CCR: Following primary should process NoOps once (#34408)
This is a follow-up for #34288. Relates #34412
This commit is contained in:
parent
ba87c543c0
commit
d90b6730c7
|
@ -60,6 +60,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
|
|||
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
|
@ -155,6 +156,7 @@ public class InternalEngine extends Engine {
|
|||
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
|
||||
|
||||
private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
|
||||
private final KeyedLock<Long> noOpKeyedLock = new KeyedLock<>();
|
||||
|
||||
@Nullable
|
||||
private final String historyUUID;
|
||||
|
@ -1407,32 +1409,42 @@ public class InternalEngine extends Engine {
|
|||
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
|
||||
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
|
||||
final long seqNo = noOp.seqNo();
|
||||
try {
|
||||
Exception failure = null;
|
||||
if (softDeleteEnabled) {
|
||||
try {
|
||||
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason());
|
||||
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
|
||||
// A noop tombstone does not require a _version but it's added to have a fully dense docvalues for the version field.
|
||||
// 1L is selected to optimize the compression because it might probably be the most common value in version field.
|
||||
tombstone.version().setLongValue(1L);
|
||||
assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]";
|
||||
final ParseContext.Document doc = tombstone.docs().get(0);
|
||||
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null
|
||||
: "Noop tombstone document but _tombstone field is not set [" + doc + " ]";
|
||||
doc.add(softDeletesField);
|
||||
indexWriter.addDocument(doc);
|
||||
} catch (Exception ex) {
|
||||
if (maybeFailEngine("noop", ex)) {
|
||||
throw ex;
|
||||
try (Releasable ignored = noOpKeyedLock.acquire(seqNo)) {
|
||||
final NoOpResult noOpResult;
|
||||
final Optional<Exception> preFlightError = preFlightCheckForNoOp(noOp);
|
||||
if (preFlightError.isPresent()) {
|
||||
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), preFlightError.get());
|
||||
} else {
|
||||
Exception failure = null;
|
||||
if (softDeleteEnabled) {
|
||||
try {
|
||||
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason());
|
||||
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
|
||||
// A noop tombstone does not require a _version but it's added to have a fully dense docvalues for the version field.
|
||||
// 1L is selected to optimize the compression because it might probably be the most common value in version field.
|
||||
tombstone.version().setLongValue(1L);
|
||||
assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]";
|
||||
final ParseContext.Document doc = tombstone.docs().get(0);
|
||||
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null
|
||||
: "Noop tombstone document but _tombstone field is not set [" + doc + " ]";
|
||||
doc.add(softDeletesField);
|
||||
indexWriter.addDocument(doc);
|
||||
} catch (Exception ex) {
|
||||
if (maybeFailEngine("noop", ex)) {
|
||||
throw ex;
|
||||
}
|
||||
failure = ex;
|
||||
}
|
||||
failure = ex;
|
||||
}
|
||||
}
|
||||
final NoOpResult noOpResult = failure != null ? new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure) : new NoOpResult(getPrimaryTerm(), noOp.seqNo());
|
||||
if (noOp.origin().isFromTranslog() == false) {
|
||||
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
|
||||
noOpResult.setTranslogLocation(location);
|
||||
if (failure == null) {
|
||||
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo());
|
||||
} else {
|
||||
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure);
|
||||
}
|
||||
if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) {
|
||||
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
|
||||
noOpResult.setTranslogLocation(location);
|
||||
}
|
||||
}
|
||||
noOpResult.setTook(System.nanoTime() - noOp.startTime());
|
||||
noOpResult.freeze();
|
||||
|
@ -1444,6 +1456,14 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a pre-flight check for a given NoOp.
|
||||
* If this method returns a non-empty result, the engine won't process this NoOp and returns a failure.
|
||||
*/
|
||||
protected Optional<Exception> preFlightCheckForNoOp(final NoOp noOp) throws IOException {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refresh(String source) throws EngineException {
|
||||
refresh(source, SearcherScope.EXTERNAL);
|
||||
|
@ -2354,8 +2374,14 @@ public class InternalEngine extends Engine {
|
|||
* @return true if the given operation was processed; otherwise false.
|
||||
*/
|
||||
protected final boolean hasBeenProcessedBefore(Operation op) {
|
||||
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no";
|
||||
assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes());
|
||||
if (Assertions.ENABLED) {
|
||||
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no";
|
||||
if (op.operationType() == Operation.TYPE.NO_OP) {
|
||||
assert noOpKeyedLock.isHeldByCurrentThread(op.seqNo());
|
||||
} else {
|
||||
assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes());
|
||||
}
|
||||
}
|
||||
return localCheckpointTracker.contains(op.seqNo());
|
||||
}
|
||||
|
||||
|
|
|
@ -2978,6 +2978,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
private void maybeThrowFailure() throws IOException {
|
||||
if (failureToThrow.get() != null) {
|
||||
Exception failure = failureToThrow.get().get();
|
||||
clearFailure(); // one shot
|
||||
if (failure instanceof RuntimeException) {
|
||||
throw (RuntimeException) failure;
|
||||
} else if (failure instanceof IOException) {
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
|
|||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
import java.util.OptionalLong;
|
||||
|
||||
/**
|
||||
|
@ -111,9 +112,14 @@ public final class FollowingEngine extends InternalEngine {
|
|||
}
|
||||
|
||||
@Override
|
||||
public NoOpResult noOp(NoOp noOp) {
|
||||
// TODO: Make sure we process NoOp once.
|
||||
return super.noOp(noOp);
|
||||
protected Optional<Exception> preFlightCheckForNoOp(NoOp noOp) throws IOException {
|
||||
if (noOp.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(noOp)) {
|
||||
// See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
|
||||
final OptionalLong existingTerm = lookupPrimaryTerm(noOp.seqNo());
|
||||
return Optional.of(new AlreadyProcessedFollowingEngineException(shardId, noOp.seqNo(), existingTerm));
|
||||
} else {
|
||||
return super.preFlightCheckForNoOp(noOp);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineFactory;
|
||||
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
|
@ -240,6 +241,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
|||
followerGroup.startAll();
|
||||
leaderGroup.appendDocs(between(10, 100));
|
||||
leaderGroup.refresh("test");
|
||||
for (int numNoOps = between(1, 10), i = 0; i < numNoOps; i++) {
|
||||
long seqNo = leaderGroup.getPrimary().seqNoStats().getMaxSeqNo() + 1;
|
||||
Engine.NoOp noOp = new Engine.NoOp(seqNo, leaderGroup.getPrimary().getOperationPrimaryTerm(),
|
||||
Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), "test-" + i);
|
||||
for (IndexShard shard : leaderGroup) {
|
||||
getEngine(shard).noOp(noOp);
|
||||
}
|
||||
}
|
||||
for (String deleteId : randomSubsetOf(IndexShardTestCase.getShardDocUIDs(leaderGroup.getPrimary()))) {
|
||||
BulkItemResponse resp = leaderGroup.delete(new DeleteRequest("test", "type", deleteId));
|
||||
assertThat(resp.getFailure(), nullValue());
|
||||
|
@ -276,11 +285,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
|||
SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
|
||||
shardFollowTask.start(followerGroup.getPrimary().getHistoryUUID(), leadingPrimary.getGlobalCheckpoint(),
|
||||
leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo());
|
||||
assertBusy(() -> {
|
||||
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint()));
|
||||
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
|
||||
});
|
||||
shardFollowTask.markAsCompleted();
|
||||
try {
|
||||
assertBusy(() -> {
|
||||
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint()));
|
||||
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
|
||||
});
|
||||
} finally {
|
||||
shardFollowTask.markAsCompleted();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -103,8 +103,10 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
|
|||
final Translog.Operation op;
|
||||
if (randomBoolean()) {
|
||||
op = new Translog.Index("_doc", id, seqno++, primaryTerm, 0, SOURCE, null, -1);
|
||||
} else {
|
||||
} else if (randomBoolean()) {
|
||||
op = new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), seqno++, primaryTerm, 0);
|
||||
} else {
|
||||
op = new Translog.NoOp(seqno++, primaryTerm, "test-" + i);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
firstBulk.add(op);
|
||||
|
|
|
@ -306,7 +306,7 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
|
||||
private Engine.Result applyOperation(Engine engine, Engine.Operation op,
|
||||
long primaryTerm, Engine.Operation.Origin origin) throws IOException {
|
||||
final VersionType versionType = origin == Engine.Operation.Origin.PRIMARY ? op.versionType() : null;
|
||||
final VersionType versionType = origin == Engine.Operation.Origin.PRIMARY ? VersionType.EXTERNAL : null;
|
||||
final Engine.Result result;
|
||||
if (op instanceof Engine.Index) {
|
||||
Engine.Index index = (Engine.Index) op;
|
||||
|
@ -572,9 +572,12 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
if (randomBoolean()) {
|
||||
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L,
|
||||
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true));
|
||||
} else {
|
||||
} else if (randomBoolean()) {
|
||||
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), i, primaryTerm.get(), 1L,
|
||||
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis()));
|
||||
} else {
|
||||
operations.add(new Engine.NoOp(i, primaryTerm.get(), Engine.Operation.Origin.PRIMARY,
|
||||
threadPool.relativeTimeInMillis(), "test-" + i));
|
||||
}
|
||||
}
|
||||
Randomness.shuffle(operations);
|
||||
|
|
Loading…
Reference in New Issue