diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2c3988fc240..3cbfbfaff83 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -60,6 +60,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; @@ -155,6 +156,7 @@ public class InternalEngine extends Engine { private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false); + private final KeyedLock noOpKeyedLock = new KeyedLock<>(); @Nullable private final String historyUUID; @@ -1407,32 +1409,42 @@ public class InternalEngine extends Engine { assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED; final long seqNo = noOp.seqNo(); - try { - Exception failure = null; - if (softDeleteEnabled) { - try { - final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason()); - tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm()); - // A noop tombstone does not require a _version but it's added to have a fully dense docvalues for the version field. - // 1L is selected to optimize the compression because it might probably be the most common value in version field. - tombstone.version().setLongValue(1L); - assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]"; - final ParseContext.Document doc = tombstone.docs().get(0); - assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null - : "Noop tombstone document but _tombstone field is not set [" + doc + " ]"; - doc.add(softDeletesField); - indexWriter.addDocument(doc); - } catch (Exception ex) { - if (maybeFailEngine("noop", ex)) { - throw ex; + try (Releasable ignored = noOpKeyedLock.acquire(seqNo)) { + final NoOpResult noOpResult; + final Optional preFlightError = preFlightCheckForNoOp(noOp); + if (preFlightError.isPresent()) { + noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), preFlightError.get()); + } else { + Exception failure = null; + if (softDeleteEnabled) { + try { + final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason()); + tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm()); + // A noop tombstone does not require a _version but it's added to have a fully dense docvalues for the version field. + // 1L is selected to optimize the compression because it might probably be the most common value in version field. + tombstone.version().setLongValue(1L); + assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]"; + final ParseContext.Document doc = tombstone.docs().get(0); + assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null + : "Noop tombstone document but _tombstone field is not set [" + doc + " ]"; + doc.add(softDeletesField); + indexWriter.addDocument(doc); + } catch (Exception ex) { + if (maybeFailEngine("noop", ex)) { + throw ex; + } + failure = ex; } - failure = ex; } - } - final NoOpResult noOpResult = failure != null ? new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure) : new NoOpResult(getPrimaryTerm(), noOp.seqNo()); - if (noOp.origin().isFromTranslog() == false) { - final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); - noOpResult.setTranslogLocation(location); + if (failure == null) { + noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo()); + } else { + noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure); + } + if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) { + final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); + noOpResult.setTranslogLocation(location); + } } noOpResult.setTook(System.nanoTime() - noOp.startTime()); noOpResult.freeze(); @@ -1444,6 +1456,14 @@ public class InternalEngine extends Engine { } } + /** + * Executes a pre-flight check for a given NoOp. + * If this method returns a non-empty result, the engine won't process this NoOp and returns a failure. + */ + protected Optional preFlightCheckForNoOp(final NoOp noOp) throws IOException { + return Optional.empty(); + } + @Override public void refresh(String source) throws EngineException { refresh(source, SearcherScope.EXTERNAL); @@ -2354,8 +2374,14 @@ public class InternalEngine extends Engine { * @return true if the given operation was processed; otherwise false. */ protected final boolean hasBeenProcessedBefore(Operation op) { - assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no"; - assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes()); + if (Assertions.ENABLED) { + assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no"; + if (op.operationType() == Operation.TYPE.NO_OP) { + assert noOpKeyedLock.isHeldByCurrentThread(op.seqNo()); + } else { + assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes()); + } + } return localCheckpointTracker.contains(op.seqNo()); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f36476a9685..efbce034de4 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2978,6 +2978,7 @@ public class InternalEngineTests extends EngineTestCase { private void maybeThrowFailure() throws IOException { if (failureToThrow.get() != null) { Exception failure = failureToThrow.get().get(); + clearFailure(); // one shot if (failure instanceof RuntimeException) { throw (RuntimeException) failure; } else if (failure instanceof IOException) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 84aa141c80d..b21112ad403 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -26,6 +26,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.IOException; +import java.util.Optional; import java.util.OptionalLong; /** @@ -111,9 +112,14 @@ public final class FollowingEngine extends InternalEngine { } @Override - public NoOpResult noOp(NoOp noOp) { - // TODO: Make sure we process NoOp once. - return super.noOp(noOp); + protected Optional preFlightCheckForNoOp(NoOp noOp) throws IOException { + if (noOp.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(noOp)) { + // See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation. + final OptionalLong existingTerm = lookupPrimaryTerm(noOp.seqNo()); + return Optional.of(new AlreadyProcessedFollowingEngineException(shardId, noOp.seqNo(), existingTerm)); + } else { + return super.preFlightCheckForNoOp(noOp); + } } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index be5e2198554..42ba4f9ef34 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.seqno.SeqNoStats; @@ -240,6 +241,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest followerGroup.startAll(); leaderGroup.appendDocs(between(10, 100)); leaderGroup.refresh("test"); + for (int numNoOps = between(1, 10), i = 0; i < numNoOps; i++) { + long seqNo = leaderGroup.getPrimary().seqNoStats().getMaxSeqNo() + 1; + Engine.NoOp noOp = new Engine.NoOp(seqNo, leaderGroup.getPrimary().getOperationPrimaryTerm(), + Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), "test-" + i); + for (IndexShard shard : leaderGroup) { + getEngine(shard).noOp(noOp); + } + } for (String deleteId : randomSubsetOf(IndexShardTestCase.getShardDocUIDs(leaderGroup.getPrimary()))) { BulkItemResponse resp = leaderGroup.delete(new DeleteRequest("test", "type", deleteId)); assertThat(resp.getFailure(), nullValue()); @@ -276,11 +285,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); shardFollowTask.start(followerGroup.getPrimary().getHistoryUUID(), leadingPrimary.getGlobalCheckpoint(), leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo()); - assertBusy(() -> { - assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint())); - assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup); - }); - shardFollowTask.markAsCompleted(); + try { + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint())); + assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup); + }); + } finally { + shardFollowTask.markAsCompleted(); + } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index 283cf6bf42c..1cbfe4cec5a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -103,8 +103,10 @@ public class BulkShardOperationsTests extends IndexShardTestCase { final Translog.Operation op; if (randomBoolean()) { op = new Translog.Index("_doc", id, seqno++, primaryTerm, 0, SOURCE, null, -1); - } else { + } else if (randomBoolean()) { op = new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), seqno++, primaryTerm, 0); + } else { + op = new Translog.NoOp(seqno++, primaryTerm, "test-" + i); } if (randomBoolean()) { firstBulk.add(op); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 5d27c786ad4..9e62eb6cfa1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -306,7 +306,7 @@ public class FollowingEngineTests extends ESTestCase { private Engine.Result applyOperation(Engine engine, Engine.Operation op, long primaryTerm, Engine.Operation.Origin origin) throws IOException { - final VersionType versionType = origin == Engine.Operation.Origin.PRIMARY ? op.versionType() : null; + final VersionType versionType = origin == Engine.Operation.Origin.PRIMARY ? VersionType.EXTERNAL : null; final Engine.Result result; if (op instanceof Engine.Index) { Engine.Index index = (Engine.Index) op; @@ -572,9 +572,12 @@ public class FollowingEngineTests extends ESTestCase { if (randomBoolean()) { operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true)); - } else { + } else if (randomBoolean()) { operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), i, primaryTerm.get(), 1L, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis())); + } else { + operations.add(new Engine.NoOp(i, primaryTerm.get(), Engine.Operation.Origin.PRIMARY, + threadPool.relativeTimeInMillis(), "test-" + i)); } } Randomness.shuffle(operations);