From 40f1bb5e5eab4445f0113131f415963664a1c530 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 20 Aug 2018 15:13:19 -0400 Subject: [PATCH] Trim translog when safe commit advanced (#32967) Since #28140 when the global checkpoint is advanced, we try to move the safe commit forward, and clean up old index commits if possible. However, we forget to trim unreferenced translog. This change makes sure that we prune both old translog and index commits when the safe commit advanced. Relates #28140 Closes #32089 --- .../elasticsearch/index/engine/InternalEngine.java | 3 +++ .../index/engine/InternalEngineTests.java | 13 ++++++++++--- .../indices/recovery/RecoveryTests.java | 1 - 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a30127a24ae..982b220f25b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -478,6 +478,7 @@ public class InternalEngine extends Engine { private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException { if (combinedDeletionPolicy.hasUnreferencedCommits()) { indexWriter.deleteUnusedFiles(); + translog.trimUnreferencedReaders(); } } @@ -1736,6 +1737,8 @@ public class InternalEngine extends Engine { // Revisit the deletion policy if we can clean up the snapshotting commit. if (combinedDeletionPolicy.releaseCommit(snapshot)) { ensureOpen(); + // Here we don't have to trim translog because snapshotting an index commit + // does not lock translog or prevents unreferenced files from trimming. indexWriter.deleteUnusedFiles(); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 9151fa24fc9..f6df2224288 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -152,6 +152,7 @@ import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.IndexSettingsModule; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -4342,13 +4343,18 @@ public class InternalEngineTests extends EngineTestCase { public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { IOUtils.close(engine, store); - store = createStore(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", + Settings.builder().put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), -1) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), -1).build()); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { + try (Store store = createStore(); + InternalEngine engine = + createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) { final int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { index(engine, docId); - if (frequently()) { + if (rarely()) { engine.flush(randomBoolean(), randomBoolean()); } } @@ -4362,6 +4368,7 @@ public class InternalEngineTests extends EngineTestCase { globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpoint(), Long.MAX_VALUE)); engine.syncTranslog(); assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1))); + assertThat(engine.estimateTranslogOperationsFromMinSeq(0L), equalTo(0)); } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 0f663eca75d..5547a629ab2 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -73,7 +73,6 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32089") public void testRetentionPolicyChangeDuringRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startPrimary();