diff --git a/docs/reference/cluster/nodes-stats.asciidoc b/docs/reference/cluster/nodes-stats.asciidoc index dbb63c7e9fd..81d027f8779 100644 --- a/docs/reference/cluster/nodes-stats.asciidoc +++ b/docs/reference/cluster/nodes-stats.asciidoc @@ -25,7 +25,7 @@ of `indices`, `os`, `process`, `jvm`, `network`, `transport`, `http`, [horizontal] `indices`:: Indices stats about size, document count, indexing and - deletion times, search times, field cache size , merges and flushes + deletion times, search times, field cache size, merges and flushes `fs`:: File system information, data path, free disk space, read/write diff --git a/docs/reference/cluster/update-settings.asciidoc b/docs/reference/cluster/update-settings.asciidoc index 9962cf2bb23..aded0fda24b 100644 --- a/docs/reference/cluster/update-settings.asciidoc +++ b/docs/reference/cluster/update-settings.asciidoc @@ -183,15 +183,6 @@ due to forced awareness or allocation filtering. `indices.recovery.max_bytes_per_sec`:: See <> -[float] -==== Store level throttling - -`indices.store.throttle.type`:: - See <> - -`indices.store.throttle.max_bytes_per_sec`:: - See <> - [float] [[logger]] === Logger diff --git a/docs/reference/index-modules/merge.asciidoc b/docs/reference/index-modules/merge.asciidoc index aaf42a57429..4d276f00037 100644 --- a/docs/reference/index-modules/merge.asciidoc +++ b/docs/reference/index-modules/merge.asciidoc @@ -7,12 +7,6 @@ where the index data is stored, and are immutable up to delete markers. Segments are, periodically, merged into larger segments to keep the index size at bay and expunge deletes. -The more segments one has in the Lucene index means slower searches and -more memory used. Segment merging is used to reduce the number of segments, -however merges can be expensive to perform, especially on low IO environments. -Merges can be throttled using <>. - - [float] [[policy]] === Policy @@ -194,10 +188,21 @@ scheduler supports this setting: `index.merge.scheduler.max_thread_count`:: The maximum number of threads that may be merging at once. Defaults to -`Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors() / 2))` +`Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors() / 2))` which works well for a good solid-state-disk (SSD). If your index is on spinning platter drives instead, decrease this to 1. +`index.merge.scheduler.auto_throttle`:: + +If this is true (the default), then the merge scheduler will +rate-limit IO (writes) for merges to an adaptive value depending on +how many merges are requested over time. An application with a low +indexing rate that unluckily suddenly requires a large merge will see +that merge aggressively throttled, while an application doing heavy +indexing will see the throttle move higher to allow merges to keep up +with ongoing indexing. This is a dynamic setting (you can <<../indices/update-settings,change it +at any time on a running index>>). + [float] ==== SerialMergeScheduler diff --git a/docs/reference/index-modules/store.asciidoc b/docs/reference/index-modules/store.asciidoc index a56c9315c56..4a0a9de86a9 100644 --- a/docs/reference/index-modules/store.asciidoc +++ b/docs/reference/index-modules/store.asciidoc @@ -19,37 +19,6 @@ to the fact that there is no need for extra large JVM heaps (with their own consequences) for storing the index in memory. -[float] -[[store-throttling]] -=== Store Level Throttling - -The way Lucene, the IR library elasticsearch uses under the covers, -works is by creating immutable segments (up to deletes) and constantly -merging them (the merge policy settings allow to control how those -merges happen). The merge process happens in an asynchronous manner -without affecting the indexing / search speed. The problem though, -especially on systems with low IO, is that the merge process can be -expensive and affect search / index operation simply by the fact that -the box is now taxed with more IO happening. - -The store module allows to have throttling configured for merges (or -all) either on the node level, or on the index level. The node level -throttling will make sure that out of all the shards allocated on that -node, the merge process won't pass the specific setting bytes per -second. It can be set by setting `indices.store.throttle.type` to -`merge`, and setting `indices.store.throttle.max_bytes_per_sec` to -something like `5mb`. The node level settings can be changed dynamically -using the cluster update settings API. The default is set -to `20mb` with type `merge`. - -If specific index level configuration is needed, regardless of the node -level settings, it can be set as well using the -`index.store.throttle.type`, and -`index.store.throttle.max_bytes_per_sec`. The default value for the type -is `node`, meaning it will throttle based on the node level settings and -participate in the global throttling happening. Both settings can be set -using the index update settings API dynamically. - [float] [[file-system]] === File system storage types diff --git a/docs/reference/indices/update-settings.asciidoc b/docs/reference/indices/update-settings.asciidoc index 036de02c7a4..626a4e00870 100644 --- a/docs/reference/indices/update-settings.asciidoc +++ b/docs/reference/indices/update-settings.asciidoc @@ -90,6 +90,9 @@ settings API: All the settings for the merge policy currently configured. A different merge policy can't be set. +`index.merge.scheduler.*`:: + All the settings for the merge scheduler. + `index.routing.allocation.include.*`:: A node matching any rule will be allowed to host shards from the index. diff --git a/docs/reference/modules/indices.asciidoc b/docs/reference/modules/indices.asciidoc index 16c22a9dba7..a27b26b846b 100644 --- a/docs/reference/modules/indices.asciidoc +++ b/docs/reference/modules/indices.asciidoc @@ -61,16 +61,3 @@ The following settings can be set to manage the recovery policy: `indices.recovery.max_bytes_per_sec`:: defaults to `20mb`. -[float] -[[throttling]] -=== Store level throttling - -The following settings can be set to control the store throttling: - -[horizontal] -`indices.store.throttle.type`:: - could be `merge` (default), `none` or `all`. See <>. - -`indices.store.throttle.max_bytes_per_sec`:: - defaults to `20mb`. - diff --git a/src/main/java/org/apache/lucene/index/TrackingConcurrentMergeScheduler.java b/src/main/java/org/apache/lucene/index/TrackingConcurrentMergeScheduler.java index ec2c0a003f5..94d86087f9c 100644 --- a/src/main/java/org/apache/lucene/index/TrackingConcurrentMergeScheduler.java +++ b/src/main/java/org/apache/lucene/index/TrackingConcurrentMergeScheduler.java @@ -46,6 +46,8 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler { private final CounterMetric currentMerges = new CounterMetric(); private final CounterMetric currentMergesNumDocs = new CounterMetric(); private final CounterMetric currentMergesSizeInBytes = new CounterMetric(); + private final CounterMetric totalMergeStoppedTime = new CounterMetric(); + private final CounterMetric totalMergeThrottledTime = new CounterMetric(); private final Set onGoingMerges = ConcurrentCollections.newConcurrentSet(); private final Set readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges); @@ -83,6 +85,14 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler { return currentMergesSizeInBytes.count(); } + public long totalMergeStoppedTimeMillis() { + return totalMergeStoppedTime.count(); + } + + public long totalMergeThrottledTimeMillis() { + return totalMergeThrottledTime.count(); + } + public Set onGoingMerges() { return readOnlyOnGoingMerges; } @@ -118,12 +128,23 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler { totalMergesNumDocs.inc(totalNumDocs); totalMergesSizeInBytes.inc(totalSizeInBytes); totalMerges.inc(took); + + long stoppedMS = merge.rateLimiter.getTotalStoppedNS()/1000000; + long throttledMS = merge.rateLimiter.getTotalPausedNS()/1000000; + + totalMergeStoppedTime.inc(stoppedMS); + totalMergeThrottledTime.inc(throttledMS); + String message = String.format(Locale.ROOT, - "merge segment [%s] done: took [%s], [%,.1f MB], [%,d docs]", + "merge segment [%s] done: took [%s], [%,.1f MB], [%,d docs], [%s stopped], [%s throttled], [%,.1f MB written], [%,.1f MB/sec throttle]", merge.info == null ? "_na_" : merge.info.info.name, TimeValue.timeValueMillis(took), totalSizeInBytes/1024f/1024f, - totalNumDocs); + totalNumDocs, + TimeValue.timeValueMillis(stoppedMS), + TimeValue.timeValueMillis(throttledMS), + merge.rateLimiter.getTotalBytesWritten()/1024f/1024f, + merge.rateLimiter.getMBPerSec()); if (took > 20000) { // if more than 20 seconds, DEBUG log it logger.debug(message); diff --git a/src/main/java/org/apache/lucene/store/StoreUtils.java b/src/main/java/org/apache/lucene/store/StoreUtils.java index 7bf85604f34..b7de08b1ec1 100644 --- a/src/main/java/org/apache/lucene/store/StoreUtils.java +++ b/src/main/java/org/apache/lucene/store/StoreUtils.java @@ -18,6 +18,8 @@ */ package org.apache.lucene.store; +import java.util.Arrays; + /** */ public final class StoreUtils { @@ -46,4 +48,12 @@ public final class StoreUtils { return directory.toString(); } + + public static String toString(Directory[] directories) { + String[] strings = new String[directories.length]; + for(int i=0;i schedulers = new CopyOnWriteArraySet<>(); @@ -64,10 +63,10 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider { public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexSettingsService indexSettingsService) { super(shardId, indexSettings, threadPool); this.indexSettingsService = indexSettingsService; - // TODO LUCENE MONITOR this will change in Lucene 4.0 - this.maxThreadCount = componentSettings.getAsInt(MAX_THREAD_COUNT_KEY, Math.max(1, Math.min(3, EsExecutors.boundedNumberOfProcessors(indexSettings) / 2))); - this.maxMergeCount = componentSettings.getAsInt(MAX_MERGE_COUNT_KEY, maxThreadCount + 2); - logger.debug("using [concurrent] merge scheduler with max_thread_count[{}], max_merge_count[{}]", maxThreadCount, maxMergeCount); + this.maxThreadCount = indexSettings.getAsInt(MAX_THREAD_COUNT, Math.max(1, Math.min(4, EsExecutors.boundedNumberOfProcessors(indexSettings) / 2))); + this.maxMergeCount = indexSettings.getAsInt(MAX_MERGE_COUNT, maxThreadCount + 5); + this.autoThrottle = indexSettings.getAsBoolean(AUTO_THROTTLE, true); + logger.debug("using [concurrent] merge scheduler with max_thread_count[{}], max_merge_count[{}], auto_throttle[{}]", maxThreadCount, maxMergeCount, autoThrottle); indexSettingsService.addListener(applySettings); } @@ -75,10 +74,14 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider { @Override public MergeScheduler newMergeScheduler() { CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this); - // which would then stall if there are 2 merges in flight, and unstall once we are back to 1 or 0 merges // NOTE: we pass maxMergeCount+1 here so that CMS will allow one too many merges to kick off which then allows // InternalEngine.IndexThrottle to detect too-many-merges and throttle: concurrentMergeScheduler.setMaxMergesAndThreads(maxMergeCount+1, maxThreadCount); + if (autoThrottle) { + concurrentMergeScheduler.enableAutoIOThrottle(); + } else { + concurrentMergeScheduler.disableAutoIOThrottle(); + } schedulers.add(concurrentMergeScheduler); return concurrentMergeScheduler; } @@ -86,9 +89,13 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider { @Override public MergeStats stats() { MergeStats mergeStats = new MergeStats(); + // TODO: why would there be more than one CMS for a single shard...? for (CustomConcurrentMergeScheduler scheduler : schedulers) { mergeStats.add(scheduler.totalMerges(), scheduler.totalMergeTime(), scheduler.totalMergeNumDocs(), scheduler.totalMergeSizeInBytes(), - scheduler.currentMerges(), scheduler.currentMergesNumDocs(), scheduler.currentMergesSizeInBytes()); + scheduler.currentMerges(), scheduler.currentMergesNumDocs(), scheduler.currentMergesSizeInBytes(), + scheduler.totalMergeStoppedTimeMillis(), + scheduler.totalMergeThrottledTimeMillis(), + autoThrottle ? scheduler.getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY); } return mergeStats; } @@ -165,7 +172,7 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider { public void onRefreshSettings(Settings settings) { int maxThreadCount = settings.getAsInt(MAX_THREAD_COUNT, ConcurrentMergeSchedulerProvider.this.maxThreadCount); if (maxThreadCount != ConcurrentMergeSchedulerProvider.this.maxThreadCount) { - logger.info("updating [{}] from [{}] to [{}]", MAX_THREAD_COUNT_KEY, ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount); + logger.info("updating [{}] from [{}] to [{}]", MAX_THREAD_COUNT, ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount); ConcurrentMergeSchedulerProvider.this.maxThreadCount = maxThreadCount; for (CustomConcurrentMergeScheduler scheduler : schedulers) { scheduler.setMaxMergesAndThreads(ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxThreadCount); @@ -174,12 +181,25 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider { int maxMergeCount = settings.getAsInt(MAX_MERGE_COUNT, ConcurrentMergeSchedulerProvider.this.maxMergeCount); if (maxMergeCount != ConcurrentMergeSchedulerProvider.this.maxMergeCount) { - logger.info("updating [{}] from [{}] to [{}]", MAX_MERGE_COUNT_KEY, ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount); + logger.info("updating [{}] from [{}] to [{}]", MAX_MERGE_COUNT, ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount); ConcurrentMergeSchedulerProvider.this.maxMergeCount = maxMergeCount; for (CustomConcurrentMergeScheduler scheduler : schedulers) { scheduler.setMaxMergesAndThreads(maxMergeCount, ConcurrentMergeSchedulerProvider.this.maxThreadCount); } } + + boolean autoThrottle = settings.getAsBoolean(AUTO_THROTTLE, ConcurrentMergeSchedulerProvider.this.autoThrottle); + if (autoThrottle != ConcurrentMergeSchedulerProvider.this.autoThrottle) { + logger.info("updating [{}] from [{}] to [{}]", AUTO_THROTTLE, ConcurrentMergeSchedulerProvider.this.autoThrottle, autoThrottle); + ConcurrentMergeSchedulerProvider.this.autoThrottle = autoThrottle; + for (CustomConcurrentMergeScheduler scheduler : schedulers) { + if (autoThrottle) { + scheduler.enableAutoIOThrottle(); + } else { + scheduler.disableAutoIOThrottle(); + } + } + } } } } diff --git a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index 42c55b9722b..7a17bcd46ff 100644 --- a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -55,6 +55,7 @@ public class IndexDynamicSettingsModule extends AbstractModule { indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE); indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT); indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT); + indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE); indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_REQUIRE_GROUP + "*"); indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "*"); indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "*"); diff --git a/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java b/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java index 36eae00e88b..18dac8b6d92 100644 --- a/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java +++ b/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java @@ -21,8 +21,9 @@ package org.elasticsearch.index.store.distributor; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; -import org.elasticsearch.index.store.DirectoryUtils; +import org.apache.lucene.store.StoreUtils; import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.index.store.DirectoryUtils; import java.io.IOException; import java.nio.file.FileStore; @@ -68,7 +69,7 @@ public abstract class AbstractDistributor implements Distributor { @Override public String toString() { - return name() + Arrays.toString(delegates); + return name() + StoreUtils.toString(delegates); } protected abstract Directory doAny() throws IOException; diff --git a/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java b/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java index f829885d8a4..bb73c669047 100644 --- a/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java @@ -95,7 +95,7 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen this.indexService = indexService; this.indicesStore = indicesStore; - this.rateLimitingType = indexSettings.get(INDEX_STORE_THROTTLE_TYPE, "node"); + this.rateLimitingType = indexSettings.get(INDEX_STORE_THROTTLE_TYPE, "none"); if (rateLimitingType.equalsIgnoreCase("node")) { nodeRateLimiting = true; } else { diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 1496799aa12..558ccdfebbe 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -114,10 +114,10 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe this.transportService = transportService; transportService.registerHandler(ACTION_SHARD_EXISTS, new ShardActiveRequestHandler()); - // we limit with 20MB / sec by default with a default type set to merge sice 0.90.1 - this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.MERGE.name()); + // we don't limit by default (we default to CMS's auto throttle instead): + this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.NONE.name()); rateLimiting.setType(rateLimitingType); - this.rateLimitingThrottle = componentSettings.getAsBytesSize("throttle.max_bytes_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB)); + this.rateLimitingThrottle = componentSettings.getAsBytesSize("throttle.max_bytes_per_sec", new ByteSizeValue(10240, ByteSizeUnit.MB)); rateLimiting.setMaxRate(rateLimitingThrottle); logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle); @@ -458,4 +458,4 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe node.writeTo(out); } } -} \ No newline at end of file +} diff --git a/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java b/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java index febb3f97091..e9e5af52ebd 100644 --- a/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java +++ b/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java @@ -23,7 +23,6 @@ import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -264,7 +263,8 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest { public boolean sawIndexWriterMessage; public boolean sawFlushDeletes; public boolean sawMergeThreadPaused; - public boolean sawUpdateSetting; + public boolean sawUpdateMaxThreadCount; + public boolean sawUpdateAutoThrottle; @Override protected void append(LoggingEvent event) { @@ -274,8 +274,11 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest { sawFlushDeletes |= message.contains("IW: apply all deletes during flush"); sawMergeThreadPaused |= message.contains("CMS: pause thread"); } - if (event.getLevel() == Level.INFO && message.contains("updating [max_thread_count] from [10000] to [1]")) { - sawUpdateSetting = true; + if (event.getLevel() == Level.INFO && message.contains("updating [index.merge.scheduler.max_thread_count] from [10000] to [1]")) { + sawUpdateMaxThreadCount = true; + } + if (event.getLevel() == Level.INFO && message.contains("updating [index.merge.scheduler.auto_throttle] from [true] to [false]")) { + sawUpdateAutoThrottle = true; } } @@ -289,10 +292,50 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest { } } + @Test + public void testUpdateAutoThrottleSettings() { + + MockAppender mockAppender = new MockAppender(); + Logger rootLogger = Logger.getRootLogger(); + Level savedLevel = rootLogger.getLevel(); + rootLogger.addAppender(mockAppender); + rootLogger.setLevel(Level.TRACE); + + try { + // No throttling at first, only 1 non-replicated shard, force lots of merging: + assertAcked(prepareCreate("test") + .setSettings(ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1") + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2") + .put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1") + .put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "2") + )); + + // Disable auto throttle: + client() + .admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(ImmutableSettings.builder() + .put(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE, "no")) + .get(); + + // Make sure we log the change: + assertTrue(mockAppender.sawUpdateAutoThrottle); + + // Make sure setting says it is in fact changed: + GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get(); + assertThat(getSettingsResponse.getSetting("test", ConcurrentMergeSchedulerProvider.AUTO_THROTTLE), equalTo("no")); + } finally { + rootLogger.removeAppender(mockAppender); + rootLogger.setLevel(savedLevel); + } + } + // #6882: make sure we can change index.merge.scheduler.max_thread_count live @Test - @Slow - @AwaitsFix(bugUrl="Super slow because of LUCENE-6119. Muted until we clean up merge throttling.") public void testUpdateMergeMaxThreadCount() { MockAppender mockAppender = new MockAppender(); @@ -303,11 +346,8 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest { try { - // Tons of merge threads allowed, only 1 non-replicated shard, force lots of merging, throttle so they fall behind: assertAcked(prepareCreate("test") .setSettings(ImmutableSettings.builder() - .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge") - .put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "1mb") .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2") @@ -316,79 +356,25 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest { .put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "10000") .put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000") )); - ensureGreen(); - long termUpto = 0; - for(int i=0;i<100;i++) { - // Provoke slowish merging by making many unique terms: - StringBuilder sb = new StringBuilder(); - for(int j=0;j<100;j++) { - sb.append(' '); - sb.append(termUpto++); - } - client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get(); - if (i % 2 == 0) { - refresh(); - } - } - assertTrue(mockAppender.sawFlushDeletes); - assertFalse(mockAppender.sawMergeThreadPaused); - mockAppender.sawFlushDeletes = false; - mockAppender.sawMergeThreadPaused = false; + assertFalse(mockAppender.sawUpdateMaxThreadCount); - assertFalse(mockAppender.sawUpdateSetting); - - // Now make a live change to reduce allowed merge threads, and waaay over-throttle merging so they fall behind: + // Now make a live change to reduce allowed merge threads: client() .admin() .indices() .prepareUpdateSettings("test") .setSettings(ImmutableSettings.builder() .put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1") - .put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "10kb") ) .get(); - try { - - // Make sure we log the change: - assertTrue(mockAppender.sawUpdateSetting); - - int i = 0; - while (true) { - // Provoke slowish merging by making many unique terms: - StringBuilder sb = new StringBuilder(); - for(int j=0;j<100;j++) { - sb.append(' '); - sb.append(termUpto++); - } - client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get(); - if (i % 2 == 0) { - refresh(); - } - // This time we should see some merges were in fact paused: - if (mockAppender.sawMergeThreadPaused) { - break; - } - i++; - } - } finally { - // Make merges fast again & finish merges before we try to close; else we sometimes get a "Delete Index failed - not acked" - // when ElasticsearchIntegrationTest.after tries to remove indices created by the test: - client() - .admin() - .indices() - .prepareUpdateSettings("test") - .setSettings(ImmutableSettings.builder() - .put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "3") - .put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "20mb") - ) - .get(); - - // Wait for merges to finish - client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get(); - } + // Make sure we log the change: + assertTrue(mockAppender.sawUpdateMaxThreadCount); + // Make sure setting says it is in fact changed: + GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get(); + assertThat(getSettingsResponse.getSetting("test", ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT), equalTo("1")); } finally { rootLogger.removeAppender(mockAppender); diff --git a/src/test/java/org/elasticsearch/indices/store/SimpleDistributorTests.java b/src/test/java/org/elasticsearch/indices/store/SimpleDistributorTests.java index 9fe7a037850..bfe2876c8dc 100644 --- a/src/test/java/org/elasticsearch/indices/store/SimpleDistributorTests.java +++ b/src/test/java/org/elasticsearch/indices/store/SimpleDistributorTests.java @@ -121,6 +121,8 @@ public class SimpleDistributorTests extends ElasticsearchIntegrationTest { .put("index.store.type", storeType.name()) .put("index.number_of_replicas", 0) .put("index.number_of_shards", 1) + .put("index.store.throttle.type", "merge") + .put("index.store.throttle.max_bytes_per_sec", "20mb") ) .execute().actionGet(); assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false)); diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index d37dfbadd7e..14c1aa06302 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -444,7 +444,9 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase if (random.nextBoolean()) { builder.put(StoreModule.DISTIBUTOR_KEY, random.nextBoolean() ? StoreModule.LEAST_USED_DISTRIBUTOR : StoreModule.RANDOM_WEIGHT_DISTRIBUTOR); } - + if (random.nextBoolean()) { + builder.put(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE, false); + } if (random.nextBoolean()) { if (random.nextInt(10) == 0) { // do something crazy slow here