Simplify initialization of max_seq_no of updates (#41161)

Today we choose to initialize max_seq_no_of_updates on primaries only so
we can deal with a situation where a primary is on an old node (before
6.5) which does not have MUS while replicas on new nodes (6.5+).
However, this strategy is quite complex and can lead to bugs (for
example #40249) since we have to assign a correct value (not too low) to
MSU in all possible situations (before recovering from translog,
restoring history on promotion, and handing off relocation).

Fortunately, we don't have to deal with this BWC in 7.0+ since all nodes
in the cluster should have MSU. This change simplifies the
initialization of MSU by always assigning it a correct value in the
constructor of Engine regardless of whether it's a replica or primary.

Relates #33842
This commit is contained in:
Nhat Nguyen 2019-04-29 20:57:31 -04:00
parent d46f55f013
commit 887f3f2c83
11 changed files with 40 additions and 112 deletions

View File

@ -98,7 +98,6 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -142,16 +141,6 @@ public abstract class Engine implements Closeable {
*/
protected volatile long lastWriteNanos = System.nanoTime();
/*
* This marker tracks the max seq_no of either update operations or delete operations have been processed in this engine.
* An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index.
* This marker is started uninitialized (-2), and the optimization using seq_no will be disabled if this marker is uninitialized.
* The value of this marker never goes backwards, and is updated/changed differently on primary and replica:
* 1. A primary initializes this marker once using the max_seq_no from its history, then advances when processing an update or delete.
* 2. A replica never advances this marker by itself but only inherits from its primary (via advanceMaxSeqNoOfUpdatesOrDeletes).
*/
private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(UNASSIGNED_SEQ_NO);
protected Engine(EngineConfig engineConfig) {
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");
@ -1961,25 +1950,13 @@ public abstract class Engine implements Closeable {
* Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates
* in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed.
*
* @see #reinitializeMaxSeqNoOfUpdatesOrDeletes()
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)
*/
public final long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes.get();
}
/**
* A primary shard calls this method to re-initialize the max_seq_no_of_updates marker using the
* max_seq_no from Lucene index and translog before replaying the local translog in its local recovery.
*/
public abstract void reinitializeMaxSeqNoOfUpdatesOrDeletes();
public abstract long getMaxSeqNoOfUpdatesOrDeletes();
/**
* A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method
* to advance this marker to at least the given sequence number.
*/
public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo));
assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo : maxSeqNoOfUpdatesOrDeletes.get() + " < " + seqNo;
}
public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary);
}

View File

