Move update and delete by query to use seq# for optimistic concurrency control (#37857)

The delete and update by query APIs both offer protection against overriding concurrent user changes to the documents they touch. They currently are using internal versioning. This PR changes that to rely on sequences numbers and primary terms.

Relates #37639 
Relates #36148 
Relates #10708
This commit is contained in:
Boaz Leskes 2019-01-29 10:23:05 -05:00 committed by GitHub
parent 6d1693ff49
commit 218df3009a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 230 additions and 75 deletions

View File

@ -372,14 +372,6 @@ the current document version of 1. If the document was already updated
and its version was set to 2 or higher, the indexing command will fail
and result in a conflict (409 http status code).
WARNING: External versioning supports the value 0 as a valid version number.
This allows the version to be in sync with an external versioning system
where version numbers start from zero instead of one. It has the side effect
that documents with version number equal to zero can neither be updated
using the <<docs-update-by-query,Update-By-Query API>> nor be deleted
using the <<docs-delete-by-query,Delete By Query API>> as long as their
version number is equal to zero.
A nice side effect is that there is no need to maintain strict ordering
of async indexing operations executed as a result of changes to a source
database, as long as version numbers from the source database are used.

View File

@ -35,7 +35,6 @@ import org.elasticsearch.action.bulk.Retry;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -50,6 +49,7 @@ import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.UpdateScript;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.threadpool.ThreadPool;
@ -88,7 +88,6 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
protected final WorkerBulkByScrollTaskState worker;
protected final ThreadPool threadPool;
protected final ScriptService scriptService;
protected final ClusterState clusterState;
/**
* The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
@ -111,9 +110,10 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
*/
private final BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> scriptApplier;
public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState,
ActionListener<BulkByScrollResponse> listener) {
public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Request mainRequest, ScriptService scriptService,
ActionListener<BulkByScrollResponse> listener) {
this.task = task;
if (!task.isWorker()) {
@ -125,7 +125,6 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
this.client = client;
this.threadPool = threadPool;
this.scriptService = scriptService;
this.clusterState = clusterState;
this.mainRequest = mainRequest;
this.listener = listener;
BackoffPolicy backoffPolicy = buildBackoffPolicy();
@ -137,11 +136,13 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
* them and if we add _doc as the first sort by default then sorts will never work.... So we add it here, only if there isn't
* another sort.
*/
List<SortBuilder<?>> sorts = mainRequest.getSearchRequest().source().sorts();
final SearchSourceBuilder sourceBuilder = mainRequest.getSearchRequest().source();
List<SortBuilder<?>> sorts = sourceBuilder.sorts();
if (sorts == null || sorts.isEmpty()) {
mainRequest.getSearchRequest().source().sort(fieldSort("_doc"));
sourceBuilder.sort(fieldSort("_doc"));
}
mainRequest.getSearchRequest().source().version(needsSourceDocumentVersions());
sourceBuilder.version(needsSourceDocumentVersions);
sourceBuilder.seqNoAndPrimaryTerm(needsSourceDocumentSeqNoAndPrimaryTerm);
}
/**
@ -153,12 +154,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
// The default script applier executes a no-op
return (request, searchHit) -> request;
}
/**
* Does this operation need the versions of the source documents?
*/
protected abstract boolean needsSourceDocumentVersions();
/**
* Build the {@link RequestWrapper} for a single search hit. This shouldn't handle
* metadata or scripting. That will be handled by copyMetadata and

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.reindex;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.ParentTaskAssigningClient;
@ -31,20 +32,20 @@ import org.elasticsearch.threadpool.ThreadPool;
* Implementation of delete-by-query using scrolling and bulk.
*/
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest> {
private final boolean useSeqNoForCAS;
public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService,
ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
super(task,
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
// all nodes support sequence number powered optimistic concurrency control and we can use it
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
logger, client, threadPool, request, scriptService, listener);
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
}
@Override
protected boolean needsSourceDocumentVersions() {
/*
* We always need the version of the source document so we can report a version conflict if we try to delete it and it has been
* changed.
*/
return true;
}
@Override
protected boolean accept(ScrollableHitSource.Hit doc) {
@ -59,7 +60,12 @@ public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<De
delete.index(doc.getIndex());
delete.type(doc.getType());
delete.id(doc.getId());
delete.version(doc.getVersion());
if (useSeqNoForCAS) {
delete.setIfSeqNo(doc.getSeqNo());
delete.setIfPrimaryTerm(doc.getPrimaryTerm());
} else {
delete.version(doc.getVersion());
}
return wrap(delete);
}

View File

@ -259,16 +259,13 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, ReindexRequest request, ScriptService scriptService, ClusterState clusterState,
ActionListener<BulkByScrollResponse> listener) {
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
}
@Override
protected boolean needsSourceDocumentVersions() {
/*
* We only need the source version if we're going to use it when write and we only do that when the destination request uses
* external versioning.
*/
return mainRequest.getDestination().versionType() != VersionType.INTERNAL;
super(task,
/*
* We only need the source version if we're going to use it when write and we only do that when the destination request uses
* external versioning.
*/
request.getDestination().versionType() != VersionType.INTERNAL,
false, logger, client, threadPool, request, scriptService, listener);
}
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.reindex;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
@ -81,19 +82,19 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
* Simple implementation of update-by-query using scrolling and bulk.
*/
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<UpdateByQueryRequest> {
private final boolean useSeqNoForCAS;
AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState,
ActionListener<BulkByScrollResponse> listener) {
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
}
@Override
protected boolean needsSourceDocumentVersions() {
/*
* We always need the version of the source document so we can report a version conflict if we try to delete it and it has
* been changed.
*/
return true;
super(task,
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
// all nodes support sequence number powered optimistic concurrency control and we can use it
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
logger, client, threadPool, request, scriptService, listener);
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
}
@Override
@ -112,8 +113,13 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
index.type(doc.getType());
index.id(doc.getId());
index.source(doc.getSource(), doc.getXContentType());
index.versionType(VersionType.INTERNAL);
index.version(doc.getVersion());
if (useSeqNoForCAS) {
index.setIfSeqNo(doc.getSeqNo());
index.setIfPrimaryTerm(doc.getPrimaryTerm());
} else {
index.versionType(VersionType.INTERNAL);
index.version(doc.getVersion());
}
index.setPipeline(mainRequest.getPipeline());
return wrap(index);
}

