Restore local checkpoint tracker on promotion
When a shard is promoted to replica, it's possible that it was previously a replica that started following a new primary. When it started following this new primary, the state of its local checkpoint tracker was reset. Upon promotion, it's possible that the state of the local checkpoint tracker has not yet restored from a successful primary-replica re-sync. To account for this, we must restore the state of the local checkpoint tracker when a replica shard is promoted to primary. To do this, we stream the operations in the translog, marking the operations that are in the translog as completed. We do this before we fill the gaps on the newly promoted primary, ensuring that we have a primary shard with a complete history up to the largest maximum sequence number it has ever seen. Relates #25553
This commit is contained in:
parent
2eafbaf759
commit
93311ab717
|
@ -1445,6 +1445,14 @@ public abstract class Engine implements Closeable {
|
|||
*/
|
||||
public abstract void deactivateThrottling();
|
||||
|
||||
/**
|
||||
* Marks operations in the translog as completed. This is used to restore the state of the local checkpoint tracker on primary
|
||||
* promotion.
|
||||
*
|
||||
* @throws IOException if an I/O exception occurred reading the translog
|
||||
*/
|
||||
public abstract void restoreLocalCheckpointFromTranslog() throws IOException;
|
||||
|
||||
/**
|
||||
* Fills up the local checkpoints history with no-ops until the local checkpoint
|
||||
* and the max seen sequence ID are identical.
|
||||
|
|
|
@ -232,6 +232,23 @@ public class InternalEngine extends Engine {
|
|||
logger.trace("created new InternalEngine");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restoreLocalCheckpointFromTranslog() throws IOException {
|
||||
try (ReleasableLock ignored = writeLock.acquire()) {
|
||||
ensureOpen();
|
||||
final long localCheckpoint = seqNoService().getLocalCheckpoint();
|
||||
try (Translog.View view = getTranslog().newView()) {
|
||||
final Translog.Snapshot snapshot = view.snapshot(localCheckpoint + 1);
|
||||
Translog.Operation operation;
|
||||
while ((operation = snapshot.next()) != null) {
|
||||
if (operation.seqNo() > localCheckpoint) {
|
||||
seqNoService().markSeqNoAsCompleted(operation.seqNo());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int fillSeqNoGaps(long primaryTerm) throws IOException {
|
||||
try (ReleasableLock ignored = writeLock.acquire()) {
|
||||
|
|
|
@ -460,6 +460,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
() -> {
|
||||
latch.await();
|
||||
try {
|
||||
/*
|
||||
* If this shard was serving as a replica shard when another shard was promoted to primary then the state of
|
||||
* its local checkpoint tracker was reset during the primary term transition. In particular, the local
|
||||
* checkpoint on this shard was thrown back to the global checkpoint and the state of the local checkpoint
|
||||
* tracker above the local checkpoint was destroyed. If the other shard that was promoted to primary
|
||||
* subsequently fails before the primary/replica re-sync completes successfully and we are now being
|
||||
* promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence
|
||||
* numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by
|
||||
* replaying the translog and marking any operations there are completed.
|
||||
*/
|
||||
getEngine().restoreLocalCheckpointFromTranslog();
|
||||
getEngine().fillSeqNoGaps(newPrimaryTerm);
|
||||
updateLocalCheckpointForShard(currentRouting.allocationId().getId(),
|
||||
getEngine().seqNoService().getLocalCheckpoint());
|
||||
|
|
|
@ -3995,6 +3995,67 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testRestoreLocalCheckpointFromTranslog() throws IOException {
|
||||
engine.close();
|
||||
InternalEngine actualEngine = null;
|
||||
try {
|
||||
final Set<Long> completedSeqNos = new HashSet<>();
|
||||
final SequenceNumbersService seqNoService =
|
||||
new SequenceNumbersService(
|
||||
shardId,
|
||||
defaultSettings,
|
||||
SequenceNumbersService.NO_OPS_PERFORMED,
|
||||
SequenceNumbersService.NO_OPS_PERFORMED,
|
||||
SequenceNumbersService.UNASSIGNED_SEQ_NO) {
|
||||
@Override
|
||||
public void markSeqNoAsCompleted(long seqNo) {
|
||||
super.markSeqNoAsCompleted(seqNo);
|
||||
completedSeqNos.add(seqNo);
|
||||
}
|
||||
};
|
||||
actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
|
||||
@Override
|
||||
public SequenceNumbersService seqNoService() {
|
||||
return seqNoService;
|
||||
}
|
||||
};
|
||||
final int operations = randomIntBetween(0, 1024);
|
||||
final Set<Long> expectedCompletedSeqNos = new HashSet<>();
|
||||
for (int i = 0; i < operations; i++) {
|
||||
if (rarely() && i < operations - 1) {
|
||||
continue;
|
||||
}
|
||||
expectedCompletedSeqNos.add((long) i);
|
||||
}
|
||||
|
||||
final ArrayList<Long> seqNos = new ArrayList<>(expectedCompletedSeqNos);
|
||||
Randomness.shuffle(seqNos);
|
||||
for (final long seqNo : seqNos) {
|
||||
final String id = Long.toString(seqNo);
|
||||
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
|
||||
final Term uid = newUid(doc);
|
||||
final long time = System.nanoTime();
|
||||
actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, VersionType.EXTERNAL, REPLICA, time, time, false));
|
||||
if (rarely()) {
|
||||
actualEngine.rollTranslogGeneration();
|
||||
}
|
||||
}
|
||||
final long currentLocalCheckpoint = actualEngine.seqNoService().getLocalCheckpoint();
|
||||
final long resetLocalCheckpoint =
|
||||
randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint));
|
||||
actualEngine.seqNoService().resetLocalCheckpoint(resetLocalCheckpoint);
|
||||
completedSeqNos.clear();
|
||||
actualEngine.restoreLocalCheckpointFromTranslog();
|
||||
final Set<Long> intersection = new HashSet<>(expectedCompletedSeqNos);
|
||||
intersection.retainAll(LongStream.range(resetLocalCheckpoint + 1, operations).boxed().collect(Collectors.toSet()));
|
||||
assertThat(completedSeqNos, equalTo(intersection));
|
||||
assertThat(actualEngine.seqNoService().getLocalCheckpoint(), equalTo(currentLocalCheckpoint));
|
||||
assertThat(actualEngine.seqNoService().generateSeqNo(), equalTo((long) operations));
|
||||
} finally {
|
||||
IOUtils.close(actualEngine);
|
||||
}
|
||||
}
|
||||
|
||||
public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
|
||||
final int docs = randomIntBetween(1, 32);
|
||||
int numDocsOnReplica = 0;
|
||||
|
|
|
@ -81,7 +81,6 @@ import org.elasticsearch.index.mapper.ParsedDocument;
|
|||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
|
@ -700,6 +699,60 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(indexShard);
|
||||
}
|
||||
|
||||
public void testRestoreLocalCheckpointTrackerFromTranslogOnPromotion() throws IOException, InterruptedException {
|
||||
final IndexShard indexShard = newStartedShard(false);
|
||||
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
|
||||
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));
|
||||
|
||||
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
|
||||
final long globalCheckpointOnReplica = SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||
randomIntBetween(
|
||||
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
|
||||
Math.toIntExact(indexShard.getLocalCheckpoint()));
|
||||
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica);
|
||||
|
||||
final int globalCheckpoint =
|
||||
randomIntBetween(
|
||||
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
|
||||
Math.toIntExact(indexShard.getLocalCheckpoint()));
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
indexShard.acquireReplicaOperationPermit(
|
||||
indexShard.getPrimaryTerm() + 1,
|
||||
globalCheckpoint,
|
||||
new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(Releasable releasable) {
|
||||
releasable.close();
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
|
||||
}
|
||||
},
|
||||
ThreadPool.Names.SAME);
|
||||
|
||||
latch.await();
|
||||
|
||||
final ShardRouting newRouting = indexShard.routingEntry().moveActiveReplicaToPrimary();
|
||||
final CountDownLatch resyncLatch = new CountDownLatch(1);
|
||||
indexShard.updateShardState(
|
||||
newRouting,
|
||||
indexShard.getPrimaryTerm() + 1,
|
||||
(s, r) -> resyncLatch.countDown(),
|
||||
1L,
|
||||
Collections.singleton(newRouting.allocationId().getId()),
|
||||
Collections.emptySet(),
|
||||
Collections.emptySet());
|
||||
resyncLatch.await();
|
||||
assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo));
|
||||
assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo));
|
||||
|
||||
closeShards(indexShard);
|
||||
}
|
||||
|
||||
public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException {
|
||||
final IndexShard indexShard = newStartedShard(false);
|
||||
|
||||
|
|
Loading…
Reference in New Issue