Cascading primary failure lead to MSU too low (#40249)

If a replica were first reset due to one primary failover and then
promoted (before resync completes), its MSU would not include changes
since global checkpoint, leading to errors during translog replay.

Fixed by re-initializing MSU before restoring local history.
This commit is contained in:
Henning Andersen 2019-03-20 13:56:38 +01:00 committed by Henning Andersen
parent 68d98c85c9
commit 4c2a8638ca
10 changed files with 58 additions and 50 deletions

View File

@ -1961,7 +1961,7 @@ public abstract class Engine implements Closeable {
* Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates
* in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed.
*
* @see #initializeMaxSeqNoOfUpdatesOrDeletes()
* @see #reinitializeMaxSeqNoOfUpdatesOrDeletes()
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)
*/
public final long getMaxSeqNoOfUpdatesOrDeletes() {
@ -1969,10 +1969,10 @@ public abstract class Engine implements Closeable {
}
/**
* A primary shard calls this method once to initialize the max_seq_no_of_updates marker using the
* A primary shard calls this method to re-initialize the max_seq_no_of_updates marker using the
* max_seq_no from Lucene index and translog before replaying the local translog in its local recovery.
*/
public abstract void initializeMaxSeqNoOfUpdatesOrDeletes();
public abstract void reinitializeMaxSeqNoOfUpdatesOrDeletes();
/**
* A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method

View File

@ -2745,9 +2745,7 @@ public class InternalEngine extends Engine {
}
@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO :
"max_seq_no_of_updates is already initialized [" + getMaxSeqNoOfUpdatesOrDeletes() + "]";
public void reinitializeMaxSeqNoOfUpdatesOrDeletes() {
final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo());
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
}

View File

@ -455,7 +455,7 @@ public class ReadOnlyEngine extends Engine {
}
@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
public void reinitializeMaxSeqNoOfUpdatesOrDeletes() {
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo());
}

View File

@ -536,6 +536,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0);
engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
}
// in case we previously reset engine, we need to forward MSU before replaying translog.
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) ->
runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}));
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
@ -1394,7 +1396,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
};
innerOpenEngineAndTranslog();
final Engine engine = getEngine();
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}

View File

@ -677,7 +677,7 @@ public class InternalEngineTests extends EngineTestCase {
trimUnsafeCommits(engine.config());
engine = new InternalEngine(engine.config());
assertTrue(engine.isRecovering());
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
assertThat(counter.get(), equalTo(2));
@ -695,7 +695,7 @@ public class InternalEngineTests extends EngineTestCase {
engine = new InternalEngine(engine.config());
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
assertTrue(engine.isRecovering());
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertFalse(engine.isRecovering());
doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null);
@ -728,7 +728,7 @@ public class InternalEngineTests extends EngineTestCase {
}
trimUnsafeCommits(engine.config());
try (Engine recoveringEngine = new InternalEngine(engine.config())) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
@ -765,7 +765,7 @@ public class InternalEngineTests extends EngineTestCase {
}
};
assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs));
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertTrue(committed.get());
} finally {
@ -800,7 +800,7 @@ public class InternalEngineTests extends EngineTestCase {
initialEngine.close();
trimUnsafeCommits(initialEngine.config());
recoveringEngine = new InternalEngine(initialEngine.config());
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs);
@ -836,7 +836,7 @@ public class InternalEngineTests extends EngineTestCase {
}
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo));
@ -844,7 +844,7 @@ public class InternalEngineTests extends EngineTestCase {
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, upToSeqNo);
assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo));
@ -1261,7 +1261,7 @@ public class InternalEngineTests extends EngineTestCase {
}
trimUnsafeCommits(config);
engine = new InternalEngine(config);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
}
@ -1282,7 +1282,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.close();
trimUnsafeCommits(config);
engine = new InternalEngine(config);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertNull("Sync ID must be gone since we have a document to replay",
engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
@ -2381,7 +2381,7 @@ public class InternalEngineTests extends EngineTestCase {
trimUnsafeCommits(initialEngine.engineConfig);
try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
@ -2737,7 +2737,7 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
@ -2756,7 +2756,7 @@ public class InternalEngineTests extends EngineTestCase {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(2, engine.getTranslog().currentFileGeneration());
assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations());
@ -2771,7 +2771,7 @@ public class InternalEngineTests extends EngineTestCase {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "1",
@ -2879,7 +2879,7 @@ public class InternalEngineTests extends EngineTestCase {
}
}
}) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
final ParsedDocument doc1 = testParsedDocument("1", null,
testDocumentWithTextField(), SOURCE, null);
@ -2892,7 +2892,7 @@ public class InternalEngineTests extends EngineTestCase {
try (InternalEngine engine =
new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null,
globalCheckpointSupplier))) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertVisibleCount(engine, 1);
final long committedGen = Long.valueOf(
@ -2963,7 +2963,7 @@ public class InternalEngineTests extends EngineTestCase {
trimUnsafeCommits(copy(engine.config(), inSyncGlobalCheckpointSupplier));
// we need to reuse the engine config unless the parser.mappingModified won't work
engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertVisibleCount(engine, numDocs, false);
@ -3726,7 +3726,7 @@ public class InternalEngineTests extends EngineTestCase {
InternalEngine engine = new InternalEngine(configSupplier.apply(store))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp());
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(timestamp1, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp());
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
@ -4094,7 +4094,7 @@ public class InternalEngineTests extends EngineTestCase {
}
trimUnsafeCommits(initialEngine.config());
try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
recoveringEngine.fillSeqNoGaps(2);
assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1)));
@ -4207,7 +4207,7 @@ public class InternalEngineTests extends EngineTestCase {
throw new UnsupportedOperationException();
}
};
noOpEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
noOpEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get());
final String reason = "filling gaps";
@ -4444,7 +4444,7 @@ public class InternalEngineTests extends EngineTestCase {
}
trimUnsafeCommits(engineConfig);
try (InternalEngine engine = new InternalEngine(engineConfig)) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, globalCheckpoint.get());
engine.restoreLocalHistoryFromTranslog(translogHandler);
assertThat(getDocIds(engine, true), equalTo(prevDocs));
@ -4492,7 +4492,7 @@ public class InternalEngineTests extends EngineTestCase {
trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get));
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations());
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint());
@ -4529,7 +4529,7 @@ public class InternalEngineTests extends EngineTestCase {
if (flushed) {
assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
}
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
@ -4724,7 +4724,7 @@ public class InternalEngineTests extends EngineTestCase {
super.commitIndexWriter(writer, translog, syncId);
}
}) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
@ -5500,7 +5500,7 @@ public class InternalEngineTests extends EngineTestCase {
Set<String> liveDocIds = new HashSet<>();
engine = new InternalEngine(engine.config());
assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
int numOps = between(1, 500);
for (int i = 0; i < numOps; i++) {
long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes();
@ -5571,7 +5571,7 @@ public class InternalEngineTests extends EngineTestCase {
"seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + "checkpoint=" + tracker.getCheckpoint(),
tracker.contains(op.seqNo()), equalTo(seqNosInSafeCommit.contains(op.seqNo())));
}
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(getDocIds(engine, true), equalTo(docs));
}

View File

@ -96,7 +96,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
// Close and reopen the main engine
InternalEngineTests.trimUnsafeCommits(config);
try (InternalEngine recoveringEngine = new InternalEngine(config)) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
// the locked down engine should still point to the previous commit
assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
@ -235,7 +235,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
}
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
readOnlyEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
readOnlyEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong());
assertThat(translogHandler.appliedOperations(), equalTo(0L));

View File

@ -546,7 +546,7 @@ public class IndexShardTests extends IndexShardTestCase {
// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), false);
final int maxSeqNo = result.maxSeqNo;
@ -1093,7 +1093,7 @@ public class IndexShardTests extends IndexShardTestCase {
public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), true);
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
@ -1145,9 +1145,9 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo));
assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback));
if (shouldRollback) {
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Collections.max(
Arrays.asList(currentMaxSeqNoOfUpdates, maxSeqNoOfUpdatesOrDeletes, globalCheckpoint, globalCheckpointOnReplica))
));
// we conservatively roll MSU forward to maxSeqNo during restoreLocalHistory, ideally it should become just
// currentMaxSeqNoOfUpdates
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo));
} else {
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, maxSeqNoOfUpdatesOrDeletes)));
}
@ -1159,7 +1159,9 @@ public class IndexShardTests extends IndexShardTestCase {
// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
// todo: all tests should run with allowUpdates=true, but this specific test sometimes fails during lucene commit when updates are
// added (seed = F37E9647ABE5928)
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), false);
final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test");
@ -1202,7 +1204,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(newMaxSeqNoOfUpdates));
// ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()));
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()), false);
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint));
closeShard(indexShard, false);
}
@ -3137,19 +3139,25 @@ public class IndexShardTests extends IndexShardTestCase {
* @param indexShard the shard
* @param operations the number of operations
* @param offset the starting sequence number
* @param allowUpdates whether updates should be added.
* @return a pair of the maximum sequence number and whether or not a gap was introduced
* @throws IOException if an I/O exception occurs while indexing on the shard
*/
private Result indexOnReplicaWithGaps(
final IndexShard indexShard,
final int operations,
final int offset) throws IOException {
final int offset,
boolean allowUpdates) throws IOException {
int localCheckpoint = offset;
int max = offset;
boolean gap = false;
Set<String> ids = new HashSet<>();
for (int i = offset + 1; i < operations; i++) {
if (!rarely() || i == operations - 1) { // last operation can't be a gap as it's not a gap anymore
final String id = Integer.toString(i);
final String id = ids.isEmpty() || randomBoolean() ? Integer.toString(i) : randomFrom(ids);
if (allowUpdates && ids.add(id) == false) { // this is an update
indexShard.advanceMaxSeqNoOfUpdatesOrDeletes(i);
}
SourceToParse sourceToParse = new SourceToParse(indexShard.shardId().getIndexName(), "_doc", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(i, 1,
@ -3604,7 +3612,7 @@ public class IndexShardTests extends IndexShardTestCase {
public void testResetEngine() throws Exception {
IndexShard shard = newStartedShard(false);
indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()));
indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()), false);
final long globalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint());
shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
Set<String> docBelowGlobalCheckpoint = getShardDocUIDs(shard).stream()
@ -3644,7 +3652,7 @@ public class IndexShardTests extends IndexShardTestCase {
public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception {
final IndexShard replica = newStartedShard(false);
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()));
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()), false);
final int nbTermUpdates = randomIntBetween(1, 5);

View File

@ -148,7 +148,7 @@ public class RefreshListenersTests extends ESTestCase {
() -> primaryTerm,
EngineTestCase.tombstoneDocSupplier());
engine = new InternalEngine(config);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);
listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);
}

View File

@ -514,7 +514,7 @@ public abstract class EngineTestCase extends ESTestCase {
}
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config);
internalEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
internalEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
return internalEngine;
}

View File

@ -289,7 +289,7 @@ public class FollowingEngineTests extends ESTestCase {
store.associateIndexWithNewTranslog(translogUuid);
FollowingEngine followingEngine = new FollowingEngine(config);
TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
followingEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
followingEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
return followingEngine;
}
@ -495,7 +495,7 @@ public class FollowingEngineTests extends ESTestCase {
leaderStore.associateIndexWithNewTranslog(Translog.createEmptyTranslog(
leaderConfig.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, 1L));
try (InternalEngine leaderEngine = new InternalEngine(leaderConfig)) {
leaderEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
leaderEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
leaderEngine.skipTranslogRecovery();
Settings followerSettings = Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 0)