@ -47,7 +47,6 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.Assertions;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
@ -146,6 +145,10 @@ public class InternalEngine extends Engine {
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
// max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine.
// An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index.
// The value of this marker never goes backwards, and is tracked/updated differently on primary and replica.
private final AtomicLong maxSeqNoOfUpdatesOrDeletes;
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
// Lucene operations since this engine was opened - not include operations from existing segments.
@ -231,6 +234,7 @@ public class InternalEngine extends Engine {
() -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier);
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()));
success = true;
} finally {
if (success == false) {
@ -408,7 +412,6 @@ public class InternalEngine extends Engine {
flushLock.lock();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is uninitialized";
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
@ -877,7 +880,7 @@ public class InternalEngine extends Engine {
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
if (toAppend == false) {
advanceMaxSeqNoOfUpdatesOrDeletes(index.seqNo());
advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo());
}
} else {
markSeqNoAsSeen(index.seqNo());
@ -984,7 +987,6 @@ public class InternalEngine extends Engine {
protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
final IndexingStrategy plan;
// resolve an external operation into an internal one which is safe to replay
if (canOptimizeAddDocument(index)) {
@ -1325,7 +1327,6 @@ public class InternalEngine extends Engine {
protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
// resolve operation from external to internal
final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
assert incrementVersionLookup();
@ -2727,13 +2728,22 @@ public class InternalEngine extends Engine {
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
}
@Override
public long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes.get();
}
@Override
public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {
if (maxSeqNoOfUpdatesOnPrimary == SequenceNumbers.UNASSIGNED_SEQ_NO) {
assert false : "max_seq_no_of_updates on primary is unassigned";
throw new IllegalArgumentException("max_seq_no_of_updates on primary is unassigned");
}
this.maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, maxSeqNoOfUpdatesOnPrimary));
}
private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean allowDeleted, boolean relaxIfGapInSeqNo) {
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
// If the primary is on an old version which does not replicate msu, we need to relax this assertion for that.
if (maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO) {
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_5_0);
return true;
}
// We treat a delete on the tombstones on replicas as a regular document, then use updateDocument (not addDocument).
if (allowDeleted) {
final VersionValue versionValue = versionMap.getVersionForAssert(id.bytes());
@ -2751,12 +2761,6 @@ public class InternalEngine extends Engine {
return true;
}
@Override
public void reinitializeMaxSeqNoOfUpdatesOrDeletes() {
final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo());
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
}
private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOException {
final Store store = engineConfig.getStore();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);

View File

@ -456,11 +456,6 @@ public class ReadOnlyEngine extends Engine {
}
@Override
public void reinitializeMaxSeqNoOfUpdatesOrDeletes() {
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo());
}
protected void processReaders(IndexReader reader, IndexReader previousReader) {
searcherFactory.processReaders(reader, previousReader);
}
@ -487,4 +482,15 @@ public class ReadOnlyEngine extends Engine {
}
};
}
@Override
public long getMaxSeqNoOfUpdatesOrDeletes() {
return seqNoStats.getMaxSeqNo();
}
@Override
public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {
assert maxSeqNoOfUpdatesOnPrimary <= getMaxSeqNoOfUpdatesOrDeletes() :
maxSeqNoOfUpdatesOnPrimary + ">" + getMaxSeqNoOfUpdatesOrDeletes();
}
}

View File

