Update translog policy before the next safe commit (#54839)

IndexShardIT#testMaybeFlush relies on the assumption that the safe commit
and translog deletion policy have advanced after IndexShard#sync returns .
This assumption does not hold if there's a race with the global checkpoint sync.

Closes #52223
This commit is contained in:
Nhat Nguyen 2020-04-07 11:14:25 -04:00
parent 254d1e3543
commit 65713743c2
3 changed files with 44 additions and 9 deletions

View File

@ -84,17 +84,17 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
this.safeCommitInfo = SafeCommitInfo.EMPTY;
this.lastCommit = commits.get(commits.size() - 1);
this.safeCommit = commits.get(keptPosition);
if (keptPosition == commits.size() - 1) {
this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE;
} else {
this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
}
for (int i = 0; i < keptPosition; i++) {
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
deleteCommit(commits.get(i));
}
}
updateRetentionPolicy();
if (keptPosition == commits.size() - 1) {
this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE;
} else {
this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
}
safeCommit = this.safeCommit;
}

View File

@ -1181,6 +1181,45 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(5L));
}
public void testSyncTranslogConcurrently() throws Exception {
IOUtils.close(engine, store);
final Path translogPath = createTempDir();
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
engine = createEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get));
List<Engine.Operation> ops = generateHistoryOnReplica(between(1, 50), false, randomBoolean(), randomBoolean());
applyOperations(engine, ops);
engine.flush(true, true);
final CheckedRunnable<IOException> checker = () -> {
assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(0));
assertThat(engine.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint.get()));
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
SequenceNumbers.CommitInfo commitInfo =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getIndexCommit().getUserData().entrySet());
assertThat(commitInfo.localCheckpoint, equalTo(engine.getProcessedLocalCheckpoint()));
}
};
final Thread[] threads = new Thread[randomIntBetween(2, 4)];
final Phaser phaser = new Phaser(threads.length);
globalCheckpoint.set(engine.getProcessedLocalCheckpoint());
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
try {
engine.syncTranslog();
checker.run();
} catch (IOException e) {
throw new AssertionError(e);
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
checker.run();
}
public void testSyncedFlush() throws IOException {
try (Store store = createStore();
Engine engine = createEngine(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null)) {

View File

@ -83,7 +83,6 @@ import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
import org.junit.Assert;
import java.io.IOException;
@ -339,9 +338,6 @@ public class IndexShardIT extends ESSingleNodeTestCase {
assertPathHasBeenCleared(newIndexDataPath.toAbsolutePath());
}
@TestIssueLogging(
value = "org.elasticsearch.index.engine:DEBUG",
issueUrl = "https://github.com/elastic/elasticsearch/issues/52223")
public void testMaybeFlush() throws Exception {
createIndex("test", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
.build());