From e6dc74f2bfe0d54470ccdfa3fde88f0c99066aeb Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 16 Jan 2017 08:08:52 -0500 Subject: [PATCH] Add replica ops with version conflict to translog An operation that completed successfully on a primary can result in a version conflict on a replica due to the asynchronous nature of operations. When a replica operation results in a version conflict, the operation is not added to the translog. This leads to gaps in the translog which is problematic as it can lead to situations where a replica shard can never advance its local checkpoint. As such operations are just normal course of business for a replica shard, these operations should be treated as if they completed successfully. This commit adds these operations to the translog. Relates #22626 --- .../action/bulk/TransportShardBulkAction.java | 13 +- .../replication/ReplicationOperation.java | 26 +--- .../index/engine/InternalEngine.java | 38 +++-- .../index/engine/InternalEngineTests.java | 145 ++++++++++++------ 4 files changed, 126 insertions(+), 96 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index bb5714d3c3a..b4c3daee08f 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.bulk; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -29,6 +30,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.update.UpdateHelper; @@ -65,9 +67,6 @@ import org.elasticsearch.transport.TransportService; import java.util.Map; -import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException; -import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException; - /** Performs shard-level bulk (index, delete or update) operations */ public class TransportShardBulkAction extends TransportWriteAction { @@ -235,6 +234,10 @@ public class TransportShardBulkAction extends TransportWriteAction, ReplicaRequest extends ReplicationRequest, diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a18ca7f280e..362a4c9a48c 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -477,10 +477,7 @@ public class InternalEngine extends Engine { } if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { - if (op.origin().isRecovery()) { - // version conflict, but okay - result = onSuccess.get(); - } else { + if (op.origin() == Operation.Origin.PRIMARY) { // fatal version conflict final VersionConflictEngineException e = new VersionConflictEngineException( @@ -489,8 +486,13 @@ public class InternalEngine extends Engine { op.id(), op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); result = onFailure.apply(e); + } else { + /* + * Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a + * successful result. + */ + result = onSuccess.get(); } - return Optional.of(result); } else { return Optional.empty(); @@ -672,7 +674,7 @@ public class InternalEngine extends Engine { } } final long expectedVersion = index.version(); - final Optional checkVersionConflictResult = + final Optional resultOnVersionConflict = checkVersionConflict( index, currentVersion, @@ -682,15 +684,15 @@ public class InternalEngine extends Engine { e -> new IndexResult(e, currentVersion, index.seqNo())); final IndexResult indexResult; - if (checkVersionConflictResult.isPresent()) { - indexResult = checkVersionConflictResult.get(); + if (resultOnVersionConflict.isPresent()) { + indexResult = resultOnVersionConflict.get(); } else { // no version conflict if (index.origin() == Operation.Origin.PRIMARY) { seqNo = seqNoService().generateSeqNo(); } - /** + /* * Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. @@ -707,10 +709,12 @@ public class InternalEngine extends Engine { update(index.uid(), index.docs(), indexWriter); } indexResult = new IndexResult(updatedVersion, seqNo, deleted); + versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); + } + if (!indexResult.hasFailure()) { location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(new Translog.Index(index, indexResult)) : null; - versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); indexResult.setTranslogLocation(location); } indexResult.setTook(System.nanoTime() - index.startTime()); @@ -804,7 +808,7 @@ public class InternalEngine extends Engine { final long expectedVersion = delete.version(); - final Optional result = + final Optional resultOnVersionConflict = checkVersionConflict( delete, currentVersion, @@ -812,10 +816,9 @@ public class InternalEngine extends Engine { deleted, () -> new DeleteResult(expectedVersion, delete.seqNo(), true), e -> new DeleteResult(e, expectedVersion, delete.seqNo())); - final DeleteResult deleteResult; - if (result.isPresent()) { - deleteResult = result.get(); + if (resultOnVersionConflict.isPresent()) { + deleteResult = resultOnVersionConflict.get(); } else { if (delete.origin() == Operation.Origin.PRIMARY) { seqNo = seqNoService().generateSeqNo(); @@ -824,11 +827,14 @@ public class InternalEngine extends Engine { updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue); deleteResult = new DeleteResult(updatedVersion, seqNo, found); + + versionMap.putUnderLock(delete.uid().bytes(), + new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); + } + if (!deleteResult.hasFailure()) { location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(new Translog.Delete(delete, deleteResult)) : null; - versionMap.putUnderLock(delete.uid().bytes(), - new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); deleteResult.setTranslogLocation(location); } deleteResult.setTook(System.nanoTime() - delete.startTime()); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f0ca8292f4f..6f85d65bc91 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1478,76 +1478,121 @@ public class InternalEngineTests extends ESTestCase { } public void testVersioningReplicaConflict1() { - ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null); - Engine.Index index = new Engine.Index(newUid("1"), doc); - Engine.IndexResult indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(1L)); + final ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null); + final Engine.Index v1Index = new Engine.Index(newUid("1"), doc); + final Engine.IndexResult v1Result = engine.index(v1Index); + assertThat(v1Result.getVersion(), equalTo(1L)); - index = new Engine.Index(newUid("1"), doc); - indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(2L)); + final Engine.Index v2Index = new Engine.Index(newUid("1"), doc); + final Engine.IndexResult v2Result = engine.index(v2Index); + assertThat(v2Result.getVersion(), equalTo(2L)); // apply the second index to the replica, should work fine - index = new Engine.Index(newUid("1"), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - indexResult = replicaEngine.index(index); - assertThat(indexResult.getVersion(), equalTo(2L)); + final Engine.Index replicaV2Index = new Engine.Index( + newUid("1"), + doc, + v2Result.getSeqNo(), + v2Index.primaryTerm(), + v2Result.getVersion(), + VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), + REPLICA, + 0, + -1, + false); + final Engine.IndexResult replicaV2Result = replicaEngine.index(replicaV2Index); + assertThat(replicaV2Result.getVersion(), equalTo(2L)); - long seqNo = indexResult.getSeqNo(); - long primaryTerm = index.primaryTerm(); - // now, the old one should not work - index = new Engine.Index(newUid("1"), doc, seqNo, primaryTerm, 1L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - indexResult = replicaEngine.index(index); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); + // now, the old one should produce an indexing result + final Engine.Index replicaV1Index = new Engine.Index( + newUid("1"), + doc, + v1Result.getSeqNo(), + v1Index.primaryTerm(), + v1Result.getVersion(), + VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), + REPLICA, + 0, + -1, + false); + final Engine.IndexResult replicaV1Result = replicaEngine.index(replicaV1Index); + assertFalse(replicaV1Result.hasFailure()); + assertFalse(replicaV1Result.isCreated()); + assertThat(replicaV1Result.getVersion(), equalTo(2L)); // second version on replica should fail as well - index = new Engine.Index(newUid("1"), doc, seqNo, primaryTerm, 2L - , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - indexResult = replicaEngine.index(index); - assertThat(indexResult.getVersion(), equalTo(2L)); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); + final Engine.IndexResult replicaV2ReplayResult = replicaEngine.index(replicaV2Index); + assertFalse(replicaV2Result.hasFailure()); + assertFalse(replicaV1Result.isCreated()); + assertThat(replicaV2ReplayResult.getVersion(), equalTo(2L)); } public void testVersioningReplicaConflict2() { - ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null); - Engine.Index index = new Engine.Index(newUid("1"), doc); - Engine.IndexResult indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(1L)); + final ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null); + final Engine.Index v1Index = new Engine.Index(newUid("1"), doc); + final Engine.IndexResult v1Result = engine.index(v1Index); + assertThat(v1Result.getVersion(), equalTo(1L)); // apply the first index to the replica, should work fine - index = new Engine.Index(newUid("1"), doc, indexResult.getSeqNo(), index.primaryTerm(), 1L, - VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - indexResult = replicaEngine.index(index); - assertThat(indexResult.getVersion(), equalTo(1L)); + final Engine.Index replicaV1Index = new Engine.Index( + newUid("1"), + doc, + v1Result.getSeqNo(), + v1Index.primaryTerm(), + v1Result.getVersion(), + VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), + REPLICA, + 0, + -1, + false); + Engine.IndexResult replicaV1Result = replicaEngine.index(replicaV1Index); + assertThat(replicaV1Result.getVersion(), equalTo(1L)); // index it again - index = new Engine.Index(newUid("1"), doc); - indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(2L)); + final Engine.Index v2Index = new Engine.Index(newUid("1"), doc); + final Engine.IndexResult v2Result = engine.index(v2Index); + assertThat(v2Result.getVersion(), equalTo(2L)); // now delete it - Engine.Delete delete = new Engine.Delete("test", "1", newUid("1")); - Engine.DeleteResult deleteResult = engine.delete(delete); + final Engine.Delete delete = new Engine.Delete("test", "1", newUid("1")); + final Engine.DeleteResult deleteResult = engine.delete(delete); assertThat(deleteResult.getVersion(), equalTo(3L)); // apply the delete on the replica (skipping the second index) - delete = new Engine.Delete("test", "1", newUid("1"), deleteResult.getSeqNo(), delete.primaryTerm(), 3L - , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0); - deleteResult = replicaEngine.delete(delete); - assertThat(deleteResult.getVersion(), equalTo(3L)); + final Engine.Delete replicaDelete = new Engine.Delete( + "test", + "1", + newUid("1"), + deleteResult.getSeqNo(), + delete.primaryTerm(), + deleteResult.getVersion(), + VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), + REPLICA, + 0); + final Engine.DeleteResult replicaDeleteResult = replicaEngine.delete(replicaDelete); + assertThat(replicaDeleteResult.getVersion(), equalTo(3L)); - // second time delete with same version should fail - delete = new Engine.Delete("test", "1", newUid("1"), deleteResult.getSeqNo(), delete.primaryTerm(), 3L - , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0); - deleteResult = replicaEngine.delete(delete); - assertTrue(deleteResult.hasFailure()); - assertThat(deleteResult.getFailure(), instanceOf(VersionConflictEngineException.class)); + // second time delete with same version should just produce the same version + final Engine.DeleteResult deleteReplayResult = replicaEngine.delete(replicaDelete); + assertFalse(deleteReplayResult.hasFailure()); + assertTrue(deleteReplayResult.isFound()); + assertThat(deleteReplayResult.getVersion(), equalTo(3L)); - // now do the second index on the replica, it should fail - index = new Engine.Index(newUid("1"), doc, deleteResult.getSeqNo(), delete.primaryTerm(), 2L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - indexResult = replicaEngine.index(index); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); + // now do the second index on the replica, it should result in the current version + final Engine.Index replicaV2Index = new Engine.Index( + newUid("1"), + doc, + v2Result.getSeqNo(), + v2Index.primaryTerm(), + v2Result.getVersion(), + VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), + REPLICA, + 0, + -1, + false); + final Engine.IndexResult replicaV2Result = replicaEngine.index(replicaV2Index); + assertFalse(replicaV2Result.hasFailure()); + assertFalse(replicaV2Result.isCreated()); + assertThat(replicaV2Result.getVersion(), equalTo(3L)); } public void testBasicCreatedFlag() {