Remove support for internal versioning for concurrency control (#38254)

Elasticsearch has long [supported](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-versioning) compare and set (a.k.a optimistic concurrency control) operations using internal document versioning. Sadly that approach is flawed and can sometime do the wrong thing. Here's the relevant excerpt from the resiliency status page:

> When a primary has been partitioned away from the cluster there is a short period of time until it detects this. During that time it will continue indexing writes locally, thereby updating document versions. When it tries to replicate the operation, however, it will discover that it is partitioned away. It won’t acknowledge the write and will wait until the partition is resolved to negotiate with the master on how to proceed. The master will decide to either fail any replicas which failed to index the operations on the primary or tell the primary that it has to step down because a new primary has been chosen in the meantime. Since the old primary has already written documents, clients may already have read from the old primary before it shuts itself down. The version numbers of these reads may not be unique if the new primary has already accepted writes for the same document 

We recently [introduced](https://www.elastic.co/guide/en/elasticsearch/reference/6.x/optimistic-concurrency-control.html) a new sequence number based approach that doesn't suffer from this dirty reads problem. 

This commit removes support for internal versioning as a concurrency control mechanism in favor of the sequence number approach.

Relates to #1078
This commit is contained in:
Boaz Leskes 2019-02-05 20:53:35 +01:00 committed by GitHub
parent b03d138122
commit 033ba725af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 226 additions and 806 deletions

View File

@ -70,7 +70,6 @@ final class WatcherRequestConverters {
Request request = new Request(HttpPut.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request)
.withVersion(putWatchRequest.getVersion())
.withIfSeqNo(putWatchRequest.ifSeqNo())
.withIfPrimaryTerm(putWatchRequest.ifPrimaryTerm());
if (putWatchRequest.isActive() == false) {

View File

@ -21,7 +21,6 @@ package org.elasticsearch.client.watcher;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.seqno.SequenceNumbers;
@ -43,11 +42,9 @@ public final class PutWatchRequest implements Validatable {
private final BytesReference source;
private final XContentType xContentType;
private boolean active = true;
private long version = Versions.MATCH_ANY;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
public PutWatchRequest(String id, BytesReference source, XContentType xContentType) {
Objects.requireNonNull(id, "watch id is missing");
if (isValidId(id) == false) {
@ -95,14 +92,6 @@ public final class PutWatchRequest implements Validatable {
return xContentType;
}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
/**
* only performs this put request if the watch's last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}

View File

@ -768,8 +768,6 @@ public class RequestConvertersTests extends ESTestCase {
}
}
setRandomWaitForActiveShards(updateRequest::waitForActiveShards, expectedParams);
setRandomVersion(updateRequest, expectedParams);
setRandomVersionType(updateRequest::versionType, expectedParams);
setRandomIfSeqNoAndTerm(updateRequest, new HashMap<>()); // if* params are passed in the body
if (randomBoolean()) {
int retryOnConflict = randomIntBetween(0, 5);
@ -911,14 +909,7 @@ public class RequestConvertersTests extends ESTestCase {
if (randomBoolean()) {
docWriteRequest.routing(randomAlphaOfLength(10));
}
if (randomBoolean()) {
if (randomBoolean()) {
docWriteRequest.version(randomNonNegativeLong());
}
if (randomBoolean()) {
docWriteRequest.versionType(randomFrom(VersionType.values()));
}
} else if (randomBoolean()) {
if (opType != DocWriteRequest.OpType.UPDATE && randomBoolean()) {
docWriteRequest.setIfSeqNo(randomNonNegativeLong());
docWriteRequest.setIfPrimaryTerm(randomLongBetween(1, 200));
}

View File

@ -29,8 +29,8 @@ import org.elasticsearch.client.watcher.ActivateWatchRequest;
import org.elasticsearch.client.watcher.DeactivateWatchRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.ExecuteWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.WatcherStatsRequest;
@ -88,9 +88,12 @@ public class WatcherRequestConvertersTests extends ESTestCase {
}
if (randomBoolean()) {
long version = randomLongBetween(10, 100);
putWatchRequest.setVersion(version);
expectedParams.put("version", String.valueOf(version));
long seqNo = randomNonNegativeLong();
long ifPrimaryTerm = randomLongBetween(1, 200);
putWatchRequest.setIfSeqNo(seqNo);
putWatchRequest.setIfPrimaryTerm(ifPrimaryTerm);
expectedParams.put("if_seq_no", String.valueOf(seqNo));
expectedParams.put("if_primary_term", String.valueOf(ifPrimaryTerm));
}
Request request = WatcherRequestConverters.putWatch(putWatchRequest);

View File

@ -170,7 +170,6 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
// tag::index-response
String index = indexResponse.getIndex();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
// <1>
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
@ -220,7 +219,8 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
IndexRequest request = new IndexRequest("posts")
.id("1")
.source("field", "value")
.version(1);
.setIfSeqNo(10L)
.setIfPrimaryTerm(20);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
@ -432,7 +432,8 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
// tag::update-conflict
UpdateRequest request = new UpdateRequest("posts", "1")
.doc("field", "value")
.version(1);
.setIfSeqNo(101L)
.setIfPrimaryTerm(200L);
try {
UpdateResponse updateResponse = client.update(
request, RequestOptions.DEFAULT);
@ -499,9 +500,10 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // <1>
request.setRefreshPolicy("wait_for"); // <2>
// end::update-request-refresh
// tag::update-request-version
request.version(2); // <1>
// end::update-request-version
// tag::update-request-cas
request.setIfSeqNo(2L); // <1>
request.setIfPrimaryTerm(1L); // <2>
// end::update-request-request-cas
// tag::update-request-detect-noop
request.detectNoop(false); // <1>
// end::update-request-detect-noop
@ -630,7 +632,7 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
// tag::delete-conflict
try {
DeleteResponse deleteResponse = client.delete(
new DeleteRequest("posts", "1").version(2),
new DeleteRequest("posts", "1").setIfSeqNo(100).setIfPrimaryTerm(2),
RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {

View File

@ -140,9 +140,10 @@ include-tagged::{doc-tests-file}[{api}-request-source-exclude]
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-version]
include-tagged::{doc-tests-file}[{api}-request-cas]
--------------------------------------------------
<1> Version
<1> ifSeqNo
<2> ifPrimaryTerm
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------

View File

@ -2,6 +2,22 @@
[[breaking_70_api_changes]]
=== API changes
[float]
==== Internal Versioning is no longer supported for optimistic concurrency control
Elasticsearch maintains a numeric version field for each document it stores. That field
is incremented by one with every change to the document. Until 7.0.0 the API allowed using
that field for optimistic concurrency control, i.e., making a write operation conditional
on the current document version. Sadly, that approach is flawed because the value of the
version doesn't always uniquely represent a change to the document. If a primary fails
while handling a write operation, it may expose a version that will then be reused by the
new primary.
Due to that issue, internal versioning can no longer be used and is replaced by a new
method based on sequence numbers. See <<optimistic-concurrency-control>> for more details.
Note that the `external` versioning type is still fully supported.
[float]
==== Camel case and underscore parameters deprecated in 6.x have been removed
A number of duplicate parameters deprecated in 6.x have been removed from

View File

@ -20,11 +20,9 @@
package org.elasticsearch.index.reindex;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
@ -33,18 +31,10 @@ import org.elasticsearch.threadpool.ThreadPool;
*/
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest, TransportDeleteByQueryAction> {
private final boolean useSeqNoForCAS;
public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request,
ScriptService scriptService, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
super(task,
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
// all nodes support sequence number powered optimistic concurrency control and we can use it
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
logger, client, threadPool, action, request, listener);
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
ScriptService scriptService, ActionListener<BulkByScrollResponse> listener) {
super(task, false, true, logger, client, threadPool, action, request, listener);
}
@Override
@ -60,12 +50,8 @@ public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<De
delete.index(doc.getIndex());
delete.type(doc.getType());
delete.id(doc.getId());
if (useSeqNoForCAS) {
delete.setIfSeqNo(doc.getSeqNo());
delete.setIfPrimaryTerm(doc.getPrimaryTerm());
} else {
delete.version(doc.getVersion());
}
delete.setIfSeqNo(doc.getSeqNo());
delete.setIfPrimaryTerm(doc.getPrimaryTerm());
return wrap(delete);
}

View File

@ -61,7 +61,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
ClusterState state = clusterService.state();
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
bulkByScrollTask);
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService, state,
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService,
listener).start();
}
);

View File

@ -31,7 +31,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.IndexFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
@ -113,13 +112,8 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
index.type(doc.getType());
index.id(doc.getId());
index.source(doc.getSource(), doc.getXContentType());
if (useSeqNoForCAS) {
index.setIfSeqNo(doc.getSeqNo());
index.setIfPrimaryTerm(doc.getPrimaryTerm());
} else {
index.versionType(VersionType.INTERNAL);
index.version(doc.getVersion());
}
index.setIfSeqNo(doc.getSeqNo());
index.setIfPrimaryTerm(doc.getPrimaryTerm());
index.setPipeline(mainRequest.getPipeline());
return wrap(index);
}

View File

@ -67,7 +67,7 @@ public class UpdateByQueryWhileModifyingTests extends ReindexTestCase {
IndexRequestBuilder index = client().prepareIndex("test", "test", "test").setSource("test", value.get())
.setRefreshPolicy(IMMEDIATE);
/*
* Update by query increments the version number so concurrent
* Update by query changes the document so concurrent
* indexes might get version conflict exceptions so we just
* blindly retry.
*/
@ -75,7 +75,7 @@ public class UpdateByQueryWhileModifyingTests extends ReindexTestCase {
while (true) {
attempts++;
try {
index.setVersion(get.getVersion()).get();
index.setIfSeqNo(get.getSeqNo()).setIfPrimaryTerm(get.getPrimaryTerm()).get();
break;
} catch (VersionConflictEngineException e) {
if (attempts >= MAX_ATTEMPTS) {

View File

@ -70,15 +70,6 @@
"if_primary_term" : {
"type" : "number",
"description" : "only perform the update operation if the last operation that has changed the document has the specified primary term"
},
"version": {
"type": "number",
"description": "Explicit version number for concurrency control"
},
"version_type": {
"type": "enum",
"options": ["internal", "force"],
"description": "Specific version type"
}
}
},

View File

@ -1,36 +0,0 @@
---
"Internal version":
- skip:
version: " - 6.99.99"
reason: types are required in requests before 7.0.0
- do:
create:
index: test_1
id: 1
body: { foo: bar }
- match: { _version: 1}
- do:
catch: conflict
create:
index: test_1
id: 1
body: { foo: bar }
---
"Internal versioning with explicit version":
- skip:
version: " - 6.99.99"
reason: types are required in requests before 7.0.0
- do:
catch: bad_request
create:
index: test
id: 3
body: { foo: bar }
version: 5
- match: { status: 400 }
- match: { error.type: action_request_validation_exception }
- match: { error.reason: "Validation Failed: 1: create operations do not support explicit versions. use index instead;" }

View File

@ -1,35 +0,0 @@
---
"Internal version":
- do:
create:
index: test_1
type: test
id: 1
body: { foo: bar }
- match: { _version: 1}
- do:
catch: conflict
create:
index: test_1
type: test
id: 1
body: { foo: bar }
---
"Internal versioning with explicit version":
- do:
catch: bad_request
create:
index: test
type: test
id: 3
body: { foo: bar }
version: 5
- match: { status: 400 }
- match: { error.type: action_request_validation_exception }
- match: { error.reason: "Validation Failed: 1: create operations do not support explicit versions. use index instead;" }

View File

@ -11,19 +11,21 @@
id: 1
body: { foo: bar }
- match: { _version: 1}
- match: { _seq_no: 0 }
- do:
catch: conflict
delete:
index: test_1
id: 1
version: 2
if_seq_no: 2
if_primary_term: 1
- do:
delete:
index: test_1
id: 1
version: 1
if_seq_no: 0
if_primary_term: 1
- match: { _version: 2 }
- match: { _seq_no: 1 }

View File

@ -0,0 +1,30 @@
---
"Internal version":
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
- match: { _seq_no: 0 }
- do:
catch: conflict
delete:
index: test_1
type: test
id: 1
if_seq_no: 2
if_primary_term: 1
- do:
delete:
index: test_1
type: test
id: 1
if_seq_no: 0
if_primary_term: 1
- match: { _seq_no: 1 }

View File

@ -1,28 +0,0 @@
---
"Internal version":
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
- match: { _version: 1}
- do:
catch: conflict
delete:
index: test_1
type: test
id: 1
version: 2
- do:
delete:
index: test_1
type: test
id: 1
version: 1
- match: { _version: 2 }

View File

@ -1,36 +0,0 @@
---
"Internal version":
- skip:
version: " - 6.99.99"
reason: types are required in requests before 7.0.0
- do:
index:
index: test_1
id: 1
body: { foo: bar }
- match: { _version: 1}
- do:
index:
index: test_1
id: 1
body: { foo: bar }
- match: { _version: 2}
- do:
catch: conflict
index:
index: test_1
id: 1
body: { foo: bar }
version: 1
- do:
index:
index: test_1
id: 1
body: { foo: bar }
version: 2
- match: { _version: 3 }

View File

@ -1,36 +0,0 @@
---
"Internal version":
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
- match: { _version: 1}
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
- match: { _version: 2}
- do:
catch: conflict
index:
index: test_1
type: test
id: 1
body: { foo: bar }
version: 1
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
version: 2
- match: { _version: 3 }

View File

@ -1,31 +0,0 @@
---
"Internal version":
- skip:
version: " - 6.99.99"
reason: types are required in requests before 7.0.0
- do:
catch: missing
update:
index: test_1
id: 1
version: 1
body:
doc: { foo: baz }
- do:
index:
index: test_1
id: 1
body:
doc: { foo: baz }
- do:
catch: conflict
update:
index: test_1
id: 1
version: 2
body:
doc: { foo: baz }

View File

@ -1,30 +0,0 @@
---
"Internal version":
- do:
catch: missing
update:
index: test_1
type: test
id: 1
version: 1
body:
doc: { foo: baz }
- do:
index:
index: test_1
type: test
id: 1
body:
doc: { foo: baz }
- do:
catch: conflict
update:
index: test_1
type: test
id: 1
version: 2
body:
doc: { foo: baz }

View File

@ -1,29 +0,0 @@
---
"Not supported versions":
- skip:
version: " - 6.99.99"
reason: types are required in requests before 7.0.0
- do:
catch: /Validation|Invalid/
update:
index: test_1
id: 1
version: 2
version_type: external
body:
doc: { foo: baz }
upsert: { foo: bar }
- do:
catch: /Validation|Invalid/
update:
index: test_1
id: 1
version: 2
version_type: external_gte
body:
doc: { foo: baz }
upsert: { foo: bar }

View File

@ -1,27 +0,0 @@
---
"Not supported versions":
- do:
catch: /Validation|Invalid/
update:
index: test_1
type: test
id: 1
version: 2
version_type: external
body:
doc: { foo: baz }
upsert: { foo: bar }
- do:
catch: /Validation|Invalid/
update:
index: test_1
type: test
id: 1
version: 2
version_type: external_gte
body:
doc: { foo: baz }
upsert: { foo: bar }

View File

@ -257,16 +257,23 @@ public interface DocWriteRequest<T> extends IndicesRequest {
static ActionRequestValidationException validateSeqNoBasedCASParams(
DocWriteRequest request, ActionRequestValidationException validationException) {
if (request.versionType().validateVersionForWrites(request.version()) == false) {
validationException = addValidationError("illegal version value [" + request.version() + "] for version type ["
+ request.versionType().name() + "]", validationException);
final long version = request.version();
final VersionType versionType = request.versionType();
if (versionType.validateVersionForWrites(version) == false) {
validationException = addValidationError("illegal version value [" + version + "] for version type ["
+ versionType.name() + "]", validationException);
}
if (request.versionType() == VersionType.FORCE) {
if (versionType == VersionType.FORCE) {
validationException = addValidationError("version type [force] may no longer be used", validationException);
}
if (versionType == VersionType.INTERNAL && version != Versions.MATCH_ANY && version != Versions.MATCH_DELETED) {
validationException = addValidationError("internal versioning can not be used for optimistic concurrency control. " +
"Please use `if_seq_no` and `if_primary_term` instead", validationException);
}
if (request.ifSeqNo() != UNASSIGNED_SEQ_NO && (
request.versionType() != VersionType.INTERNAL || request.version() != Versions.MATCH_ANY
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}

View File

@ -46,9 +46,9 @@ import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.io.IOException;
@ -501,8 +501,11 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
.create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
throw new IllegalArgumentException("Update requests do not support versioning. " +
"Please use `if_seq_no` and `if_primary_term` instead");
}
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict)
.version(version).versionType(versionType)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.routing(routing);
// EMPTY is safe here because we never call namedObject
@ -516,15 +519,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
}
IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
upsertRequest.version(version);
upsertRequest.versionType(versionType);
upsertRequest.setPipeline(defaultPipeline);
}
IndexRequest doc = updateRequest.doc();
if (doc != null) {
doc.version(version);
doc.versionType(versionType);
}
internalAdd(updateRequest, payload);
}

View File

@ -224,7 +224,7 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
sourceBuilder().version(version);
return this;
}
/**
* Should each {@link org.elasticsearch.search.SearchHit} be returned with the
* sequence number and primary term of the last modification of the document.

View File

@ -70,7 +70,7 @@ public class UpdateHelper {
*/
public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) {
final GetResult getResult = indexShard.getService().getForUpdate(
request.type(), request.id(), request.version(), request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm());
request.type(), request.id(), request.ifSeqNo(), request.ifPrimaryTerm());
return prepare(indexShard.shardId(), request, getResult, nowInMillis);
}

View File

@ -108,8 +108,6 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private FetchSourceContext fetchSourceContext;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private int retryOnConflict = 0;
private long ifSeqNo = UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
@ -150,9 +148,6 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
if (version != Versions.MATCH_ANY && upsertRequest != null) {
validationException = addValidationError("can't provide both upsert request and a version", validationException);
}
if(upsertRequest != null && upsertRequest.version() != Versions.MATCH_ANY) {
validationException = addValidationError("can't provide version in upsert request", validationException);
}
@ -163,30 +158,20 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
validationException = addValidationError("id is missing", validationException);
}
if (versionType != VersionType.INTERNAL) {
validationException = addValidationError("version type [" + versionType + "] is not supported by the update API",
validationException);
} else {
if (version != Versions.MATCH_ANY && retryOnConflict > 0) {
validationException = addValidationError("can't provide both retry_on_conflict and a specific version",
validationException);
}
if (!versionType.validateVersionForWrites(version)) {
validationException = addValidationError("illegal version value [" + version + "] for version type [" +
versionType.name() + "]", validationException);
}
}
validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException);
if (ifSeqNo != UNASSIGNED_SEQ_NO && retryOnConflict > 0) {
validationException = addValidationError("compare and write operations can not be retried", validationException);
}
if (ifSeqNo != UNASSIGNED_SEQ_NO) {
if (retryOnConflict > 0) {
validationException = addValidationError("compare and write operations can not be retried", validationException);
}
if (ifSeqNo != UNASSIGNED_SEQ_NO && docAsUpsert) {
validationException = addValidationError("compare and write operations can not be used with upsert", validationException);
if (docAsUpsert) {
validationException = addValidationError("compare and write operations can not be used with upsert", validationException);
}
if (upsertRequest != null) {
validationException =
addValidationError("upsert requests don't support `if_seq_no` and `if_primary_term`", validationException);
}
}
if (script == null && doc == null) {
@ -530,24 +515,22 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
@Override
public UpdateRequest version(long version) {
this.version = version;
return this;
throw new UnsupportedOperationException("update requests do not support versioning");
}
@Override
public long version() {
return this.version;
return Versions.MATCH_ANY;
}
@Override
public UpdateRequest versionType(VersionType versionType) {
this.versionType = versionType;
return this;
throw new UnsupportedOperationException("update requests do not support versioning");
}
@Override
public VersionType versionType() {
return this.versionType;
return VersionType.INTERNAL;
}
/**
@ -877,8 +860,14 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
upsertRequest.readFrom(in);
}
docAsUpsert = in.readBoolean();
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().before(Version.V_7_0_0)) {
long version = in.readLong();
VersionType versionType = VersionType.readFromStream(in);
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
throw new UnsupportedOperationException(
"versioned update requests have been removed in 7.0. Use if_seq_no and if_primary_term");
}
}
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
detectNoop = in.readBoolean();
@ -930,8 +919,10 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
upsertRequest.writeTo(out);
}
out.writeBoolean(docAsUpsert);
out.writeLong(version);
out.writeByte(versionType.getValue());
if (out.getVersion().before(Version.V_7_0_0)) {
out.writeLong(Versions.MATCH_ANY);
out.writeByte(VersionType.INTERNAL.getValue());
}
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
out.writeBoolean(detectNoop);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
@ -102,9 +103,9 @@ public final class ShardGetService extends AbstractIndexShardComponent {
}
}
public GetResult getForUpdate(String type, String id, long version, VersionType versionType, long ifSeqNo, long ifPrimaryTerm) {
return get(type, id, new String[]{RoutingFieldMapper.NAME}, true, version, versionType, ifSeqNo, ifPrimaryTerm,
FetchSourceContext.FETCH_SOURCE, true);
public GetResult getForUpdate(String type, String id, long ifSeqNo, long ifPrimaryTerm) {
return get(type, id, new String[]{RoutingFieldMapper.NAME}, true,
Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE, true);
}
/**

View File

@ -83,8 +83,6 @@ public class RestUpdateAction extends BaseRestHandler {
}
updateRequest.retryOnConflict(request.paramAsInt("retry_on_conflict", updateRequest.retryOnConflict()));
updateRequest.version(RestActions.parseVersion(request));
updateRequest.versionType(VersionType.fromString(request.param("version_type"), updateRequest.versionType()));
updateRequest.setIfSeqNo(request.paramAsLong("if_seq_no", updateRequest.ifSeqNo()));
updateRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", updateRequest.ifPrimaryTerm()));

View File

@ -311,7 +311,7 @@ public class BulkRequestTests extends ESTestCase {
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}
public void testToValidateUpsertRequestAndVersionInBulkRequest() throws IOException {
public void testToValidateUpsertRequestAndCASInBulkRequest() throws IOException {
XContentType xContentType = XContentType.SMILE;
BytesReference data;
try (BytesStreamOutput out = new BytesStreamOutput()) {
@ -321,7 +321,8 @@ public class BulkRequestTests extends ESTestCase {
builder.field("_index", "index");
builder.field("_type", "type");
builder.field("_id", "id");
builder.field("version", 1L);
builder.field("if_seq_no", 1L);
builder.field("if_primary_term", 100L);
builder.endObject();
builder.endObject();
}
@ -330,7 +331,8 @@ public class BulkRequestTests extends ESTestCase {
builder.startObject();
builder.startObject("doc").endObject();
Map<String,Object> values = new HashMap<>();
values.put("version", 2L);
values.put("if_seq_no", 1L);
values.put("if_primary_term", 100L);
values.put("_index", "index");
values.put("_type", "type");
builder.field("upsert", values);
@ -341,8 +343,7 @@ public class BulkRequestTests extends ESTestCase {
}
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(data, null, null, xContentType);
assertThat(bulkRequest.validate().validationErrors(), contains("can't provide both upsert request and a version",
"can't provide version in upsert request"));
assertThat(bulkRequest.validate().validationErrors(), contains("upsert requests don't support `if_seq_no` and `if_primary_term`"));
//This test's JSON contains outdated references to types
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
@ -195,8 +196,8 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(4L));
}
public void testBulkVersioning() throws Exception {
createIndex("test");
public void testBulkWithCAS() throws Exception {
createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).build());
ensureGreen();
BulkResponse bulkResponse = client().prepareBulk()
.add(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field", "1"))
@ -204,20 +205,22 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
.add(client().prepareIndex("test", "type", "1").setSource("field", "2")).get();
assertEquals(DocWriteResponse.Result.CREATED, bulkResponse.getItems()[0].getResponse().getResult());
assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L));
assertThat(bulkResponse.getItems()[0].getResponse().getSeqNo(), equalTo(0L));
assertEquals(DocWriteResponse.Result.CREATED, bulkResponse.getItems()[1].getResponse().getResult());
assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(1L));
assertThat(bulkResponse.getItems()[1].getResponse().getSeqNo(), equalTo(1L));
assertEquals(DocWriteResponse.Result.UPDATED, bulkResponse.getItems()[2].getResponse().getResult());
assertThat(bulkResponse.getItems()[2].getResponse().getVersion(), equalTo(2L));
assertThat(bulkResponse.getItems()[2].getResponse().getSeqNo(), equalTo(2L));
bulkResponse = client().prepareBulk()
.add(client().prepareUpdate("test", "type", "1").setVersion(4L).setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2"))
.add(client().prepareUpdate("test", "type", "1").setIfSeqNo(40L).setIfPrimaryTerm(20)
.setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2"))
.add(client().prepareUpdate("test", "type", "2").setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2"))
.add(client().prepareUpdate("test", "type", "1").setVersion(2L).setDoc(Requests.INDEX_CONTENT_TYPE, "field", "3")).get();
.add(client().prepareUpdate("test", "type", "1").setIfSeqNo(2L).setIfPrimaryTerm(1)
.setDoc(Requests.INDEX_CONTENT_TYPE, "field", "3")).get();
assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("version conflict"));
assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(2L));
assertThat(bulkResponse.getItems()[2].getResponse().getVersion(), equalTo(3L));
assertThat(bulkResponse.getItems()[1].getResponse().getSeqNo(), equalTo(3L));
assertThat(bulkResponse.getItems()[2].getResponse().getSeqNo(), equalTo(4L));
bulkResponse = client().prepareBulk()
.add(client().prepareIndex("test", "type", "e1")
@ -237,9 +240,9 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
bulkResponse = client().prepareBulk()
.add(client().prepareUpdate("test", "type", "e1")
.setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2").setVersion(10)) // INTERNAL
.setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2").setIfSeqNo(10L).setIfPrimaryTerm(1))
.add(client().prepareUpdate("test", "type", "e1")
.setDoc(Requests.INDEX_CONTENT_TYPE, "field", "3").setVersion(13).setVersionType(VersionType.INTERNAL))
.setDoc(Requests.INDEX_CONTENT_TYPE, "field", "3").setIfSeqNo(20L).setIfPrimaryTerm(1))
.get();
assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("version conflict"));
@ -471,7 +474,7 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
return;
}
BulkRequestBuilder requestBuilder = client().prepareBulk();
requestBuilder.add(client().prepareUpdate("test", "type", "1").setVersion(1)
requestBuilder.add(client().prepareUpdate("test", "type", "1").setIfSeqNo(0L).setIfPrimaryTerm(1)
.setDoc(Requests.INDEX_CONTENT_TYPE, "field", threadID));
responses[threadID] = requestBuilder.get();

View File

@ -500,12 +500,14 @@ public class UpdateRequestTests extends ESTestCase {
assertToXContentEquivalent(originalBytes, finalBytes, xContentType);
}
public void testToValidateUpsertRequestAndVersion() {
public void testToValidateUpsertRequestAndCAS() {
UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
updateRequest.version(1L);
updateRequest.setIfSeqNo(1L);
updateRequest.setIfPrimaryTerm(1L);
updateRequest.doc("{}", XContentType.JSON);
updateRequest.upsert(new IndexRequest("index","type", "id"));
assertThat(updateRequest.validate().validationErrors(), contains("can't provide both upsert request and a version"));
assertThat(updateRequest.validate().validationErrors(),
contains("upsert requests don't support `if_seq_no` and `if_primary_term`"));
}
public void testToValidateUpsertRequestWithVersion() {

View File

@ -22,7 +22,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
@ -31,7 +30,6 @@ import org.elasticsearch.index.mapper.RoutingFieldMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static org.elasticsearch.common.lucene.uid.Versions.MATCH_ANY;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
@ -51,8 +49,7 @@ public class ShardGetServiceTests extends IndexShardTestCase {
recoverShardFromStore(primary);
Engine.IndexResult test = indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
assertTrue(primary.getEngine().refreshNeeded());
GetResult testGet = primary.getService().getForUpdate(
"test", "0", test.getVersion(), VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
GetResult testGet = primary.getService().getForUpdate("test", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals(new String(testGet.source(), StandardCharsets.UTF_8), "{\"foo\" : \"bar\"}");
try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
@ -61,8 +58,7 @@ public class ShardGetServiceTests extends IndexShardTestCase {
Engine.IndexResult test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar");
assertTrue(primary.getEngine().refreshNeeded());
GetResult testGet1 = primary.getService().getForUpdate(
"test", "1", test1.getVersion(), VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
GetResult testGet1 = primary.getService().getForUpdate("test", "1", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}");
assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue());
@ -77,20 +73,19 @@ public class ShardGetServiceTests extends IndexShardTestCase {
// now again from the reader
Engine.IndexResult test2 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar");
assertTrue(primary.getEngine().refreshNeeded());
testGet1 = primary.getService().getForUpdate("test", "1", test2.getVersion(), VersionType.INTERNAL,
UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
testGet1 = primary.getService().getForUpdate("test", "1", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}");
assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue());
final long primaryTerm = primary.getOperationPrimaryTerm();
testGet1 = primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm);
testGet1 = primary.getService().getForUpdate("test", "1", test2.getSeqNo(), primaryTerm);
assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}");
expectThrows(VersionConflictEngineException.class, () ->
primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo() + 1, primaryTerm));
primary.getService().getForUpdate("test", "1", test2.getSeqNo() + 1, primaryTerm));
expectThrows(VersionConflictEngineException.class, () ->
primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm + 1));
primary.getService().getForUpdate("test", "1", test2.getSeqNo(), primaryTerm + 1));
closeShards(primary);
}
@ -108,16 +103,13 @@ public class ShardGetServiceTests extends IndexShardTestCase {
Engine.IndexResult indexResult = indexDoc(shard, "some_type", "0", "{\"foo\" : \"bar\"}");
assertTrue(indexResult.isCreated());
GetResult getResult = shard.getService().getForUpdate(
"some_type", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
GetResult getResult = shard.getService().getForUpdate("some_type", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertTrue(getResult.isExists());
getResult = shard.getService().getForUpdate(
"some_other_type", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
getResult = shard.getService().getForUpdate("some_other_type", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertFalse(getResult.isExists());
getResult = shard.getService().getForUpdate(
"_doc", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
getResult = shard.getService().getForUpdate("_doc", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertTrue(getResult.isExists());
closeShards(shard);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.settings;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Setting;
@ -436,17 +437,20 @@ public class UpdateSettingsIT extends ESIntegTestCase {
public void testEngineGCDeletesSetting() throws InterruptedException {
createIndex("test");
client().prepareIndex("test", "type", "1").setSource("f", 1).get(); // set version to 1
client().prepareDelete("test", "type", "1").get(); // sets version to 2
// delete is still in cache this should work & set version to 3
client().prepareIndex("test", "type", "1").setSource("f", 2).setVersion(2).get();
client().prepareIndex("test", "type", "1").setSource("f", 1).get();
DeleteResponse response = client().prepareDelete("test", "type", "1").get();
long seqNo = response.getSeqNo();
long primaryTerm = response.getPrimaryTerm();
// delete is still in cache this should work
client().prepareIndex("test", "type", "1").setSource("f", 2).setIfSeqNo(seqNo).setIfPrimaryTerm(primaryTerm).get();
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.gc_deletes", 0)).get();
client().prepareDelete("test", "type", "1").get(); // sets version to 4
response = client().prepareDelete("test", "type", "1").get();
seqNo = response.getSeqNo();
Thread.sleep(300); // wait for cache time to change TODO: this needs to be solved better. To be discussed.
// delete is should not be in cache
assertThrows(client().prepareIndex("test", "type", "1").setSource("f", 3)
.setVersion(4), VersionConflictEngineException.class);
assertThrows(client().prepareIndex("test", "type", "1").setSource("f", 3).setIfSeqNo(seqNo).setIfPrimaryTerm(primaryTerm),
VersionConflictEngineException.class);
}

View File

@ -213,11 +213,11 @@ public class SimpleVersioningIT extends ESIntegTestCase {
}
}
public void testInternalVersioningInitialDelete() throws Exception {
public void testCompareAndSetInitialDelete() throws Exception {
createIndex("test");
ensureGreen();
assertThrows(client().prepareDelete("test", "type", "1").setVersion(17).execute(),
assertThrows(client().prepareDelete("test", "type", "1").setIfSeqNo(17).setIfPrimaryTerm(10).execute(),
VersionConflictEngineException.class);
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1")
@ -225,63 +225,6 @@ public class SimpleVersioningIT extends ESIntegTestCase {
assertThat(indexResponse.getVersion(), equalTo(1L));
}
public void testInternalVersioning() throws Exception {
createIndex("test");
ensureGreen();
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet();
assertThat(indexResponse.getVersion(), equalTo(1L));
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet();
assertThat(indexResponse.getVersion(), equalTo(2L));
assertThrows(
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(),
VersionConflictEngineException.class);
assertThrows(
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(),
VersionConflictEngineException.class);
assertThrows(
client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").execute(),
VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
client().admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {
assertThat(client().prepareGet("test", "type", "1").execute().actionGet().getVersion(), equalTo(2L));
}
// search with versioning
for (int i = 0; i < 10; i++) {
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setVersion(true).execute().actionGet();
assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(2L));
}
// search without versioning
for (int i = 0; i < 10; i++) {
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).execute().actionGet();
assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(Versions.NOT_FOUND));
}
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(2).execute().actionGet();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
assertThat(deleteResponse.getVersion(), equalTo(3L));
assertThrows(client().prepareDelete("test", "type", "1").setVersion(2).execute(), VersionConflictEngineException.class);
// This is intricate - the object was deleted but a delete transaction was with the right version. We add another one
// and thus the transaction is increased.
deleteResponse = client().prepareDelete("test", "type", "1").setVersion(3).execute().actionGet();
assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteResponse.getResult());
assertThat(deleteResponse.getVersion(), equalTo(4L));
}
public void testCompareAndSet() {
createIndex("test");
ensureGreen();
@ -290,7 +233,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
assertThat(indexResponse.getSeqNo(), equalTo(0L));
assertThat(indexResponse.getPrimaryTerm(), equalTo(1L));
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet();
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setIfSeqNo(0L).setIfPrimaryTerm(1).get();
assertThat(indexResponse.getSeqNo(), equalTo(1L));
assertThat(indexResponse.getPrimaryTerm(), equalTo(1L));
@ -353,25 +296,21 @@ public class SimpleVersioningIT extends ESIntegTestCase {
createIndex("test");
ensureGreen();
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet();
assertThat(indexResponse.getVersion(), equalTo(1L));
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").get();
assertThat(indexResponse.getSeqNo(), equalTo(0L));
client().admin().indices().prepareFlush().execute().actionGet();
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet();
assertThat(indexResponse.getVersion(), equalTo(2L));
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setIfSeqNo(0).setIfPrimaryTerm(1).get();
assertThat(indexResponse.getSeqNo(), equalTo(1L));
client().admin().indices().prepareFlush().execute().actionGet();
assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(),
assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfSeqNo(0).setIfPrimaryTerm(1),
VersionConflictEngineException.class);
assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(),
assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1"),
VersionConflictEngineException.class);
assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").execute(),
VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setIfSeqNo(0).setIfPrimaryTerm(1), VersionConflictEngineException.class);
for (int i = 0; i < 10; i++) {
assertThat(client().prepareGet("test", "type", "1").execute().actionGet().getVersion(), equalTo(2L));
@ -380,10 +319,11 @@ public class SimpleVersioningIT extends ESIntegTestCase {
client().admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setVersion(true).
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setVersion(true).seqNoAndPrimaryTerm(true).
execute().actionGet();
assertHitCount(searchResponse, 1);
assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(2L));
assertThat(searchResponse.getHits().getAt(0).getSeqNo(), equalTo(1L));
}
}

View File

@ -495,7 +495,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private void clearJobFinishedTime(String jobId, ActionListener<AcknowledgedResponse> listener) {
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
jobConfigProvider.updateJob(jobId, update, null, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap(
jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
job -> listener.onResponse(new AcknowledgedResponse(true)),
e -> {
logger.error("[" + jobId + "] Failed to clear finished_time", e);

View File

@ -87,7 +87,7 @@ public class TransportUpdateDatafeedAction extends TransportMasterNodeAction<Upd
CheckedConsumer<Boolean, Exception> updateConsumer = ok -> {
datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers,
jobConfigProvider::validateDatafeedJob, clusterService.state().nodes().getMinNodeVersion(),
jobConfigProvider::validateDatafeedJob,
ActionListener.wrap(
updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
listener::onFailure

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
@ -54,7 +53,6 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi
private final Client client;
private final JobManager jobManager;
private final ClusterService clusterService;
@Inject
public TransportUpdateFilterAction(TransportService transportService, ActionFilters actionFilters, Client client,
@ -63,19 +61,18 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi
(Supplier<UpdateFilterAction.Request>) UpdateFilterAction.Request::new);
this.client = client;
this.jobManager = jobManager;
this.clusterService = clusterService;
}
@Override
protected void doExecute(Task task, UpdateFilterAction.Request request, ActionListener<PutFilterAction.Response> listener) {
ActionListener<FilterWithVersion> filterListener = ActionListener.wrap(filterWithVersion -> {
ActionListener<FilterWithSeqNo> filterListener = ActionListener.wrap(filterWithVersion -> {
updateFilter(filterWithVersion, request, listener);
}, listener::onFailure);
getFilterWithVersion(request.getFilterId(), filterListener);
}
private void updateFilter(FilterWithVersion filterWithVersion, UpdateFilterAction.Request request,
private void updateFilter(FilterWithSeqNo filterWithVersion, UpdateFilterAction.Request request,
ActionListener<PutFilterAction.Response> listener) {
MlFilter filter = filterWithVersion.filter;
@ -100,19 +97,15 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi
MlFilter updatedFilter = MlFilter.builder(filter.getId()).setDescription(description).setItems(items).build();
indexUpdatedFilter(
updatedFilter, filterWithVersion.version, filterWithVersion.seqNo, filterWithVersion.primaryTerm, request, listener);
updatedFilter, filterWithVersion.seqNo, filterWithVersion.primaryTerm, request, listener);
}
private void indexUpdatedFilter(MlFilter filter, final long version, final long seqNo, final long primaryTerm,
private void indexUpdatedFilter(MlFilter filter, final long seqNo, final long primaryTerm,
UpdateFilterAction.Request request,
ActionListener<PutFilterAction.Response> listener) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId());
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) {
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
} else {
indexRequest.version(version);
}
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
@ -145,7 +138,7 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi
});
}
private void getFilterWithVersion(String filterId, ActionListener<FilterWithVersion> listener) {
private void getFilterWithVersion(String filterId, ActionListener<FilterWithSeqNo> listener) {
GetRequest getRequest = new GetRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, MlFilter.documentId(filterId));
executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener<GetResponse>() {
@Override
@ -157,7 +150,7 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
MlFilter filter = MlFilter.LENIENT_PARSER.apply(parser, null).build();
listener.onResponse(new FilterWithVersion(filter, getDocResponse));
listener.onResponse(new FilterWithSeqNo(filter, getDocResponse));
}
} else {
this.onFailure(new ResourceNotFoundException(Messages.getMessage(Messages.FILTER_NOT_FOUND, filterId)));
@ -174,16 +167,14 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi
});
}
private static class FilterWithVersion {
private static class FilterWithSeqNo {
private final MlFilter filter;
private final long version;
private final long seqNo;
private final long primaryTerm;
private FilterWithVersion(MlFilter filter, GetResponse getDocResponse) {
private FilterWithSeqNo(MlFilter filter, GetResponse getDocResponse) {
this.filter = filter;
this.version = getDocResponse.getVersion();
this.seqNo = getDocResponse.getSeqNo();
this.primaryTerm = getDocResponse.getPrimaryTerm();

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.datafeed.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
@ -264,13 +263,11 @@ public class DatafeedConfigProvider {
* @param headers Datafeed headers applied with the update
* @param validator BiConsumer that accepts the updated config and can perform
* extra validations. {@code validator} must call the passed listener
* @param minClusterNodeVersion minimum version of nodes in cluster
* @param updatedConfigListener Updated datafeed config listener
*/
public void updateDatefeedConfig(String datafeedId, DatafeedUpdate update, Map<String, String> headers,
BiConsumer<DatafeedConfig, ActionListener<Boolean>> validator,
Version minClusterNodeVersion,
ActionListener<DatafeedConfig> updatedConfigListener) {
BiConsumer<DatafeedConfig, ActionListener<Boolean>> validator,
ActionListener<DatafeedConfig> updatedConfigListener) {
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId));
@ -304,7 +301,7 @@ public class DatafeedConfigProvider {
ActionListener<Boolean> validatedListener = ActionListener.wrap(
ok -> {
indexUpdatedConfig(updatedConfig, version, seqNo, primaryTerm, minClusterNodeVersion, ActionListener.wrap(
indexUpdatedConfig(updatedConfig, seqNo, primaryTerm, ActionListener.wrap(
indexResponse -> {
assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED;
updatedConfigListener.onResponse(updatedConfig);
@ -324,8 +321,8 @@ public class DatafeedConfigProvider {
});
}
private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, long seqNo, long primaryTerm,
Version minClusterNodeVersion, ActionListener<IndexResponse> listener) {
private void indexUpdatedConfig(DatafeedConfig updatedConfig, long seqNo, long primaryTerm,
ActionListener<IndexResponse> listener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder updatedSource = updatedConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
@ -333,12 +330,8 @@ public class DatafeedConfigProvider {
.setSource(updatedSource)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) {
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
} else {
indexRequest.setVersion(version);
}
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), listener);

View File

@ -333,7 +333,7 @@ public class JobManager {
Runnable doUpdate = () -> {
jobConfigProvider.updateJobWithValidation(request.getJobId(), request.getJobUpdate(), maxModelMemoryLimit,
this::validate, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap(
this::validate, ActionListener.wrap(
updatedJob -> postJobUpdate(request, updatedJob, actionListener),
actionListener::onFailure
));
@ -603,7 +603,7 @@ public class JobManager {
.setModelSnapshotId(modelSnapshot.getSnapshotId())
.build();
jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, clusterService.state().nodes().getMinNodeVersion(),
jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit,
ActionListener.wrap(job -> {
auditor.info(request.getJobId(),
Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription()));

View File

@ -10,7 +10,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
@ -227,11 +226,9 @@ public class JobConfigProvider {
* @param maxModelMemoryLimit The maximum model memory allowed. This can be {@code null}
* if the job's {@link org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits}
* are not changed.
* @param minClusterNodeVersion the minimum version of nodes in the cluster
* @param updatedJobListener Updated job listener
*/
public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit,
Version minClusterNodeVersion,
ActionListener<Job> updatedJobListener) {
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));
@ -266,7 +263,7 @@ public class JobConfigProvider {
return;
}
indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener);
indexUpdatedJob(updatedJob, seqNo, primaryTerm, updatedJobListener);
}
@Override
@ -287,18 +284,17 @@ public class JobConfigProvider {
}
/**
* Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, Version, ActionListener)} but
* Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, ActionListener)} but
* with an extra validation step which is called before the updated is applied.
*
* @param jobId The Id of the job to update
* @param update The job update
* @param maxModelMemoryLimit The maximum model memory allowed
* @param validator The job update validator
* @param minClusterNodeVersion the minimum version of a node ifn the cluster
* @param updatedJobListener Updated job listener
*/
public void updateJobWithValidation(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit,
UpdateValidator validator, Version minClusterNodeVersion, ActionListener<Job> updatedJobListener) {
UpdateValidator validator, ActionListener<Job> updatedJobListener) {
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));
@ -334,7 +330,7 @@ public class JobConfigProvider {
return;
}
indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener);
indexUpdatedJob(updatedJob, seqNo, primaryTerm, updatedJobListener);
},
updatedJobListener::onFailure
));
@ -347,7 +343,7 @@ public class JobConfigProvider {
});
}
private void indexUpdatedJob(Job updatedJob, long version, long seqNo, long primaryTerm, Version minClusterNodeVersion,
private void indexUpdatedJob(Job updatedJob, long seqNo, long primaryTerm,
ActionListener<Job> updatedJobListener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder updatedSource = updatedJob.toXContent(builder, ToXContent.EMPTY_PARAMS);
@ -355,12 +351,8 @@ public class JobConfigProvider {
ElasticsearchMappings.DOC_TYPE, Job.documentId(updatedJob.getId()))
.setSource(updatedSource)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) {
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
} else {
indexRequest.setVersion(version);
}
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), ActionListener.wrap(
indexResponse -> {

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
@ -87,7 +86,7 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
AtomicReference<DatafeedConfig> configHolder = new AtomicReference<>();
blockingCall(actionListener ->
datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), updateHeaders,
(updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), Version.CURRENT, actionListener),
(updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener),
configHolder, exceptionHolder);
assertNull(exceptionHolder.get());
assertThat(configHolder.get().getIndices(), equalTo(updateIndices));
@ -168,7 +167,7 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
AtomicReference<DatafeedConfig> configHolder = new AtomicReference<>();
blockingCall(actionListener ->
datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(),
(updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), Version.CURRENT, actionListener),
(updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener),
configHolder, exceptionHolder);
assertNull(configHolder.get());
assertNotNull(exceptionHolder.get());
@ -194,7 +193,7 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(actionListener ->
datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(),
validateErrorFunction, Version.CURRENT, actionListener),
validateErrorFunction, actionListener),
configHolder, exceptionHolder);
assertNull(configHolder.get());

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
@ -149,7 +148,7 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
AtomicReference<Job> updateJobResponseHolder = new AtomicReference<>();
blockingCall(actionListener -> jobConfigProvider.updateJob
(jobId, jobUpdate, new ByteSizeValue(32), Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder);
(jobId, jobUpdate, new ByteSizeValue(32), actionListener), updateJobResponseHolder, exceptionHolder);
assertNull(exceptionHolder.get());
assertEquals("This job has been updated", updateJobResponseHolder.get().getDescription());
@ -206,7 +205,7 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
.build();
AtomicReference<Job> updateJobResponseHolder = new AtomicReference<>();
blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, invalidUpdate, new ByteSizeValue(32), Version.CURRENT,
blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, invalidUpdate, new ByteSizeValue(32),
actionListener), updateJobResponseHolder, exceptionHolder);
assertNull(updateJobResponseHolder.get());
assertNotNull(exceptionHolder.get());
@ -231,7 +230,7 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
AtomicReference<Job> updateJobResponseHolder = new AtomicReference<>();
// update with the no-op validator
blockingCall(actionListener -> jobConfigProvider.updateJobWithValidation(
jobId, jobUpdate, new ByteSizeValue(32), validator, Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder);
jobId, jobUpdate, new ByteSizeValue(32), validator, actionListener), updateJobResponseHolder, exceptionHolder);
assertNull(exceptionHolder.get());
assertNotNull(updateJobResponseHolder.get());
@ -244,7 +243,7 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
updateJobResponseHolder.set(null);
// Update with a validator that errors
blockingCall(actionListener -> jobConfigProvider.updateJobWithValidation(jobId, jobUpdate, new ByteSizeValue(32),
validatorWithAnError, Version.CURRENT, actionListener),
validatorWithAnError, actionListener),
updateJobResponseHolder, exceptionHolder);
assertNull(updateJobResponseHolder.get());

