From 0f65390c29a9648c9288596bb17e2f763b6d53d9 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 4 Mar 2019 10:10:25 +0100 Subject: [PATCH] Do not mutate engine during planning step (#39571) This cleans up the Engine implementation by separating the sequence number generation from the planning step in the engine, to avoid for the planning step to have any side effects. This makes it easier to see that every sequence number is properly accounted for. --- .../index/engine/InternalEngine.java | 180 +++++++++--------- .../ccr/index/engine/FollowingEngine.java | 20 +- 2 files changed, 108 insertions(+), 92 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2a988e15bf8..65c09f581cc 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -799,11 +799,17 @@ public class InternalEngine extends Engine { return true; } - private long generateSeqNoForOperation(final Operation operation) { + protected long generateSeqNoForOperationOnPrimary(final Operation operation) { assert operation.origin() == Operation.Origin.PRIMARY; + assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : + "ops should not have an assigned seq no. but was: " + operation.seqNo(); return doGenerateSeqNoForOperation(operation); } + protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) { + advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); + } + /** * Generate the sequence number for the specified operation. * @@ -860,11 +866,29 @@ public class InternalEngine extends Engine { if (plan.earlyResultOnPreFlightError.isPresent()) { indexResult = plan.earlyResultOnPreFlightError.get(); assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType(); - } else if (plan.indexIntoLucene || plan.addStaleOpToLucene) { - indexResult = indexIntoLucene(index, plan); } else { - indexResult = new IndexResult( - plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); + // generate or register sequence number + if (index.origin() == Operation.Origin.PRIMARY) { + index = new Index(index.uid(), index.parsedDoc(), generateSeqNoForOperationOnPrimary(index), index.primaryTerm(), + index.version(), index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(), + index.isRetry(), index.getIfSeqNo(), index.getIfPrimaryTerm()); + + final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; + if (toAppend == false) { + advanceMaxSeqNoOfUpdatesOrDeletes(index.seqNo()); + } + } else { + markSeqNoAsSeen(index.seqNo()); + } + + assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin(); + + if (plan.indexIntoLucene || plan.addStaleOpToLucene) { + indexResult = indexIntoLucene(index, plan); + } else { + indexResult = new IndexResult( + plan.versionForIndexing, getPrimaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted); + } } if (index.origin().isFromTranslog() == false) { final Translog.Location location; @@ -883,11 +907,9 @@ public class InternalEngine extends Engine { if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) { final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null; versionMap.maybePutIndexUnderLock(index.uid().bytes(), - new IndexVersionValue(translogLocation, plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm())); - } - if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo()); + new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())); } + localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo()); indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); return indexResult; @@ -916,7 +938,7 @@ public class InternalEngine extends Engine { * requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization. */ assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; - plan = IndexingStrategy.optimizedAppendOnly(index.seqNo(), 1L); + plan = IndexingStrategy.optimizedAppendOnly(1L); } else { if (appendOnlyRequest == false) { maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr)); @@ -935,18 +957,17 @@ public class InternalEngine extends Engine { // question may have been deleted in an out of order op that is not replayed. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); + plan = IndexingStrategy.processButSkipLucene(false, index.version()); } else { final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version()); + plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version()); } else { plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, - index.seqNo(), index.version()); + index.version()); } } } - markSeqNoAsSeen(index.seqNo()); return plan; } @@ -966,10 +987,10 @@ public class InternalEngine extends Engine { // resolve an external operation into an internal one which is safe to replay if (canOptimizeAddDocument(index)) { if (mayHaveBeenIndexedBefore(index)) { - plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L); + plan = IndexingStrategy.overrideExistingAsIfNotThere(1L); versionMap.enforceSafeAccess(); } else { - plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index), 1L); + plan = IndexingStrategy.optimizedAppendOnly(1L); } } else { versionMap.enforceSafeAccess(); @@ -1002,41 +1023,36 @@ public class InternalEngine extends Engine { plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm()); } else { plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, - generateSeqNoForOperation(index), index.versionType().updateVersion(currentVersion, index.version()) ); } } - final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; - if (toAppend == false) { - advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing); - } return plan; } private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws IOException { - assert plan.seqNoForIndexing >= 0 : "ops should have an assigned seq no.; origin: " + index.origin(); + assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin(); assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing; assert plan.indexIntoLucene || plan.addStaleOpToLucene; /* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. */ - index.parsedDoc().updateSeqID(plan.seqNoForIndexing, index.primaryTerm()); + index.parsedDoc().updateSeqID(index.seqNo(), index.primaryTerm()); index.parsedDoc().version().setLongValue(plan.versionForIndexing); try { if (plan.addStaleOpToLucene) { addStaleDocs(index.docs(), indexWriter); } else if (plan.useLuceneUpdateDocument) { - assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), plan.seqNoForIndexing, true, true); + assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), index.seqNo(), true, true); updateDocs(index.uid(), index.docs(), indexWriter); } else { // document does not exists, we can optimize for create, but double check if assertions are running assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); addDocs(index.docs(), indexWriter); } - return new IndexResult(plan.versionForIndexing, index.primaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); + return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted); } catch (Exception ex) { if (indexWriter.getTragicException() == null) { /* There is no tragic event recorded so this must be a document failure. @@ -1052,7 +1068,7 @@ public class InternalEngine extends Engine { * we return a `MATCH_ANY` version to indicate no document was index. The value is * not used anyway */ - return new IndexResult(ex, Versions.MATCH_ANY, index.primaryTerm(), plan.seqNoForIndexing); + return new IndexResult(ex, Versions.MATCH_ANY, index.primaryTerm(), index.seqNo()); } else { throw ex; } @@ -1109,14 +1125,13 @@ public class InternalEngine extends Engine { protected static final class IndexingStrategy { final boolean currentNotFoundOrDeleted; final boolean useLuceneUpdateDocument; - final long seqNoForIndexing; final long versionForIndexing; final boolean indexIntoLucene; final boolean addStaleOpToLucene; final Optional earlyResultOnPreFlightError; private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument, - boolean indexIntoLucene, boolean addStaleOpToLucene, long seqNoForIndexing, + boolean indexIntoLucene, boolean addStaleOpToLucene, long versionForIndexing, IndexResult earlyResultOnPreFlightError) { assert useLuceneUpdateDocument == false || indexIntoLucene : "use lucene update is set to true, but we're not indexing into lucene"; @@ -1126,7 +1141,6 @@ public class InternalEngine extends Engine { + " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError; this.currentNotFoundOrDeleted = currentNotFoundOrDeleted; this.useLuceneUpdateDocument = useLuceneUpdateDocument; - this.seqNoForIndexing = seqNoForIndexing; this.versionForIndexing = versionForIndexing; this.indexIntoLucene = indexIntoLucene; this.addStaleOpToLucene = addStaleOpToLucene; @@ -1135,9 +1149,8 @@ public class InternalEngine extends Engine { Optional.of(earlyResultOnPreFlightError); } - public static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(true, false, true, - false, seqNoForIndexing, versionForIndexing, null); + public static IndexingStrategy optimizedAppendOnly(long versionForIndexing) { + return new IndexingStrategy(true, false, true, false, versionForIndexing, null); } public static IndexingStrategy skipDueToVersionConflict( @@ -1145,30 +1158,28 @@ public class InternalEngine extends Engine { final IndexResult result = new IndexResult(e, currentVersion, term); return new IndexingStrategy( currentNotFoundOrDeleted, false, false, false, - SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result); + Versions.NOT_FOUND, result); } static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, - long seqNoForIndexing, long versionForIndexing) { + long versionForIndexing) { return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false, - true, false, seqNoForIndexing, versionForIndexing, null); + true, false, versionForIndexing, null); } - static IndexingStrategy overrideExistingAsIfNotThere( - long seqNoForIndexing, long versionForIndexing) { + static IndexingStrategy overrideExistingAsIfNotThere(long versionForIndexing) { return new IndexingStrategy(true, true, true, - false, seqNoForIndexing, versionForIndexing, null); + false, versionForIndexing, null); } - public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, - long versionForIndexing) { + public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) { return new IndexingStrategy(currentNotFoundOrDeleted, false, false, - false, seqNoForIndexing, versionForIndexing, null); + false, versionForIndexing, null); } - static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long seqNoForIndexing, long versionForIndexing) { + static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long versionForIndexing) { return new IndexingStrategy(false, false, false, - addStaleOpToLucene, seqNoForIndexing, versionForIndexing, null); + addStaleOpToLucene, versionForIndexing, null); } } @@ -1227,11 +1238,26 @@ public class InternalEngine extends Engine { if (plan.earlyResultOnPreflightError.isPresent()) { deleteResult = plan.earlyResultOnPreflightError.get(); - } else if (plan.deleteFromLucene || plan.addStaleOpToLucene) { - deleteResult = deleteInLucene(delete, plan); } else { - deleteResult = new DeleteResult( - plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false); + // generate or register sequence number + if (delete.origin() == Operation.Origin.PRIMARY) { + delete = new Delete(delete.type(), delete.id(), delete.uid(), generateSeqNoForOperationOnPrimary(delete), + delete.primaryTerm(), delete.version(), delete.versionType(), delete.origin(), delete.startTime(), + delete.getIfSeqNo(), delete.getIfPrimaryTerm()); + + advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(delete.seqNo()); + } else { + markSeqNoAsSeen(delete.seqNo()); + } + + assert delete.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + delete.origin(); + + if (plan.deleteFromLucene || plan.addStaleOpToLucene) { + deleteResult = deleteInLucene(delete, plan); + } else { + deleteResult = new DeleteResult( + plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false); + } } if (delete.origin().isFromTranslog() == false) { final Translog.Location location; @@ -1247,9 +1273,7 @@ public class InternalEngine extends Engine { } deleteResult.setTranslogLocation(location); } - if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - localCheckpointTracker.markSeqNoAsCompleted(deleteResult.getSeqNo()); - } + localCheckpointTracker.markSeqNoAsCompleted(deleteResult.getSeqNo()); deleteResult.setTook(System.nanoTime() - delete.startTime()); deleteResult.freeze(); } catch (RuntimeException | IOException e) { @@ -1290,17 +1314,15 @@ public class InternalEngine extends Engine { // question may have been deleted in an out of order op that is not replayed. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); + plan = DeletionStrategy.processButSkipLucene(false, delete.version()); } else { final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); + plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.version()); } else { - plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, - delete.seqNo(), delete.version()); + plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version()); } } - markSeqNoAsSeen(delete.seqNo()); return plan; } @@ -1339,22 +1361,18 @@ public class InternalEngine extends Engine { final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted); plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted); } else { - plan = DeletionStrategy.processNormally( - currentlyDeleted, - generateSeqNoForOperation(delete), - delete.versionType().updateVersion(currentVersion, delete.version())); - advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoOfDeletion); + plan = DeletionStrategy.processNormally(currentlyDeleted, delete.versionType().updateVersion(currentVersion, delete.version())); } return plan; } private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException { - assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), plan.seqNoOfDeletion, false, false); + assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), delete.seqNo(), false, false); try { if (softDeleteEnabled) { final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id()); assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; - tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm()); + tombstone.updateSeqID(delete.seqNo(), delete.primaryTerm()); tombstone.version().setLongValue(plan.versionOfDeletion); final ParseContext.Document doc = tombstone.docs().get(0); assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : @@ -1373,16 +1391,16 @@ public class InternalEngine extends Engine { if (plan.deleteFromLucene) { numDocDeletes.inc(); versionMap.putDeleteUnderLock(delete.uid().bytes(), - new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(), + new DeleteVersionValue(plan.versionOfDeletion, delete.seqNo(), delete.primaryTerm(), engineConfig.getThreadPool().relativeTimeInMillis())); } return new DeleteResult( - plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false); + plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false); } catch (Exception ex) { if (indexWriter.getTragicException() == null) { // there is no tragic event and such it must be a document level failure return new DeleteResult( - ex, plan.versionOfDeletion, delete.primaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false); + ex, plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false); } else { throw ex; } @@ -1394,13 +1412,11 @@ public class InternalEngine extends Engine { final boolean deleteFromLucene; final boolean addStaleOpToLucene; final boolean currentlyDeleted; - final long seqNoOfDeletion; final long versionOfDeletion; final Optional earlyResultOnPreflightError; private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, boolean currentlyDeleted, - long seqNoOfDeletion, long versionOfDeletion, - DeleteResult earlyResultOnPreflightError) { + long versionOfDeletion, DeleteResult earlyResultOnPreflightError) { assert (deleteFromLucene && earlyResultOnPreflightError != null) == false : "can only delete from lucene or have a preflight result but not both." + "deleteFromLucene: " + deleteFromLucene @@ -1408,7 +1424,6 @@ public class InternalEngine extends Engine { this.deleteFromLucene = deleteFromLucene; this.addStaleOpToLucene = addStaleOpToLucene; this.currentlyDeleted = currentlyDeleted; - this.seqNoOfDeletion = seqNoOfDeletion; this.versionOfDeletion = versionOfDeletion; this.earlyResultOnPreflightError = earlyResultOnPreflightError == null ? Optional.empty() : Optional.of(earlyResultOnPreflightError); @@ -1418,26 +1433,20 @@ public class InternalEngine extends Engine { VersionConflictEngineException e, long currentVersion, long term, boolean currentlyDeleted) { final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; final DeleteResult deleteResult = new DeleteResult(e, currentVersion, term, unassignedSeqNo, currentlyDeleted == false); - return new DeletionStrategy(false, false, currentlyDeleted, unassignedSeqNo, - Versions.NOT_FOUND, deleteResult); + return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, deleteResult); } - static DeletionStrategy processNormally(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(true, false, currentlyDeleted, seqNoOfDeletion, - versionOfDeletion, null); + static DeletionStrategy processNormally(boolean currentlyDeleted, long versionOfDeletion) { + return new DeletionStrategy(true, false, currentlyDeleted, versionOfDeletion, null); } - public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, - long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(false, false, currentlyDeleted, seqNoOfDeletion, - versionOfDeletion, null); + public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long versionOfDeletion) { + return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, null); } - static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, boolean currentlyDeleted, - long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(false, addStaleOpToLucene, currentlyDeleted, seqNoOfDeletion, - versionOfDeletion, null); + static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, boolean currentlyDeleted, long versionOfDeletion) { + return new DeletionStrategy(false, addStaleOpToLucene, currentlyDeleted, versionOfDeletion, null); } } @@ -1456,7 +1465,6 @@ public class InternalEngine extends Engine { final NoOpResult noOpResult; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - markSeqNoAsSeen(noOp.seqNo()); noOpResult = innerNoOp(noOp); } catch (final Exception e) { try { @@ -1480,6 +1488,7 @@ public class InternalEngine extends Engine { noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), preFlightError.get()); } else { Exception failure = null; + markSeqNoAsSeen(noOp.seqNo()); if (softDeleteEnabled) { try { final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason()); @@ -1511,13 +1520,10 @@ public class InternalEngine extends Engine { noOpResult.setTranslogLocation(location); } } + localCheckpointTracker.markSeqNoAsCompleted(seqNo); noOpResult.setTook(System.nanoTime() - noOp.startTime()); noOpResult.freeze(); return noOpResult; - } finally { - if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { - localCheckpointTracker.markSeqNoAsCompleted(seqNo); - } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index acffacd4051..bbb0689a8a7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -68,7 +68,6 @@ public final class FollowingEngine extends InternalEngine { @Override protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { preFlight(index); - markSeqNoAsSeen(index.seqNo()); // NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers. final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; @@ -89,13 +88,12 @@ public final class FollowingEngine extends InternalEngine { shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo())); return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm()); } else { - return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); + return IndexingStrategy.processButSkipLucene(false, index.version()); } } else if (maxSeqNoOfUpdatesOrDeletes <= getLocalCheckpoint()) { assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]"; numOfOptimizedIndexing.inc(); - return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.seqNo(), index.version()); - + return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.version()); } else { return planIndexingAsNonPrimary(index); } @@ -104,7 +102,6 @@ public final class FollowingEngine extends InternalEngine { @Override protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException { preFlight(delete); - markSeqNoAsSeen(delete.seqNo()); if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) { // See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation. final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( @@ -126,6 +123,19 @@ public final class FollowingEngine extends InternalEngine { } } + @Override + protected long generateSeqNoForOperationOnPrimary(final Operation operation) { + assert operation.origin() == Operation.Origin.PRIMARY; + assert operation.seqNo() >= 0 : "ops should have an assigned seq no. but was: " + operation.seqNo(); + markSeqNoAsSeen(operation.seqNo()); // even though we're not generating a sequence number, we mark it as seen + return operation.seqNo(); + } + + @Override + protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) { + // ignore, this is not really a primary + } + @Override public int fillSeqNoGaps(long primaryTerm) throws IOException { // a noop implementation, because follow shard does not own the history but the leader shard does.