Bootstrap

Elasticsearch文档版本冲突原理与解决

一般我们在更新文档时,主要的操作流程时:读取文档->修改->提交保存。数据中心等保存的都是最新一次提交的内容。

正常来说没什么问题,但是如果两个或更多的请求并发修改同一个文档时,很容易产生冲突。如果按照先后顺序,则最后被处理的请求可能覆盖首先被处理的请求作出的操作和变更,从而导致其数据变更丢失(最后被处理的请求也不一定是最后发起的,取决于其网络传输等因素影响)。

在发生并发冲突的时候,我们有常用的两种策略:

Elastic中的乐观锁策略

Elasticsearch是分布式的,文档的创建/变更等都会同步到其他节点。由于其异步性和并发的特点,这些同步请求都是并行的,因此并不能保证数据的是按照修改顺序依次到达的。Elasticsearch保证了一个老版本的数据永远无法重写或覆盖更新版本的数据。

在   和 请求中,都存在一个  字段。数据的变更均会导致 的值增大。Elasticsearch通过该字段来保证小于该值的数据会被忽略掉。

通过数字版本的方式也可以避免ABA的数据问题,即数据A修改为B而后又修改为A,对于应用端来说,数据是没有任何变化的

创建文档:

PUT /website/blog/1/_create
{
  "title": "My first blog entry",
  "text":  "Just trying this out..."
}

获取文档:

GET /website/blog/1
结果:
{
  "_index" :   "website",
  "_type" :    "blog",
  "_id" :      "1",
  "_version" : 1,
  "found" :    true,
  "_source" :  {
      "title": "My first blog entry",
      "text":  "Just trying this out..."
  }
}

此时  为1

修改数据:

PUT /website/blog/1?version=1 
{
  "title": "My first blog entry",
  "text":  "Starting to get the hang of this..."
}

{
  "_index":   "website",
  "_type":    "blog",
  "_id":      "1",
  "_version": 2
  "created":  false
}

此时操作成功。

如果再版本1的基础我们再次提交修改:

{
   "error": {
      "root_cause": [
         {
            "type": "version_conflict_engine_exception",
            "reason": "[blog][1]: version conflict, current [2], provided [1]",
            "index": "website",
            "shard": "3"
         }
      ],
      "type": "version_conflict_engine_exception",
      "reason": "[blog][1]: version conflict, current [2], provided [1]",
      "index": "website",
      "shard": "3"
   },
   "status": 409
}

此时,显示存在版本冲突。此时应用可以据此作出相应的处理,获取最新数据Merge处理亦或其他处理方式。

问题处理

一般的系统中,这种冲突出现的情况比较少。但当我们在实际中存在多个系统可能会对某一数据做更新的情况时,会偶尔出现并发修改的冲突。

 org.elasticsearch.index.engine.VersionConflictEngineException: [project][1140860]: version conflict, current version [5] is different than the one provided [4]
	at org.elasticsearch.index.engine.InternalEngine.planIndexingAsPrimary(InternalEngine.java:559)
	at org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:472)
	at org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:560)
	at org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:549)
	at org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnPrimary(TransportShardBulkAction.java:484)
	at org.elasticsearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:143)
	at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:113)
	at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:69)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:939)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:908)
	at org.elasticsearch.action.support.replication.ReplicationOperation.execute(ReplicationOperation.java:113)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.onResponse(TransportReplicationAction.java:322)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.onResponse(TransportReplicationAction.java:264)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$1.onResponse(TransportReplicationAction.java:888)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$1.onResponse(TransportReplicationAction.java:885)
	at org.elasticsearch.index.shard.IndexShardOperationsLock.acquire(IndexShardOperationsLock.java:147)
	at org.elasticsearch.index.shard.IndexShard.acquirePrimaryOperationLock(IndexShard.java:1658)
	at org.elasticsearch.action.support.replication.TransportReplicationAction.acquirePrimaryShardReference(TransportReplicationAction.java:897)
	at org.elasticsearch.action.support.replication.TransportReplicationAction.access$400(TransportReplicationAction.java:93)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.doRun(TransportReplicationAction.java:281)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:260)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:252)
	at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:69)
	at org.elasticsearch.transport.TransportService$7.doRun(TransportService.java:627)
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:638)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

此时我们应该在请求上加上retry_on_conflict的参数,在冲突发生时,重试提交数据:

POST /website/pageviews/1/_update?retry_on_conflict=5 
{
   //some update
}

也可以尝试更多次数,以保证提交能够最终成功,java/python等都有具体的实现,一般时UpdateRequest对象上进行设置(重试情况的前提是两种数据都可以成功写入为最新数据,如文章写作)