Fix testKeepTranslogAfterGlobalCheckpoint (#55868)

If we advance the global checkpoint during commit and sync that
checkpoint after commit, then the assertions in the test won't hold
because the deletion policy did not see the latest global checkpoint
but only the value before committing.

Closes #55680
This commit is contained in:
Nhat Nguyen 2020-04-28 12:41:40 -04:00
parent cab7bcc156
commit ad6221c0cb
1 changed files with 5 additions and 4 deletions

View File

@ -4886,9 +4886,11 @@ public class InternalEngineTests extends EngineTestCase {
final EngineConfig engineConfig = config(indexSettings, store, translogPath, final EngineConfig engineConfig = config(indexSettings, store, translogPath,
NoMergePolicy.INSTANCE, null, null, () -> globalCheckpoint.get()); NoMergePolicy.INSTANCE, null, null, () -> globalCheckpoint.get());
final AtomicLong lastSyncedGlobalCheckpointBeforeCommit = new AtomicLong(Translog.readGlobalCheckpoint(translogPath, translogUUID));
try (InternalEngine engine = new InternalEngine(engineConfig) { try (InternalEngine engine = new InternalEngine(engineConfig) {
@Override @Override
protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
lastSyncedGlobalCheckpointBeforeCommit.set(Translog.readGlobalCheckpoint(translogPath, translogUUID));
// Advance the global checkpoint during the flush to create a lag between a persisted global checkpoint in the translog // Advance the global checkpoint during the flush to create a lag between a persisted global checkpoint in the translog
// (this value is visible to the deletion policy) and an in memory global checkpoint in the SequenceNumbersService. // (this value is visible to the deletion policy) and an in memory global checkpoint in the SequenceNumbersService.
if (rarely()) { if (rarely()) {
@ -4909,21 +4911,20 @@ public class InternalEngineTests extends EngineTestCase {
} }
if (frequently()) { if (frequently()) {
engine.flush(randomBoolean(), true); engine.flush(randomBoolean(), true);
final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory()); final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
// Keep only one safe commit as the oldest commit. // Keep only one safe commit as the oldest commit.
final IndexCommit safeCommit = commits.get(0); final IndexCommit safeCommit = commits.get(0);
if (lastSyncedGlobalCheckpoint == UNASSIGNED_SEQ_NO) { if (lastSyncedGlobalCheckpointBeforeCommit.get() == UNASSIGNED_SEQ_NO) {
// If the global checkpoint is still unassigned, we keep an empty(eg. initial) commit as a safe commit. // If the global checkpoint is still unassigned, we keep an empty(eg. initial) commit as a safe commit.
assertThat(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), assertThat(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
equalTo(SequenceNumbers.NO_OPS_PERFORMED)); equalTo(SequenceNumbers.NO_OPS_PERFORMED));
} else { } else {
assertThat(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), assertThat(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
lessThanOrEqualTo(lastSyncedGlobalCheckpoint)); lessThanOrEqualTo(lastSyncedGlobalCheckpointBeforeCommit.get()));
} }
for (int i = 1; i < commits.size(); i++) { for (int i = 1; i < commits.size(); i++) {
assertThat(Long.parseLong(commits.get(i).getUserData().get(SequenceNumbers.MAX_SEQ_NO)), assertThat(Long.parseLong(commits.get(i).getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
greaterThan(lastSyncedGlobalCheckpoint)); greaterThan(lastSyncedGlobalCheckpointBeforeCommit.get()));
} }
// Make sure we keep all translog operations after the local checkpoint of the safe commit. // Make sure we keep all translog operations after the local checkpoint of the safe commit.
long localCheckpointFromSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); long localCheckpointFromSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));