@ -531,14 +531,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes.
*/
final Engine engine = getEngine();
if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) {
// If the old primary was on an old version that did not replicate the msu,
// we need to bootstrap it manually from its local history.
assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0);
engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
}
// in case we previously reset engine, we need to forward MSU before replaying translog.
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) ->
runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}));
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
@ -1396,9 +1388,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
translogRecoveryStats::incrementRecoveredOperations);
};
innerOpenEngineAndTranslog();
final Engine engine = getEngine();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}
/**
@ -2191,12 +2181,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) {
// If the old primary was on an old version that did not replicate the msu,
// we need to bootstrap it manually from its local history.
assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0);
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
}
}
}
@ -3100,7 +3084,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
newEngine = engineFactory.newReadWriteEngine(newEngineConfig());
onNewEngine(newEngine);
}
newEngine.advanceMaxSeqNoOfUpdatesOrDeletes(globalCheckpoint);
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
// TODO: add a dedicate recovery stats for the reset translog
@ -3151,11 +3134,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, long, ActionListener)
*/
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
assert seqNo != UNASSIGNED_SEQ_NO
|| getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO :
"replica has max_seq_no_of_updates=" + getMaxSeqNoOfUpdatesOrDeletes() + " but primary does not";
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo;
}
/**

View File

@ -676,7 +676,6 @@ public class InternalEngineTests extends EngineTestCase {
engine = new InternalEngine(engine.config());
assertTrue(engine.isRecovering());
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
assertThat(counter.get(), equalTo(2));
@ -693,7 +692,6 @@ public class InternalEngineTests extends EngineTestCase {
engine = new InternalEngine(engine.config());
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
assertTrue(engine.isRecovering());
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertFalse(engine.isRecovering());
doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null);
@ -725,7 +723,6 @@ public class InternalEngineTests extends EngineTestCase {
IOUtils.close(engine);
}
try (Engine recoveringEngine = new InternalEngine(engine.config())) {
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
@ -761,7 +758,6 @@ public class InternalEngineTests extends EngineTestCase {
}
};
assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs));
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertTrue(committed.get());
} finally {
@ -795,7 +791,6 @@ public class InternalEngineTests extends EngineTestCase {
}
initialEngine.close();
recoveringEngine = new InternalEngine(initialEngine.config());
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs);
@ -830,14 +825,12 @@ public class InternalEngineTests extends EngineTestCase {
engine.syncTranslog();
}
try (InternalEngine engine = new InternalEngine(config)) {
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo));
}
try (InternalEngine engine = new InternalEngine(config)) {
long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo);
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, upToSeqNo);
assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo));
@ -1253,7 +1246,6 @@ public class InternalEngineTests extends EngineTestCase {
store.associateIndexWithNewTranslog(translogUUID);
}
engine = new InternalEngine(config);
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
}
@ -1273,7 +1265,6 @@ public class InternalEngineTests extends EngineTestCase {
EngineConfig config = engine.config();
engine.close();
engine = new InternalEngine(config);
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertNull("Sync ID must be gone since we have a document to replay",
engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
@ -2371,7 +2362,6 @@ public class InternalEngineTests extends EngineTestCase {
}
try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
@ -2726,7 +2716,6 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
@ -2744,7 +2733,6 @@ public class InternalEngineTests extends EngineTestCase {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(2, engine.getTranslog().currentFileGeneration());
assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations());
@ -2758,7 +2746,6 @@ public class InternalEngineTests extends EngineTestCase {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "1",
@ -2865,7 +2852,6 @@ public class InternalEngineTests extends EngineTestCase {
}
}
}) {
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
final ParsedDocument doc1 = testParsedDocument("1", null,
testDocumentWithTextField(), SOURCE, null);
@ -2878,7 +2864,6 @@ public class InternalEngineTests extends EngineTestCase {
try (InternalEngine engine =
new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null,
globalCheckpointSupplier))) {
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertVisibleCount(engine, 1);
final long committedGen = Long.valueOf(
@ -2947,7 +2932,6 @@ public class InternalEngineTests extends EngineTestCase {
engine.close();
// we need to reuse the engine config unless the parser.mappingModified won't work
engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier));
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertVisibleCount(engine, numDocs, false);
@ -3711,7 +3695,6 @@ public class InternalEngineTests extends EngineTestCase {
InternalEngine engine = new InternalEngine(configSupplier.apply(store))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp());
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(timestamp1, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp());
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
@ -4078,7 +4061,6 @@ public class InternalEngineTests extends EngineTestCase {
IOUtils.close(initialEngine);
}
try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
recoveringEngine.fillSeqNoGaps(2);
assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1)));
@ -4190,7 +4172,6 @@ public class InternalEngineTests extends EngineTestCase {
throw new UnsupportedOperationException();
}
};
noOpEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get());
final String reason = "filling gaps";
@ -4426,7 +4407,6 @@ public class InternalEngineTests extends EngineTestCase {
totalTranslogOps = engine.getTranslog().totalOperations();
}
try (InternalEngine engine = new InternalEngine(engineConfig)) {
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, globalCheckpoint.get());
engine.restoreLocalHistoryFromTranslog(translogHandler);
assertThat(getDocIds(engine, true), equalTo(prevDocs));
@ -4473,7 +4453,6 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpoint());
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations());
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint());
@ -4509,7 +4488,6 @@ public class InternalEngineTests extends EngineTestCase {
if (flushed) {
assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
}
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
@ -4704,7 +4682,6 @@ public class InternalEngineTests extends EngineTestCase {
super.commitIndexWriter(writer, translog, syncId);
}
}) {
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
@ -5478,8 +5455,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.close();
Set<String> liveDocIds = new HashSet<>();
engine = new InternalEngine(engine.config());
assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L));
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L));
int numOps = between(1, 500);
for (int i = 0; i < numOps; i++) {
long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes();
@ -5549,7 +5525,6 @@ public class InternalEngineTests extends EngineTestCase {
"seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + "checkpoint=" + tracker.getCheckpoint(),
tracker.contains(op.seqNo()), equalTo(seqNosInSafeCommit.contains(op.seqNo())));
}
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(getDocIds(engine, true), equalTo(docs));
}

View File

@ -92,7 +92,6 @@ public class ReadOnlyEngineTests extends EngineTestCase {
}
// Close and reopen the main engine
try (InternalEngine recoveringEngine = new InternalEngine(config)) {
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
// the locked down engine should still point to the previous commit
assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
@ -224,7 +223,6 @@ public class ReadOnlyEngineTests extends EngineTestCase {
}
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
readOnlyEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong());
assertThat(translogHandler.appliedOperations(), equalTo(0L));

View File

@ -1109,11 +1109,9 @@ public class IndexShardTests extends IndexShardTestCase {
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test");
final long globalCheckpoint = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
final long currentMaxSeqNoOfUpdates = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
final long maxSeqNoOfUpdatesOrDeletes = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo);
final Set<String> docsBeforeRollback = getShardDocUIDs(indexShard);
final CountDownLatch latch = new CountDownLatch(1);
final boolean shouldRollback = Math.max(globalCheckpointOnReplica, globalCheckpoint) < maxSeqNo;
randomReplicaOperationPermitAcquisition(indexShard,
indexShard.getPendingPrimaryTerm() + 1,
globalCheckpoint,
@ -1132,13 +1130,7 @@ public class IndexShardTests extends IndexShardTestCase {
}, "");
latch.await();
if (shouldRollback) {
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Collections.max(
Arrays.asList(maxSeqNoOfUpdatesOrDeletes, globalCheckpoint, globalCheckpointOnReplica))
));
} else {
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(maxSeqNoOfUpdatesOrDeletes, currentMaxSeqNoOfUpdates)));
}
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo));
final ShardRouting newRouting = indexShard.routingEntry().moveActiveReplicaToPrimary();
final CountDownLatch resyncLatch = new CountDownLatch(1);
indexShard.updateShardState(
@ -1153,7 +1145,6 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo));
assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback));
// we conservatively roll MSU forward to maxSeqNo during restoreLocalHistory, ideally it should become just currentMaxSeqNoOfUpdates
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo));
closeShard(indexShard, false);
}
@ -3658,6 +3649,7 @@ public class IndexShardTests extends IndexShardTestCase {
public void testResetEngine() throws Exception {
IndexShard shard = newStartedShard(false);
indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()));
long maxSeqNoBeforeRollback = shard.seqNoStats().getMaxSeqNo();
final long globalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint());
shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
Set<String> docBelowGlobalCheckpoint = getShardDocUIDs(shard).stream()
@ -3699,7 +3691,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint));
assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint));
assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations()));
assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(globalCheckpoint));
assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNoBeforeRollback));
done.set(true);
thread.join();
closeShard(shard, false);

View File

@ -152,7 +152,6 @@ public class RefreshListenersTests extends ESTestCase {
() -> primaryTerm,
EngineTestCase.tombstoneDocSupplier());
engine = new InternalEngine(config);
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);
listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);
}

View File

@ -514,7 +514,6 @@ public abstract class EngineTestCase extends ESTestCase {
}
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config);
internalEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
return internalEngine;
}

View File

@ -133,7 +133,8 @@ public final class FollowingEngine extends InternalEngine {
@Override
protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) {
// ignore, this is not really a primary
assert getMaxSeqNoOfUpdatesOrDeletes() >= seqNo : seqNo + " < " + getMaxSeqNoOfUpdatesOrDeletes();
super.advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(seqNo); // extra safe in production code
}
@Override

View File

@ -289,7 +289,6 @@ public class FollowingEngineTests extends ESTestCase {
store.associateIndexWithNewTranslog(translogUuid);
FollowingEngine followingEngine = new FollowingEngine(config);
TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
followingEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
return followingEngine;
}
@ -495,7 +494,6 @@ public class FollowingEngineTests extends ESTestCase {
leaderStore.associateIndexWithNewTranslog(Translog.createEmptyTranslog(
leaderConfig.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, 1L));
try (InternalEngine leaderEngine = new InternalEngine(leaderConfig)) {
leaderEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
leaderEngine.skipTranslogRecovery();
Settings followerSettings = Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 0)