Trim translog when safe commit advanced (#32967)
Since #28140 when the global checkpoint is advanced, we try to move the safe commit forward, and clean up old index commits if possible. However, we forget to trim unreferenced translog. This change makes sure that we prune both old translog and index commits when the safe commit advanced. Relates #28140 Closes #32089
This commit is contained in:
parent
815c56b677
commit
40f1bb5e5e
|
@ -478,6 +478,7 @@ public class InternalEngine extends Engine {
|
|||
private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
|
||||
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
|
||||
indexWriter.deleteUnusedFiles();
|
||||
translog.trimUnreferencedReaders();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1736,6 +1737,8 @@ public class InternalEngine extends Engine {
|
|||
// Revisit the deletion policy if we can clean up the snapshotting commit.
|
||||
if (combinedDeletionPolicy.releaseCommit(snapshot)) {
|
||||
ensureOpen();
|
||||
// Here we don't have to trim translog because snapshotting an index commit
|
||||
// does not lock translog or prevents unreferenced files from trimming.
|
||||
indexWriter.deleteUnusedFiles();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -152,6 +152,7 @@ import org.elasticsearch.index.translog.SnapshotMatchers;
|
|||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.TranslogConfig;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
|
@ -4342,13 +4343,18 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
|
||||
public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
|
||||
IOUtils.close(engine, store);
|
||||
store = createStore();
|
||||
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test",
|
||||
Settings.builder().put(defaultSettings.getSettings())
|
||||
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), -1)
|
||||
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), -1).build());
|
||||
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
|
||||
try (Store store = createStore();
|
||||
InternalEngine engine =
|
||||
createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) {
|
||||
final int numDocs = scaledRandomIntBetween(10, 100);
|
||||
for (int docId = 0; docId < numDocs; docId++) {
|
||||
index(engine, docId);
|
||||
if (frequently()) {
|
||||
if (rarely()) {
|
||||
engine.flush(randomBoolean(), randomBoolean());
|
||||
}
|
||||
}
|
||||
|
@ -4362,6 +4368,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpoint(), Long.MAX_VALUE));
|
||||
engine.syncTranslog();
|
||||
assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1)));
|
||||
assertThat(engine.estimateTranslogOperationsFromMinSeq(0L), equalTo(0));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -73,7 +73,6 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32089")
|
||||
public void testRetentionPolicyChangeDuringRecovery() throws Exception {
|
||||
try (ReplicationGroup shards = createGroup(0)) {
|
||||
shards.startPrimary();
|
||||
|
|
Loading…
Reference in New Issue