Limit number of retaining translog files for peer recovery (#47414)
Today we control the extra translog (when soft-deletes is disabled) for peer recoveries by size and age. If users manually (force) flush many times within a short period, we can keep many small (or empty) translog files as neither the size or age condition is reached. We can protect the cluster from running out of the file descriptors in such a situation by limiting the number of retaining translog files.
This commit is contained in:
parent
f81d9a4aa6
commit
5e4732f2bb
|
@ -271,6 +271,14 @@ public final class IndexSettings {
|
|||
Setting.byteSizeSetting("index.translog.retention.size", settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? "-1" : "512MB",
|
||||
Property.Dynamic, Property.IndexScope);
|
||||
|
||||
/**
|
||||
* Controls the number of translog files that are no longer needed for persistence reasons will be kept around before being deleted.
|
||||
* This is a safeguard making sure that the translog deletion policy won't keep too many translog files especially when they're small.
|
||||
* This setting is intentionally not registered, it is only used in tests
|
||||
**/
|
||||
public static final Setting<Integer> INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING =
|
||||
Setting.intSetting("index.translog.retention.total_files", 100, 0, Setting.Property.IndexScope);
|
||||
|
||||
/**
|
||||
* Controls the maximum length of time since a retention lease is created or renewed before it is considered expired.
|
||||
*/
|
||||
|
@ -842,6 +850,14 @@ public final class IndexSettings {
|
|||
return translogRetentionAge;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the maximum number of translog files that that no longer required for persistence should be kept for peer recovery
|
||||
* when soft-deletes is disabled.
|
||||
*/
|
||||
public int getTranslogRetentionTotalFiles() {
|
||||
return INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.get(getSettings());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
|
||||
* be preserved for rollback purposes, we want to keep the size of individual generations from
|
||||
|
|
|
@ -193,7 +193,8 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
|
||||
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
|
||||
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
|
||||
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
|
||||
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
|
||||
);
|
||||
store.incRef();
|
||||
IndexWriter writer = null;
|
||||
|
|
|
@ -151,7 +151,7 @@ public final class NoOpEngine extends ReadOnlyEngine {
|
|||
|
||||
if (minTranslogGeneration < lastCommitGeneration) {
|
||||
// a translog deletion policy that retains nothing but the last translog generation from safe commit
|
||||
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1);
|
||||
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
|
||||
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration);
|
||||
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration);
|
||||
|
||||
|
|
|
@ -227,7 +227,8 @@ public class ReadOnlyEngine extends Engine {
|
|||
final TranslogConfig translogConfig = config.getTranslogConfig();
|
||||
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
|
||||
config.getIndexSettings().getTranslogRetentionSize().getBytes(),
|
||||
config.getIndexSettings().getTranslogRetentionAge().getMillis()
|
||||
config.getIndexSettings().getTranslogRetentionAge().getMillis(),
|
||||
config.getIndexSettings().getTranslogRetentionTotalFiles()
|
||||
);
|
||||
translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit);
|
||||
|
||||
|
|
|
@ -63,9 +63,12 @@ public class TranslogDeletionPolicy {
|
|||
|
||||
private long retentionAgeInMillis;
|
||||
|
||||
public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis) {
|
||||
private int retentionTotalFiles;
|
||||
|
||||
public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) {
|
||||
this.retentionSizeInBytes = retentionSizeInBytes;
|
||||
this.retentionAgeInMillis = retentionAgeInMillis;
|
||||
this.retentionTotalFiles = retentionTotalFiles;
|
||||
if (Assertions.ENABLED) {
|
||||
openTranslogRef = new ConcurrentHashMap<>();
|
||||
} else {
|
||||
|
@ -100,6 +103,10 @@ public class TranslogDeletionPolicy {
|
|||
retentionAgeInMillis = ageInMillis;
|
||||
}
|
||||
|
||||
synchronized void setRetentionTotalFiles(int retentionTotalFiles) {
|
||||
this.retentionTotalFiles = retentionTotalFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation
|
||||
* will not be deleted until the returned {@link Releasable} is closed.
|
||||
|
@ -164,7 +171,8 @@ public class TranslogDeletionPolicy {
|
|||
} else {
|
||||
minByAgeAndSize = Math.max(minByAge, minBySize);
|
||||
}
|
||||
return Math.min(minByAgeAndSize, Math.min(minByLocks, minTranslogGenerationForRecovery));
|
||||
long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles);
|
||||
return Math.min(Math.max(minByAgeAndSize, minByNumFiles), Math.min(minByLocks, minTranslogGenerationForRecovery));
|
||||
}
|
||||
|
||||
static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
|
||||
|
@ -196,6 +204,16 @@ public class TranslogDeletionPolicy {
|
|||
}
|
||||
}
|
||||
|
||||
static long getMinTranslogGenByTotalFiles(List<TranslogReader> readers, TranslogWriter writer, final int maxTotalFiles) {
|
||||
long minGen = writer.generation;
|
||||
int totalFiles = 1; // for the current writer
|
||||
for (int i = readers.size() - 1; i >= 0 && totalFiles < maxTotalFiles; i--) {
|
||||
totalFiles++;
|
||||
minGen = readers.get(i).generation;
|
||||
}
|
||||
return minGen;
|
||||
}
|
||||
|
||||
protected long currentTime() {
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
|
|
@ -178,7 +178,8 @@ public class TruncateTranslogAction {
|
|||
indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
|
||||
long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardPath.getShardId().id());
|
||||
// We open translog to check for corruption, do not clean anything.
|
||||
final TranslogDeletionPolicy retainAllTranslogPolicy = new TranslogDeletionPolicy(Long.MAX_VALUE, Long.MAX_VALUE) {
|
||||
final TranslogDeletionPolicy retainAllTranslogPolicy = new TranslogDeletionPolicy(
|
||||
Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE) {
|
||||
@Override
|
||||
long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) {
|
||||
long minGen = writer.generation;
|
||||
|
|
|
@ -94,6 +94,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
|
@ -128,6 +129,7 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public class IndexShardIT extends ESSingleNodeTestCase {
|
||||
|
@ -855,6 +857,43 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testLimitNumberOfRetainingTranslogFiles() throws Exception {
|
||||
String indexName = "test";
|
||||
int translogRetentionTotalFiles = randomIntBetween(0, 50);
|
||||
Settings.Builder settings = Settings.builder()
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false)
|
||||
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.getKey(), translogRetentionTotalFiles);
|
||||
if (randomBoolean()) {
|
||||
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1024 * 1024)));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), TimeValue.timeValueMillis(between(1, 10_000)));
|
||||
}
|
||||
IndexService indexService = createIndex(indexName, settings.build());
|
||||
IndexShard shard = indexService.getShard(0);
|
||||
shard.rollTranslogGeneration();
|
||||
CheckedRunnable<IOException> checkTranslog = () -> {
|
||||
try (Stream<Path> files = Files.list(getTranslog(shard).location()).sorted(Comparator.reverseOrder())) {
|
||||
long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count();
|
||||
assertThat(totalFiles, either(lessThanOrEqualTo((long) translogRetentionTotalFiles)).or(equalTo(1L)));
|
||||
}
|
||||
};
|
||||
for (int i = 0; i < 100; i++) {
|
||||
client().prepareIndex(indexName, "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
|
||||
if (randomInt(100) < 10) {
|
||||
client().admin().indices().prepareFlush(indexName).setWaitIfOngoing(true).get();
|
||||
checkTranslog.run();
|
||||
}
|
||||
if (randomInt(100) < 10) {
|
||||
shard.rollTranslogGeneration();
|
||||
}
|
||||
}
|
||||
client().admin().indices().prepareFlush(indexName).get();
|
||||
checkTranslog.run();
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that there are no files in the specified path
|
||||
*/
|
||||
|
|
|
@ -47,7 +47,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
|
|||
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
|
||||
allGens.add(readersAndWriter.v2());
|
||||
try {
|
||||
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0);
|
||||
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0, 0);
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter, 1L);
|
||||
final int committedReader = randomIntBetween(0, allGens.size() - 1);
|
||||
final long committedGen = allGens.get(committedReader).generation;
|
||||
|
@ -98,6 +98,25 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testTotalFilesRetention() throws Exception {
|
||||
Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(System.currentTimeMillis());
|
||||
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
|
||||
allGens.add(readersAndWriter.v2());
|
||||
try {
|
||||
assertThat(TranslogDeletionPolicy.getMinTranslogGenByTotalFiles(readersAndWriter.v1(), readersAndWriter.v2(),
|
||||
randomIntBetween(Integer.MIN_VALUE, 1)), equalTo(readersAndWriter.v2().generation));
|
||||
assertThat(TranslogDeletionPolicy.getMinTranslogGenByTotalFiles(readersAndWriter.v1(), readersAndWriter.v2(),
|
||||
randomIntBetween(allGens.size(), Integer.MAX_VALUE)), equalTo(allGens.get(0).generation));
|
||||
int numFiles = randomIntBetween(1, allGens.size());
|
||||
long selectedGeneration = allGens.get(allGens.size() - numFiles).generation;
|
||||
assertThat(TranslogDeletionPolicy.getMinTranslogGenByTotalFiles(readersAndWriter.v1(), readersAndWriter.v2(), numFiles),
|
||||
equalTo(selectedGeneration));
|
||||
} finally {
|
||||
IOUtils.close(readersAndWriter.v1());
|
||||
IOUtils.close(readersAndWriter.v2());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that age trumps size but recovery trumps both.
|
||||
*/
|
||||
|
@ -107,7 +126,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
|
|||
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
|
||||
allGens.add(readersAndWriter.v2());
|
||||
try {
|
||||
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE);
|
||||
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE);
|
||||
deletionPolicy.setTranslogGenerationOfLastCommit(Long.MAX_VALUE);
|
||||
deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE);
|
||||
int selectedReader = randomIntBetween(0, allGens.size() - 1);
|
||||
|
@ -116,33 +135,41 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
|
|||
selectedReader = randomIntBetween(0, allGens.size() - 1);
|
||||
final long selectedGenerationBySize = allGens.get(selectedReader).generation;
|
||||
long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get();
|
||||
selectedReader = randomIntBetween(0, allGens.size() - 1);
|
||||
final long selectedGenerationByTotalFiles = allGens.get(selectedReader).generation;
|
||||
deletionPolicy.setRetentionAgeInMillis(maxAge);
|
||||
deletionPolicy.setRetentionSizeInBytes(size);
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.max(selectedGenerationByAge, selectedGenerationBySize));
|
||||
final int totalFiles = allGens.size() - selectedReader;
|
||||
deletionPolicy.setRetentionTotalFiles(totalFiles);
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter,
|
||||
max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles));
|
||||
// make a new policy as committed gen can't go backwards (for now)
|
||||
deletionPolicy = new MockDeletionPolicy(now, size, maxAge);
|
||||
deletionPolicy = new MockDeletionPolicy(now, size, maxAge, totalFiles);
|
||||
long committedGen = randomFrom(allGens).generation;
|
||||
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE));
|
||||
deletionPolicy.setMinTranslogGenerationForRecovery(committedGen);
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter,
|
||||
Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize)));
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen,
|
||||
max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles)));
|
||||
long viewGen = randomFrom(allGens).generation;
|
||||
try (Releasable ignored = deletionPolicy.acquireTranslogGen(viewGen)) {
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter,
|
||||
Math.min(
|
||||
Math.min(committedGen, viewGen),
|
||||
Math.max(selectedGenerationByAge, selectedGenerationBySize)));
|
||||
min3(committedGen, viewGen, max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles)));
|
||||
// disable age
|
||||
deletionPolicy.setRetentionAgeInMillis(-1);
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationBySize));
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter,
|
||||
min3(committedGen, viewGen, Math.max(selectedGenerationBySize, selectedGenerationByTotalFiles)));
|
||||
// disable size
|
||||
deletionPolicy.setRetentionAgeInMillis(maxAge);
|
||||
deletionPolicy.setRetentionSizeInBytes(-1);
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationByAge));
|
||||
// disable both
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter,
|
||||
min3(committedGen, viewGen, Math.max(selectedGenerationByAge, selectedGenerationByTotalFiles)));
|
||||
// disable age and zie
|
||||
deletionPolicy.setRetentionAgeInMillis(-1);
|
||||
deletionPolicy.setRetentionSizeInBytes(-1);
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen));
|
||||
// disable total files
|
||||
deletionPolicy.setRetentionTotalFiles(0);
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen));
|
||||
}
|
||||
} finally {
|
||||
IOUtils.close(readersAndWriter.v1());
|
||||
|
@ -191,8 +218,8 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
|
|||
|
||||
long now;
|
||||
|
||||
MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis) {
|
||||
super(retentionSizeInBytes, maxRetentionAgeInMillis);
|
||||
MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis, int maxRetentionTotalFiles) {
|
||||
super(retentionSizeInBytes, maxRetentionAgeInMillis, maxRetentionTotalFiles);
|
||||
this.now = now;
|
||||
}
|
||||
|
||||
|
@ -201,4 +228,12 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
|
|||
return now;
|
||||
}
|
||||
}
|
||||
|
||||
private static long max3(long x1, long x2, long x3) {
|
||||
return Math.max(Math.max(x1, x2), x3);
|
||||
}
|
||||
|
||||
private static long min3(long x1, long x2, long x3) {
|
||||
return Math.min(Math.min(x1, x2), x3);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2265,7 +2265,7 @@ public class TranslogTests extends ESTestCase {
|
|||
// engine blows up, after committing the above generation
|
||||
translog.close();
|
||||
TranslogConfig config = translog.getConfig();
|
||||
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1);
|
||||
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
|
||||
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE));
|
||||
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
|
||||
translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy,
|
||||
|
@ -2324,7 +2324,7 @@ public class TranslogTests extends ESTestCase {
|
|||
// expected...
|
||||
}
|
||||
}
|
||||
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1);
|
||||
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
|
||||
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE));
|
||||
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
|
||||
try (Translog translog = new Translog(config, translogUUID, deletionPolicy,
|
||||
|
|
|
@ -27,13 +27,14 @@ public class TranslogDeletionPolicies {
|
|||
public static TranslogDeletionPolicy createTranslogDeletionPolicy() {
|
||||
return new TranslogDeletionPolicy(
|
||||
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(),
|
||||
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis()
|
||||
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis(),
|
||||
IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.getDefault(Settings.EMPTY)
|
||||
);
|
||||
}
|
||||
|
||||
public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) {
|
||||
return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(),
|
||||
indexSettings.getTranslogRetentionAge().getMillis());
|
||||
indexSettings.getTranslogRetentionAge().getMillis(), indexSettings.getTranslogRetentionTotalFiles());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -53,7 +53,8 @@ public final class InternalSettingsPlugin extends Plugin {
|
|||
IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING,
|
||||
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING,
|
||||
IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING,
|
||||
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING
|
||||
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING,
|
||||
IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue