Always use primary term of operation in InternalEngine (#45083)

We keep adding the current primary term to operations for which we do not assign a sequence
number. This does not make sense anymore as all operations which we care about have
sequence numbers now. The goal of this commit is to clean things up in InternalEngine and
reduce the complexity.
This commit is contained in:
Yannick Welsch 2019-08-01 17:29:31 +02:00
parent 48dc53f8d2
commit 917510d3e4
7 changed files with 29 additions and 31 deletions

View File

@ -212,7 +212,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
final Engine.Result result =
new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO);
new Engine.IndexResult(failure, updateRequest.version());
context.setRequestToExecute(updateRequest);
context.markOperationAsExecuted(result);
context.markAsCompleted(context.getExecutionResult());

View File

@ -518,8 +518,8 @@ public abstract class Engine implements Closeable {
/**
* use in case of the index operation failed before getting to internal engine
**/
public IndexResult(Exception failure, long version, long term) {
this(failure, version, term, UNASSIGNED_SEQ_NO);
public IndexResult(Exception failure, long version) {
this(failure, version, UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO);
}
public IndexResult(Exception failure, long version, long term, long seqNo) {

View File

@ -832,10 +832,6 @@ public class InternalEngine extends Engine {
return localCheckpointTracker.generateSeqNo();
}
private long getPrimaryTerm() {
return engineConfig.getPrimaryTermSupplier().getAsLong();
}
@Override
public IndexResult index(Index index) throws IOException {
assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
@ -899,7 +895,7 @@ public class InternalEngine extends Engine {
indexResult = indexIntoLucene(index, plan);
} else {
indexResult = new IndexResult(
plan.versionForIndexing, getPrimaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
}
}
if (index.origin().isFromTranslog() == false) {
@ -1017,19 +1013,20 @@ public class InternalEngine extends Engine {
}
if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index.id(),
index.getIfSeqNo(), index.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
plan = IndexingStrategy.skipDueToVersionConflict(e, true, currentVersion, getPrimaryTerm());
index.getIfSeqNo(), index.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
plan = IndexingStrategy.skipDueToVersionConflict(e, true, currentVersion);
} else if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != index.getIfSeqNo() || versionValue.term != index.getIfPrimaryTerm()
)) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index.id(),
index.getIfSeqNo(), index.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else if (index.versionType().isVersionConflictForWrites(
currentVersion, index.version(), currentNotFoundOrDeleted)) {
final VersionConflictEngineException e =
new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else {
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
index.versionType().updateVersion(currentVersion, index.version())
@ -1171,8 +1168,8 @@ public class InternalEngine extends Engine {
}
public static IndexingStrategy skipDueToVersionConflict(
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion, long term) {
final IndexResult result = new IndexResult(e, currentVersion, term);
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) {
final IndexResult result = new IndexResult(e, currentVersion);
return new IndexingStrategy(
currentNotFoundOrDeleted, false, false, false,
Versions.NOT_FOUND, result);
@ -1273,7 +1270,7 @@ public class InternalEngine extends Engine {
deleteResult = deleteInLucene(delete, plan);
} else {
deleteResult = new DeleteResult(
plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
}
}
if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) {
@ -1354,17 +1351,17 @@ public class InternalEngine extends Engine {
final DeletionStrategy plan;
if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete.id(),
delete.getIfSeqNo(), delete.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), true);
delete.getIfSeqNo(), delete.getIfPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, true);
} else if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionValue.seqNo != delete.getIfSeqNo() || versionValue.term != delete.getIfPrimaryTerm()
)) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete.id(),
delete.getIfSeqNo(), delete.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else {
plan = DeletionStrategy.processNormally(currentlyDeleted, delete.versionType().updateVersion(currentVersion, delete.version()));
}
@ -1400,7 +1397,7 @@ public class InternalEngine extends Engine {
engineConfig.getThreadPool().relativeTimeInMillis()));
}
return new DeleteResult(
plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
} catch (Exception ex) {
if (ex instanceof AlreadyClosedException == false && indexWriter.getTragicException() == null) {
throw new AssertionError("delete operation should never fail at document level", ex);
@ -1432,9 +1429,9 @@ public class InternalEngine extends Engine {
}
public static DeletionStrategy skipDueToVersionConflict(
VersionConflictEngineException e, long currentVersion, long term, boolean currentlyDeleted) {
final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, term, unassignedSeqNo, currentlyDeleted == false);
VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
SequenceNumbers.UNASSIGNED_SEQ_NO, currentlyDeleted == false);
return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, deleteResult);
}
@ -1487,7 +1484,8 @@ public class InternalEngine extends Engine {
final NoOpResult noOpResult;
final Optional<Exception> preFlightError = preFlightCheckForNoOp(noOp);
if (preFlightError.isPresent()) {
noOpResult = new NoOpResult(getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, preFlightError.get());
noOpResult = new NoOpResult(SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
SequenceNumbers.UNASSIGNED_SEQ_NO, preFlightError.get());
} else {
markSeqNoAsSeen(noOp.seqNo());
if (softDeleteEnabled && hasBeenProcessedBefore(noOp) == false) {
@ -1511,7 +1509,7 @@ public class InternalEngine extends Engine {
throw ex;
}
}
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo());
noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);

View File

@ -828,7 +828,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
public Engine.IndexResult getFailedIndexResult(Exception e, long version) {
return new Engine.IndexResult(e, version, getOperationPrimaryTerm());
return new Engine.IndexResult(e, version);
}
public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) {

View File

@ -60,7 +60,7 @@ public class BulkPrimaryExecutionContextTests extends ESTestCase {
visitedRequests.add(context.getCurrent());
context.setRequestToExecute(context.getCurrent());
// using failures prevents caring about types
context.markOperationAsExecuted(new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1));
context.markOperationAsExecuted(new Engine.IndexResult(new ElasticsearchException("bla"), 1));
context.markAsCompleted(context.getExecutionResult());
}
@ -122,7 +122,7 @@ public class BulkPrimaryExecutionContextTests extends ESTestCase {
case CREATE:
context.setRequestToExecute(current);
if (failure) {
result = new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1);
result = new Engine.IndexResult(new ElasticsearchException("bla"), 1);
} else {
result = new FakeIndexResult(1, 1, randomLongBetween(0, 200), randomBoolean(), location);
}

View File

@ -808,7 +808,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
Exception err = new VersionConflictEngineException(shardId, "id",
"I'm conflicted <(;_;)>");
Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0, 0);
Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0);
Engine.IndexResult mappingUpdate =
new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap()));
Translog.Location resultLocation = new Translog.Location(42, 42, 42);

View File

@ -77,7 +77,7 @@ public final class FollowingEngine extends InternalEngine {
*/
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(
shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo()));
return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm());
return IndexingStrategy.skipDueToVersionConflict(error, false, index.version());
} else {
return planIndexingAsNonPrimary(index);
}
@ -90,7 +90,7 @@ public final class FollowingEngine extends InternalEngine {
// See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(
shardId, delete.seqNo(), lookupPrimaryTerm(delete.seqNo()));
return DeletionStrategy.skipDueToVersionConflict(error, delete.version(), delete.primaryTerm(), false);
return DeletionStrategy.skipDueToVersionConflict(error, delete.version(), false);
} else {
return planDeletionAsNonPrimary(delete);
}