diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index fa3f6df7844..4dc99c5f9ea 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -139,6 +139,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING, EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, + IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index a8e629e2aff..eab23ea5915 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -195,6 +195,14 @@ public final class IndexSettings { new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), Property.Dynamic, Property.IndexScope); + /** + * The minimum size of a merge that triggers a flush in order to free resources + */ + public static final Setting INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING = + Setting.byteSizeSetting("index.flush_after_merge", new ByteSizeValue(512, ByteSizeUnit.MB), + new ByteSizeValue(0, ByteSizeUnit.BYTES), // always flush after merge + new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES), // never flush after merge + Property.Dynamic, Property.IndexScope); /** * The maximum size of a translog generation. This is independent of the maximum size of * translog operations that have not been flushed. @@ -338,6 +346,7 @@ public final class IndexSettings { private volatile TimeValue translogRetentionAge; private volatile ByteSizeValue translogRetentionSize; private volatile ByteSizeValue generationThresholdSize; + private volatile ByteSizeValue flushAfterMergeThresholdSize; private final MergeSchedulerConfig mergeSchedulerConfig; private final MergePolicyConfig mergePolicyConfig; private final IndexSortConfig indexSortConfig; @@ -470,6 +479,7 @@ public final class IndexSettings { refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); + flushAfterMergeThresholdSize = scopedSettings.get(INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING); mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); softDeleteEnabled = version.onOrAfter(Version.V_6_5_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING); @@ -530,6 +540,7 @@ public final class IndexSettings { scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer); scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize); + scopedSettings.addSettingsUpdateConsumer(INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING, this::setFlushAfterMergeThresholdSize); scopedSettings.addSettingsUpdateConsumer( INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, this::setGenerationThresholdSize); @@ -555,6 +566,10 @@ public final class IndexSettings { this.flushThresholdSize = byteSizeValue; } + private void setFlushAfterMergeThresholdSize(ByteSizeValue byteSizeValue) { + this.flushAfterMergeThresholdSize = byteSizeValue; + } + private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) { if (softDeleteEnabled && byteSizeValue.getBytes() >= 0) { // ignore the translog retention settings if soft-deletes enabled @@ -744,6 +759,11 @@ public final class IndexSettings { */ public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; } + /** + * Returns the merge threshold size when to forcefully flush the index and free resources. + */ + public ByteSizeValue getFlushAfterMergeThresholdSize() { return flushAfterMergeThresholdSize; } + /** * Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 73d8a89b930..4324adbb738 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -175,6 +175,7 @@ public class InternalEngine extends Engine { private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false); private final KeyedLock noOpKeyedLock = new KeyedLock<>(); + private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false); @Nullable private final String historyUUID; @@ -1698,6 +1699,9 @@ public class InternalEngine extends Engine { @Override public boolean shouldPeriodicallyFlush() { ensureOpen(); + if (shouldPeriodicallyFlushAfterBigMerge.get()) { + return true; + } final long translogGenerationOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); @@ -2345,7 +2349,7 @@ public class InternalEngine extends Engine { } @Override - protected void doRun() throws Exception { + protected void doRun() { // if we have no pending merges and we are supposed to flush once merges have finished // we try to renew a sync commit which is the case when we are having a big merge after we // are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work @@ -2357,7 +2361,11 @@ public class InternalEngine extends Engine { } } }); - + } else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) { + // we hit a significant merge which would allow us to free up memory if we'd commit it hence on the next change + // we should execute a flush on the next operation if that's a flush after inactive or indexing a document. + // we could fork a thread and do it right away but we try to minimize forking and piggyback on outside events. + shouldPeriodicallyFlushAfterBigMerge.set(true); } } @@ -2425,7 +2433,7 @@ public class InternalEngine extends Engine { logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); - + shouldPeriodicallyFlushAfterBigMerge.set(false); writer.commit(); } catch (final Exception ex) { try { diff --git a/server/src/main/java/org/elasticsearch/index/merge/OnGoingMerge.java b/server/src/main/java/org/elasticsearch/index/merge/OnGoingMerge.java index e39158720c3..5c09b660777 100644 --- a/server/src/main/java/org/elasticsearch/index/merge/OnGoingMerge.java +++ b/server/src/main/java/org/elasticsearch/index/merge/OnGoingMerge.java @@ -30,11 +30,12 @@ import java.util.List; public class OnGoingMerge { private final String id; - private final List mergedSegments; + private final MergePolicy.OneMerge oneMerge; public OnGoingMerge(MergePolicy.OneMerge merge) { this.id = Integer.toString(System.identityHashCode(merge)); - this.mergedSegments = merge.segments; + this.oneMerge = merge; + } /** @@ -44,10 +45,20 @@ public class OnGoingMerge { return id; } + + /** + * Returns the total size in bytes of this merge. Note that this does not + * indicate the size of the merged segment, but the + * input total size. + */ + public long getTotalBytesSize() { + return oneMerge.totalBytesSize(); + } + /** * The list of segments that are being merged. */ public List getMergedSegments() { - return mergedSegments; + return oneMerge.segments; } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c7820078465..f91f6ee0d8c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5054,6 +5054,34 @@ public class InternalEngineTests extends EngineTestCase { } } + public void testShouldPeriodicallyFlushAfterMerge() throws Exception { + assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); + ParsedDocument doc = + testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null); + engine.index(indexForDoc(doc)); + engine.refresh("test"); + assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false)); + final IndexSettings indexSettings = engine.config().getIndexSettings(); + final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) + .settings(Settings.builder().put(indexSettings.getSettings()) + .put(IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING.getKey(), "0b")).build(); + indexSettings.updateIndexMetaData(indexMetaData); + engine.onSettingsChanged(); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(1)); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); + doc = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null); + engine.index(indexForDoc(doc)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(2)); + engine.refresh("test"); + engine.forceMerge(false, 1, false, false, false); + assertBusy(() -> { + // the merge listner runs concurrently after the force merge returned + assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); + }); + engine.flush(); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); + } + public void testStressShouldPeriodicallyFlush() throws Exception { final long flushThreshold = randomLongBetween(120, 5000); final long generationThreshold = randomLongBetween(1000, 5000); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index daa40c62a1c..a1e9c2aea08 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -398,6 +398,7 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction