From 1e92f0f4ffd1948c4863ec409722a128426dcfc6 Mon Sep 17 00:00:00 2001 From: mikemccand Date: Tue, 22 Jul 2014 11:23:46 -0400 Subject: [PATCH] Core: allow index.merge.scheduler.max_thread_count to be updated dynamically Lucene allows the max_thread_count to be updated, but this wasn't fully exposed in Elasticsearch. Closes #6925 --- .../ConcurrentMergeSchedulerProvider.java | 8 +- .../settings/IndexDynamicSettingsModule.java | 3 + .../indices/settings/UpdateSettingsTests.java | 118 ++++++++++++++++++ 3 files changed, 125 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java index 8601c85f2bd..5fa7ad416c6 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java @@ -156,18 +156,18 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider { class ApplySettings implements IndexSettingsService.Listener { @Override public void onRefreshSettings(Settings settings) { - int maxThreadCount = settings.getAsInt("index.merge.scheduler.max_thread_count", ConcurrentMergeSchedulerProvider.this.maxThreadCount); + int maxThreadCount = settings.getAsInt(MAX_THREAD_COUNT, ConcurrentMergeSchedulerProvider.this.maxThreadCount); if (maxThreadCount != ConcurrentMergeSchedulerProvider.this.maxThreadCount) { - logger.info("updating [max_thread_count] from [{}] to [{}]", ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount); + logger.info("updating [{}] from [{}] to [{}]", MAX_THREAD_COUNT_KEY, ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount); ConcurrentMergeSchedulerProvider.this.maxThreadCount = maxThreadCount; for (CustomConcurrentMergeScheduler scheduler : schedulers) { scheduler.setMaxMergesAndThreads(ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxThreadCount); } } - int maxMergeCount = settings.getAsInt("index.merge.scheduler.max_merge_count", ConcurrentMergeSchedulerProvider.this.maxMergeCount); + int maxMergeCount = settings.getAsInt(MAX_MERGE_COUNT, ConcurrentMergeSchedulerProvider.this.maxMergeCount); if (maxMergeCount != ConcurrentMergeSchedulerProvider.this.maxMergeCount) { - logger.info("updating [max_merge_count] from [{}] to [{}]", ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount); + logger.info("updating [{}] from [{}] to [{}]", MAX_MERGE_COUNT_KEY, ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount); ConcurrentMergeSchedulerProvider.this.maxMergeCount = maxMergeCount; for (CustomConcurrentMergeScheduler scheduler : schedulers) { scheduler.setMaxMergesAndThreads(maxMergeCount, ConcurrentMergeSchedulerProvider.this.maxThreadCount); diff --git a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index 4f6d403c2b8..30ece8cad6f 100644 --- a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService; import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider; import org.elasticsearch.index.merge.policy.LogDocMergePolicyProvider; import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider; +import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; import org.elasticsearch.index.search.slowlog.ShardSlowLogSearchService; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.store.support.AbstractIndexStore; @@ -52,6 +53,8 @@ public class IndexDynamicSettingsModule extends AbstractModule { indexDynamicSettings = new DynamicSettings(); indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE); indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE); + indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT); + indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT); indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_REQUIRE_GROUP + "*"); indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "*"); indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "*"); diff --git a/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java b/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java index 74080008c76..270b106bb18 100644 --- a/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java +++ b/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java @@ -19,6 +19,10 @@ package org.elasticsearch.indices.settings; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -237,4 +241,118 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest { // No additional merge IO throttling should have happened: assertEquals(sumThrottleTime, newSumThrottleTime); } + + private static class MockAppender extends AppenderSkeleton { + public boolean sawIndexWriterMessage; + public boolean sawFlushDeletes; + public boolean sawMergeThreadPaused; + public boolean sawUpdateSetting; + + @Override + protected void append(LoggingEvent event) { + String message = event.getMessage().toString(); + if (event.getLevel() == Level.TRACE && + event.getLoggerName().endsWith("lucene.iw")) { + sawFlushDeletes |= message.contains("IW: apply all deletes during flush"); + sawMergeThreadPaused |= message.contains("CMS: pause thread"); + } + if (event.getLevel() == Level.INFO && message.contains("updating [max_thread_count] from [10000] to [1]")) { + sawUpdateSetting = true; + } + } + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + public void close() { + } + } + + // #6882: make sure we can change index.merge.scheduler.max_thread_count live + @Test + @Slow + public void testUpdateMergeMaxThreadCount() { + + MockAppender mockAppender = new MockAppender(); + Logger rootLogger = Logger.getRootLogger(); + Level savedLevel = rootLogger.getLevel(); + rootLogger.addAppender(mockAppender); + rootLogger.setLevel(Level.TRACE); + + try { + + // Tons of merge threads allowed, only 1 non-replicated shard, force lots of merging, throttle so they fall behind: + assertAcked(prepareCreate("test") + .setSettings(ImmutableSettings.builder() + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge") + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "1mb") + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1") + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2") + .put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "10000") + .put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000") + )); + ensureGreen(); + long termUpto = 0; + for(int i=0;i<100;i++) { + // Provoke slowish merging by making many unique terms: + StringBuilder sb = new StringBuilder(); + for(int j=0;j<100;j++) { + sb.append(' '); + sb.append(termUpto++); + } + client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get(); + if (i % 2 == 0) { + refresh(); + } + } + + assertTrue(mockAppender.sawFlushDeletes); + assertFalse(mockAppender.sawMergeThreadPaused); + mockAppender.sawFlushDeletes = false; + mockAppender.sawMergeThreadPaused = false; + + assertFalse(mockAppender.sawUpdateSetting); + + // Now make a live change to reduce allowed merge threads: + client() + .admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(ImmutableSettings.builder() + .put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")) + .get(); + + // Make sure we log the change: + assertTrue(mockAppender.sawUpdateSetting); + + int i = 0; + while (true) { + // Provoke slowish merging by making many unique terms: + StringBuilder sb = new StringBuilder(); + for(int j=0;j<100;j++) { + sb.append(' '); + sb.append(termUpto++); + } + client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get(); + if (i % 2 == 0) { + refresh(); + } + // This time we should see some merges were in fact paused: + if (mockAppender.sawMergeThreadPaused) { + break; + } + i++; + } + + + } finally { + rootLogger.removeAppender(mockAppender); + rootLogger.setLevel(savedLevel); + } + } }