From e7cfe101e4c1e71edf9089280a86f51f8c1ceb6b Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 31 Oct 2016 13:43:55 +0100 Subject: [PATCH] Retrying replication requests on replica doesn't call `onRetry` (#21189) Replication request may arrive at a replica before the replica's node has processed a required mapping update. In these cases the TransportReplicationAction will retry the request once a new cluster state arrives. Sadly that retry logic failed to call `ReplicationRequest#onRetry`, causing duplicates in the append only use case. This commit fixes this and also the test which missed the check. I also added an assertion which would have helped finding the source of the duplicates. This was discovered by https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+multijob-unix-compatibility/os=opensuse/174/ Relates #20211 --- .../TransportReplicationAction.java | 1 + .../index/engine/DeleteVersionValue.java | 8 ++++++ .../index/engine/InternalEngine.java | 27 +++++++++++++++++-- .../index/engine/VersionValue.java | 7 +++++ .../TransportReplicationActionTests.java | 7 +++-- 5 files changed, 46 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 740a003ffa8..4f44f680b46 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -481,6 +481,7 @@ public abstract class TransportReplicationAction< transportReplicaAction, request), e); + request.onRetry(); final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext(); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override diff --git a/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java b/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java index baacc4b240d..b145a86e43d 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java +++ b/core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java @@ -48,4 +48,12 @@ class DeleteVersionValue extends VersionValue { public long ramBytesUsed() { return BASE_RAM_BYTES_USED; } + + @Override + public String toString() { + return "DeleteVersionValue{" + + "version=" + version() + ", " + + "time=" + time + + '}'; + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3cc1aa26195..c450e5214c1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -35,6 +35,7 @@ import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.SearcherFactory; import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockObtainFailedException; @@ -484,7 +485,8 @@ public class InternalEngine extends Engine { // if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the // lucene index without checking the version map but we still do the version check final boolean forceUpdateDocument; - if (canOptimizeAddDocument(index)) { + final boolean canOptimizeAddDocument = canOptimizeAddDocument(index); + if (canOptimizeAddDocument) { long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get(); if (index.isRetry()) { forceUpdateDocument = true; @@ -523,7 +525,8 @@ public class InternalEngine extends Engine { final long updatedVersion = updateVersion(index, currentVersion, expectedVersion); index.setCreated(deleted); if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { - // document does not exists, we can optimize for create + // document does not exists, we can optimize for create, but double check if assertions are running + assert assertDocDoesNotExist(index, canOptimizeAddDocument == false); index(index, indexWriter); } else { update(index, indexWriter); @@ -532,6 +535,26 @@ public class InternalEngine extends Engine { } } + /** + * Asserts that the doc in the index operation really doesn't exist + */ + private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException { + final VersionValue versionValue = versionMap.getUnderLock(index.uid()); + if (versionValue != null) { + if (versionValue.delete() == false || allowDeleted == false) { + throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")"); + } + } else { + try (final Searcher searcher = acquireSearcher("assert doc doesn't exist")) { + final long docsWithId = searcher.searcher().count(new TermQuery(index.uid())); + if (docsWithId > 0) { + throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists [" + docsWithId + "] times in index"); + } + } + } + return true; + } + private long updateVersion(Engine.Operation op, long currentVersion, long expectedVersion) { final long updatedVersion = op.versionType().updateVersion(currentVersion, expectedVersion); op.updateVersion(updatedVersion); diff --git a/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java b/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java index 662c88df5d9..5258b270091 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java +++ b/core/src/main/java/org/elasticsearch/index/engine/VersionValue.java @@ -57,4 +57,11 @@ class VersionValue implements Accountable { public Collection getChildResources() { return Collections.emptyList(); } + + @Override + public String toString() { + return "VersionValue{" + + "version=" + version + + '}'; + } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 3deae74f455..1caac899005 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -867,8 +867,11 @@ public class TransportReplicationActionTests extends ESTestCase { final CapturingTransport.CapturedRequest capturedRequest = capturedRequests.get(0); assertThat(capturedRequest.action, equalTo("testActionWithExceptions[r]")); assertThat(capturedRequest.request, instanceOf(TransportReplicationAction.ConcreteShardRequest.class)); - assertThat(((TransportReplicationAction.ConcreteShardRequest) capturedRequest.request).getRequest(), equalTo(request)); - assertThat(((TransportReplicationAction.ConcreteShardRequest) capturedRequest.request).getTargetAllocationID(), + final TransportReplicationAction.ConcreteShardRequest concreteShardRequest = + (TransportReplicationAction.ConcreteShardRequest) capturedRequest.request; + assertThat(concreteShardRequest.getRequest(), equalTo(request)); + assertThat(concreteShardRequest.getRequest().isRetrySet.get(), equalTo(true)); + assertThat(concreteShardRequest.getTargetAllocationID(), equalTo(replica.allocationId().getId())); }