Add lower bound for translog flush threshold (#28382)
If the translog flush threshold is too small (eg. smaller than the translog header), we may repeatedly flush even there is no uncommitted operation because the shouldFlush condition can still be true after flushing. This is currently avoided by adding an extra guard against the uncommitted operations. However, this extra guard makes the shouldFlush complicated. This commit replaces that extra guard by a lower bound for translog flush threshold. We keep the lower bound small for convenience in testing. Relates #28350 Relates #23606
This commit is contained in:
parent
bb97c00556
commit
1970e01782
|
@ -183,8 +183,15 @@ public final class IndexSettings {
|
||||||
Setting.timeSetting("index.refresh_interval", DEFAULT_REFRESH_INTERVAL, new TimeValue(-1, TimeUnit.MILLISECONDS),
|
Setting.timeSetting("index.refresh_interval", DEFAULT_REFRESH_INTERVAL, new TimeValue(-1, TimeUnit.MILLISECONDS),
|
||||||
Property.Dynamic, Property.IndexScope);
|
Property.Dynamic, Property.IndexScope);
|
||||||
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING =
|
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING =
|
||||||
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
|
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB),
|
||||||
Property.IndexScope);
|
/*
|
||||||
|
* An empty translog occupies 43 bytes on disk. If the flush threshold is below this, the flush thread
|
||||||
|
* can get stuck in an infinite loop as the shouldPeriodicallyFlush can still be true after flushing.
|
||||||
|
* However, small thresholds are useful for testing so we do not add a large lower bound here.
|
||||||
|
*/
|
||||||
|
new ByteSizeValue(Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1, ByteSizeUnit.BYTES),
|
||||||
|
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES),
|
||||||
|
Property.Dynamic, Property.IndexScope);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Controls how long translog files that are no longer needed for persistence reasons
|
* Controls how long translog files that are no longer needed for persistence reasons
|
||||||
|
@ -219,9 +226,9 @@ public final class IndexSettings {
|
||||||
* generation threshold. However, small thresholds are useful for testing so we
|
* generation threshold. However, small thresholds are useful for testing so we
|
||||||
* do not add a large lower bound here.
|
* do not add a large lower bound here.
|
||||||
*/
|
*/
|
||||||
new ByteSizeValue(64, ByteSizeUnit.BYTES),
|
new ByteSizeValue(Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1, ByteSizeUnit.BYTES),
|
||||||
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES),
|
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES),
|
||||||
new Property[]{Property.Dynamic, Property.IndexScope});
|
Property.Dynamic, Property.IndexScope);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Index setting to enable / disable deletes garbage collection.
|
* Index setting to enable / disable deletes garbage collection.
|
||||||
|
|
|
@ -1470,6 +1470,8 @@ public class InternalEngine extends Engine {
|
||||||
if (uncommittedSizeOfCurrentCommit < flushThreshold) {
|
if (uncommittedSizeOfCurrentCommit < flushThreshold) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
assert translog.uncommittedOperations() > 0 : "translog required to flush periodically but not contain any uncommitted operation; "
|
||||||
|
+ "uncommitted translog size [" + uncommittedSizeOfCurrentCommit + "], flush threshold [" + flushThreshold + "]";
|
||||||
/*
|
/*
|
||||||
* We should only flush ony if the shouldFlush condition can become false after flushing.
|
* We should only flush ony if the shouldFlush condition can become false after flushing.
|
||||||
* This condition will change if the `uncommittedSize` of the new commit is smaller than
|
* This condition will change if the `uncommittedSize` of the new commit is smaller than
|
||||||
|
@ -1477,14 +1479,7 @@ public class InternalEngine extends Engine {
|
||||||
* thus the IndexWriter#hasUncommittedChanges condition is not considered.
|
* thus the IndexWriter#hasUncommittedChanges condition is not considered.
|
||||||
*/
|
*/
|
||||||
final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1);
|
final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1);
|
||||||
/*
|
return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit;
|
||||||
* If flushThreshold is too small, we may repeatedly flush even there is no uncommitted operation
|
|
||||||
* as #sizeOfGensAboveSeqNoInByte and #uncommittedSizeInBytes can return different values.
|
|
||||||
* An empty translog file has non-zero `uncommittedSize` (the translog header), and method #sizeOfGensAboveSeqNoInBytes can
|
|
||||||
* return 0 now(no translog gen contains ops above local checkpoint) but method #uncommittedSizeInBytes will return an actual
|
|
||||||
* non-zero value after rolling a new translog generation. This can be avoided by checking the actual uncommitted operations.
|
|
||||||
*/
|
|
||||||
return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit && translog.uncommittedOperations() > 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -108,6 +108,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
public static final String CHECKPOINT_FILE_NAME = "translog" + CHECKPOINT_SUFFIX;
|
public static final String CHECKPOINT_FILE_NAME = "translog" + CHECKPOINT_SUFFIX;
|
||||||
|
|
||||||
static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$");
|
static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$");
|
||||||
|
public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogWriter.getHeaderLength(UUIDs.randomBase64UUID());
|
||||||
|
|
||||||
// the list of translog readers is guaranteed to be in order of translog generation
|
// the list of translog readers is guaranteed to be in order of translog generation
|
||||||
private final List<TranslogReader> readers = new ArrayList<>();
|
private final List<TranslogReader> readers = new ArrayList<>();
|
||||||
|
@ -451,7 +452,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
* @throws IOException if creating the translog failed
|
* @throws IOException if creating the translog failed
|
||||||
*/
|
*/
|
||||||
TranslogWriter createWriter(long fileGeneration) throws IOException {
|
TranslogWriter createWriter(long fileGeneration) throws IOException {
|
||||||
return createWriter(fileGeneration, getMinFileGeneration(), globalCheckpointSupplier.getAsLong());
|
final TranslogWriter writer = createWriter(fileGeneration, getMinFileGeneration(), globalCheckpointSupplier.getAsLong());
|
||||||
|
assert writer.sizeInBytes() == DEFAULT_HEADER_SIZE_IN_BYTES : "Mismatch translog header size; " +
|
||||||
|
"empty translog size [" + writer.sizeInBytes() + ", header size [" + DEFAULT_HEADER_SIZE_IN_BYTES + "]";
|
||||||
|
return writer;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -350,7 +350,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
||||||
});
|
});
|
||||||
assertEquals(0, translog.uncommittedOperations());
|
assertEquals(0, translog.uncommittedOperations());
|
||||||
translog.sync();
|
translog.sync();
|
||||||
long size = translog.uncommittedSizeInBytes();
|
long size = Math.max(translog.uncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1);
|
||||||
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(),
|
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(),
|
||||||
translog.uncommittedOperations(), translog.getGeneration());
|
translog.uncommittedOperations(), translog.getGeneration());
|
||||||
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(
|
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(
|
||||||
|
|
Loading…
Reference in New Issue