Flush engine after big merge (#46066) (#46111)

Today we might carry on a big merge uncommitted and therefore
occupy a significant amount of diskspace for quite a long time
if for instance indexing load goes down and we are not quickly
reaching the translog size threshold. This change will cause a
flush if we hit a significant merge (512MB by default) which
frees diskspace sooner.
This commit is contained in:
Simon Willnauer 2019-08-29 17:54:15 +02:00 committed by GitHub
parent f7a91239e7
commit 9b2ea07b17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 75 additions and 6 deletions

View File

@ -139,6 +139,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING,
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING, EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,

View File

@ -195,6 +195,14 @@ public final class IndexSettings {
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES),
Property.Dynamic, Property.IndexScope); Property.Dynamic, Property.IndexScope);
/**
* The minimum size of a merge that triggers a flush in order to free resources
*/
public static final Setting<ByteSizeValue> INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING =
Setting.byteSizeSetting("index.flush_after_merge", new ByteSizeValue(512, ByteSizeUnit.MB),
new ByteSizeValue(0, ByteSizeUnit.BYTES), // always flush after merge
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), // never flush after merge
Property.Dynamic, Property.IndexScope);
/** /**
* The maximum size of a translog generation. This is independent of the maximum size of * The maximum size of a translog generation. This is independent of the maximum size of
* translog operations that have not been flushed. * translog operations that have not been flushed.
@ -338,6 +346,7 @@ public final class IndexSettings {
private volatile TimeValue translogRetentionAge; private volatile TimeValue translogRetentionAge;
private volatile ByteSizeValue translogRetentionSize; private volatile ByteSizeValue translogRetentionSize;
private volatile ByteSizeValue generationThresholdSize; private volatile ByteSizeValue generationThresholdSize;
private volatile ByteSizeValue flushAfterMergeThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig; private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig; private final MergePolicyConfig mergePolicyConfig;
private final IndexSortConfig indexSortConfig; private final IndexSortConfig indexSortConfig;
@ -470,6 +479,7 @@ public final class IndexSettings {
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
flushAfterMergeThresholdSize = scopedSettings.get(INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this); mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
softDeleteEnabled = version.onOrAfter(Version.V_6_5_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING); softDeleteEnabled = version.onOrAfter(Version.V_6_5_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
@ -530,6 +540,7 @@ public final class IndexSettings {
scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer); scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer);
scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes); scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING, this::setFlushAfterMergeThresholdSize);
scopedSettings.addSettingsUpdateConsumer( scopedSettings.addSettingsUpdateConsumer(
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
this::setGenerationThresholdSize); this::setGenerationThresholdSize);
@ -555,6 +566,10 @@ public final class IndexSettings {
this.flushThresholdSize = byteSizeValue; this.flushThresholdSize = byteSizeValue;
} }
private void setFlushAfterMergeThresholdSize(ByteSizeValue byteSizeValue) {
this.flushAfterMergeThresholdSize = byteSizeValue;
}
private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) { private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
if (softDeleteEnabled && byteSizeValue.getBytes() >= 0) { if (softDeleteEnabled && byteSizeValue.getBytes() >= 0) {
// ignore the translog retention settings if soft-deletes enabled // ignore the translog retention settings if soft-deletes enabled
@ -744,6 +759,11 @@ public final class IndexSettings {
*/ */
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; } public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }
/**
* Returns the merge threshold size when to forcefully flush the index and free resources.
*/
public ByteSizeValue getFlushAfterMergeThresholdSize() { return flushAfterMergeThresholdSize; }
/** /**
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries * Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
*/ */

View File

