diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index addb16d58d0..313598e1d8e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -144,7 +144,8 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { /** * Find a safe commit point from a list of existing commits based on the supplied global checkpoint. * The max sequence number of a safe commit point should be at most the global checkpoint. - * If an index was created before v6.2, and we haven't retained a safe commit yet, this method will return the oldest commit. + * If an index was created before 6.2 or recovered from remote, we might not have a safe commit. + * In this case, this method will return the oldest index commit. * * @param commits a list of existing commit points * @param globalCheckpoint the persisted global checkpoint from the translog, see {@link Translog#readGlobalCheckpoint(Path, String)} @@ -172,22 +173,13 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) { return i + 1; } - // 5.x commits do not contain MAX_SEQ_NO, we should not keep it and the older commits. - if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) { - return Math.min(commits.size() - 1, i + 1); - } final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)); if (maxSeqNoFromCommit <= globalCheckpoint) { return i; } } - /* - * We may reach to this point in these cases: - * 1. In the previous 6.x, we keep only the last commit - which is likely not a safe commit if writes are in progress. - * Thus, after upgrading, we may not find a safe commit until we can reserve one. - * 2. In peer-recovery, if the file-based happens, a replica will be received the latest commit from a primary. - * However, that commit may not be a safe commit if writes are in progress in the primary. - */ + // If an index was created before 6.2 or recovered from remote, we might not have a safe commit. + // In this case, we return the oldest index commit instead. return 0; } @@ -204,11 +196,9 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { boolean hasUnreferencedCommits() throws IOException { final IndexCommit lastCommit = this.lastCommit; if (safeCommit != lastCommit) { // Race condition can happen but harmless - if (lastCommit.getUserData().containsKey(SequenceNumbers.MAX_SEQ_NO)) { - final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)); - // We can clean up the current safe commit if the last commit is safe - return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit; - } + final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)); + // We can clean up the current safe commit if the last commit is safe + return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit; } return false; } diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 617d23c44e1..110a27ff551 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,7 +37,6 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import static java.util.Collections.singletonList; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.equalTo; @@ -180,41 +178,6 @@ public class CombinedDeletionPolicyTests extends ESTestCase { Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)))); } - public void testLegacyIndex() throws Exception { - final AtomicLong globalCheckpoint = new AtomicLong(); - final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY); - final UUID translogUUID = UUID.randomUUID(); - - TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); - - long legacyTranslogGen = randomNonNegativeLong(); - IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen); - assertThat(CombinedDeletionPolicy.findSafeCommitPoint(singletonList(legacyCommit), globalCheckpoint.get()), - equalTo(legacyCommit)); - - long safeTranslogGen = randomLongBetween(legacyTranslogGen, Long.MAX_VALUE); - long maxSeqNo = randomLongBetween(1, Long.MAX_VALUE); - final IndexCommit freshCommit = mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, translogUUID, safeTranslogGen); - - globalCheckpoint.set(randomLongBetween(0, maxSeqNo - 1)); - indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit)); - verify(legacyCommit, times(1)).delete(); // Do not keep the legacy commit once we have a new commit. - verify(freshCommit, times(0)).delete(); - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen)); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen)); - - // Make the fresh commit safe. - resetDeletion(legacyCommit); - globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE)); - indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit)); - verify(legacyCommit, times(2)).delete(); - verify(freshCommit, times(0)).delete(); - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen)); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen)); - assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(getLocalCheckpoint(freshCommit) + 1)); - } - public void testDeleteInvalidCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY); @@ -317,15 +280,4 @@ public class CombinedDeletionPolicyTests extends ESTestCase { private long getLocalCheckpoint(IndexCommit commit) throws IOException { return Long.parseLong(commit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); } - - IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IOException { - final Map userData = new HashMap<>(); - userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString()); - userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen)); - final IndexCommit commit = mock(IndexCommit.class); - when(commit.getUserData()).thenReturn(userData); - resetDeletion(commit); - return commit; - } - }