Do not keep 5.x commits once having 6.x commits (#28188)
Currently we keep a 5.x index commit as a safe commit until we have a 6.x safe commit. During that time, if peer-recovery happens, a primary will send a 5.x commit in file-based sync and the recovery will even fail as the snapshotted commit does not have sequence number tags. This commit updates the combined deletion policy to delete legacy commits if there are 6.x commits. Relates #27606 Relates #28038
This commit is contained in:
parent
99f88f15c5
commit
55a14230a7
|
@ -160,9 +160,9 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
|
||||||
if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
|
if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
|
||||||
return i + 1;
|
return i + 1;
|
||||||
}
|
}
|
||||||
// 5.x commits do not contain MAX_SEQ_NO.
|
// 5.x commits do not contain MAX_SEQ_NO, we should not keep it and the older commits.
|
||||||
if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) {
|
if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) {
|
||||||
return i;
|
return Math.min(commits.size() - 1, i + 1);
|
||||||
}
|
}
|
||||||
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
|
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
|
||||||
if (maxSeqNoFromCommit <= globalCheckpoint) {
|
if (maxSeqNoFromCommit <= globalCheckpoint) {
|
||||||
|
|
|
@ -171,15 +171,15 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
|
||||||
|
|
||||||
globalCheckpoint.set(randomLongBetween(0, maxSeqNo - 1));
|
globalCheckpoint.set(randomLongBetween(0, maxSeqNo - 1));
|
||||||
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
|
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
|
||||||
verify(legacyCommit, times(0)).delete();
|
verify(legacyCommit, times(1)).delete(); // Do not keep the legacy commit once we have a new commit.
|
||||||
verify(freshCommit, times(0)).delete();
|
verify(freshCommit, times(0)).delete();
|
||||||
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen));
|
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
|
||||||
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
|
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
|
||||||
|
|
||||||
// Make the fresh commit safe.
|
// Make the fresh commit safe.
|
||||||
globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE));
|
globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE));
|
||||||
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
|
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
|
||||||
verify(legacyCommit, times(1)).delete();
|
verify(legacyCommit, times(2)).delete();
|
||||||
verify(freshCommit, times(0)).delete();
|
verify(freshCommit, times(0)).delete();
|
||||||
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
|
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
|
||||||
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
|
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
|
||||||
|
|
Loading…
Reference in New Issue