Do not mutate engine during planning step (#39571)

This cleans up the Engine implementation by separating the sequence number generation from the
planning step in the engine, to avoid for the planning step to have any side effects. This makes it
easier to see that every sequence number is properly accounted for.
This commit is contained in:
Yannick Welsch 2019-03-04 10:10:25 +01:00
parent 0550ead176
commit 0f65390c29
2 changed files with 108 additions and 92 deletions

View File

@ -799,11 +799,17 @@ public class InternalEngine extends Engine {
return true; return true;
} }
private long generateSeqNoForOperation(final Operation operation) { protected long generateSeqNoForOperationOnPrimary(final Operation operation) {
assert operation.origin() == Operation.Origin.PRIMARY; assert operation.origin() == Operation.Origin.PRIMARY;
assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO :
"ops should not have an assigned seq no. but was: " + operation.seqNo();
return doGenerateSeqNoForOperation(operation); return doGenerateSeqNoForOperation(operation);
} }
protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) {
advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
}
/** /**
* Generate the sequence number for the specified operation. * Generate the sequence number for the specified operation.
* *
@ -860,11 +866,29 @@ public class InternalEngine extends Engine {
if (plan.earlyResultOnPreFlightError.isPresent()) { if (plan.earlyResultOnPreFlightError.isPresent()) {
indexResult = plan.earlyResultOnPreFlightError.get(); indexResult = plan.earlyResultOnPreFlightError.get();
assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType(); assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
} else if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
indexResult = indexIntoLucene(index, plan);
} else { } else {
indexResult = new IndexResult( // generate or register sequence number
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); if (index.origin() == Operation.Origin.PRIMARY) {
index = new Index(index.uid(), index.parsedDoc(), generateSeqNoForOperationOnPrimary(index), index.primaryTerm(),
index.version(), index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(),
index.isRetry(), index.getIfSeqNo(), index.getIfPrimaryTerm());
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
if (toAppend == false) {
advanceMaxSeqNoOfUpdatesOrDeletes(index.seqNo());
}
} else {
markSeqNoAsSeen(index.seqNo());
}
assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
indexResult = indexIntoLucene(index, plan);
} else {
indexResult = new IndexResult(
plan.versionForIndexing, getPrimaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
}
} }
if (index.origin().isFromTranslog() == false) { if (index.origin().isFromTranslog() == false) {
final Translog.Location location; final Translog.Location location;
@ -883,11 +907,9 @@ public class InternalEngine extends Engine {
if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) { if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null; final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
versionMap.maybePutIndexUnderLock(index.uid().bytes(), versionMap.maybePutIndexUnderLock(index.uid().bytes(),
new IndexVersionValue(translogLocation, plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm())); new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm()));
}
if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo());
} }
localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo());
indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze(); indexResult.freeze();
return indexResult; return indexResult;
@ -916,7 +938,7 @@ public class InternalEngine extends Engine {
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization. * requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
*/ */
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo(), 1L); plan = IndexingStrategy.optimizedAppendOnly(1L);
} else { } else {
if (appendOnlyRequest == false) { if (appendOnlyRequest == false) {
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr)); maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
@ -935,18 +957,17 @@ public class InternalEngine extends Engine {
// question may have been deleted in an out of order op that is not replayed. // question may have been deleted in an out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery
plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); plan = IndexingStrategy.processButSkipLucene(false, index.version());
} else { } else {
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version()); plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version());
} else { } else {
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
index.seqNo(), index.version()); index.version());
} }
} }
} }
markSeqNoAsSeen(index.seqNo());
return plan; return plan;
} }
@ -966,10 +987,10 @@ public class InternalEngine extends Engine {
// resolve an external operation into an internal one which is safe to replay // resolve an external operation into an internal one which is safe to replay
if (canOptimizeAddDocument(index)) { if (canOptimizeAddDocument(index)) {
if (mayHaveBeenIndexedBefore(index)) { if (mayHaveBeenIndexedBefore(index)) {
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L); plan = IndexingStrategy.overrideExistingAsIfNotThere(1L);
versionMap.enforceSafeAccess(); versionMap.enforceSafeAccess();
} else { } else {
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index), 1L); plan = IndexingStrategy.optimizedAppendOnly(1L);
} }
} else { } else {
versionMap.enforceSafeAccess(); versionMap.enforceSafeAccess();
@ -1002,41 +1023,36 @@ public class InternalEngine extends Engine {
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm()); plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
} else { } else {
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
generateSeqNoForOperation(index),
index.versionType().updateVersion(currentVersion, index.version()) index.versionType().updateVersion(currentVersion, index.version())
); );
} }
} }
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
if (toAppend == false) {
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing);
}
return plan; return plan;
} }
private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
throws IOException { throws IOException {
assert plan.seqNoForIndexing >= 0 : "ops should have an assigned seq no.; origin: " + index.origin(); assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing; assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing;
assert plan.indexIntoLucene || plan.addStaleOpToLucene; assert plan.indexIntoLucene || plan.addStaleOpToLucene;
/* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence /* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
*/ */
index.parsedDoc().updateSeqID(plan.seqNoForIndexing, index.primaryTerm()); index.parsedDoc().updateSeqID(index.seqNo(), index.primaryTerm());
index.parsedDoc().version().setLongValue(plan.versionForIndexing); index.parsedDoc().version().setLongValue(plan.versionForIndexing);
try { try {
if (plan.addStaleOpToLucene) { if (plan.addStaleOpToLucene) {
addStaleDocs(index.docs(), indexWriter); addStaleDocs(index.docs(), indexWriter);
} else if (plan.useLuceneUpdateDocument) { } else if (plan.useLuceneUpdateDocument) {
assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), plan.seqNoForIndexing, true, true); assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), index.seqNo(), true, true);
updateDocs(index.uid(), index.docs(), indexWriter); updateDocs(index.uid(), index.docs(), indexWriter);
} else { } else {
// document does not exists, we can optimize for create, but double check if assertions are running // document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
addDocs(index.docs(), indexWriter); addDocs(index.docs(), indexWriter);
} }
return new IndexResult(plan.versionForIndexing, index.primaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
} catch (Exception ex) { } catch (Exception ex) {
if (indexWriter.getTragicException() == null) { if (indexWriter.getTragicException() == null) {
/* There is no tragic event recorded so this must be a document failure. /* There is no tragic event recorded so this must be a document failure.
@ -1052,7 +1068,7 @@ public class InternalEngine extends Engine {
* we return a `MATCH_ANY` version to indicate no document was index. The value is * we return a `MATCH_ANY` version to indicate no document was index. The value is
* not used anyway * not used anyway
*/ */
return new IndexResult(ex, Versions.MATCH_ANY, index.primaryTerm(), plan.seqNoForIndexing); return new IndexResult(ex, Versions.MATCH_ANY, index.primaryTerm(), index.seqNo());
} else { } else {
throw ex; throw ex;
} }
@ -1109,14 +1125,13 @@ public class InternalEngine extends Engine {
protected static final class IndexingStrategy { protected static final class IndexingStrategy {
final boolean currentNotFoundOrDeleted; final boolean currentNotFoundOrDeleted;
final boolean useLuceneUpdateDocument; final boolean useLuceneUpdateDocument;
final long seqNoForIndexing;
final long versionForIndexing; final long versionForIndexing;
final boolean indexIntoLucene; final boolean indexIntoLucene;
final boolean addStaleOpToLucene; final boolean addStaleOpToLucene;
final Optional<IndexResult> earlyResultOnPreFlightError; final Optional<IndexResult> earlyResultOnPreFlightError;
private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument, private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument,
boolean indexIntoLucene, boolean addStaleOpToLucene, long seqNoForIndexing, boolean indexIntoLucene, boolean addStaleOpToLucene,
long versionForIndexing, IndexResult earlyResultOnPreFlightError) { long versionForIndexing, IndexResult earlyResultOnPreFlightError) {
assert useLuceneUpdateDocument == false || indexIntoLucene : assert useLuceneUpdateDocument == false || indexIntoLucene :
"use lucene update is set to true, but we're not indexing into lucene"; "use lucene update is set to true, but we're not indexing into lucene";
@ -1126,7 +1141,6 @@ public class InternalEngine extends Engine {
+ " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError; + " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError;
this.currentNotFoundOrDeleted = currentNotFoundOrDeleted; this.currentNotFoundOrDeleted = currentNotFoundOrDeleted;
this.useLuceneUpdateDocument = useLuceneUpdateDocument; this.useLuceneUpdateDocument = useLuceneUpdateDocument;
this.seqNoForIndexing = seqNoForIndexing;
this.versionForIndexing = versionForIndexing; this.versionForIndexing = versionForIndexing;
this.indexIntoLucene = indexIntoLucene; this.indexIntoLucene = indexIntoLucene;
this.addStaleOpToLucene = addStaleOpToLucene; this.addStaleOpToLucene = addStaleOpToLucene;
@ -1135,9 +1149,8 @@ public class InternalEngine extends Engine {
Optional.of(earlyResultOnPreFlightError); Optional.of(earlyResultOnPreFlightError);
} }
public static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing, long versionForIndexing) { public static IndexingStrategy optimizedAppendOnly(long versionForIndexing) {
return new IndexingStrategy(true, false, true, return new IndexingStrategy(true, false, true, false, versionForIndexing, null);
false, seqNoForIndexing, versionForIndexing, null);
} }
public static IndexingStrategy skipDueToVersionConflict( public static IndexingStrategy skipDueToVersionConflict(
@ -1145,30 +1158,28 @@ public class InternalEngine extends Engine {
final IndexResult result = new IndexResult(e, currentVersion, term); final IndexResult result = new IndexResult(e, currentVersion, term);
return new IndexingStrategy( return new IndexingStrategy(
currentNotFoundOrDeleted, false, false, false, currentNotFoundOrDeleted, false, false, false,
SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result); Versions.NOT_FOUND, result);
} }
static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted,
long seqNoForIndexing, long versionForIndexing) { long versionForIndexing) {
return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false, return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false,
true, false, seqNoForIndexing, versionForIndexing, null); true, false, versionForIndexing, null);
} }
static IndexingStrategy overrideExistingAsIfNotThere( static IndexingStrategy overrideExistingAsIfNotThere(long versionForIndexing) {
long seqNoForIndexing, long versionForIndexing) {
return new IndexingStrategy(true, true, true, return new IndexingStrategy(true, true, true,
false, seqNoForIndexing, versionForIndexing, null); false, versionForIndexing, null);
} }
public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) {
long versionForIndexing) {
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, return new IndexingStrategy(currentNotFoundOrDeleted, false, false,
false, seqNoForIndexing, versionForIndexing, null); false, versionForIndexing, null);
} }
static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long seqNoForIndexing, long versionForIndexing) { static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long versionForIndexing) {
return new IndexingStrategy(false, false, false, return new IndexingStrategy(false, false, false,
addStaleOpToLucene, seqNoForIndexing, versionForIndexing, null); addStaleOpToLucene, versionForIndexing, null);
} }
} }
@ -1227,11 +1238,26 @@ public class InternalEngine extends Engine {
if (plan.earlyResultOnPreflightError.isPresent()) { if (plan.earlyResultOnPreflightError.isPresent()) {
deleteResult = plan.earlyResultOnPreflightError.get(); deleteResult = plan.earlyResultOnPreflightError.get();
} else if (plan.deleteFromLucene || plan.addStaleOpToLucene) {
deleteResult = deleteInLucene(delete, plan);
} else { } else {
deleteResult = new DeleteResult( // generate or register sequence number
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false); if (delete.origin() == Operation.Origin.PRIMARY) {
delete = new Delete(delete.type(), delete.id(), delete.uid(), generateSeqNoForOperationOnPrimary(delete),
delete.primaryTerm(), delete.version(), delete.versionType(), delete.origin(), delete.startTime(),
delete.getIfSeqNo(), delete.getIfPrimaryTerm());
advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(delete.seqNo());
} else {
markSeqNoAsSeen(delete.seqNo());
}
assert delete.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + delete.origin();
if (plan.deleteFromLucene || plan.addStaleOpToLucene) {
deleteResult = deleteInLucene(delete, plan);
} else {
deleteResult = new DeleteResult(
plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
}
} }
if (delete.origin().isFromTranslog() == false) { if (delete.origin().isFromTranslog() == false) {
final Translog.Location location; final Translog.Location location;
@ -1247,9 +1273,7 @@ public class InternalEngine extends Engine {
} }
deleteResult.setTranslogLocation(location); deleteResult.setTranslogLocation(location);
} }
if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { localCheckpointTracker.markSeqNoAsCompleted(deleteResult.getSeqNo());
localCheckpointTracker.markSeqNoAsCompleted(deleteResult.getSeqNo());
}
deleteResult.setTook(System.nanoTime() - delete.startTime()); deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze(); deleteResult.freeze();
} catch (RuntimeException | IOException e) { } catch (RuntimeException | IOException e) {
@ -1290,17 +1314,15 @@ public class InternalEngine extends Engine {
// question may have been deleted in an out of order op that is not replayed. // question may have been deleted in an out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery
plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); plan = DeletionStrategy.processButSkipLucene(false, delete.version());
} else { } else {
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.version());
} else { } else {
plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version());
delete.seqNo(), delete.version());
} }
} }
markSeqNoAsSeen(delete.seqNo());
return plan; return plan;
} }
@ -1339,22 +1361,18 @@ public class InternalEngine extends Engine {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted); final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted); plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
} else { } else {
plan = DeletionStrategy.processNormally( plan = DeletionStrategy.processNormally(currentlyDeleted, delete.versionType().updateVersion(currentVersion, delete.version()));
currentlyDeleted,
generateSeqNoForOperation(delete),
delete.versionType().updateVersion(currentVersion, delete.version()));
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoOfDeletion);
} }
return plan; return plan;
} }
private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException { private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException {
assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), plan.seqNoOfDeletion, false, false); assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), delete.seqNo(), false, false);
try { try {
if (softDeleteEnabled) { if (softDeleteEnabled) {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id()); final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id());
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm()); tombstone.updateSeqID(delete.seqNo(), delete.primaryTerm());
tombstone.version().setLongValue(plan.versionOfDeletion); tombstone.version().setLongValue(plan.versionOfDeletion);
final ParseContext.Document doc = tombstone.docs().get(0); final ParseContext.Document doc = tombstone.docs().get(0);
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null :
@ -1373,16 +1391,16 @@ public class InternalEngine extends Engine {
if (plan.deleteFromLucene) { if (plan.deleteFromLucene) {
numDocDeletes.inc(); numDocDeletes.inc();
versionMap.putDeleteUnderLock(delete.uid().bytes(), versionMap.putDeleteUnderLock(delete.uid().bytes(),
new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(), new DeleteVersionValue(plan.versionOfDeletion, delete.seqNo(), delete.primaryTerm(),
engineConfig.getThreadPool().relativeTimeInMillis())); engineConfig.getThreadPool().relativeTimeInMillis()));
} }
return new DeleteResult( return new DeleteResult(
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false); plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
} catch (Exception ex) { } catch (Exception ex) {
if (indexWriter.getTragicException() == null) { if (indexWriter.getTragicException() == null) {
// there is no tragic event and such it must be a document level failure // there is no tragic event and such it must be a document level failure
return new DeleteResult( return new DeleteResult(
ex, plan.versionOfDeletion, delete.primaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false); ex, plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
} else { } else {
throw ex; throw ex;
} }
@ -1394,13 +1412,11 @@ public class InternalEngine extends Engine {
final boolean deleteFromLucene; final boolean deleteFromLucene;
final boolean addStaleOpToLucene; final boolean addStaleOpToLucene;
final boolean currentlyDeleted; final boolean currentlyDeleted;
final long seqNoOfDeletion;
final long versionOfDeletion; final long versionOfDeletion;
final Optional<DeleteResult> earlyResultOnPreflightError; final Optional<DeleteResult> earlyResultOnPreflightError;
private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, boolean currentlyDeleted, private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, boolean currentlyDeleted,
long seqNoOfDeletion, long versionOfDeletion, long versionOfDeletion, DeleteResult earlyResultOnPreflightError) {
DeleteResult earlyResultOnPreflightError) {
assert (deleteFromLucene && earlyResultOnPreflightError != null) == false : assert (deleteFromLucene && earlyResultOnPreflightError != null) == false :
"can only delete from lucene or have a preflight result but not both." + "can only delete from lucene or have a preflight result but not both." +
"deleteFromLucene: " + deleteFromLucene "deleteFromLucene: " + deleteFromLucene
@ -1408,7 +1424,6 @@ public class InternalEngine extends Engine {
this.deleteFromLucene = deleteFromLucene; this.deleteFromLucene = deleteFromLucene;
this.addStaleOpToLucene = addStaleOpToLucene; this.addStaleOpToLucene = addStaleOpToLucene;
this.currentlyDeleted = currentlyDeleted; this.currentlyDeleted = currentlyDeleted;
this.seqNoOfDeletion = seqNoOfDeletion;
this.versionOfDeletion = versionOfDeletion; this.versionOfDeletion = versionOfDeletion;
this.earlyResultOnPreflightError = earlyResultOnPreflightError == null ? this.earlyResultOnPreflightError = earlyResultOnPreflightError == null ?
Optional.empty() : Optional.of(earlyResultOnPreflightError); Optional.empty() : Optional.of(earlyResultOnPreflightError);
@ -1418,26 +1433,20 @@ public class InternalEngine extends Engine {
VersionConflictEngineException e, long currentVersion, long term, boolean currentlyDeleted) { VersionConflictEngineException e, long currentVersion, long term, boolean currentlyDeleted) {
final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, term, unassignedSeqNo, currentlyDeleted == false); final DeleteResult deleteResult = new DeleteResult(e, currentVersion, term, unassignedSeqNo, currentlyDeleted == false);
return new DeletionStrategy(false, false, currentlyDeleted, unassignedSeqNo, return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, deleteResult);
Versions.NOT_FOUND, deleteResult);
} }
static DeletionStrategy processNormally(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) { static DeletionStrategy processNormally(boolean currentlyDeleted, long versionOfDeletion) {
return new DeletionStrategy(true, false, currentlyDeleted, seqNoOfDeletion, return new DeletionStrategy(true, false, currentlyDeleted, versionOfDeletion, null);
versionOfDeletion, null);
} }
public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long versionOfDeletion) {
long seqNoOfDeletion, long versionOfDeletion) { return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, null);
return new DeletionStrategy(false, false, currentlyDeleted, seqNoOfDeletion,
versionOfDeletion, null);
} }
static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, boolean currentlyDeleted, static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, boolean currentlyDeleted, long versionOfDeletion) {
long seqNoOfDeletion, long versionOfDeletion) { return new DeletionStrategy(false, addStaleOpToLucene, currentlyDeleted, versionOfDeletion, null);
return new DeletionStrategy(false, addStaleOpToLucene, currentlyDeleted, seqNoOfDeletion,
versionOfDeletion, null);
} }
} }
@ -1456,7 +1465,6 @@ public class InternalEngine extends Engine {
final NoOpResult noOpResult; final NoOpResult noOpResult;
try (ReleasableLock ignored = readLock.acquire()) { try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen(); ensureOpen();
markSeqNoAsSeen(noOp.seqNo());
noOpResult = innerNoOp(noOp); noOpResult = innerNoOp(noOp);
} catch (final Exception e) { } catch (final Exception e) {
try { try {
@ -1480,6 +1488,7 @@ public class InternalEngine extends Engine {
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), preFlightError.get()); noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), preFlightError.get());
} else { } else {
Exception failure = null; Exception failure = null;
markSeqNoAsSeen(noOp.seqNo());
if (softDeleteEnabled) { if (softDeleteEnabled) {
try { try {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason()); final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason());
@ -1511,13 +1520,10 @@ public class InternalEngine extends Engine {
noOpResult.setTranslogLocation(location); noOpResult.setTranslogLocation(location);
} }
} }
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
noOpResult.setTook(System.nanoTime() - noOp.startTime()); noOpResult.setTook(System.nanoTime() - noOp.startTime());
noOpResult.freeze(); noOpResult.freeze();
return noOpResult; return noOpResult;
} finally {
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
}
} }
} }