View File

@ -750,12 +750,8 @@ public final class TokenService {
client.prepareUpdate(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId)
.setDoc("refresh_token", Collections.singletonMap("refreshed", true))
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) {
updateRequest.setIfSeqNo(response.getSeqNo());
updateRequest.setIfPrimaryTerm(response.getPrimaryTerm());
} else {
updateRequest.setVersion(response.getVersion());
}
updateRequest.setIfSeqNo(response.getSeqNo());
updateRequest.setIfPrimaryTerm(response.getPrimaryTerm());
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest.request(),
ActionListener.<UpdateResponse>wrap(
updateResponse -> createUserToken(authentication, userAuth, listener, metadata, true),

View File

@ -115,112 +115,6 @@ setup:
}
---
"Test putting a watch with a redacted password with old version returns an error":
# version 1
- do:
xpack.watcher.put_watch:
id: "watch_old_version"
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"http" : {
"request" : {
"host" : "host.domain",
"port" : 9200,
"path" : "/myservice",
"auth" : {
"basic" : {
"username" : "user",
"password" : "pass"
}
}
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Log me Amadeus!"
}
}
}
}
# version 2
- do:
xpack.watcher.put_watch:
id: "watch_old_version"
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"http" : {
"request" : {
"host" : "host.domain",
"port" : 9200,
"path" : "/myservice",
"auth" : {
"basic" : {
"username" : "user",
"password" : "pass"
}
}
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Log me Amadeus!"
}
}
}
}
# using optimistic concurrency control, this one will loose
# as if two users in the watch UI tried to update the same watch
- do:
catch: conflict
xpack.watcher.put_watch:
id: "watch_old_version"
version: 1
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"http" : {
"request" : {
"host" : "host.domain",
"port" : 9200,
"path" : "/myservice",
"auth" : {
"basic" : {
"username" : "user",
"password" : "::es_redacted::"
}
}
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Log me Amadeus!"
}
}
}
}
---
"Test putting a watch with a redacted password with old seq no returns an error":
- skip:
@ -390,98 +284,3 @@ setup:
- match: { hits.hits.0._source.input.http.request.auth.basic.username: "new_user" }
- match: { hits.hits.0._source.input.http.request.auth.basic.password: "pass" }
---
"Test putting a watch with a redacted password with current version works":
- do:
xpack.watcher.put_watch:
id: "my_watch_with_version"
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"http" : {
"request" : {
"host" : "host.domain",
"port" : 9200,
"path" : "/myservice",
"auth" : {
"basic" : {
"username" : "user",
"password" : "pass"
}
}
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Log me Amadeus!"
}
}
}
}
- match: { _id: "my_watch_with_version" }
- match: { _version: 1 }
# this resembles the exact update from the UI and thus should work, no password change, any change in the watch
# but correct version provided
- do:
xpack.watcher.put_watch:
id: "my_watch_with_version"
version: 1
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"http" : {
"request" : {
"host" : "host.domain",
"port" : 9200,
"path" : "/myservice",
"auth" : {
"basic" : {
"username" : "user",
"password" : "::es_redacted::"
}
}
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Log me Amadeus!"
}
}
}
}
- match: { _id: "my_watch_with_version" }
- match: { _version: 2 }
- do:
search:
rest_total_hits_as_int: true
index: .watches
body: >
{
"query": {
"term": {
"_id": {
"value": "my_watch_with_version"
}
}
}
}
- match: { hits.total: 1 }
- match: { hits.hits.0._id: "my_watch_with_version" }
- match: { hits.hits.0._source.input.http.request.auth.basic.password: "pass" }