Rename seq# powered optimistic concurrency control parameters to ifSeqNo/ifPrimaryTerm (#36757)

This PR renames the parameters previously introduce to the following:

### URL Parameters
```
PUT twitter/_doc/1?if_seq_no=501&if_primary_term=1
{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elasticsearch"
}

DELETE twitter/_doc/1?if_seq_no=501&if_primary_term=1
```

### Bulk API
```
POST _bulk
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1", "if_seq_no": 501, "if_primary_term": 1 } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2", "if_seq_no": 501, "if_primary_term": 1 } }
```

### Java API
```
IndexRequest.ifSeqNo(long seqNo)
IndexRequest.ifPrimaryTerm(long primaryTerm)
DeleteRequest.ifSeqNo(long seqNo)
DeleteRequest.ifPrimaryTerm(long primaryTerm)
```

Relates #36148
Relates #10708
This commit is contained in:
Boaz Leskes 2018-12-18 14:35:18 +01:00 committed by GitHub
parent af57575838
commit 5f76f39386
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 247 additions and 159 deletions

View File

@ -43,11 +43,11 @@
"type" : "time",
"description" : "Explicit operation timeout"
},
"if_seq_no_match" : {
"if_seq_no" : {
"type" : "number",
"description" : "only perform the delete operation if the last operation that has changed the document has the specified sequence number"
},
"if_primary_term_match" : {
"if_primary_term" : {
"type" : "number",
"description" : "only perform the delete operation if the last operation that has changed the document has the specified primary term"
},

View File

@ -57,11 +57,11 @@
"options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type"
},
"if_seq_no_match" : {
"if_seq_no" : {
"type" : "number",
"description" : "only perform the index operation if the last operation that has changed the document has the specified sequence number"
},
"if_primary_term_match" : {
"if_primary_term" : {
"type" : "number",
"description" : "only perform the index operation if the last operation that has changed the document has the specified primary term"
},

View File

@ -26,8 +26,8 @@
index:
index: test_1
id: 1
if_seq_no_match: 10000
if_primary_term_match: $primary_term
if_seq_no: 10000
if_primary_term: $primary_term
body: { foo: bar2 }
- do:
@ -35,16 +35,16 @@
index:
index: test_1
id: 1
if_seq_no_match: $seqno
if_primary_term_match: 1000
if_seq_no: $seqno
if_primary_term: 1000
body: { foo: bar2 }
- do:
index:
index: test_1
id: 1
if_seq_no_match: $seqno
if_primary_term_match: $primary_term
if_seq_no: $seqno
if_primary_term: $primary_term
body: { foo: bar2 }
- match: { _version: 2 }

View File

@ -78,8 +78,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
private static final ParseField RETRY_ON_CONFLICT = new ParseField("retry_on_conflict");
private static final ParseField PIPELINE = new ParseField("pipeline");
private static final ParseField SOURCE = new ParseField("_source");
private static final ParseField IF_SEQ_NO_MATCH = new ParseField("if_seq_no_match");
private static final ParseField IF_PRIMARY_TERM_MATCH = new ParseField("if_primary_term_match");
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
/**
* Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and
@ -350,8 +350,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
String opType = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
long ifPrimaryTermMatch = 0;
long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
long ifPrimaryTerm = 0;
int retryOnConflict = 0;
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
@ -382,10 +382,10 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
version = parser.longValue();
} else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
versionType = VersionType.fromString(parser.text());
} else if (IF_SEQ_NO_MATCH.match(currentFieldName, parser.getDeprecationHandler())) {
ifSeqNoMatch = parser.longValue();
} else if (IF_PRIMARY_TERM_MATCH.match(currentFieldName, parser.getDeprecationHandler())) {
ifPrimaryTermMatch = parser.longValue();
} else if (IF_SEQ_NO.match(currentFieldName, parser.getDeprecationHandler())) {
ifSeqNo = parser.longValue();
} else if (IF_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) {
ifPrimaryTerm = parser.longValue();
} else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
retryOnConflict = parser.intValue();
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
@ -414,7 +414,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
if ("delete".equals(action)) {
add(new DeleteRequest(index, type, id).routing(routing)
.version(version).versionType(versionType).setIfMatch(ifSeqNoMatch, ifPrimaryTermMatch), payload);
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm), payload);
} else {
nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
@ -427,16 +427,17 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
if ("index".equals(action)) {
if (opType == null) {
internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType)
.setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
.setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType), payload);
} else {
internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType)
.create("create".equals(opType)).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
.create("create".equals(opType)).setPipeline(pipeline)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
}
} else if ("create".equals(action)) {
internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType)
.create(true).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
.create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
} else if ("update".equals(action)) {
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict)

View File

@ -462,7 +462,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
executeOnPrimaryWhileHandlingMappingUpdates(context,
() ->
primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
request.ifSeqNoMatch(), request.ifPrimaryTermMatch(), request.getAutoGeneratedTimestamp(), request.isRetry()),
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry()),
e -> primary.getFailedIndexResult(e, request.version()),
context::markOperationAsExecuted,
mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type()));
@ -474,7 +474,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
final IndexShard primary = context.getPrimary();
executeOnPrimaryWhileHandlingMappingUpdates(context,
() -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(),
request.ifSeqNoMatch(), request.ifPrimaryTermMatch()),
request.ifSeqNo(), request.ifPrimaryTerm()),
e -> primary.getFailedDeleteResult(e, request.version()),
context::markOperationAsExecuted,
mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type()));

View File

@ -58,8 +58,8 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
private String routing;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTermMatch = 0;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = 0;
public DeleteRequest() {
}
@ -116,11 +116,20 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
validationException = addValidationError("version type [force] may no longer be used", validationException);
}
if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO && (
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}
if (ifPrimaryTerm == 0 && ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
}
if (ifPrimaryTerm != 0 && ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
validationException =
addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException);
}
return validationException;
}
@ -203,29 +212,52 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
return this;
}
public long ifSeqNoMatch() {
return ifSeqNoMatch;
/**
* If set, only perform this delete request if the document was last modification was assigned this sequence number.
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifSeqNo() {
return ifSeqNo;
}
public long ifPrimaryTermMatch() {
return ifPrimaryTermMatch;
/**
* If set, only perform this delete request if the document was last modification was assigned this primary term.
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifPrimaryTerm() {
return ifPrimaryTerm;
}
public DeleteRequest setIfMatch(long seqNo, long term) {
if (term == 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("seqNo is set, but primary term is [0]");
}
if (term != 0 && seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("seqNo is unassigned, but primary term is [" + term + "]");
}
/**
* only perform this delete request if the document was last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public DeleteRequest setIfSeqNo(long seqNo) {
if (seqNo < 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
ifSeqNo = seqNo;
return this;
}
/**
* only perform this delete request if the document was last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the document last modification was assigned a different primary term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public DeleteRequest setIfPrimaryTerm(long term) {
if (term < 0) {
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
}
ifSeqNoMatch = seqNo;
ifPrimaryTermMatch = term;
ifPrimaryTerm = term;
return this;
}
@ -251,11 +283,11 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
ifSeqNoMatch = in.readZLong();
ifPrimaryTermMatch = in.readVLong();
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTermMatch = 0;
ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTerm = 0;
}
}
@ -271,10 +303,10 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
out.writeLong(version);
out.writeByte(versionType.getValue());
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeZLong(ifSeqNoMatch);
out.writeVLong(ifPrimaryTermMatch);
} else if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) {
assert false : "setIfMatch [" + ifSeqNoMatch + "], currentDocTem [" + ifPrimaryTermMatch + "]";
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
} else if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTerm != 0) {
assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +
"Stream version [" + out.getVersion() + "]");

View File

@ -82,11 +82,26 @@ public class DeleteRequestBuilder extends ReplicationRequestBuilder<DeleteReques
}
/**
* only performs this delete request if the document was last modification was assigned the given
* sequence number and primary term
* only perform this delete request if the document was last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public DeleteRequestBuilder setIfMatch(long seqNo, long term) {
request.setIfMatch(seqNo, term);
public DeleteRequestBuilder setIfSeqNo(long seqNo) {
request.setIfSeqNo(seqNo);
return this;
}
/**
* only perform this delete request if the document was last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public DeleteRequestBuilder setIfPrimaryTerm(long term) {
request.setIfPrimaryTerm(term);
return this;
}

View File

@ -105,8 +105,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private long autoGeneratedTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;
private boolean isRetry = false;
private long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTermMatch = 0;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = 0;
public IndexRequest() {
@ -168,7 +168,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
return validationException;
}
if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) {
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTerm != 0) {
validationException = addValidationError("create operations do not support compare and set. use index instead",
validationException);
return validationException;
@ -201,11 +201,18 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
validationException = addValidationError("pipeline cannot be an empty string", validationException);
}
if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO && (
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}
if (ifPrimaryTerm == 0 && ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
}
if (ifPrimaryTerm != 0 && ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
validationException =
addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException);
}
return validationException;
}
@ -486,31 +493,53 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
return this;
}
public IndexRequest ifMatch(long seqNo, long term) {
if (term == 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("seqNo is set, but primary term is [0]");
}
if (term != 0 && seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("seqNo is unassigned, but primary term is [" + term + "]");
}
/**
* only perform this indexing request if the document was last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public IndexRequest setIfSeqNo(long seqNo) {
if (seqNo < 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
if (term < 0) {
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
}
ifSeqNoMatch = seqNo;
ifPrimaryTermMatch = term;
ifSeqNo = seqNo;
return this;
}
public long ifSeqNoMatch() {
return ifSeqNoMatch;
/**
* only performs this indexing request if the document was last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public IndexRequest setIfPrimaryTerm(long term) {
if (term < 0) {
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
}
ifPrimaryTerm = term;
return this;
}
public long ifPrimaryTermMatch() {
return ifPrimaryTermMatch;
/**
* If set, only perform this indexing request if the document was last modification was assigned this sequence number.
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifSeqNo() {
return ifSeqNo;
}
/**
* If set, only perform this indexing request if the document was last modification was assigned this primary term.
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifPrimaryTerm() {
return ifPrimaryTerm;
}
@Override
@ -534,8 +563,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
// generate id if not already provided
if (id == null) {
assert autoGeneratedTimestamp == -1 : "timestamp has already been generated!";
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO;
assert ifPrimaryTermMatch == 0;
assert ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO;
assert ifPrimaryTerm == 0;
autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia
String uid;
if (indexCreatedVersion.onOrAfter(Version.V_6_0_0_beta1)) {
@ -578,11 +607,11 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
contentType = null;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
ifSeqNoMatch = in.readZLong();
ifPrimaryTermMatch = in.readVLong();
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTermMatch = 0;
ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTerm = 0;
}
}
@ -616,10 +645,10 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
out.writeBoolean(false);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeZLong(ifSeqNoMatch);
out.writeVLong(ifPrimaryTermMatch);
} else if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) {
assert false : "setIfMatch [" + ifSeqNoMatch + "], currentDocTem [" + ifPrimaryTermMatch + "]";
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
} else if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTerm != 0) {
assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +
"Stream version [" + out.getVersion() + "]");

View File

@ -200,11 +200,26 @@ public class IndexRequestBuilder extends ReplicationRequestBuilder<IndexRequest,
}
/**
* only performs this indexing request if the document was last modification was assigned the given
* sequence number and primary term
* only perform this indexing request if the document was last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public IndexRequestBuilder setIfMatch(long seqNo, long term) {
request.ifMatch(seqNo, term);
public IndexRequestBuilder setIfSeqNo(long seqNo) {
request.setIfSeqNo(seqNo);
return this;
}
/**
* only perform this indexing request if the document was last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public IndexRequestBuilder setIfPrimaryTerm(long term) {
request.setIfPrimaryTerm(term);
return this;
}

View File

@ -1345,23 +1345,23 @@ public abstract class Engine implements Closeable {
private final ParsedDocument doc;
private final long autoGeneratedIdTimestamp;
private final boolean isRetry;
private final long ifSeqNoMatch;
private final long ifPrimaryTermMatch;
private final long ifSeqNo;
private final long ifPrimaryTerm;
public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
long startTime, long autoGeneratedIdTimestamp, boolean isRetry, long ifSeqNoMatch, long ifPrimaryTermMatch) {
long startTime, long autoGeneratedIdTimestamp, boolean isRetry, long ifSeqNo, long ifPrimaryTerm) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
assert ifPrimaryTermMatch >= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative";
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 :
"ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) :
assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative";
assert ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNo >=0 :
"ifSeqNo [" + ifSeqNo + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTerm == 0) :
"cas operations are only allowed if origin is primary. get [" + origin + "]";
this.doc = doc;
this.isRetry = isRetry;
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
this.ifSeqNoMatch = ifSeqNoMatch;
this.ifPrimaryTermMatch = ifPrimaryTermMatch;
this.ifSeqNo = ifSeqNo;
this.ifPrimaryTerm = ifPrimaryTerm;
}
public Index(Term uid, long primaryTerm, ParsedDocument doc) {
@ -1426,12 +1426,12 @@ public abstract class Engine implements Closeable {
return isRetry;
}
public long getIfSeqNoMatch() {
return ifSeqNoMatch;
public long getIfSeqNo() {
return ifSeqNo;
}
public long getIfPrimaryTermMatch() {
return ifPrimaryTermMatch;
public long getIfPrimaryTerm() {
return ifPrimaryTerm;
}
}
@ -1439,22 +1439,22 @@ public abstract class Engine implements Closeable {
private final String type;
private final String id;
private final long ifSeqNoMatch;
private final long ifPrimaryTermMatch;
private final long ifSeqNo;
private final long ifPrimaryTerm;
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
Origin origin, long startTime, long ifSeqNoMatch, long ifPrimaryTermMatch) {
Origin origin, long startTime, long ifSeqNo, long ifPrimaryTerm) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
assert ifPrimaryTermMatch >= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative";
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 :
"ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) :
assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative";
assert ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNo >=0 :
"ifSeqNo [" + ifSeqNo + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTerm == 0) :
"cas operations are only allowed if origin is primary. get [" + origin + "]";
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
this.ifSeqNoMatch = ifSeqNoMatch;
this.ifPrimaryTermMatch = ifPrimaryTermMatch;
this.ifSeqNo = ifSeqNo;
this.ifPrimaryTerm = ifPrimaryTerm;
}
public Delete(String type, String id, Term uid, long primaryTerm) {
@ -1487,12 +1487,12 @@ public abstract class Engine implements Closeable {
return (uid().field().length() + uid().text().length()) * 2 + 20;
}
public long getIfSeqNoMatch() {
return ifSeqNoMatch;
public long getIfSeqNo() {
return ifSeqNo;
}
public long getIfPrimaryTermMatch() {
return ifPrimaryTermMatch;
public long getIfPrimaryTerm() {
return ifPrimaryTerm;
}
}

View File

@ -965,7 +965,7 @@ public class InternalEngine extends Engine {
versionMap.enforceSafeAccess();
// resolves incoming version
final VersionValue versionValue =
resolveDocVersion(index, index.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO);
resolveDocVersion(index, index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
final long currentVersion;
final boolean currentNotFoundOrDeleted;
if (versionValue == null) {
@ -975,15 +975,15 @@ public class InternalEngine extends Engine {
currentVersion = versionValue.version;
currentNotFoundOrDeleted = versionValue.isDelete();
}
if (index.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index.type(), index.id(),
index.getIfSeqNoMatch(), index.getIfPrimaryTermMatch(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
index.getIfSeqNo(), index.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
} else if (index.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != index.getIfSeqNoMatch() || versionValue.term != index.getIfPrimaryTermMatch()
} else if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != index.getIfSeqNo() || versionValue.term != index.getIfPrimaryTerm()
)) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index.type(), index.id(),
index.getIfSeqNoMatch(), index.getIfPrimaryTermMatch(), versionValue.seqNo, versionValue.term);
index.getIfSeqNo(), index.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
} else if (index.versionType().isVersionConflictForWrites(
currentVersion, index.version(), currentNotFoundOrDeleted)) {
@ -1302,7 +1302,7 @@ public class InternalEngine extends Engine {
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
// resolve operation from external to internal
final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO);
final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
assert incrementVersionLookup();
final long currentVersion;
final boolean currentlyDeleted;
@ -1314,15 +1314,15 @@ public class InternalEngine extends Engine {
currentlyDeleted = versionValue.isDelete();
}
final DeletionStrategy plan;
if (delete.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete.type(), delete.id(),
delete.getIfSeqNoMatch(), delete.getIfPrimaryTermMatch(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
delete.getIfSeqNo(), delete.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
} else if (delete.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != delete.getIfSeqNoMatch() || versionValue.term != delete.getIfPrimaryTermMatch()
} else if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != delete.getIfSeqNo() || versionValue.term != delete.getIfPrimaryTerm()
)) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete.type(), delete.id(),
delete.getIfSeqNoMatch(), delete.getIfPrimaryTermMatch(), versionValue.seqNo, versionValue.term);
delete.getIfSeqNo(), delete.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
} else if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);

View File

@ -686,12 +686,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse,
long ifSeqNoMatch, long ifPrimaryTermMatch, long autoGeneratedTimestamp,
long ifSeqNo, long ifPrimaryTerm, long autoGeneratedTimestamp,
boolean isRetry)
throws IOException {
assert versionType.validateVersionForWrites(version);
return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, ifSeqNoMatch,
ifPrimaryTermMatch, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, ifSeqNo,
ifPrimaryTerm, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
}
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp,
@ -702,7 +702,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long opPrimaryTerm, long version,
@Nullable VersionType versionType, long ifSeqNoMatch, long ifPrimaryTermMatch,
@Nullable VersionType versionType, long ifSeqNo, long ifPrimaryTerm,
long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
SourceToParse sourceToParse) throws IOException {
assert opPrimaryTerm <= this.operationPrimaryTerm: "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
@ -712,7 +712,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
try {
operation = prepareIndex(docMapper(sourceToParse.type()), indexSettings.getIndexVersionCreated(), sourceToParse, seqNo,
opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry,
ifSeqNoMatch, ifPrimaryTermMatch);
ifSeqNo, ifPrimaryTerm);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
return new Engine.IndexResult(update);
@ -732,7 +732,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public static Engine.Index prepareIndex(DocumentMapperForType docMapper, Version indexCreatedVersion, SourceToParse source, long seqNo,
long primaryTerm, long version, VersionType versionType, Engine.Operation.Origin origin,
long autoGeneratedIdTimestamp, boolean isRetry,
long ifSeqNoMatch, long ifPrimaryTermMatch) {
long ifSeqNo, long ifPrimaryTerm) {
long startTime = System.nanoTime();
ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
if (docMapper.getMapping() != null) {
@ -740,7 +740,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(doc.id()));
return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry,
ifSeqNoMatch, ifPrimaryTermMatch);
ifSeqNo, ifPrimaryTerm);
}
private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
@ -792,11 +792,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType,
long ifSeqNoMatch, long ifPrimaryTermMatch)
long ifSeqNo, long ifPrimaryTerm)
throws IOException {
assert versionType.validateVersionForWrites(version);
return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType,
ifSeqNoMatch, ifPrimaryTermMatch, Engine.Operation.Origin.PRIMARY);
ifSeqNo, ifPrimaryTerm, Engine.Operation.Origin.PRIMARY);
}
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException {
@ -805,7 +805,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, String type, String id,
@Nullable VersionType versionType, long ifSeqNoMatch, long ifPrimaryTermMatch,
@Nullable VersionType versionType, long ifSeqNo, long ifPrimaryTerm,
Engine.Operation.Origin origin) throws IOException {
assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
+ "]";
@ -835,16 +835,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
final Engine.Delete delete = prepareDelete(type, id, uid, seqNo, opPrimaryTerm, version,
versionType, origin, ifSeqNoMatch, ifPrimaryTermMatch);
versionType, origin, ifSeqNo, ifPrimaryTerm);
return delete(engine, delete);
}
private Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version,
VersionType versionType, Engine.Operation.Origin origin,
long ifSeqNoMatch, long ifPrimaryTermMatch) {
long ifSeqNo, long ifPrimaryTerm) {
long startTime = System.nanoTime();
return new Engine.Delete(resolveType(type), id, uid, seqNo, primaryTerm, version, versionType, origin, startTime,
ifSeqNoMatch, ifPrimaryTermMatch);
ifSeqNo, ifPrimaryTerm);
}
private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException {

View File

@ -66,10 +66,8 @@ public class RestDeleteAction extends BaseRestHandler {
deleteRequest.setRefreshPolicy(request.param("refresh"));
deleteRequest.version(RestActions.parseVersion(request));
deleteRequest.versionType(VersionType.fromString(request.param("version_type"), deleteRequest.versionType()));
deleteRequest.setIfMatch(
request.paramAsLong("if_seq_no_match", deleteRequest.ifSeqNoMatch()),
request.paramAsLong("if_primary_term_match", deleteRequest.ifPrimaryTermMatch())
);
deleteRequest.setIfSeqNo(request.paramAsLong("if_seq_no", deleteRequest.ifSeqNo()));
deleteRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", deleteRequest.ifPrimaryTerm()));
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {

View File

@ -93,10 +93,8 @@ public class RestIndexAction extends BaseRestHandler {
indexRequest.setRefreshPolicy(request.param("refresh"));
indexRequest.version(RestActions.parseVersion(request));
indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
indexRequest.ifMatch(
request.paramAsLong("if_seq_no_match", indexRequest.ifSeqNoMatch()),
request.paramAsLong("if_primary_term_match", indexRequest.ifPrimaryTermMatch())
);
indexRequest.setIfSeqNo(request.paramAsLong("if_seq_no", indexRequest.ifSeqNo()));
indexRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", indexRequest.ifPrimaryTerm()));
String sOpType = request.param("op_type");
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {

View File

@ -295,21 +295,21 @@ public class SimpleVersioningIT extends ESIntegTestCase {
assertThat(indexResponse.getPrimaryTerm(), equalTo(1L));
assertThrows(
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfMatch(10, 1).execute(),
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfSeqNo(10).setIfPrimaryTerm(1).execute(),
VersionConflictEngineException.class);
assertThrows(
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfMatch(10, 2).execute(),
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfSeqNo(10).setIfPrimaryTerm(2).execute(),
VersionConflictEngineException.class);
assertThrows(
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfMatch(1, 2).execute(),
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfSeqNo(1).setIfPrimaryTerm(2).execute(),
VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(10, 1).execute(), VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(10, 2).execute(), VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(1, 2).execute(), VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setIfSeqNo(10).setIfPrimaryTerm(1), VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setIfSeqNo(10).setIfPrimaryTerm(2), VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setIfSeqNo(1).setIfPrimaryTerm(2), VersionConflictEngineException.class);
client().admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {
@ -331,19 +331,19 @@ public class SimpleVersioningIT extends ESIntegTestCase {
assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(Versions.NOT_FOUND));
}
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setIfMatch(1, 1).execute().actionGet();
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setIfSeqNo(1).setIfPrimaryTerm(1).get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
assertThat(deleteResponse.getSeqNo(), equalTo(2L));
assertThat(deleteResponse.getPrimaryTerm(), equalTo(1L));
assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(1, 1).execute(), VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(3, 2).execute(), VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(1, 2).execute(), VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setIfSeqNo(1).setIfPrimaryTerm(1), VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setIfSeqNo(3).setIfPrimaryTerm(12), VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setIfSeqNo(1).setIfPrimaryTerm(2), VersionConflictEngineException.class);
// This is intricate - the object was deleted but a delete transaction was with the right version. We add another one
// and thus the transaction is increased.
deleteResponse = client().prepareDelete("test", "type", "1").setIfMatch(2, 1).execute().actionGet();
deleteResponse = client().prepareDelete("test", "type", "1").setIfSeqNo(2).setIfPrimaryTerm(1).get();
assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteResponse.getResult());
assertThat(deleteResponse.getSeqNo(), equalTo(3L));
assertThat(deleteResponse.getPrimaryTerm(), equalTo(1L));

View File

@ -318,11 +318,11 @@ public class FollowingEngineTests extends ESTestCase {
Engine.Index index = (Engine.Index) op;
result = engine.index(new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), primaryTerm, index.version(),
versionType, origin, index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(),
index.getIfSeqNoMatch(), index.getIfPrimaryTermMatch()));
index.getIfSeqNo(), index.getIfPrimaryTerm()));
} else if (op instanceof Engine.Delete) {
Engine.Delete delete = (Engine.Delete) op;
result = engine.delete(new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm,
delete.version(), versionType, origin, delete.startTime(), delete.getIfSeqNoMatch(), delete.getIfPrimaryTermMatch()));
delete.version(), versionType, origin, delete.startTime(), delete.getIfSeqNo(), delete.getIfPrimaryTerm()));
} else {
Engine.NoOp noOp = (Engine.NoOp) op;
result = engine.noOp(new Engine.NoOp(noOp.seqNo(), primaryTerm, origin, noOp.startTime(), noOp.reason()));