diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index d30f9629dc2..bc426226f7c 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1445,6 +1445,14 @@ public abstract class Engine implements Closeable { */ public abstract void deactivateThrottling(); + /** + * Marks operations in the translog as completed. This is used to restore the state of the local checkpoint tracker on primary + * promotion. + * + * @throws IOException if an I/O exception occurred reading the translog + */ + public abstract void restoreLocalCheckpointFromTranslog() throws IOException; + /** * Fills up the local checkpoints history with no-ops until the local checkpoint * and the max seen sequence ID are identical. diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a8f0759c1bb..b32e618c6a2 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -232,6 +232,23 @@ public class InternalEngine extends Engine { logger.trace("created new InternalEngine"); } + @Override + public void restoreLocalCheckpointFromTranslog() throws IOException { + try (ReleasableLock ignored = writeLock.acquire()) { + ensureOpen(); + final long localCheckpoint = seqNoService().getLocalCheckpoint(); + try (Translog.View view = getTranslog().newView()) { + final Translog.Snapshot snapshot = view.snapshot(localCheckpoint + 1); + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + if (operation.seqNo() > localCheckpoint) { + seqNoService().markSeqNoAsCompleted(operation.seqNo()); + } + } + } + } + } + @Override public int fillSeqNoGaps(long primaryTerm) throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 296dae0331e..6d4dafa4a77 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -460,6 +460,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl () -> { latch.await(); try { + /* + * If this shard was serving as a replica shard when another shard was promoted to primary then the state of + * its local checkpoint tracker was reset during the primary term transition. In particular, the local + * checkpoint on this shard was thrown back to the global checkpoint and the state of the local checkpoint + * tracker above the local checkpoint was destroyed. If the other shard that was promoted to primary + * subsequently fails before the primary/replica re-sync completes successfully and we are now being + * promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence + * numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by + * replaying the translog and marking any operations there are completed. + */ + getEngine().restoreLocalCheckpointFromTranslog(); getEngine().fillSeqNoGaps(newPrimaryTerm); updateLocalCheckpointForShard(currentRouting.allocationId().getId(), getEngine().seqNoService().getLocalCheckpoint()); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 14d0cbb44b7..73c9ebd6a51 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3995,6 +3995,67 @@ public class InternalEngineTests extends ESTestCase { } } + public void testRestoreLocalCheckpointFromTranslog() throws IOException { + engine.close(); + InternalEngine actualEngine = null; + try { + final Set completedSeqNos = new HashSet<>(); + final SequenceNumbersService seqNoService = + new SequenceNumbersService( + shardId, + defaultSettings, + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.UNASSIGNED_SEQ_NO) { + @Override + public void markSeqNoAsCompleted(long seqNo) { + super.markSeqNoAsCompleted(seqNo); + completedSeqNos.add(seqNo); + } + }; + actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { + @Override + public SequenceNumbersService seqNoService() { + return seqNoService; + } + }; + final int operations = randomIntBetween(0, 1024); + final Set expectedCompletedSeqNos = new HashSet<>(); + for (int i = 0; i < operations; i++) { + if (rarely() && i < operations - 1) { + continue; + } + expectedCompletedSeqNos.add((long) i); + } + + final ArrayList seqNos = new ArrayList<>(expectedCompletedSeqNos); + Randomness.shuffle(seqNos); + for (final long seqNo : seqNos) { + final String id = Long.toString(seqNo); + final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); + final Term uid = newUid(doc); + final long time = System.nanoTime(); + actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, VersionType.EXTERNAL, REPLICA, time, time, false)); + if (rarely()) { + actualEngine.rollTranslogGeneration(); + } + } + final long currentLocalCheckpoint = actualEngine.seqNoService().getLocalCheckpoint(); + final long resetLocalCheckpoint = + randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint)); + actualEngine.seqNoService().resetLocalCheckpoint(resetLocalCheckpoint); + completedSeqNos.clear(); + actualEngine.restoreLocalCheckpointFromTranslog(); + final Set intersection = new HashSet<>(expectedCompletedSeqNos); + intersection.retainAll(LongStream.range(resetLocalCheckpoint + 1, operations).boxed().collect(Collectors.toSet())); + assertThat(completedSeqNos, equalTo(intersection)); + assertThat(actualEngine.seqNoService().getLocalCheckpoint(), equalTo(currentLocalCheckpoint)); + assertThat(actualEngine.seqNoService().generateSeqNo(), equalTo((long) operations)); + } finally { + IOUtils.close(actualEngine); + } + } + public void testFillUpSequenceIdGapsOnRecovery() throws IOException { final int docs = randomIntBetween(1, 32); int numDocsOnReplica = 0; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b8ad55b6046..59d98ae96e9 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -81,7 +81,6 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; @@ -700,6 +699,60 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(indexShard); } + public void testRestoreLocalCheckpointTrackerFromTranslogOnPromotion() throws IOException, InterruptedException { + final IndexShard indexShard = newStartedShard(false); + final int operations = 1024 - scaledRandomIntBetween(0, 1024); + indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED)); + + final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); + final long globalCheckpointOnReplica = SequenceNumbersService.UNASSIGNED_SEQ_NO; + randomIntBetween( + Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO), + Math.toIntExact(indexShard.getLocalCheckpoint())); + indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica); + + final int globalCheckpoint = + randomIntBetween( + Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO), + Math.toIntExact(indexShard.getLocalCheckpoint())); + + final CountDownLatch latch = new CountDownLatch(1); + indexShard.acquireReplicaOperationPermit( + indexShard.getPrimaryTerm() + 1, + globalCheckpoint, + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + releasable.close(); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + + } + }, + ThreadPool.Names.SAME); + + latch.await(); + + final ShardRouting newRouting = indexShard.routingEntry().moveActiveReplicaToPrimary(); + final CountDownLatch resyncLatch = new CountDownLatch(1); + indexShard.updateShardState( + newRouting, + indexShard.getPrimaryTerm() + 1, + (s, r) -> resyncLatch.countDown(), + 1L, + Collections.singleton(newRouting.allocationId().getId()), + Collections.emptySet(), + Collections.emptySet()); + resyncLatch.await(); + assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo)); + assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo)); + + closeShards(indexShard); + } + public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false);