diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index e5d8cacf736..ca0d93fa7c5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -45,37 +45,72 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final TranslogDeletionPolicy translogDeletionPolicy; private final EngineConfig.OpenMode openMode; private final LongSupplier globalCheckpointSupplier; + private final IndexCommit startingCommit; private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. private IndexCommit lastCommit; // the most recent commit point CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy, - LongSupplier globalCheckpointSupplier) { + LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) { this.openMode = openMode; this.translogDeletionPolicy = translogDeletionPolicy; this.globalCheckpointSupplier = globalCheckpointSupplier; + this.startingCommit = startingCommit; this.snapshottedCommits = new ObjectIntHashMap<>(); } @Override - public void onInit(List commits) throws IOException { + public synchronized void onInit(List commits) throws IOException { switch (openMode) { case CREATE_INDEX_AND_TRANSLOG: + assert startingCommit == null : "CREATE_INDEX_AND_TRANSLOG must not have starting commit; commit [" + startingCommit + "]"; break; case OPEN_INDEX_CREATE_TRANSLOG: - assert commits.isEmpty() == false : "index is opened, but we have no commits"; - // When an engine starts with OPEN_INDEX_CREATE_TRANSLOG, a new fresh index commit will be created immediately. - // We therefore can simply skip processing here as `onCommit` will be called right after with a new commit. - break; case OPEN_INDEX_AND_TRANSLOG: assert commits.isEmpty() == false : "index is opened, but we have no commits"; - onCommit(commits); + assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; " + + "startingCommit [" + startingCommit + "], commit list [" + commits + "]"; + keepOnlyStartingCommitOnInit(commits); + // OPEN_INDEX_CREATE_TRANSLOG can open an index commit from other shard with a different translog history, + // We therefore should not use that index commit to update the translog deletion policy. + if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + updateTranslogDeletionPolicy(); + } break; default: throw new IllegalArgumentException("unknown openMode [" + openMode + "]"); } } + /** + * Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe + * at the recovering time but they can suddenly become safe in the future. + * The following issues can happen if unsafe commits are kept oninit. + *

+ * 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1) + * and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2) + * is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use + * the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the + * commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica. + *

+ * 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit + * c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2). + * The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new + * commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery + * translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1 + * while the local checkpoint of c2 is 2. + *

