Remove pre 6.0.0 support from InternalEngine (#27720)
This removes special casing for documents without a sequence ID. This code is complex enough with seq IDs we should clean up things when we can and we don't support 5.x indexing in 7.x anymore
This commit is contained in:
parent
22e294ce6d
commit
ebb93db010
|
@ -700,13 +700,9 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
|
||||
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) && origin == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||
// legacy support
|
||||
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" +
|
||||
" index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ", seqNo: " + seqNo;
|
||||
} else if (origin == Operation.Origin.PRIMARY) {
|
||||
if (origin == Operation.Origin.PRIMARY) {
|
||||
assert assertOriginPrimarySequenceNumber(seqNo);
|
||||
} else if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1)) {
|
||||
} else {
|
||||
// sequence number should be set when operation origin is not primary
|
||||
assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin;
|
||||
}
|
||||
|
@ -720,15 +716,6 @@ public class InternalEngine extends Engine {
|
|||
return true;
|
||||
}
|
||||
|
||||
private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) {
|
||||
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1) ||
|
||||
origin == Operation.Origin.PRIMARY) {
|
||||
// sequence number should be set when operation origin is primary or when all shards are on new nodes
|
||||
assert seqNo >= 0 : "ops should have an assigned seq no.; origin: " + origin;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private long generateSeqNoForOperation(final Operation operation) {
|
||||
assert operation.origin() == Operation.Origin.PRIMARY;
|
||||
return doGenerateSeqNoForOperation(operation);
|
||||
|
@ -842,13 +829,7 @@ public class InternalEngine extends Engine {
|
|||
// this allows to ignore the case where a document was found in the live version maps in
|
||||
// a delete state and return false for the created flag in favor of code simplicity
|
||||
final OpVsLuceneDocStatus opVsLucene;
|
||||
if (index.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
// This can happen if the primary is still on an old node and send traffic without seq# or we recover from translog
|
||||
// created by an old version.
|
||||
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) :
|
||||
"index is newly created but op has no sequence numbers. op: " + index;
|
||||
opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
|
||||
} else if (index.seqNo() <= seqNoService.getLocalCheckpoint()){
|
||||
if (index.seqNo() <= seqNoService.getLocalCheckpoint()){
|
||||
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
|
||||
// this can happen during recovery where older operations are sent from the translog that are already
|
||||
// part of the lucene commit (either from a peer recovery or a local translog)
|
||||
|
@ -910,7 +891,7 @@ public class InternalEngine extends Engine {
|
|||
|
||||
private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
|
||||
throws IOException {
|
||||
assert assertSequenceNumberBeforeIndexing(index.origin(), plan.seqNoForIndexing);
|
||||
assert plan.seqNoForIndexing >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
|
||||
assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing;
|
||||
assert plan.indexIntoLucene;
|
||||
/* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
|
||||
|
@ -1135,11 +1116,7 @@ public class InternalEngine extends Engine {
|
|||
// this allows to ignore the case where a document was found in the live version maps in
|
||||
// a delete state and return true for the found flag in favor of code simplicity
|
||||
final OpVsLuceneDocStatus opVsLucene;
|
||||
if (delete.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) :
|
||||
"index is newly created but op has no sequence numbers. op: " + delete;
|
||||
opVsLucene = compareOpToLuceneDocBasedOnVersions(delete);
|
||||
} else if (delete.seqNo() <= seqNoService.getLocalCheckpoint()) {
|
||||
if (delete.seqNo() <= seqNoService.getLocalCheckpoint()) {
|
||||
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
|
||||
// this can happen during recovery where older operations are sent from the translog that are already
|
||||
// part of the lucene commit (either from a peer recovery or a local translog)
|
||||
|
|
|
@ -1353,24 +1353,6 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertOpsOnReplica(ops, replicaEngine, true);
|
||||
}
|
||||
|
||||
public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException {
|
||||
IndexSettings oldSettings = IndexSettingsModule.newIndexSettings("testOld", Settings.builder()
|
||||
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us
|
||||
.put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName)
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_5_4_0)
|
||||
.put(IndexSettings.INDEX_MAPPING_SINGLE_TYPE_SETTING_KEY, true)
|
||||
.put(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(),
|
||||
between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY)))
|
||||
.build());
|
||||
|
||||
try (Store oldReplicaStore = createStore();
|
||||
InternalEngine replicaEngine =
|
||||
createEngine(oldSettings, oldReplicaStore, createTempDir("translog-old-replica"), newMergePolicy())) {
|
||||
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), true, 2, 2, 20);
|
||||
assertOpsOnReplica(ops, replicaEngine, true);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException {
|
||||
final Engine.Operation lastOp = ops.get(ops.size() - 1);
|
||||
final String lastFieldValue;
|
||||
|
|
Loading…
Reference in New Issue