Expose Sequence Number based Optimistic Concurrency Control in the rest layer (#36721)

Relates #36148 
Relates #10708
This commit is contained in:
Boaz Leskes 2018-12-18 10:56:02 +01:00 committed by GitHub
parent 57e1a4bc9f
commit 9087c98a5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 76 additions and 2 deletions

View File

@ -43,6 +43,14 @@
"type" : "time", "type" : "time",
"description" : "Explicit operation timeout" "description" : "Explicit operation timeout"
}, },
"if_seq_no_match" : {
"type" : "number",
"description" : "only perform the delete operation if the last operation that has changed the document has the specified sequence number"
},
"if_primary_term_match" : {
"type" : "number",
"description" : "only perform the delete operation if the last operation that has changed the document has the specified primary term"
},
"version" : { "version" : {
"type" : "number", "type" : "number",
"description" : "Explicit version number for concurrency control" "description" : "Explicit version number for concurrency control"

View File

@ -57,6 +57,14 @@
"options" : ["internal", "external", "external_gte", "force"], "options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type" "description" : "Specific version type"
}, },
"if_seq_no_match" : {
"type" : "number",
"description" : "only perform the index operation if the last operation that has changed the document has the specified sequence number"
},
"if_primary_term_match" : {
"type" : "number",
"description" : "only perform the index operation if the last operation that has changed the document has the specified primary term"
},
"pipeline" : { "pipeline" : {
"type" : "string", "type" : "string",
"description" : "The pipeline id to preprocess incoming documents with" "description" : "The pipeline id to preprocess incoming documents with"

View File

@ -0,0 +1,50 @@
---
"Compare And Swap Sequence Numbers":
- skip:
version: " - 6.99.99"
reason: cas ops are introduced in 7.0.0
- do:
index:
index: test_1
id: 1
body: { foo: bar }
- match: { _version: 1}
- set: { _seq_no: seqno }
- set: { _primary_term: primary_term }
- do:
get:
index: test_1
id: 1
- match: { _seq_no: $seqno }
- match: { _primary_term: $primary_term }
- do:
catch: conflict
index:
index: test_1
id: 1
if_seq_no_match: 10000
if_primary_term_match: $primary_term
body: { foo: bar2 }
- do:
catch: conflict
index:
index: test_1
id: 1
if_seq_no_match: $seqno
if_primary_term_match: 1000
body: { foo: bar2 }
- do:
index:
index: test_1
id: 1
if_seq_no_match: $seqno
if_primary_term_match: $primary_term
body: { foo: bar2 }
- match: { _version: 2 }

View File

@ -91,7 +91,7 @@ public class GetResponse extends ActionResponse implements Iterable<DocumentFiel
} }
/** /**
* The sequence number assigned to the last operation to have changed this document, if found. * The sequence number assigned to the last operation that has changed this document, if found.
*/ */
public long getSeqNo() { public long getSeqNo() {
return getResult.getSeqNo(); return getResult.getSeqNo();

View File

@ -131,7 +131,7 @@ public class GetResult implements Streamable, Iterable<DocumentField>, ToXConten
} }
/** /**
* The sequence number assigned to the last operation to have changed this document, if found. * The sequence number assigned to the last operation that has changed this document, if found.
*/ */
public long getSeqNo() { public long getSeqNo() {
return seqNo; return seqNo;

View File

@ -66,6 +66,10 @@ public class RestDeleteAction extends BaseRestHandler {
deleteRequest.setRefreshPolicy(request.param("refresh")); deleteRequest.setRefreshPolicy(request.param("refresh"));
deleteRequest.version(RestActions.parseVersion(request)); deleteRequest.version(RestActions.parseVersion(request));
deleteRequest.versionType(VersionType.fromString(request.param("version_type"), deleteRequest.versionType())); deleteRequest.versionType(VersionType.fromString(request.param("version_type"), deleteRequest.versionType()));
deleteRequest.setIfMatch(
request.paramAsLong("if_seq_no_match", deleteRequest.ifSeqNoMatch()),
request.paramAsLong("if_primary_term_match", deleteRequest.ifPrimaryTermMatch())
);
String waitForActiveShards = request.param("wait_for_active_shards"); String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) { if (waitForActiveShards != null) {

View File

@ -93,6 +93,10 @@ public class RestIndexAction extends BaseRestHandler {
indexRequest.setRefreshPolicy(request.param("refresh")); indexRequest.setRefreshPolicy(request.param("refresh"));
indexRequest.version(RestActions.parseVersion(request)); indexRequest.version(RestActions.parseVersion(request));
indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType())); indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
indexRequest.ifMatch(
request.paramAsLong("if_seq_no_match", indexRequest.ifSeqNoMatch()),
request.paramAsLong("if_primary_term_match", indexRequest.ifPrimaryTermMatch())
);
String sOpType = request.param("op_type"); String sOpType = request.param("op_type");
String waitForActiveShards = request.param("wait_for_active_shards"); String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) { if (waitForActiveShards != null) {