Retrying replication requests on replica doesn't call `onRetry` (#21189)

Replication request may arrive at a replica before the replica's node has processed a required mapping update. In these cases the TransportReplicationAction will retry the request once a new cluster state arrives. Sadly that retry logic failed to call `ReplicationRequest#onRetry`, causing duplicates in the append only use case.

This commit fixes this and also the test which missed the check. I also added an assertion which would have helped finding the source of the duplicates.

This was discovered by https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+multijob-unix-compatibility/os=opensuse/174/

Relates #20211
This commit is contained in:
Boaz Leskes 2016-10-31 13:43:55 +01:00 committed by GitHub
parent b5f86f6f05
commit e7cfe101e4
5 changed files with 46 additions and 4 deletions

View File

@ -481,6 +481,7 @@ public abstract class TransportReplicationAction<
transportReplicaAction,
request),
e);
request.onRetry();
final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override

View File

@ -48,4 +48,12 @@ class DeleteVersionValue extends VersionValue {
public long ramBytesUsed() {
return BASE_RAM_BYTES_USED;
}
@Override
public String toString() {
return "DeleteVersionValue{" +
"version=" + version() + ", " +
"time=" + time +
'}';
}
}

View File

@ -35,6 +35,7 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
@ -484,7 +485,8 @@ public class InternalEngine extends Engine {
// if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the
// lucene index without checking the version map but we still do the version check
final boolean forceUpdateDocument;
if (canOptimizeAddDocument(index)) {
final boolean canOptimizeAddDocument = canOptimizeAddDocument(index);
if (canOptimizeAddDocument) {
long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
if (index.isRetry()) {
forceUpdateDocument = true;
@ -523,7 +525,8 @@ public class InternalEngine extends Engine {
final long updatedVersion = updateVersion(index, currentVersion, expectedVersion);
index.setCreated(deleted);
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
index(index, indexWriter);
} else {
update(index, indexWriter);
@ -532,6 +535,26 @@ public class InternalEngine extends Engine {
}
}
/**
* Asserts that the doc in the index operation really doesn't exist
*/
private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException {
final VersionValue versionValue = versionMap.getUnderLock(index.uid());
if (versionValue != null) {
if (versionValue.delete() == false || allowDeleted == false) {
throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")");
}
} else {
try (final Searcher searcher = acquireSearcher("assert doc doesn't exist")) {
final long docsWithId = searcher.searcher().count(new TermQuery(index.uid()));
if (docsWithId > 0) {
throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists [" + docsWithId + "] times in index");
}
}
}
return true;
}
private long updateVersion(Engine.Operation op, long currentVersion, long expectedVersion) {
final long updatedVersion = op.versionType().updateVersion(currentVersion, expectedVersion);
op.updateVersion(updatedVersion);

View File

@ -57,4 +57,11 @@ class VersionValue implements Accountable {
public Collection<Accountable> getChildResources() {
return Collections.emptyList();
}
@Override
public String toString() {
return "VersionValue{" +
"version=" + version +
'}';
}
}

View File

@ -867,8 +867,11 @@ public class TransportReplicationActionTests extends ESTestCase {
final CapturingTransport.CapturedRequest capturedRequest = capturedRequests.get(0);
assertThat(capturedRequest.action, equalTo("testActionWithExceptions[r]"));
assertThat(capturedRequest.request, instanceOf(TransportReplicationAction.ConcreteShardRequest.class));
assertThat(((TransportReplicationAction.ConcreteShardRequest<?>) capturedRequest.request).getRequest(), equalTo(request));
assertThat(((TransportReplicationAction.ConcreteShardRequest<?>) capturedRequest.request).getTargetAllocationID(),
final TransportReplicationAction.ConcreteShardRequest<Request> concreteShardRequest =
(TransportReplicationAction.ConcreteShardRequest<Request>) capturedRequest.request;
assertThat(concreteShardRequest.getRequest(), equalTo(request));
assertThat(concreteShardRequest.getRequest().isRetrySet.get(), equalTo(true));
assertThat(concreteShardRequest.getTargetAllocationID(),
equalTo(replica.allocationId().getId()));
}