View File

@ -21,13 +21,9 @@ package org.elasticsearch.index.reindex.remote;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.reindex.ScrollableHitSource.BasicHit;
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
import org.elasticsearch.index.reindex.ScrollableHitSource.Response;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
@ -37,6 +33,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentLocation;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.ScrollableHitSource.BasicHit;
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
import org.elasticsearch.index.reindex.ScrollableHitSource.Response;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.search.SearchHits;
import java.io.IOException;
@ -90,6 +90,8 @@ final class RemoteResponseParsers {
ParseField routingField = new ParseField("_routing");
ParseField ttlField = new ParseField("_ttl");
ParseField parentField = new ParseField("_parent");
ParseField seqNoField = new ParseField("_seq_no");
ParseField primaryTermField = new ParseField("_primary_term");
HIT_PARSER.declareString(BasicHit::setRouting, routingField);
// Pre-2.0.0 routing come back in "fields"
class Fields {

View File

@ -677,13 +677,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
private class DummyAsyncBulkByScrollAction extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest> {
DummyAsyncBulkByScrollAction() {
super(testTask, AsyncBulkByScrollActionTests.this.logger, new ParentTaskAssigningClient(client, localNode, testTask),
client.threadPool(), testRequest, null, null, listener);
}
@Override
protected boolean needsSourceDocumentVersions() {
return randomBoolean();
super(testTask, randomBoolean(), randomBoolean(), AsyncBulkByScrollActionTests.this.logger,
new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), testRequest, null, listener);
}
@Override

View File

@ -19,12 +19,14 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
public class UpdateByQueryMetadataTests
extends AbstractAsyncBulkByScrollActionMetadataTestCase<UpdateByQueryRequest, BulkByScrollResponse> {
public void testRoutingIsCopied() throws Exception {
extends AbstractAsyncBulkByScrollActionMetadataTestCase<UpdateByQueryRequest, BulkByScrollResponse> {
public void testRoutingIsCopied() {
IndexRequest index = new IndexRequest();
action().copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
assertEquals("foo", index.routing());
@ -43,12 +45,12 @@ public class UpdateByQueryMetadataTests
private class TestAction extends TransportUpdateByQueryAction.AsyncIndexBySearchAction {
TestAction() {
super(UpdateByQueryMetadataTests.this.task, UpdateByQueryMetadataTests.this.logger, null,
UpdateByQueryMetadataTests.this.threadPool, request(), null, null, listener());
UpdateByQueryMetadataTests.this.threadPool, request(), null, ClusterState.EMPTY_STATE, listener());
}
@Override
public AbstractAsyncBulkByScrollAction.RequestWrapper<?> copyMetadata(AbstractAsyncBulkByScrollAction.RequestWrapper<?> request,
Hit doc) {
Hit doc) {
return super.copyMetadata(request, doc);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.script.ScriptService;
import java.util.Date;
@ -54,6 +55,6 @@ public class UpdateByQueryWithScriptTests
@Override
protected TransportUpdateByQueryAction.AsyncIndexBySearchAction action(ScriptService scriptService, UpdateByQueryRequest request) {
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, scriptService,
null, listener());
ClusterState.EMPTY_STATE, listener());
}
}

View File

@ -89,7 +89,11 @@
- is_false: response.task
---
"Response for version conflict":
"Response for version conflict (version powered)":
- skip:
version: "6.7.0 - "
reason: reindex moved to rely on sequence numbers for concurrency control
- do:
indices.create:
index: test
@ -143,6 +147,64 @@
- match: {count: 1}
---
"Response for version conflict (seq no powered)":
- skip:
version: " - 6.6.99"
reason: reindex moved to rely on sequence numbers for concurrency control
- do:
indices.create:
index: test
body:
settings:
index.refresh_interval: -1
- do:
index:
index: test
type: _doc
id: 1
body: { "text": "test" }
- do:
indices.refresh: {}
# Creates a new version for reindex to miss on scan.
- do:
index:
index: test
type: _doc
id: 1
body: { "text": "test2" }
- do:
catch: conflict
delete_by_query:
index: test
body:
query:
match_all: {}
- match: {deleted: 0}
- match: {version_conflicts: 1}
- match: {batches: 1}
- match: {failures.0.index: test}
- match: {failures.0.type: _doc}
- match: {failures.0.id: "1"}
- match: {failures.0.status: 409}
- match: {failures.0.cause.type: version_conflict_engine_exception}
- match: {failures.0.cause.reason: "/\\[_doc\\]\\[1\\]:.version.conflict,.required.seqNo.\\[\\d+\\]/"}
- match: {failures.0.cause.shard: /\d+/}
- match: {failures.0.cause.index: test}
- gte: { took: 0 }
- do:
indices.refresh: {}
- do:
count:
index: test
- match: {count: 1}
---
"Response for version conflict with conflicts=proceed":
- do:

View File

@ -1,5 +1,9 @@
---
"delete_by_query fails to delete documents with version number equal to zero":
- skip:
version: "6.7.0 - "
reason: reindex moved to rely on sequence numbers for concurrency control
- do:
index:
index: index1

View File

@ -74,7 +74,10 @@
- is_false: response.deleted
---
"Response for version conflict":
"Response for version conflict (version powered)":
- skip:
version: "6.7.0 - "
reason: reindex moved to rely on sequence numbers for concurrency control
- do:
indices.create:
index: test
@ -115,6 +118,50 @@
- match: {failures.0.cause.index: test}
- gte: { took: 0 }
---
"Response for version conflict (seq no powered)":
- skip:
version: " - 6.6.99"
reason: reindex moved to rely on sequence numbers for concurrency control
- do:
indices.create:
index: test
body:
settings:
index.refresh_interval: -1
- do:
index:
index: test
type: _doc
id: 1
body: { "text": "test" }
- do:
indices.refresh: {}
# Creates a new version for reindex to miss on scan.
- do:
index:
index: test
type: _doc
id: 1
body: { "text": "test2" }
- do:
catch: conflict
update_by_query:
index: test
- match: {updated: 0}
- match: {version_conflicts: 1}
- match: {batches: 1}
- match: {failures.0.index: test}
- match: {failures.0.type: _doc}
- match: {failures.0.id: "1"}
- match: {failures.0.status: 409}
- match: {failures.0.cause.type: version_conflict_engine_exception}
- match: {failures.0.cause.reason: "/\\[_doc\\]\\[1\\]:.version.conflict,.required.seqNo.\\[\\d+\\]/"}
- match: {failures.0.cause.shard: /\d+/}
- match: {failures.0.cause.index: test}
- gte: { took: 0 }
---
"Response for version conflict with conflicts=proceed":
- do:

View File

@ -24,6 +24,9 @@
---
"update_by_query fails to update documents with version number equal to zero":
- skip:
version: "6.7.0 - "
reason: reindex moved to rely on sequence numbers for concurrency control
- do:
index:
index: index1

View File

@ -241,6 +241,16 @@ public class ClientScrollableHitSource extends ScrollableHitSource {
return delegate.getVersion();
}
@Override
public long getSeqNo() {
return delegate.getSeqNo();
}
@Override
public long getPrimaryTerm() {
return delegate.getPrimaryTerm();
}
@Override
public String getRouting() {
return fieldValue(RoutingFieldMapper.NAME);

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
@ -190,6 +191,17 @@ public abstract class ScrollableHitSource {
* internal APIs.
*/
long getVersion();
/**
* The sequence number of the match or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if sequence numbers weren't requested.
*/
long getSeqNo();
/**
* The primary term of the match or {@link SequenceNumbers#UNASSIGNED_PRIMARY_TERM} if sequence numbers weren't requested.
*/
long getPrimaryTerm();
/**
* The source of the hit. Returns null if the source didn't come back from the search, usually because it source wasn't stored at
* all.
@ -217,6 +229,8 @@ public abstract class ScrollableHitSource {
private BytesReference source;
private XContentType xContentType;
private String routing;
private long seqNo;
private long primaryTerm;
public BasicHit(String index, String type, String id, long version) {
this.index = index;
@ -245,6 +259,16 @@ public abstract class ScrollableHitSource {
return version;
}
@Override
public long getSeqNo() {
return seqNo;
}
@Override
public long getPrimaryTerm() {
return primaryTerm;
}
@Override
public BytesReference getSource() {
return source;
@ -270,6 +294,14 @@ public abstract class ScrollableHitSource {
this.routing = routing;
return this;
}
public void setSeqNo(long seqNo) {
this.seqNo = seqNo;
}
public void setPrimaryTerm(long primaryTerm) {
this.primaryTerm = primaryTerm;
}
}
/**