+ * 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced + * (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit, + * the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog. + */ + private void keepOnlyStartingCommitOnInit(List commits) { + commits.stream().filter(commit -> startingCommit.equals(commit) == false).forEach(IndexCommit::delete); + assert startingCommit.isDeleted() == false : "Starting commit must not be deleted"; + lastCommit = startingCommit; + safeCommit = startingCommit; + } + @Override public synchronized void onCommit(List commits) throws IOException { final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong()); diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1b7b891efd6..1efbd0706d1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -185,7 +185,7 @@ public class InternalEngine extends Engine { "Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]"; this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit); this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy, - translog::getLastSyncedGlobalCheckpoint); + translog::getLastSyncedGlobalCheckpoint, startingCommit); writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit); updateMaxUnsafeAutoIdTimestampFromWriter(writer); assert engineConfig.getForceNewHistoryUUID() == false @@ -411,28 +411,44 @@ public class InternalEngine extends Engine { } private IndexCommit getStartingCommitPoint() throws IOException { - if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); - final long minRetainedTranslogGen = translog.getMinFileGeneration(); - final List existingCommits = DirectoryReader.listCommits(store.directory()); - // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose full translog - // files are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. - // To avoid this issue, we only select index commits whose translog files are fully retained. - if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) { - final List recoverableCommits = new ArrayList<>(); - for (IndexCommit commit : existingCommits) { - if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { - recoverableCommits.add(commit); + final IndexCommit startingIndexCommit; + final List existingCommits; + switch (openMode) { + case CREATE_INDEX_AND_TRANSLOG: + startingIndexCommit = null; + break; + case OPEN_INDEX_CREATE_TRANSLOG: + // Use the last commit + existingCommits = DirectoryReader.listCommits(store.directory()); + startingIndexCommit = existingCommits.get(existingCommits.size() - 1); + break; + case OPEN_INDEX_AND_TRANSLOG: + // Use the safe commit + final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); + final long minRetainedTranslogGen = translog.getMinFileGeneration(); + existingCommits = DirectoryReader.listCommits(store.directory()); + // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog + // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. + // To avoid this issue, we only select index commits whose translog are fully retained. + if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) { + final List recoverableCommits = new ArrayList<>(); + for (IndexCommit commit : existingCommits) { + if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { + recoverableCommits.add(commit); + } } + assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " + + "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; + startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); + } else { + // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint. + startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); } - assert recoverableCommits.isEmpty() == false : "No commit point with full translog found; " + - "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; - return CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); - } else { - return CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); - } + break; + default: + throw new IllegalArgumentException("unknown mode: " + openMode); } - return null; + return startingIndexCommit; } private void recoverFromTranslogInternal() throws IOException { @@ -557,9 +573,7 @@ public class InternalEngine extends Engine { final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); internalSearcherManager = new SearcherManager(directoryReader, new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService())); - // The index commit from IndexWriterConfig is null if the engine is open with other modes - // rather than CREATE_INDEX_AND_TRANSLOG. In those cases lastCommittedSegmentInfos will be retrieved from the last commit. - lastCommittedSegmentInfos = store.readCommittedSegmentsInfo(indexWriter.getConfig().getIndexCommit()); + lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager, externalSearcherFactory); success = true; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3832cd0ae20..b5d28b3a9ec 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -23,6 +23,7 @@ import com.carrotsearch.hppc.ObjectLongMap; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CheckIndex; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.LeafReaderContext; @@ -1290,12 +1291,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl /** opens the engine on top of the existing lucene engine but creates an empty translog **/ public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalCheckpoint) throws IOException { - assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE && - recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE; - SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null); - assert commitInfo.localCheckpoint >= globalCheckpoint : - "trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint [" + if (Assertions.ENABLED) { + assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE && + recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE; + SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null); + assert commitInfo.localCheckpoint >= globalCheckpoint : + "trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint [" + globalCheckpoint + "]"; + final List existingCommits = DirectoryReader.listCommits(store.directory()); + assert existingCommits.size() == 1 : "Open index create translog should have one commit, commits[" + existingCommits + "]"; + } globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "opening index with a new translog"); innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, forceNewHistoryUUID); } diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 74be98b8132..7aab2c750d1 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -182,17 +182,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * @throws IOException if the index is corrupted or the segments file is not present */ public SegmentInfos readLastCommittedSegmentsInfo() throws IOException { - return readCommittedSegmentsInfo(null); - } - - /** - * Returns the committed segments info for the given commit point. - * If the commit point is not provided, this method will return the segments info of the last commit in the store. - */ - public SegmentInfos readCommittedSegmentsInfo(final IndexCommit commit) throws IOException { failIfCorrupted(); try { - return readSegmentsInfo(commit, directory()); + return readSegmentsInfo(null, directory()); } catch (CorruptIndexException ex) { markStoreCorrupted(ex); throw ex; diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index e74cde52aa4..ca6059dae00 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -54,7 +54,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( + OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null); final LongArrayList maxSeqNoList = new LongArrayList(); final LongArrayList translogGenList = new LongArrayList(); @@ -93,7 +94,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase { final AtomicLong globalCheckpoint = new AtomicLong(); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( + OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null); long lastMaxSeqNo = between(1, 1000); long lastTranslogGen = between(1, 20); int safeIndex = 0; @@ -156,11 +158,12 @@ public class CombinedDeletionPolicyTests extends ESTestCase { final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( + OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null); long legacyTranslogGen = randomNonNegativeLong(); IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen); - indexPolicy.onInit(singletonList(legacyCommit)); + indexPolicy.onCommit(singletonList(legacyCommit)); verify(legacyCommit, never()).delete(); assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen)); assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(legacyTranslogGen)); @@ -188,7 +191,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testDeleteInvalidCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( + OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get, null); final int invalidCommits = between(1, 10); final List commitList = new ArrayList<>(); @@ -211,6 +215,35 @@ public class CombinedDeletionPolicyTests extends ESTestCase { } } + /** + * Keeping existing unsafe commits can be problematic because these commits are not safe at the recovering time + * but they can suddenly become safe in the future. See {@link CombinedDeletionPolicy#keepOnlyStartingCommitOnInit(List)} + */ + public void testKeepOnlyStartingCommitOnInit() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); + TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); + final UUID translogUUID = UUID.randomUUID(); + final List commitList = new ArrayList<>(); + int totalCommits = between(2, 20); + for (int i = 0; i < totalCommits; i++) { + commitList.add(mockIndexCommit(randomNonNegativeLong(), translogUUID, randomNonNegativeLong())); + } + final IndexCommit startingCommit = randomFrom(commitList); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( + OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, startingCommit); + indexPolicy.onInit(commitList); + for (IndexCommit commit : commitList) { + if (commit.equals(startingCommit) == false) { + verify(commit, times(1)).delete(); + } + } + verify(startingCommit, never()).delete(); + assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), + equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); + assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), + equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); + } + IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException { final Map userData = new HashMap<>(); userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 518411e59e8..db62db7e01b 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -163,6 +163,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; @@ -4010,13 +4011,15 @@ public class InternalEngineTests extends EngineTestCase { boolean flushed = false; + AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); Engine recoveringEngine = null; try { assertEquals(docs - 1, engine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(docs - 1, engine.getLocalCheckpointTracker().getCheckpoint()); assertEquals(maxSeqIDOnReplica, replicaEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint()); - recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + recoveringEngine = new InternalEngine(copy( + replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get)); assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations()); recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); @@ -4038,6 +4041,8 @@ public class InternalEngineTests extends EngineTestCase { assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint()); if ((flushed = randomBoolean())) { + globalCheckpoint.set(recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); + recoveringEngine.getTranslog().sync(); recoveringEngine.flush(true, true); } } @@ -4047,7 +4052,8 @@ public class InternalEngineTests extends EngineTestCase { // now do it again to make sure we preserve values etc. try { - recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + recoveringEngine = new InternalEngine( + copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get)); if (flushed) { assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations()); } @@ -4355,4 +4361,57 @@ public class InternalEngineTests extends EngineTestCase { assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1)); } } + + public void testOpenIndexAndTranslogKeepOnlySafeCommit() throws Exception { + IOUtils.close(engine); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get); + final IndexCommit safeCommit; + try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) { + final int numDocs = between(5, 50); + for (int i = 0; i < numDocs; i++) { + index(engine, i); + if (randomBoolean()) { + engine.flush(); + } + } + // Selects a starting commit and advances and persists the global checkpoint to that commit. + final List commits = DirectoryReader.listCommits(engine.store.directory()); + safeCommit = randomFrom(commits); + globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO))); + engine.getTranslog().sync(); + } + try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + final List existingCommits = DirectoryReader.listCommits(engine.store.directory()); + assertThat("OPEN_INDEX_AND_TRANSLOG should keep only safe commit", existingCommits, contains(safeCommit)); + } + } + + public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception { + IOUtils.close(engine); + final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); + final Map lastCommit; + try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + engine.skipTranslogRecovery(); + final int numDocs = between(5, 50); + for (int i = 0; i < numDocs; i++) { + index(engine, i); + if (randomBoolean()) { + engine.flush(); + } + } + final List commits = DirectoryReader.listCommits(engine.store.directory()); + lastCommit = commits.get(commits.size() - 1).getUserData(); + } + try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) { + final List existingCommits = DirectoryReader.listCommits(engine.store.directory()); + assertThat("OPEN_INDEX_CREATE_TRANSLOG should keep only last commit", existingCommits, hasSize(1)); + final Map userData = existingCommits.get(0).getUserData(); + assertThat(userData.get(SequenceNumbers.MAX_SEQ_NO), equalTo(lastCommit.get(SequenceNumbers.MAX_SEQ_NO))); + assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(lastCommit.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))); + // Translog tags should be fresh. + assertThat(userData.get(Translog.TRANSLOG_UUID_KEY), not(equalTo(lastCommit.get(Translog.TRANSLOG_UUID_KEY)))); + assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo("1")); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index aa97c204991..cd948ed9f90 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -304,8 +304,52 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC replica.store().close(); newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); shards.recoverReplica(newReplica); - shards.assertAllEqual(totalDocs); + // Make sure that flushing on a recovering shard is ok. + shards.flush(); + shards.assertAllEqual(totalDocs); + } + } + + public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception { + try (ReplicationGroup shards = createGroup(2)) { + shards.startAll(); + IndexShard oldPrimary = shards.getPrimary(); + IndexShard newPrimary = shards.getReplicas().get(0); + IndexShard replica = shards.getReplicas().get(1); + int goodDocs = shards.indexDocs(scaledRandomIntBetween(1, 20)); + shards.flush(); + // simulate docs that were inflight when primary failed, these will be rolled back + int staleDocs = scaledRandomIntBetween(1, 10); + logger.info("--> indexing {} stale docs", staleDocs); + for (int i = 0; i < staleDocs; i++) { + final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "stale_" + i) + .source("{}", XContentType.JSON); + final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary); + indexOnReplica(bulkShardRequest, replica); + } + shards.flush(); + shards.promoteReplicaToPrimary(newPrimary).get(); + // Recover a replica should rollback the stale documents + shards.removeReplica(replica); + replica.close("recover replica - first time", false); + replica.store().close(); + replica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); + shards.recoverReplica(replica); + shards.assertAllEqual(goodDocs); + // Index more docs - move the global checkpoint >= seqno of the stale operations. + goodDocs += shards.indexDocs(scaledRandomIntBetween(staleDocs, staleDocs * 5)); + shards.syncGlobalCheckpoint(); + assertThat(replica.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(replica.seqNoStats().getMaxSeqNo())); + // Recover a replica again should also rollback the stale documents. + shards.removeReplica(replica); + replica.close("recover replica - second time", false); + replica.store().close(); + IndexShard anotherReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); + shards.recoverReplica(anotherReplica); + shards.assertAllEqual(goodDocs); + shards.flush(); + shards.assertAllEqual(goodDocs); } }