Relax maxSeqNoOfUpdates assertion in FollowingEngine (#47188)
We disable MSU optimization if the local checkpoint is smaller than max_seq_no_of_updates. Hence, we need to relax the MSU assertion in FollowingEngine for that scenario. Suppose the leader has three operations: index-0, delete-1, and index-2 for the same doc Id. MSU on the leader is 1 as index-2 is an append. If the follower applies index-0 then index-2, then the assertion is violated. Closes #47137
This commit is contained in:
parent
e01465eb88
commit
444b47ce88
|
@ -23,6 +23,7 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer;
|
|||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||
import org.elasticsearch.index.analysis.AnalyzerScope;
|
||||
import org.elasticsearch.index.analysis.IndexAnalyzers;
|
||||
|
@ -117,20 +118,23 @@ public class TranslogHandler implements Engine.TranslogRecoveryRunner {
|
|||
return opsRecovered;
|
||||
}
|
||||
|
||||
private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
|
||||
public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
|
||||
// If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.
|
||||
final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null;
|
||||
switch (operation.opType()) {
|
||||
case INDEX:
|
||||
final Translog.Index index = (Translog.Index) operation;
|
||||
final String indexName = mapperService.index().getName();
|
||||
final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
|
||||
new SourceToParse(indexName, index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source()),
|
||||
index.routing()), index.seqNo(), index.primaryTerm(),
|
||||
index.version(), null, origin, index.getAutoGeneratedIdTimestamp(), true, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
index.routing()), index.seqNo(), index.primaryTerm(), index.version(), versionType, origin,
|
||||
index.getAutoGeneratedIdTimestamp(), true, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
|
||||
return engineIndex;
|
||||
case DELETE:
|
||||
final Translog.Delete delete = (Translog.Delete) operation;
|
||||
final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
|
||||
delete.primaryTerm(), delete.version(), null, origin, System.nanoTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
|
||||
delete.primaryTerm(), delete.version(), versionType, origin, System.nanoTime(),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
|
||||
return engineDelete;
|
||||
case NO_OP:
|
||||
final Translog.NoOp noOp = (Translog.NoOp) operation;
|
||||
|
|
|
@ -16,6 +16,7 @@ import org.apache.lucene.search.DocValuesFieldExistsQuery;
|
|||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.elasticsearch.Assertions;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
|
@ -117,7 +118,12 @@ public final class FollowingEngine extends InternalEngine {
|
|||
|
||||
@Override
|
||||
protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) {
|
||||
assert getMaxSeqNoOfUpdatesOrDeletes() >= seqNo : seqNo + " < " + getMaxSeqNoOfUpdatesOrDeletes();
|
||||
if (Assertions.ENABLED) {
|
||||
final long localCheckpoint = getProcessedLocalCheckpoint();
|
||||
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
|
||||
assert localCheckpoint < maxSeqNoOfUpdates || maxSeqNoOfUpdates >= seqNo :
|
||||
"maxSeqNoOfUpdates is not advanced local_checkpoint=" + localCheckpoint + " msu=" + maxSeqNoOfUpdates + " seq_no=" + seqNo;
|
||||
}
|
||||
super.advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(seqNo); // extra safe in production code
|
||||
}
|
||||
|
||||
|
|
|
@ -484,7 +484,7 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes()));
|
||||
assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(leader.getMaxSeqNoOfUpdatesOrDeletes()));
|
||||
assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
|
||||
EngineTestCase.assertConsistentHistoryBetweenTranslogAndLuceneIndex(follower, createMapperService("test"));
|
||||
EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(follower);
|
||||
|
@ -535,7 +535,12 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
try (Translog.Snapshot snapshot =
|
||||
shuffleSnapshot(leader.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true))) {
|
||||
follower.advanceMaxSeqNoOfUpdatesOrDeletes(leader.getMaxSeqNoOfUpdatesOrDeletes());
|
||||
translogHandler.run(follower, snapshot);
|
||||
Translog.Operation op;
|
||||
while ((op = snapshot.next()) != null) {
|
||||
EngineTestCase.applyOperation(follower,
|
||||
translogHandler.convertToEngineOp(op, randomFrom(Engine.Operation.Origin.values())));
|
||||
}
|
||||
follower.syncTranslog();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue