Core: allow index.merge.scheduler.max_thread_count to be updated dynamically
Lucene allows the max_thread_count to be updated, but this wasn't fully exposed in Elasticsearch. Closes #6925
This commit is contained in:
parent
f14af3599a
commit
1e92f0f4ff
|
@ -156,18 +156,18 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
||||||
class ApplySettings implements IndexSettingsService.Listener {
|
class ApplySettings implements IndexSettingsService.Listener {
|
||||||
@Override
|
@Override
|
||||||
public void onRefreshSettings(Settings settings) {
|
public void onRefreshSettings(Settings settings) {
|
||||||
int maxThreadCount = settings.getAsInt("index.merge.scheduler.max_thread_count", ConcurrentMergeSchedulerProvider.this.maxThreadCount);
|
int maxThreadCount = settings.getAsInt(MAX_THREAD_COUNT, ConcurrentMergeSchedulerProvider.this.maxThreadCount);
|
||||||
if (maxThreadCount != ConcurrentMergeSchedulerProvider.this.maxThreadCount) {
|
if (maxThreadCount != ConcurrentMergeSchedulerProvider.this.maxThreadCount) {
|
||||||
logger.info("updating [max_thread_count] from [{}] to [{}]", ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount);
|
logger.info("updating [{}] from [{}] to [{}]", MAX_THREAD_COUNT_KEY, ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount);
|
||||||
ConcurrentMergeSchedulerProvider.this.maxThreadCount = maxThreadCount;
|
ConcurrentMergeSchedulerProvider.this.maxThreadCount = maxThreadCount;
|
||||||
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
||||||
scheduler.setMaxMergesAndThreads(ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxThreadCount);
|
scheduler.setMaxMergesAndThreads(ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxThreadCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int maxMergeCount = settings.getAsInt("index.merge.scheduler.max_merge_count", ConcurrentMergeSchedulerProvider.this.maxMergeCount);
|
int maxMergeCount = settings.getAsInt(MAX_MERGE_COUNT, ConcurrentMergeSchedulerProvider.this.maxMergeCount);
|
||||||
if (maxMergeCount != ConcurrentMergeSchedulerProvider.this.maxMergeCount) {
|
if (maxMergeCount != ConcurrentMergeSchedulerProvider.this.maxMergeCount) {
|
||||||
logger.info("updating [max_merge_count] from [{}] to [{}]", ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount);
|
logger.info("updating [{}] from [{}] to [{}]", MAX_MERGE_COUNT_KEY, ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount);
|
||||||
ConcurrentMergeSchedulerProvider.this.maxMergeCount = maxMergeCount;
|
ConcurrentMergeSchedulerProvider.this.maxMergeCount = maxMergeCount;
|
||||||
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
||||||
scheduler.setMaxMergesAndThreads(maxMergeCount, ConcurrentMergeSchedulerProvider.this.maxThreadCount);
|
scheduler.setMaxMergesAndThreads(maxMergeCount, ConcurrentMergeSchedulerProvider.this.maxThreadCount);
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
|
||||||
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
|
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
|
||||||
import org.elasticsearch.index.merge.policy.LogDocMergePolicyProvider;
|
import org.elasticsearch.index.merge.policy.LogDocMergePolicyProvider;
|
||||||
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
|
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
|
||||||
|
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
|
||||||
import org.elasticsearch.index.search.slowlog.ShardSlowLogSearchService;
|
import org.elasticsearch.index.search.slowlog.ShardSlowLogSearchService;
|
||||||
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
||||||
import org.elasticsearch.index.store.support.AbstractIndexStore;
|
import org.elasticsearch.index.store.support.AbstractIndexStore;
|
||||||
|
@ -52,6 +53,8 @@ public class IndexDynamicSettingsModule extends AbstractModule {
|
||||||
indexDynamicSettings = new DynamicSettings();
|
indexDynamicSettings = new DynamicSettings();
|
||||||
indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
|
indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
|
||||||
indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE);
|
indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE);
|
||||||
|
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT);
|
||||||
|
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT);
|
||||||
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_REQUIRE_GROUP + "*");
|
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_REQUIRE_GROUP + "*");
|
||||||
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "*");
|
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "*");
|
||||||
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "*");
|
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "*");
|
||||||
|
|
|
@ -19,6 +19,10 @@
|
||||||
|
|
||||||
package org.elasticsearch.indices.settings;
|
package org.elasticsearch.indices.settings;
|
||||||
|
|
||||||
|
import org.apache.log4j.AppenderSkeleton;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.log4j.spi.LoggingEvent;
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
|
@ -237,4 +241,118 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
||||||
// No additional merge IO throttling should have happened:
|
// No additional merge IO throttling should have happened:
|
||||||
assertEquals(sumThrottleTime, newSumThrottleTime);
|
assertEquals(sumThrottleTime, newSumThrottleTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class MockAppender extends AppenderSkeleton {
|
||||||
|
public boolean sawIndexWriterMessage;
|
||||||
|
public boolean sawFlushDeletes;
|
||||||
|
public boolean sawMergeThreadPaused;
|
||||||
|
public boolean sawUpdateSetting;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void append(LoggingEvent event) {
|
||||||
|
String message = event.getMessage().toString();
|
||||||
|
if (event.getLevel() == Level.TRACE &&
|
||||||
|
event.getLoggerName().endsWith("lucene.iw")) {
|
||||||
|
sawFlushDeletes |= message.contains("IW: apply all deletes during flush");
|
||||||
|
sawMergeThreadPaused |= message.contains("CMS: pause thread");
|
||||||
|
}
|
||||||
|
if (event.getLevel() == Level.INFO && message.contains("updating [max_thread_count] from [10000] to [1]")) {
|
||||||
|
sawUpdateSetting = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requiresLayout() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// #6882: make sure we can change index.merge.scheduler.max_thread_count live
|
||||||
|
@Test
|
||||||
|
@Slow
|
||||||
|
public void testUpdateMergeMaxThreadCount() {
|
||||||
|
|
||||||
|
MockAppender mockAppender = new MockAppender();
|
||||||
|
Logger rootLogger = Logger.getRootLogger();
|
||||||
|
Level savedLevel = rootLogger.getLevel();
|
||||||
|
rootLogger.addAppender(mockAppender);
|
||||||
|
rootLogger.setLevel(Level.TRACE);
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
// Tons of merge threads allowed, only 1 non-replicated shard, force lots of merging, throttle so they fall behind:
|
||||||
|
assertAcked(prepareCreate("test")
|
||||||
|
.setSettings(ImmutableSettings.builder()
|
||||||
|
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
|
||||||
|
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "1mb")
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||||
|
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
|
||||||
|
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
|
||||||
|
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "10000")
|
||||||
|
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000")
|
||||||
|
));
|
||||||
|
ensureGreen();
|
||||||
|
long termUpto = 0;
|
||||||
|
for(int i=0;i<100;i++) {
|
||||||
|
// Provoke slowish merging by making many unique terms:
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for(int j=0;j<100;j++) {
|
||||||
|
sb.append(' ');
|
||||||
|
sb.append(termUpto++);
|
||||||
|
}
|
||||||
|
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||||
|
if (i % 2 == 0) {
|
||||||
|
refresh();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(mockAppender.sawFlushDeletes);
|
||||||
|
assertFalse(mockAppender.sawMergeThreadPaused);
|
||||||
|
mockAppender.sawFlushDeletes = false;
|
||||||
|
mockAppender.sawMergeThreadPaused = false;
|
||||||
|
|
||||||
|
assertFalse(mockAppender.sawUpdateSetting);
|
||||||
|
|
||||||
|
// Now make a live change to reduce allowed merge threads:
|
||||||
|
client()
|
||||||
|
.admin()
|
||||||
|
.indices()
|
||||||
|
.prepareUpdateSettings("test")
|
||||||
|
.setSettings(ImmutableSettings.builder()
|
||||||
|
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1"))
|
||||||
|
.get();
|
||||||
|
|
||||||
|
// Make sure we log the change:
|
||||||
|
assertTrue(mockAppender.sawUpdateSetting);
|
||||||
|
|
||||||
|
int i = 0;
|
||||||
|
while (true) {
|
||||||
|
// Provoke slowish merging by making many unique terms:
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for(int j=0;j<100;j++) {
|
||||||
|
sb.append(' ');
|
||||||
|
sb.append(termUpto++);
|
||||||
|
}
|
||||||
|
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||||
|
if (i % 2 == 0) {
|
||||||
|
refresh();
|
||||||
|
}
|
||||||
|
// This time we should see some merges were in fact paused:
|
||||||
|
if (mockAppender.sawMergeThreadPaused) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
rootLogger.removeAppender(mockAppender);
|
||||||
|
rootLogger.setLevel(savedLevel);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue