Bootstrap

Elasticsearch 乐观锁并发控制

文字内容整理自 B 站中华石杉的 Elasticsearch 顶尖高手系列课程核心知识篇

先构造一条数据出来


PUT /test_index/test_type/7
{
  "test_field": "test 7"
}

{
  "_index" : "test_index",
  "_type" : "test_type",
  "_id" : "7",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 10,
  "_primary_term" : 1
}

模拟两个客户端,都获取到了同一条数据(开两个浏览器的 tab 页就可以,或者不放心的话,也可以开两个浏览器,甚至是两个不同的浏览器)

GET test_index/test_type/7

{
  "_index" : "test_index",
  "_type" : "test_type",
  "_id" : "7",
  "_version" : 1,
  "_seq_no" : 10,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "test_field" : "test 7"
  }
}

其中一个客户端,先更新了一下这个数据

同时带上数据的版本号,确保说,Elasticsearch 中的数据的版本号,跟客户端中的数据的版本号是相同的,才能修改

注意,这里问题来了,按照教程中带 version 参数的更新方式,得到了一个报错信息。然后按照提示,使用了 if_seq_no 和 if_primary_term 两个参数,才更新成功。

PUT test_index/test_type/7?version=1
{
  "test_field":"update from client 1 for verion 1"
}

{
  "error" : {
    "root_cause" : [
      {
        "type" : "action_request_validation_exception",
        "reason" : "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
      }
    ],
    "type" : "action_request_validation_exception",
    "reason" : "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
  },
  "status" : 400
}

PUT test_index/test_type/7?if_seq_no=10&if_primary_term=1
{
  "test_field":"update from client 1 for verion 1"
}

{
  "_index" : "test_index",
  "_type" : "test_type",
  "_id" : "7",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 11,
  "_primary_term" : 1
}

另外一个客户端,尝试基于 version=1 的数据去进行修改,同样带上version 版本号,进行乐观锁的并发控制。

可以看到同样的错误提示,大概是因为 7.10.1 版本与 5.2 版本的差别。

# 7.10.1
PUT test_index/test_type/7?if_seq_no=10&if_primary_term=1
{
  "test_field":"update from client 1 for verion 1"
}

{
  "error" : {
    "root_cause" : [
      {
        "type" : "version_conflict_engine_exception",
        "reason" : "[7]: version conflict, required seqNo [10], primary term [1]. current document has seqNo [11] and primary term [1]",
        "index_uuid" : "zJ1EzrvMTIeZgBYwUX5u2A",
        "shard" : "0",
        "index" : "test_index"
      }
    ],
    "type" : "version_conflict_engine_exception",
    "reason" : "[7]: version conflict, required seqNo [10], primary term [1]. current document has seqNo [11] and primary term [1]",
    "index_uuid" : "zJ1EzrvMTIeZgBYwUX5u2A",
    "shard" : "0",
    "index" : "test_index"
  },
  "status" : 409
}

# 5.2 
PUT /test_index/test_type/7?version=1  
{ 
  "test_field": "test client 2" 
} 
{ 
  "error": { 
    "root_cause": [ 
      { 
        "type": "version_conflict_engine_exception", 
        "reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]", 
        "index_uuid": "6m0G7yx7R1KECWWGnfH1sw", 
        "shard": "3", 
        "index": "test_index" 
      } 
    ], 
    "type": "version_conflict_engine_exception", 
    "reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]", 
    "index_uuid": "6m0G7yx7R1KECWWGnfH1sw", 
    "shard": "3", 
    "index": "test_index" 
  }, 
  "status": 409 
}

在乐观锁成功阻止并发问题之后,尝试正确的完成更新

GET test_index/test_type/7

{
  "_index" : "test_index",
  "_type" : "test_type",
  "_id" : "7",
  "_version" : 2,
  "_seq_no" : 11,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "test_field" : "update from client 1 for verion 1"
  }
}

基于最新的数据和版本号,去进行修改,修改后,带上最新的版本号,可能这个步骤会需要反复执行好几次,才能成功,特别是在多线程并发更新同一条数据很频繁的情况下

查了一下官方文档,大概是在 Elasticsearch 6.7 版本的时候,使用 version 关键字进行乐观锁的内部版本控制就已经取消了,转而使用 if_seq_no 和  if_primary_term 来指定版本。

Indexing changes

Deprecated usage of internal versioning for optimistic concurrency control

internal version may not uniquely identify a document’s version if an indexed document wasn’t fully replicated when a primary fails. As such it is unsafe to use for optimistic concurrency control, is deprecated and the option will no longer be available in Elasticsearch 7.0.0. Please use the if_seq_no and if_primary_term parameters instead. See Optimistic concurrency control for more details.

The sequence number is increased with each operation and thus newer operations are guaranteed to have a higher sequence number than older operations. Elasticsearch can then use the sequence number of operations to make sure a newer document version is never overridden by a change that has a smaller sequence number assigned to it.

The sequence number and the primary term uniquely identify a change.

另外需要注意的是 if_seq_no 和  if_primary_term 必须同时使用。

PUT test_index/test_type/7?if_seq_no=11
{
  "test_field":"test update from client 2"
}

{
  "error" : {
    "root_cause" : [
      {
        "type" : "action_request_validation_exception",
        "reason" : "Validation Failed: 1: ifSeqNo is set, but primary term is [0];"
      }
    ],
    "type" : "action_request_validation_exception",
    "reason" : "Validation Failed: 1: ifSeqNo is set, but primary term is [0];"
  },
  "status" : 400
}

PUT test_index/test_type/7?if_seq_no=11&if_primary_term=1
{
  "test_field":"test update from client 2"
}

{
  "_index" : "test_index",
  "_type" : "test_type",
  "_id" : "7",
  "_version" : 3,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 12,
  "_primary_term" : 1
}