Allow to change concurrent merge scheduling setting dynamically

Allow to change the concurrent merge scheduler settings dynamically using the update settings API
closes #6098
This commit is contained in:
Shay Banon 2014-05-08 19:13:35 +02:00
parent 29ab31b351
commit 78e39882ee
5 changed files with 54 additions and 6 deletions

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
@ -44,18 +45,23 @@ import java.util.concurrent.CopyOnWriteArraySet;
*/
public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
private final int maxThreadCount;
private final int maxMergeCount;
private final IndexSettingsService indexSettingsService;
private final ApplySettings applySettings = new ApplySettings();
private volatile int maxThreadCount;
private volatile int maxMergeCount;
private Set<CustomConcurrentMergeScheduler> schedulers = new CopyOnWriteArraySet<>();
@Inject
public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) {
public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexSettingsService indexSettingsService) {
super(shardId, indexSettings, threadPool);
this.indexSettingsService = indexSettingsService;
this.maxThreadCount = componentSettings.getAsInt("max_thread_count", ConcurrentMergeScheduler.DEFAULT_MAX_THREAD_COUNT);
this.maxMergeCount = componentSettings.getAsInt("max_merge_count", ConcurrentMergeScheduler.DEFAULT_MAX_MERGE_COUNT);
logger.debug("using [concurrent] merge scheduler with max_thread_count[{}]", maxThreadCount);
logger.debug("using [concurrent] merge scheduler with max_thread_count[{}], max_merge_count[{}]", maxThreadCount, maxMergeCount);
indexSettingsService.addListener(applySettings);
}
@Override
@ -84,6 +90,11 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
return ImmutableSet.of();
}
@Override
public void close() {
indexSettingsService.removeListener(applySettings);
}
public static class CustomConcurrentMergeScheduler extends TrackingConcurrentMergeScheduler {
private final ShardId shardId;
@ -128,4 +139,27 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
provider.afterMerge(merge);
}
}
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
int maxThreadCount = settings.getAsInt("index.merge.scheduler.max_thread_count", ConcurrentMergeSchedulerProvider.this.maxThreadCount);
if (maxThreadCount != ConcurrentMergeSchedulerProvider.this.maxThreadCount) {
logger.info("updating [max_thread_count] from [{}] to [{}]", ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount);
ConcurrentMergeSchedulerProvider.this.maxThreadCount = maxThreadCount;
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
scheduler.setMaxMergesAndThreads(ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxThreadCount);
}
}
int maxMergeCount = settings.getAsInt("index.merge.scheduler.max_merge_count", ConcurrentMergeSchedulerProvider.this.maxMergeCount);
if (maxMergeCount != ConcurrentMergeSchedulerProvider.this.maxMergeCount) {
logger.info("updating [max_merge_count] from [{}] to [{}]", ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount);
ConcurrentMergeSchedulerProvider.this.maxMergeCount = maxMergeCount;
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
scheduler.setMaxMergesAndThreads(maxMergeCount, ConcurrentMergeSchedulerProvider.this.maxThreadCount);
}
}
}
}
}

View File

@ -127,4 +127,6 @@ public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent
public abstract MergeStats stats();
public abstract Set<OnGoingMerge> onGoingMerges();
public abstract void close();
}

View File

@ -74,6 +74,11 @@ public class SerialMergeSchedulerProvider extends MergeSchedulerProvider {
return ImmutableSet.of();
}
@Override
public void close() {
}
public static class CustomSerialMergeScheduler extends TrackingSerialMergeScheduler {
private final SerialMergeSchedulerProvider provider;

View File

@ -49,6 +49,7 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.PercolatorShardModule;
import org.elasticsearch.index.query.IndexQueryParserService;
@ -403,6 +404,12 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
logger.debug("failed to close engine", e);
// ignore
}
try {
shardInjector.getInstance(MergeSchedulerProvider.class).close();
} catch (Throwable e) {
logger.debug("failed to close merge policy scheduler", e);
// ignore
}
try {
shardInjector.getInstance(MergePolicyProvider.class).close();
} catch (Throwable e) {

View File

@ -324,7 +324,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
@Test
public void testSegmentsWithMergeFlag() throws Exception {
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool);
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS));
final AtomicReference<CountDownLatch> waitTillMerge = new AtomicReference<>();
final AtomicReference<CountDownLatch> waitForMerge = new AtomicReference<>();
mergeSchedulerProvider.addListener(new MergeSchedulerProvider.Listener() {