diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 3f3813e8fa5..07ca15213e2 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -271,6 +271,14 @@ public final class IndexSettings { Setting.byteSizeSetting("index.translog.retention.size", settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? "-1" : "512MB", Property.Dynamic, Property.IndexScope); + /** + * Controls the number of translog files that are no longer needed for persistence reasons will be kept around before being deleted. + * This is a safeguard making sure that the translog deletion policy won't keep too many translog files especially when they're small. + * This setting is intentionally not registered, it is only used in tests + **/ + public static final Setting INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING = + Setting.intSetting("index.translog.retention.total_files", 100, 0, Setting.Property.IndexScope); + /** * Controls the maximum length of time since a retention lease is created or renewed before it is considered expired. */ @@ -842,6 +850,14 @@ public final class IndexSettings { return translogRetentionAge; } + /** + * Returns the maximum number of translog files that that no longer required for persistence should be kept for peer recovery + * when soft-deletes is disabled. + */ + public int getTranslogRetentionTotalFiles() { + return INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.get(getSettings()); + } + /** * Returns the generation threshold size. As sequence numbers can cause multiple generations to * be preserved for rollback purposes, we want to keep the size of individual generations from diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index d1291a17d67..57cdd2b8123 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -193,7 +193,8 @@ public class InternalEngine extends Engine { } final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), - engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() + engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), + engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() ); store.incRef(); IndexWriter writer = null; diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index 2244dd57037..149dd03a239 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -151,7 +151,7 @@ public final class NoOpEngine extends ReadOnlyEngine { if (minTranslogGeneration < lastCommitGeneration) { // a translog deletion policy that retains nothing but the last translog generation from safe commit - final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1); + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration); translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index ded39c51b37..583cec349c0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -227,7 +227,8 @@ public class ReadOnlyEngine extends Engine { final TranslogConfig translogConfig = config.getTranslogConfig(); final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( config.getIndexSettings().getTranslogRetentionSize().getBytes(), - config.getIndexSettings().getTranslogRetentionAge().getMillis() + config.getIndexSettings().getTranslogRetentionAge().getMillis(), + config.getIndexSettings().getTranslogRetentionTotalFiles() ); translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit); diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index eb23a415d3e..8a553aad326 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -63,9 +63,12 @@ public class TranslogDeletionPolicy { private long retentionAgeInMillis; - public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis) { + private int retentionTotalFiles; + + public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) { this.retentionSizeInBytes = retentionSizeInBytes; this.retentionAgeInMillis = retentionAgeInMillis; + this.retentionTotalFiles = retentionTotalFiles; if (Assertions.ENABLED) { openTranslogRef = new ConcurrentHashMap<>(); } else { @@ -100,6 +103,10 @@ public class TranslogDeletionPolicy { retentionAgeInMillis = ageInMillis; } + synchronized void setRetentionTotalFiles(int retentionTotalFiles) { + this.retentionTotalFiles = retentionTotalFiles; + } + /** * acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation * will not be deleted until the returned {@link Releasable} is closed. @@ -164,7 +171,8 @@ public class TranslogDeletionPolicy { } else { minByAgeAndSize = Math.max(minByAge, minBySize); } - return Math.min(minByAgeAndSize, Math.min(minByLocks, minTranslogGenerationForRecovery)); + long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles); + return Math.min(Math.max(minByAgeAndSize, minByNumFiles), Math.min(minByLocks, minTranslogGenerationForRecovery)); } static long getMinTranslogGenBySize(List readers, TranslogWriter writer, long retentionSizeInBytes) { @@ -196,6 +204,16 @@ public class TranslogDeletionPolicy { } } + static long getMinTranslogGenByTotalFiles(List readers, TranslogWriter writer, final int maxTotalFiles) { + long minGen = writer.generation; + int totalFiles = 1; // for the current writer + for (int i = readers.size() - 1; i >= 0 && totalFiles < maxTotalFiles; i--) { + totalFiles++; + minGen = readers.get(i).generation; + } + return minGen; + } + protected long currentTime() { return System.currentTimeMillis(); } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java index 55a24d30991..e6581d0359d 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java @@ -178,7 +178,8 @@ public class TruncateTranslogAction { indexSettings, BigArrays.NON_RECYCLING_INSTANCE); long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardPath.getShardId().id()); // We open translog to check for corruption, do not clean anything. - final TranslogDeletionPolicy retainAllTranslogPolicy = new TranslogDeletionPolicy(Long.MAX_VALUE, Long.MAX_VALUE) { + final TranslogDeletionPolicy retainAllTranslogPolicy = new TranslogDeletionPolicy( + Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE) { @Override long minTranslogGenRequired(List readers, TranslogWriter writer) { long minGen = writer.generation; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 5ef1300b97c..a15c6c26cdd 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -94,6 +94,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Locale; import java.util.concurrent.BrokenBarrierException; @@ -128,6 +129,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; public class IndexShardIT extends ESSingleNodeTestCase { @@ -855,6 +857,43 @@ public class IndexShardIT extends ESSingleNodeTestCase { } } + public void testLimitNumberOfRetainingTranslogFiles() throws Exception { + String indexName = "test"; + int translogRetentionTotalFiles = randomIntBetween(0, 50); + Settings.Builder settings = Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.getKey(), translogRetentionTotalFiles); + if (randomBoolean()) { + settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1024 * 1024))); + } + if (randomBoolean()) { + settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), TimeValue.timeValueMillis(between(1, 10_000))); + } + IndexService indexService = createIndex(indexName, settings.build()); + IndexShard shard = indexService.getShard(0); + shard.rollTranslogGeneration(); + CheckedRunnable checkTranslog = () -> { + try (Stream files = Files.list(getTranslog(shard).location()).sorted(Comparator.reverseOrder())) { + long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); + assertThat(totalFiles, either(lessThanOrEqualTo((long) translogRetentionTotalFiles)).or(equalTo(1L))); + } + }; + for (int i = 0; i < 100; i++) { + client().prepareIndex(indexName, "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); + if (randomInt(100) < 10) { + client().admin().indices().prepareFlush(indexName).setWaitIfOngoing(true).get(); + checkTranslog.run(); + } + if (randomInt(100) < 10) { + shard.rollTranslogGeneration(); + } + } + client().admin().indices().prepareFlush(indexName).get(); + checkTranslog.run(); + } + /** * Asserts that there are no files in the specified path */ diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index da339ff5c8e..0d296af5f0c 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -47,7 +47,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase { List allGens = new ArrayList<>(readersAndWriter.v1()); allGens.add(readersAndWriter.v2()); try { - TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0); + TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0, 0); assertMinGenRequired(deletionPolicy, readersAndWriter, 1L); final int committedReader = randomIntBetween(0, allGens.size() - 1); final long committedGen = allGens.get(committedReader).generation; @@ -98,6 +98,25 @@ public class TranslogDeletionPolicyTests extends ESTestCase { } } + public void testTotalFilesRetention() throws Exception { + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(System.currentTimeMillis()); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + assertThat(TranslogDeletionPolicy.getMinTranslogGenByTotalFiles(readersAndWriter.v1(), readersAndWriter.v2(), + randomIntBetween(Integer.MIN_VALUE, 1)), equalTo(readersAndWriter.v2().generation)); + assertThat(TranslogDeletionPolicy.getMinTranslogGenByTotalFiles(readersAndWriter.v1(), readersAndWriter.v2(), + randomIntBetween(allGens.size(), Integer.MAX_VALUE)), equalTo(allGens.get(0).generation)); + int numFiles = randomIntBetween(1, allGens.size()); + long selectedGeneration = allGens.get(allGens.size() - numFiles).generation; + assertThat(TranslogDeletionPolicy.getMinTranslogGenByTotalFiles(readersAndWriter.v1(), readersAndWriter.v2(), numFiles), + equalTo(selectedGeneration)); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + /** * Tests that age trumps size but recovery trumps both. */ @@ -107,7 +126,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase { List allGens = new ArrayList<>(readersAndWriter.v1()); allGens.add(readersAndWriter.v2()); try { - TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE); + TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE); deletionPolicy.setTranslogGenerationOfLastCommit(Long.MAX_VALUE); deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE); int selectedReader = randomIntBetween(0, allGens.size() - 1); @@ -116,33 +135,41 @@ public class TranslogDeletionPolicyTests extends ESTestCase { selectedReader = randomIntBetween(0, allGens.size() - 1); final long selectedGenerationBySize = allGens.get(selectedReader).generation; long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); + selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGenerationByTotalFiles = allGens.get(selectedReader).generation; deletionPolicy.setRetentionAgeInMillis(maxAge); deletionPolicy.setRetentionSizeInBytes(size); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.max(selectedGenerationByAge, selectedGenerationBySize)); + final int totalFiles = allGens.size() - selectedReader; + deletionPolicy.setRetentionTotalFiles(totalFiles); + assertMinGenRequired(deletionPolicy, readersAndWriter, + max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles)); // make a new policy as committed gen can't go backwards (for now) - deletionPolicy = new MockDeletionPolicy(now, size, maxAge); + deletionPolicy = new MockDeletionPolicy(now, size, maxAge, totalFiles); long committedGen = randomFrom(allGens).generation; deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE)); deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); - assertMinGenRequired(deletionPolicy, readersAndWriter, - Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize))); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, + max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles))); long viewGen = randomFrom(allGens).generation; try (Releasable ignored = deletionPolicy.acquireTranslogGen(viewGen)) { assertMinGenRequired(deletionPolicy, readersAndWriter, - Math.min( - Math.min(committedGen, viewGen), - Math.max(selectedGenerationByAge, selectedGenerationBySize))); + min3(committedGen, viewGen, max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles))); // disable age deletionPolicy.setRetentionAgeInMillis(-1); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationBySize)); + assertMinGenRequired(deletionPolicy, readersAndWriter, + min3(committedGen, viewGen, Math.max(selectedGenerationBySize, selectedGenerationByTotalFiles))); // disable size deletionPolicy.setRetentionAgeInMillis(maxAge); deletionPolicy.setRetentionSizeInBytes(-1); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationByAge)); - // disable both + assertMinGenRequired(deletionPolicy, readersAndWriter, + min3(committedGen, viewGen, Math.max(selectedGenerationByAge, selectedGenerationByTotalFiles))); + // disable age and zie deletionPolicy.setRetentionAgeInMillis(-1); deletionPolicy.setRetentionSizeInBytes(-1); assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); + // disable total files + deletionPolicy.setRetentionTotalFiles(0); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); } } finally { IOUtils.close(readersAndWriter.v1()); @@ -191,8 +218,8 @@ public class TranslogDeletionPolicyTests extends ESTestCase { long now; - MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis) { - super(retentionSizeInBytes, maxRetentionAgeInMillis); + MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis, int maxRetentionTotalFiles) { + super(retentionSizeInBytes, maxRetentionAgeInMillis, maxRetentionTotalFiles); this.now = now; } @@ -201,4 +228,12 @@ public class TranslogDeletionPolicyTests extends ESTestCase { return now; } } + + private static long max3(long x1, long x2, long x3) { + return Math.max(Math.max(x1, x2), x3); + } + + private static long min3(long x1, long x2, long x3) { + return Math.min(Math.min(x1, x2), x3); + } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index af48ce8dfe6..616254a16f9 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2265,7 +2265,7 @@ public class TranslogTests extends ESTestCase { // engine blows up, after committing the above generation translog.close(); TranslogConfig config = translog.getConfig(); - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, @@ -2324,7 +2324,7 @@ public class TranslogTests extends ESTestCase { // expected... } } - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, diff --git a/test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java b/test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java index 3ab55b687bd..f0921dfb6ba 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java +++ b/test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java @@ -27,13 +27,14 @@ public class TranslogDeletionPolicies { public static TranslogDeletionPolicy createTranslogDeletionPolicy() { return new TranslogDeletionPolicy( IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), - IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis() + IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis(), + IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.getDefault(Settings.EMPTY) ); } public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) { return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(), - indexSettings.getTranslogRetentionAge().getMillis()); + indexSettings.getTranslogRetentionAge().getMillis(), indexSettings.getTranslogRetentionTotalFiles()); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java index 246dac18ef8..b921e392f4f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java @@ -53,7 +53,8 @@ public final class InternalSettingsPlugin extends Plugin { IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING, IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING, IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING, - IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING + IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING, + IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING ); } }