Cleanup legacy logic in CombinedDeletionPolicy (#43484)

This change removes the support for pre-v6 index commits which 
do not have sequence numbers.
This commit is contained in:
Nhat Nguyen 2019-06-23 11:29:08 -04:00
parent 5f87caa54c
commit 04bc754d8d
2 changed files with 7 additions and 65 deletions

View File

@ -144,7 +144,8 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
/**
* Find a safe commit point from a list of existing commits based on the supplied global checkpoint.
* The max sequence number of a safe commit point should be at most the global checkpoint.
* If an index was created before v6.2, and we haven't retained a safe commit yet, this method will return the oldest commit.
* If an index was created before 6.2 or recovered from remote, we might not have a safe commit.
* In this case, this method will return the oldest index commit.
*
* @param commits a list of existing commit points
* @param globalCheckpoint the persisted global checkpoint from the translog, see {@link Translog#readGlobalCheckpoint(Path, String)}
@ -172,22 +173,13 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
return i + 1;
}
// 5.x commits do not contain MAX_SEQ_NO, we should not keep it and the older commits.
if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) {
return Math.min(commits.size() - 1, i + 1);
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= globalCheckpoint) {
return i;
}
}
/*
* We may reach to this point in these cases:
* 1. In the previous 6.x, we keep only the last commit - which is likely not a safe commit if writes are in progress.
* Thus, after upgrading, we may not find a safe commit until we can reserve one.
* 2. In peer-recovery, if the file-based happens, a replica will be received the latest commit from a primary.
* However, that commit may not be a safe commit if writes are in progress in the primary.
*/
// If an index was created before 6.2 or recovered from remote, we might not have a safe commit.
// In this case, we return the oldest index commit instead.
return 0;
}
@ -204,11 +196,9 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
boolean hasUnreferencedCommits() throws IOException {
final IndexCommit lastCommit = this.lastCommit;
if (safeCommit != lastCommit) { // Race condition can happen but harmless
if (lastCommit.getUserData().containsKey(SequenceNumbers.MAX_SEQ_NO)) {
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
// We can clean up the current safe commit if the last commit is safe
return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit;
}
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
// We can clean up the current safe commit if the last commit is safe
return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit;
}
return false;
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -38,7 +37,6 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.singletonList;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.equalTo;
@ -180,41 +178,6 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps))));
}
public void testLegacyIndex() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong();
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY);
final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
long legacyTranslogGen = randomNonNegativeLong();
IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
assertThat(CombinedDeletionPolicy.findSafeCommitPoint(singletonList(legacyCommit), globalCheckpoint.get()),
equalTo(legacyCommit));
long safeTranslogGen = randomLongBetween(legacyTranslogGen, Long.MAX_VALUE);
long maxSeqNo = randomLongBetween(1, Long.MAX_VALUE);
final IndexCommit freshCommit = mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, translogUUID, safeTranslogGen);
globalCheckpoint.set(randomLongBetween(0, maxSeqNo - 1));
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
verify(legacyCommit, times(1)).delete(); // Do not keep the legacy commit once we have a new commit.
verify(freshCommit, times(0)).delete();
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
// Make the fresh commit safe.
resetDeletion(legacyCommit);
globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE));
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
verify(legacyCommit, times(2)).delete();
verify(freshCommit, times(0)).delete();
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(getLocalCheckpoint(freshCommit) + 1));
}
public void testDeleteInvalidCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY);
@ -317,15 +280,4 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
private long getLocalCheckpoint(IndexCommit commit) throws IOException {
return Long.parseLong(commit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
}
IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString());
userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
final IndexCommit commit = mock(IndexCommit.class);
when(commit.getUserData()).thenReturn(userData);
resetDeletion(commit);
return commit;
}
}