@ -175,6 +175,7 @@ public class InternalEngine extends Engine {
private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false); private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
private final KeyedLock<Long> noOpKeyedLock = new KeyedLock<>(); private final KeyedLock<Long> noOpKeyedLock = new KeyedLock<>();
private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false);
@Nullable @Nullable
private final String historyUUID; private final String historyUUID;
@ -1698,6 +1699,9 @@ public class InternalEngine extends Engine {
@Override @Override
public boolean shouldPeriodicallyFlush() { public boolean shouldPeriodicallyFlush() {
ensureOpen(); ensureOpen();
if (shouldPeriodicallyFlushAfterBigMerge.get()) {
return true;
}
final long translogGenerationOfLastCommit = final long translogGenerationOfLastCommit =
Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
@ -2345,7 +2349,7 @@ public class InternalEngine extends Engine {
} }
@Override @Override
protected void doRun() throws Exception { protected void doRun() {
// if we have no pending merges and we are supposed to flush once merges have finished // if we have no pending merges and we are supposed to flush once merges have finished
// we try to renew a sync commit which is the case when we are having a big merge after we // we try to renew a sync commit which is the case when we are having a big merge after we
// are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work // are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work
@ -2357,7 +2361,11 @@ public class InternalEngine extends Engine {
} }
} }
}); });
} else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) {
// we hit a significant merge which would allow us to free up memory if we'd commit it hence on the next change
// we should execute a flush on the next operation if that's a flush after inactive or indexing a document.
// we could fork a thread and do it right away but we try to minimize forking and piggyback on outside events.
shouldPeriodicallyFlushAfterBigMerge.set(true);
} }
} }
@ -2425,7 +2433,7 @@ public class InternalEngine extends Engine {
logger.trace("committing writer with commit data [{}]", commitData); logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator(); return commitData.entrySet().iterator();
}); });
shouldPeriodicallyFlushAfterBigMerge.set(false);
writer.commit(); writer.commit();
} catch (final Exception ex) { } catch (final Exception ex) {
try { try {

View File

@ -30,11 +30,12 @@ import java.util.List;
public class OnGoingMerge { public class OnGoingMerge {
private final String id; private final String id;
private final List<SegmentCommitInfo> mergedSegments; private final MergePolicy.OneMerge oneMerge;
public OnGoingMerge(MergePolicy.OneMerge merge) { public OnGoingMerge(MergePolicy.OneMerge merge) {
this.id = Integer.toString(System.identityHashCode(merge)); this.id = Integer.toString(System.identityHashCode(merge));
this.mergedSegments = merge.segments; this.oneMerge = merge;
} }
/** /**
@ -44,10 +45,20 @@ public class OnGoingMerge {
return id; return id;
} }
/**
* Returns the total size in bytes of this merge. Note that this does not
* indicate the size of the merged segment, but the
* input total size.
*/
public long getTotalBytesSize() {
return oneMerge.totalBytesSize();
}
/** /**
* The list of segments that are being merged. * The list of segments that are being merged.
*/ */
public List<SegmentCommitInfo> getMergedSegments() { public List<SegmentCommitInfo> getMergedSegments() {
return mergedSegments; return oneMerge.segments;
} }
} }

View File

@ -5054,6 +5054,34 @@ public class InternalEngineTests extends EngineTestCase {
} }
} }
public void testShouldPeriodicallyFlushAfterMerge() throws Exception {
assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false));
ParsedDocument doc =
testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
engine.refresh("test");
assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false));
final IndexSettings indexSettings = engine.config().getIndexSettings();
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
.settings(Settings.builder().put(indexSettings.getSettings())
.put(IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING.getKey(), "0b")).build();
indexSettings.updateIndexMetaData(indexMetaData);
engine.onSettingsChanged();
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(1));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
doc = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(2));
engine.refresh("test");
engine.forceMerge(false, 1, false, false, false);
assertBusy(() -> {
// the merge listner runs concurrently after the force merge returned
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
});
engine.flush();
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
}
public void testStressShouldPeriodicallyFlush() throws Exception { public void testStressShouldPeriodicallyFlush() throws Exception {
final long flushThreshold = randomLongBetween(120, 5000); final long flushThreshold = randomLongBetween(120, 5000);
final long generationThreshold = randomLongBetween(1000, 5000); final long generationThreshold = randomLongBetween(1000, 5000);

View File

@ -398,6 +398,7 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
nonReplicatedSettings.add(IndexSettings.MAX_ADJACENCY_MATRIX_FILTERS_SETTING); nonReplicatedSettings.add(IndexSettings.MAX_ADJACENCY_MATRIX_FILTERS_SETTING);
nonReplicatedSettings.add(IndexSettings.DEFAULT_PIPELINE); nonReplicatedSettings.add(IndexSettings.DEFAULT_PIPELINE);
nonReplicatedSettings.add(IndexSettings.INDEX_SEARCH_THROTTLED); nonReplicatedSettings.add(IndexSettings.INDEX_SEARCH_THROTTLED);
nonReplicatedSettings.add(IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING);
nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING); nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING);
nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING); nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING);
nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);