View File

@ -68,7 +68,6 @@ public final class FollowingEngine extends InternalEngine {
@Override @Override
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
preFlight(index); preFlight(index);
markSeqNoAsSeen(index.seqNo());
// NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers. // NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers.
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
@ -89,13 +88,12 @@ public final class FollowingEngine extends InternalEngine {
shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo())); shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo()));
return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm()); return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm());
} else { } else {
return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); return IndexingStrategy.processButSkipLucene(false, index.version());
} }
} else if (maxSeqNoOfUpdatesOrDeletes <= getLocalCheckpoint()) { } else if (maxSeqNoOfUpdatesOrDeletes <= getLocalCheckpoint()) {
assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]"; assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]";
numOfOptimizedIndexing.inc(); numOfOptimizedIndexing.inc();
return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.seqNo(), index.version()); return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.version());
} else { } else {
return planIndexingAsNonPrimary(index); return planIndexingAsNonPrimary(index);
} }
@ -104,7 +102,6 @@ public final class FollowingEngine extends InternalEngine {
@Override @Override
protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException { protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException {
preFlight(delete); preFlight(delete);
markSeqNoAsSeen(delete.seqNo());
if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) { if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) {
// See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation. // See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(
@ -126,6 +123,19 @@ public final class FollowingEngine extends InternalEngine {
} }
} }
@Override
protected long generateSeqNoForOperationOnPrimary(final Operation operation) {
assert operation.origin() == Operation.Origin.PRIMARY;
assert operation.seqNo() >= 0 : "ops should have an assigned seq no. but was: " + operation.seqNo();
markSeqNoAsSeen(operation.seqNo()); // even though we're not generating a sequence number, we mark it as seen
return operation.seqNo();
}
@Override
protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) {
// ignore, this is not really a primary
}
@Override @Override
public int fillSeqNoGaps(long primaryTerm) throws IOException { public int fillSeqNoGaps(long primaryTerm) throws IOException {
// a noop implementation, because follow shard does not own the history but the leader shard does. // a noop implementation, because follow shard does not own the history but the leader shard does.