From b7ff23ff93006ed894d663cf9e97a3fbda151522 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 27 Dec 2012 09:39:27 -0500 Subject: [PATCH] Update settings: Allow to dynamically update thread pool settings Closes #2509 --- .../util/concurrent/EsThreadPoolExecutor.java | 39 ++ .../elasticsearch/threadpool/ThreadPool.java | 420 +++++++++++++++--- .../threadpool/SimpleThreadPoolTests.java | 160 +++++++ .../UpdateThreadPoolSettingsTests.java | 311 +++++++++++++ 4 files changed, 874 insertions(+), 56 deletions(-) create mode 100644 src/test/java/org/elasticsearch/test/integration/threadpool/SimpleThreadPoolTests.java create mode 100644 src/test/java/org/elasticsearch/test/unit/threadpool/UpdateThreadPoolSettingsTests.java diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 0802e79cf05..45d722e62be 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -19,6 +19,8 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.ElasticSearchIllegalStateException; + import java.util.concurrent.*; /** @@ -26,6 +28,10 @@ import java.util.concurrent.*; */ public class EsThreadPoolExecutor extends ThreadPoolExecutor { + private volatile ShutdownListener listener; + + private final Object monitor = new Object(); + public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy()); } @@ -33,4 +39,37 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } + + public void shutdown(ShutdownListener listener) { + synchronized (monitor) { + if (this.listener != null) { + throw new ElasticSearchIllegalStateException("Shutdown was already called on this thread pool"); + } + if (isTerminated()) { + listener.onTerminated(); + } else { + this.listener = listener; + } + shutdown(); + } + } + + @Override + protected synchronized void terminated() { + super.terminated(); + synchronized (monitor) { + if (listener != null) { + try { + listener.onTerminated(); + } finally { + listener = null; + } + } + } + } + + public static interface ShutdownListener { + public void onTerminated(); + } + } diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index eda437fe0fd..b7b8261e388 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -19,10 +19,12 @@ package org.elasticsearch.threadpool; +import com.google.common.base.Objects; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -38,13 +40,16 @@ import org.elasticsearch.common.util.concurrent.*; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.node.settings.NodeSettingsService; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.*; +import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; @@ -70,40 +75,59 @@ public class ThreadPool extends AbstractComponent { public static final String SNAPSHOT = "snapshot"; } - private final ImmutableMap executors; + static { + MetaData.addDynamicSettings( + "threadpool.*" + ); + } + + private volatile ImmutableMap executors; + + private final ImmutableMap defaultExecutorTypeSettings; + + private final Queue retiredExecutors = new ConcurrentLinkedQueue(); private final ScheduledThreadPoolExecutor scheduler; private final EstimatedTimeThread estimatedTimeThread; public ThreadPool() { - this(ImmutableSettings.Builder.EMPTY_SETTINGS); + this(ImmutableSettings.Builder.EMPTY_SETTINGS, null); } @Inject - public ThreadPool(Settings settings) { + public ThreadPool(Settings settings, @Nullable NodeSettingsService nodeSettingsService) { super(settings); Map groupSettings = settings.getGroups("threadpool"); + defaultExecutorTypeSettings = ImmutableMap.builder() + .put(Names.GENERIC, settingsBuilder().put("type", "cached").put("keep_alive", "30s").build()) + .put(Names.INDEX, settingsBuilder().put("type", "cached").build()) + .put(Names.BULK, settingsBuilder().put("type", "cached").build()) + .put(Names.GET, settingsBuilder().put("type", "cached").build()) + .put(Names.SEARCH, settingsBuilder().put("type", "cached").build()) + .put(Names.PERCOLATE, settingsBuilder().put("type", "cached").build()) + .put(Names.MANAGEMENT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build()) + .put(Names.FLUSH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 10).build()) + .put(Names.MERGE, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 20).build()) + .put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 10).build()) + .put(Names.CACHE, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 4).build()) + .put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build()) + .build(); + Map executors = Maps.newHashMap(); - executors.put(Names.GENERIC, build(Names.GENERIC, "cached", groupSettings.get(Names.GENERIC), settingsBuilder().put("keep_alive", "30s").build())); - executors.put(Names.INDEX, build(Names.INDEX, "cached", groupSettings.get(Names.INDEX), ImmutableSettings.Builder.EMPTY_SETTINGS)); - executors.put(Names.BULK, build(Names.BULK, "cached", groupSettings.get(Names.BULK), ImmutableSettings.Builder.EMPTY_SETTINGS)); - executors.put(Names.GET, build(Names.GET, "cached", groupSettings.get(Names.GET), ImmutableSettings.Builder.EMPTY_SETTINGS)); - executors.put(Names.SEARCH, build(Names.SEARCH, "cached", groupSettings.get(Names.SEARCH), ImmutableSettings.Builder.EMPTY_SETTINGS)); - executors.put(Names.PERCOLATE, build(Names.PERCOLATE, "cached", groupSettings.get(Names.PERCOLATE), ImmutableSettings.Builder.EMPTY_SETTINGS)); - executors.put(Names.MANAGEMENT, build(Names.MANAGEMENT, "scaling", groupSettings.get(Names.MANAGEMENT), settingsBuilder().put("keep_alive", "5m").put("size", 5).build())); - executors.put(Names.FLUSH, build(Names.FLUSH, "scaling", groupSettings.get(Names.FLUSH), settingsBuilder().put("keep_alive", "5m").put("size", 10).build())); - executors.put(Names.MERGE, build(Names.MERGE, "scaling", groupSettings.get(Names.MERGE), settingsBuilder().put("keep_alive", "5m").put("size", 20).build())); - executors.put(Names.REFRESH, build(Names.REFRESH, "scaling", groupSettings.get(Names.REFRESH), settingsBuilder().put("keep_alive", "5m").put("size", 10).build())); - executors.put(Names.CACHE, build(Names.CACHE, "scaling", groupSettings.get(Names.CACHE), settingsBuilder().put("keep_alive", "5m").put("size", 4).build())); - executors.put(Names.SNAPSHOT, build(Names.SNAPSHOT, "scaling", groupSettings.get(Names.SNAPSHOT), settingsBuilder().put("keep_alive", "5m").put("size", 5).build())); + for (Map.Entry executor : defaultExecutorTypeSettings.entrySet()) { + executors.put(executor.getKey(), build(executor.getKey(), groupSettings.get(executor.getKey()), executor.getValue())); + } executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(Names.SAME, "same"))); this.executors = ImmutableMap.copyOf(executors); this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "scheduler")); this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + if (nodeSettingsService != null) { + nodeSettingsService.addListener(new ApplySettings()); + } TimeValue estimatedTimeInterval = componentSettings.getAsTime("estimated_time_interval", TimeValue.timeValueMillis(200)); this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); @@ -205,6 +229,9 @@ public class ThreadPool extends AbstractComponent { ((ThreadPoolExecutor) executor.executor).shutdownNow(); } } + while (!retiredExecutors.isEmpty()) { + ((ThreadPoolExecutor) retiredExecutors.remove().executor).shutdownNow(); + } } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { @@ -214,79 +241,283 @@ public class ThreadPool extends AbstractComponent { result &= ((ThreadPoolExecutor) executor.executor).awaitTermination(timeout, unit); } } + while (!retiredExecutors.isEmpty()) { + result &= ((ThreadPoolExecutor) retiredExecutors.remove().executor).awaitTermination(timeout, unit); + } return result; } - private ExecutorHolder build(String name, String defaultType, @Nullable Settings settings, Settings defaultSettings) { + private ExecutorHolder build(String name, @Nullable Settings settings, Settings defaultSettings) { + return rebuild(name, null, settings, defaultSettings); + } + + private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolder, @Nullable Settings settings, Settings defaultSettings) { + if (Names.SAME.equals(name)) { + // Don't allow to change the "same" thread executor + return previousExecutorHolder; + } if (settings == null) { settings = ImmutableSettings.Builder.EMPTY_SETTINGS; } - String type = settings.get("type", defaultType); + Info previousInfo = previousExecutorHolder != null ? previousExecutorHolder.info : null; + String type = settings.get("type", previousInfo != null ? previousInfo.type() : defaultSettings.get("type")); ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name); if ("same".equals(type)) { - logger.debug("creating thread_pool [{}], type [{}]", name, type); + if (previousExecutorHolder != null) { + logger.debug("updating thread_pool [{}], type [{}]", name, type); + } else { + logger.debug("creating thread_pool [{}], type [{}]", name, type); + } return new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(name, type)); } else if ("cached".equals(type)) { - TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); - logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); + TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); + if (previousExecutorHolder != null) { + if ("cached".equals(previousInfo.type())) { + TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.keepAlive()); + if (!previousInfo.keepAlive().equals(updatedKeepAlive)) { + logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); + ((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); + return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, -1, -1, updatedKeepAlive, null)); + } + return previousExecutorHolder; + } + if (previousInfo.keepAlive() != null) { + defaultKeepAlive = previousInfo.keepAlive(); + } + } + TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive); + if (previousExecutorHolder != null) { + logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); + } else { + logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); + } Executor executor = new EsThreadPoolExecutor(0, Integer.MAX_VALUE, keepAlive.millis(), TimeUnit.MILLISECONDS, new SynchronousQueue(), threadFactory); return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); } else if ("fixed".equals(type)) { - int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5)); - SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null))))); - RejectedExecutionHandler rejectedExecutionHandler; - String rejectSetting = settings.get("reject_policy", defaultSettings.get("reject_policy", "abort")); - if ("abort".equals(rejectSetting)) { - rejectedExecutionHandler = new EsAbortPolicy(); - } else if ("caller".equals(rejectSetting)) { - rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); - } else { - throw new ElasticSearchIllegalArgumentException("reject_policy [" + rejectSetting + "] not valid for [" + name + "] thread pool"); - } - String queueType = settings.get("queue_type", "linked"); - BlockingQueue workQueue; - if (capacity == null) { - workQueue = ConcurrentCollections.newBlockingQueue(); - } else if ((int) capacity.singles() > 0) { - if ("linked".equals(queueType)) { - workQueue = new LinkedBlockingQueue((int) capacity.singles()); - } else if ("array".equals(queueType)) { - workQueue = new ArrayBlockingQueue((int) capacity.singles()); - } else { - throw new ElasticSearchIllegalArgumentException("illegal queue_type set to [" + queueType + "], should be either linked or array"); + int defaultSize = defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5); + SizeValue defaultCapacity = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null)); + String defaultRejectSetting = defaultSettings.get("reject_policy", "abort"); + String defaultQueueType = defaultSettings.get("queue_type", "linked"); + + if (previousExecutorHolder != null) { + if ("fixed".equals(previousInfo.type())) { + SizeValue updatedCapacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", previousInfo.capacity()))); + String updatedQueueType = settings.get("queue_type", previousInfo.queueType()); + if (Objects.equal(previousInfo.capacity(), updatedCapacity) && previousInfo.queueType().equals(updatedQueueType)) { + int updatedSize = settings.getAsInt("size", previousInfo.max()); + String updatedRejectSetting = settings.get("reject_policy", previousInfo.rejectSetting()); + if (previousInfo.max() != updatedSize) { + logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedCapacity, updatedRejectSetting, updatedQueueType); + ((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedSize); + ((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize); + return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedCapacity, null, updatedRejectSetting, updatedQueueType)); + } + if (!previousInfo.rejectSetting().equals(updatedRejectSetting)) { + logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedCapacity, updatedRejectSetting, updatedQueueType); + ((EsThreadPoolExecutor) previousExecutorHolder.executor).setRejectedExecutionHandler(newRejectedExecutionHandler(name, updatedRejectSetting)); + return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedCapacity, null, updatedRejectSetting, updatedQueueType)); + } + return previousExecutorHolder; + } + } + if (previousInfo.max() >= 0) { + defaultSize = previousInfo.max(); + } + defaultCapacity = previousInfo.capacity(); + if (previousInfo.rejectSetting != null) { + defaultRejectSetting = previousInfo.rejectSetting; + } + if (previousInfo.queueType() != null) { + defaultQueueType = previousInfo.queueType(); } - } else { - workQueue = new SynchronousQueue(); } + + int size = settings.getAsInt("size", defaultSize); + SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultCapacity))); + String rejectSetting = settings.get("reject_policy", defaultRejectSetting); + RejectedExecutionHandler rejectedExecutionHandler = newRejectedExecutionHandler(name, rejectSetting); + String queueType = settings.get("queue_type", defaultQueueType); + BlockingQueue workQueue = newQueue(capacity, queueType); logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, size, capacity, rejectSetting, queueType); Executor executor = new EsThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory, rejectedExecutionHandler); - return new ExecutorHolder(executor, new Info(name, type, size, size, null, capacity)); + return new ExecutorHolder(executor, new Info(name, type, size, size, null, capacity, null, rejectSetting, queueType)); } else if ("scaling".equals(type)) { - TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); - int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1)); - int size = settings.getAsInt("max", settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5))); - logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); + TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); + int defaultMin = defaultSettings.getAsInt("min", 1); + int defaultSize = defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5); + if (previousExecutorHolder != null) { + if ("scaling".equals(previousInfo.getType())) { + TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); + int updatedMin = settings.getAsInt("min", previousInfo.getMin()); + int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax())); + if (!previousInfo.keepAlive().equals(updatedKeepAlive) || previousInfo.min() != updatedMin || previousInfo.max() != updatedSize) { + logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); + if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { + ((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); + } + if (previousInfo.getMin() != updatedMin) { + ((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedMin); + } + if (previousInfo.getMax() != updatedSize) { + ((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize); + } + return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, null)); + } + return previousExecutorHolder; + } + if (previousInfo.getKeepAlive() != null) { + defaultKeepAlive = previousInfo.getKeepAlive(); + } + if (previousInfo.getMin() >= 0) { + defaultMin = previousInfo.getMin(); + } + if (previousInfo.getMax() >= 0) { + defaultSize = previousInfo.getMax(); + } + } + TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive); + int min = settings.getAsInt("min", defaultMin); + int size = settings.getAsInt("max", settings.getAsInt("size", defaultSize)); + if (previousExecutorHolder != null) { + logger.debug("updating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); + } else { + logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); + } Executor executor = EsExecutors.newScalingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null)); } else if ("blocking".equals(type)) { - TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); - int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1)); - int size = settings.getAsInt("max", settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5))); - SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue_size", defaultSettings.getAsSize("queue_size", new SizeValue(1000)))); - TimeValue waitTime = settings.getAsTime("wait_time", defaultSettings.getAsTime("wait_time", timeValueSeconds(60))); - logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime); + TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); + int defaultMin = defaultSettings.getAsInt("min", 1); + int defaultSize = defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5); + SizeValue defaultCapacity = defaultSettings.getAsSize("queue_size", new SizeValue(1000)); + TimeValue defaultWaitTime = defaultSettings.getAsTime("wait_time", timeValueSeconds(60)); + if (previousExecutorHolder != null) { + if ("blocking".equals(previousInfo.getType())) { + SizeValue updatedCapacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultCapacity))); + TimeValue updatedWaitTime = settings.getAsTime("wait_time", defaultWaitTime); + if (previousInfo.capacity().equals(updatedCapacity) && previousInfo.waitTime().equals(updatedWaitTime)) { + TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); + int updatedMin = settings.getAsInt("min", previousInfo.getMin()); + int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax())); + if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || !previousInfo.waitTime().equals(settings.getAsTime("wait_time", defaultWaitTime)) || + previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) { + logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); + if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { + ((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); + } + if (previousInfo.getMin() != updatedMin) { + ((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedMin); + } + if (previousInfo.getMax() != updatedSize) { + ((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize); + } + return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, updatedCapacity, updatedWaitTime)); + } + return previousExecutorHolder; + } + } + if (previousInfo.getKeepAlive() != null) { + defaultKeepAlive = previousInfo.getKeepAlive(); + } + if (previousInfo.getMin() >= 0) { + defaultMin = previousInfo.getMin(); + } + if (previousInfo.getMax() >= 0) { + defaultSize = previousInfo.getMax(); + } + if (previousInfo.getCapacity() != null) { + defaultCapacity = previousInfo.getCapacity(); + } + if (previousInfo.waitTime() != null) { + defaultWaitTime = previousInfo.getKeepAlive(); + } + } + TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive); + int min = settings.getAsInt("min", defaultMin); + int size = settings.getAsInt("max", settings.getAsInt("size", defaultSize)); + SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultCapacity))); + TimeValue waitTime = settings.getAsTime("wait_time", defaultWaitTime); + if (previousExecutorHolder != null) { + logger.debug("updating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime); + } else { + logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime); + } Executor executor = EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) capacity.singles(), waitTime.millis(), TimeUnit.MILLISECONDS); - return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, capacity)); + return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, capacity, waitTime)); } throw new ElasticSearchIllegalArgumentException("No type found [" + type + "], for [" + name + "]"); } + public void updateSettings(Settings settings) { + Map groupSettings = settings.getGroups("threadpool"); + if (groupSettings.isEmpty()) { + return; + } + + for (Map.Entry executor : defaultExecutorTypeSettings.entrySet()) { + Settings updatedSettings = groupSettings.get(executor.getKey()); + if (updatedSettings == null) { + continue; + } + + ExecutorHolder oldExecutorHolder = executors.get(executor.getKey()); + ExecutorHolder newExecutorHolder = rebuild(executor.getKey(), oldExecutorHolder, updatedSettings, executor.getValue()); + if (!oldExecutorHolder.equals(newExecutorHolder)) { + executors = newMapBuilder(executors).put(executor.getKey(), newExecutorHolder).immutableMap(); + if (!oldExecutorHolder.executor.equals(newExecutorHolder.executor) && oldExecutorHolder.executor instanceof EsThreadPoolExecutor) { + retiredExecutors.add(oldExecutorHolder); + ((EsThreadPoolExecutor) oldExecutorHolder.executor).shutdown(new ExecutorShutdownListener(oldExecutorHolder)); + } + } + } + } + + private BlockingQueue newQueue(SizeValue capacity, String queueType) { + if (capacity == null) { + return ConcurrentCollections.newBlockingQueue(); + } else if ((int) capacity.singles() > 0) { + if ("linked".equals(queueType)) { + return new LinkedBlockingQueue((int) capacity.singles()); + } else if ("array".equals(queueType)) { + return new ArrayBlockingQueue((int) capacity.singles()); + } else { + throw new ElasticSearchIllegalArgumentException("illegal queue_type set to [" + queueType + "], should be either linked or array"); + } + } else { + return new SynchronousQueue(); + } + } + + private RejectedExecutionHandler newRejectedExecutionHandler(String name, String rejectSetting) { + if ("abort".equals(rejectSetting)) { + return new EsAbortPolicy(); + } else if ("caller".equals(rejectSetting)) { + return new ThreadPoolExecutor.CallerRunsPolicy(); + } else { + throw new ElasticSearchIllegalArgumentException("reject_policy [" + rejectSetting + "] not valid for [" + name + "] thread pool"); + } + } + + class ExecutorShutdownListener implements EsThreadPoolExecutor.ShutdownListener { + + private ExecutorHolder holder; + + public ExecutorShutdownListener(ExecutorHolder holder) { + this.holder = holder; + } + + @Override + public void onTerminated() { + retiredExecutors.remove(holder); + } + } + class LoggingRunnable implements Runnable { private final Runnable runnable; @@ -407,6 +638,9 @@ public class ThreadPool extends AbstractComponent { private int max; private TimeValue keepAlive; private SizeValue capacity; + private TimeValue waitTime; + private String rejectSetting; + private String queueType; Info() { @@ -421,12 +655,23 @@ public class ThreadPool extends AbstractComponent { } public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity) { + this(name, type, min, max, keepAlive, capacity, null); + } + + public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity, @Nullable TimeValue waitTime) { + this(name, type, min, max, keepAlive, capacity, waitTime, null, null); + } + + public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity, @Nullable TimeValue waitTime, String rejectSetting, String queueType) { this.name = name; this.type = type; this.min = min; this.max = max; this.keepAlive = keepAlive; this.capacity = capacity; + this.waitTime = waitTime; + this.rejectSetting = rejectSetting; + this.queueType = queueType; } public String name() { @@ -481,6 +726,37 @@ public class ThreadPool extends AbstractComponent { return this.capacity; } + @Nullable + public TimeValue waitTime() { + return this.waitTime; + } + + @Nullable + public TimeValue getWaitTime() { + return this.waitTime; + } + + @Nullable + public String rejectSetting() { + return this.rejectSetting; + } + + @Nullable + public String getRejectSetting() { + return this.rejectSetting; + } + + @Nullable + public String queueType() { + return this.queueType; + } + + @Nullable + public String getQueueType() { + return this.queueType; + } + + @Override public void readFrom(StreamInput in) throws IOException { name = in.readString(); @@ -493,6 +769,11 @@ public class ThreadPool extends AbstractComponent { if (in.readBoolean()) { capacity = SizeValue.readSizeValue(in); } + if (in.readBoolean()) { + waitTime = TimeValue.readTimeValue(in); + } + rejectSetting = in.readOptionalString(); + queueType = in.readOptionalString(); } @Override @@ -513,6 +794,14 @@ public class ThreadPool extends AbstractComponent { out.writeBoolean(true); capacity.writeTo(out); } + if (waitTime == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + waitTime.writeTo(out); + } + out.writeOptionalString(rejectSetting); + out.writeOptionalString(queueType); } @Override @@ -531,6 +820,15 @@ public class ThreadPool extends AbstractComponent { if (capacity != null) { builder.field(Fields.CAPACITY, capacity.toString()); } + if (waitTime != null) { + builder.field(Fields.WAIT_TIME, waitTime.toString()); + } + if (rejectSetting != null) { + builder.field(Fields.REJECT_POLICY, rejectSetting); + } + if (queueType != null) { + builder.field(Fields.QUEUE_TYPE, queueType); + } builder.endObject(); return builder; } @@ -541,7 +839,17 @@ public class ThreadPool extends AbstractComponent { static final XContentBuilderString MAX = new XContentBuilderString("max"); static final XContentBuilderString KEEP_ALIVE = new XContentBuilderString("keep_alive"); static final XContentBuilderString CAPACITY = new XContentBuilderString("capacity"); + static final XContentBuilderString WAIT_TIME = new XContentBuilderString("wait_time"); + static final XContentBuilderString REJECT_POLICY = new XContentBuilderString("reject_policy"); + static final XContentBuilderString QUEUE_TYPE = new XContentBuilderString("queue_type"); } } + + class ApplySettings implements NodeSettingsService.Listener { + @Override + public void onRefreshSettings(Settings settings) { + updateSettings(settings); + } + } } diff --git a/src/test/java/org/elasticsearch/test/integration/threadpool/SimpleThreadPoolTests.java b/src/test/java/org/elasticsearch/test/integration/threadpool/SimpleThreadPoolTests.java new file mode 100644 index 00000000000..95dd3ecc3e2 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/threadpool/SimpleThreadPoolTests.java @@ -0,0 +1,160 @@ +package org.elasticsearch.test.integration.threadpool; + +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.node.internal.InternalNode; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.threadpool.ThreadPoolInfo; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.*; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +/** + */ +public class SimpleThreadPoolTests extends AbstractNodesTests { + private Client client1; + + private Client client2; + + private ThreadPool threadPool; + + @BeforeClass + public void createNodes() throws Exception { + startNode("node1"); + startNode("node2"); + client1 = client("node1"); + client2 = client("node2"); + threadPool = ((InternalNode) node("node1")).injector().getInstance(ThreadPool.class); + } + + @AfterClass + public void closeNodes() { + client1.close(); + client2.close(); + closeAllNodes(); + } + + @Test(timeOut = 20000) + public void testUpdatingThreadPoolSettings() throws Exception { + // Check that settings are changed + assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(5L)); + client1.admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.keep_alive", "10m").build()).execute().actionGet(); + assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); + + // Make sure that threads continue executing when executor is replaced + final CyclicBarrier barrier = new CyclicBarrier(2); + Executor oldExecutor = threadPool.executor(Names.SEARCH); + threadPool.executor(Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } catch (BrokenBarrierException ex) { + // + } + } + }); + client1.admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet(); + assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor))); + assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true)); + assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true)); + assertThat(((ThreadPoolExecutor) oldExecutor).isTerminated(), equalTo(false)); + barrier.await(); + + // Make sure that new thread executor is functional + threadPool.executor(Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } catch (BrokenBarrierException ex) { + // + } + } + }); + client1.admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet(); + barrier.await(); + + // Check that node info is correct + NodesInfoResponse nodesInfoResponse = client2.admin().cluster().prepareNodesInfo().all().execute().actionGet(); + for (int i = 0; i < 2; i++) { + NodeInfo nodeInfo = nodesInfoResponse.nodes()[i]; + boolean found = false; + for (ThreadPool.Info info : nodeInfo.getThreadPool()) { + if (info.name().equals(Names.SEARCH)) { + assertThat(info.type(), equalTo("fixed")); + assertThat(info.rejectSetting(), equalTo("abort")); + assertThat(info.queueType(), equalTo("linked")); + found = true; + break; + } + } + assertThat(found, equalTo(true)); + + Map poolMap = getPoolSettingsThroughJson(nodeInfo.getThreadPool(), Names.SEARCH); + assertThat(poolMap.get("reject_policy").toString(), equalTo("abort")); + assertThat(poolMap.get("queue_type").toString(), equalTo("linked")); + } + + client1.admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder() + .put("threadpool.search.type", "blocking") + .put("threadpool.search.wait_time", "10s") + .put("threadpool.search.keep_alive", "15s") + .put("threadpool.search.capacity", "100") + .build()).execute().actionGet(); + nodesInfoResponse = client2.admin().cluster().prepareNodesInfo().all().execute().actionGet(); + for (int i = 0; i < 2; i++) { + NodeInfo nodeInfo = nodesInfoResponse.nodes()[i]; + boolean found = false; + for (ThreadPool.Info info : nodeInfo.getThreadPool()) { + if (info.name().equals(Names.SEARCH)) { + assertThat(info.type(), equalTo("blocking")); + assertThat(info.capacity().singles(), equalTo(100L)); + assertThat(info.waitTime().seconds(), equalTo(10L)); + assertThat(info.keepAlive().seconds(), equalTo(15L)); + found = true; + break; + } + } + assertThat(found, equalTo(true)); + + Map poolMap = getPoolSettingsThroughJson(nodeInfo.getThreadPool(), Names.SEARCH); + assertThat(poolMap.get("capacity").toString(), equalTo("100")); + assertThat(poolMap.get("wait_time").toString(), equalTo("10s")); + assertThat(poolMap.get("keep_alive").toString(), equalTo("15s")); + } + + } + + private Map getPoolSettingsThroughJson(ThreadPoolInfo info, String poolName) throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + info.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + builder.close(); + XContentParser parser = JsonXContent.jsonXContent.createParser(builder.string()); + Map poolsMap = parser.mapAndClose(); + return (Map) ((Map) poolsMap.get("thread_pool")).get(poolName); + } + +} diff --git a/src/test/java/org/elasticsearch/test/unit/threadpool/UpdateThreadPoolSettingsTests.java b/src/test/java/org/elasticsearch/test/unit/threadpool/UpdateThreadPoolSettingsTests.java new file mode 100644 index 00000000000..7916987e853 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/unit/threadpool/UpdateThreadPoolSettingsTests.java @@ -0,0 +1,311 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.unit.threadpool; + +import com.google.common.util.concurrent.ListeningExecutorService; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.util.concurrent.EsAbortPolicy; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.testng.annotations.Test; + +import java.util.concurrent.*; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +/** + */ +public class UpdateThreadPoolSettingsTests { + + private ThreadPool.Info info(ThreadPool threadPool, String name) { + for (ThreadPool.Info info : threadPool.info()) { + if (info.name().equals(name)) { + return info; + } + } + return null; + } + + @Test + public void testCachedExecutorType() { + ThreadPool threadPool = new ThreadPool(ImmutableSettings.Builder.EMPTY_SETTINGS, null); + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached")); + assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(5L)); + assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); + + // Replace with different type + threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "same").build()); + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("same")); + assertThat(threadPool.executor(Names.SEARCH), instanceOf(ListeningExecutorService.class)); + + // Replace with different type again + threadPool.updateSettings(settingsBuilder() + .put("threadpool.search.type", "scaling") + .put("threadpool.search.keep_alive", "10m") + .build()); + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling")); + assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(1)); + // Make sure keep alive value changed + assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); + + // Put old type back + threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "cached").build()); + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached")); + // Make sure keep alive value reused + assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L)); + assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); + + // Change keep alive + Executor oldExecutor = threadPool.executor(Names.SEARCH); + threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); + // Make sure keep alive value changed + assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(1L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); + // Make sure executor didn't change + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached")); + assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); + + // Set the same keep alive + threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); + // Make sure keep alive value didn't change + assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(1L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); + // Make sure executor didn't change + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached")); + assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); + + threadPool.shutdown(); + } + + @Test + public void testFixedExecutorType() { + ThreadPool threadPool = new ThreadPool(settingsBuilder().put("threadpool.search.type", "fixed").build(), null); + assertThat(info(threadPool, Names.SEARCH).rejectSetting(), equalTo("abort")); + assertThat(info(threadPool, Names.SEARCH).queueType(), equalTo("linked")); + assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); + + // Replace with different type + threadPool.updateSettings(settingsBuilder() + .put("threadpool.search.type", "scaling") + .put("threadpool.search.keep_alive", "10m") + .put("threadpool.search.min", "2") + .put("threadpool.search.size", "15") + .build()); + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling")); + assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); + assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2)); + assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15)); + // Make sure keep alive value changed + assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); + + // Put old type back + threadPool.updateSettings(settingsBuilder() + .put("threadpool.search.type", "fixed") + .build()); + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("fixed")); + // Make sure keep alive value is not used + assertThat(info(threadPool, Names.SEARCH).keepAlive(), nullValue()); + // Make sure keep pool size value were reused + assertThat(info(threadPool, Names.SEARCH).min(), equalTo(15)); + assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15)); + assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(15)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); + + // Change size + Executor oldExecutor = threadPool.executor(Names.SEARCH); + threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").build()); + // Make sure size values changed + assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10)); + assertThat(info(threadPool, Names.SEARCH).min(), equalTo(10)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(10)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(10)); + // Make sure executor didn't change + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("fixed")); + assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); + + // Change queue capacity + threadPool.updateSettings(settingsBuilder() + .put("threadpool.search.queue", "500") + .build()); + assertThat(info(threadPool, Names.SEARCH).queueType(), equalTo("linked")); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue(), instanceOf(LinkedBlockingQueue.class)); + + // Set different queue and size type + threadPool.updateSettings(settingsBuilder() + .put("threadpool.search.queue_type", "array") + .put("threadpool.search.size", "12") + .build()); + // Make sure keep size changed + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("fixed")); + assertThat(info(threadPool, Names.SEARCH).max(), equalTo(12)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(12)); + assertThat(info(threadPool, Names.SEARCH).queueType(), equalTo("array")); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue(), instanceOf(ArrayBlockingQueue.class)); + + // Change rejection policy + oldExecutor = threadPool.executor(Names.SEARCH); + assertThat(info(threadPool, Names.SEARCH).rejectSetting(), equalTo("abort")); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getRejectedExecutionHandler(), instanceOf(EsAbortPolicy.class)); + threadPool.updateSettings(settingsBuilder().put("threadpool.search.reject_policy", "caller").build()); + // Make sure rejection handler changed + assertThat(info(threadPool, Names.SEARCH).rejectSetting(), equalTo("caller")); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getRejectedExecutionHandler(), instanceOf(ThreadPoolExecutor.CallerRunsPolicy.class)); + // Make sure executor didn't change + assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); + + threadPool.shutdown(); + } + + + @Test + public void testScalingExecutorType() { + ThreadPool threadPool = new ThreadPool( + settingsBuilder().put("threadpool.search.type", "scaling").put("threadpool.search.size", 10).build(), null); + assertThat(info(threadPool, Names.SEARCH).min(), equalTo(1)); + assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10)); + assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(5L)); + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling")); + assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); + + // Change settings that doesn't require pool replacement + Executor oldExecutor = threadPool.executor(Names.SEARCH); + threadPool.updateSettings(settingsBuilder() + .put("threadpool.search.type", "scaling") + .put("threadpool.search.keep_alive", "10m") + .put("threadpool.search.min", "2") + .put("threadpool.search.size", "15") + .build()); + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling")); + assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); + assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2)); + assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15)); + // Make sure keep alive value changed + assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); + assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); + + threadPool.shutdown(); + } + + @Test + public void testBlockingExecutorType() { + ThreadPool threadPool = new ThreadPool(settingsBuilder().put("threadpool.search.type", "blocking").put("threadpool.search.size", "10").build(), null); + assertThat(info(threadPool, Names.SEARCH).min(), equalTo(1)); + assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10)); + assertThat(info(threadPool, Names.SEARCH).capacity().singles(), equalTo(1000L)); + assertThat(info(threadPool, Names.SEARCH).waitTime().minutes(), equalTo(1L)); + assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); + + // Replace with different type + threadPool.updateSettings(settingsBuilder() + .put("threadpool.search.type", "scaling") + .put("threadpool.search.keep_alive", "10m") + .put("threadpool.search.min", "2") + .put("threadpool.search.size", "15") + .build()); + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling")); + assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); + assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2)); + assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15)); + // Make sure keep alive value changed + assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); + + // Put old type back + threadPool.updateSettings(settingsBuilder() + .put("threadpool.search.type", "blocking") + .build()); + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("blocking")); + // Make sure keep alive value is not used + assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L)); + // Make sure keep pool size value were reused + assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2)); + assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15)); + assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); + + // Change size + Executor oldExecutor = threadPool.executor(Names.SEARCH); + threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").put("threadpool.search.min", "5").build()); + // Make sure size values changed + assertThat(info(threadPool, Names.SEARCH).min(), equalTo(5)); + assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(5)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(10)); + // Make sure executor didn't change + assertThat(info(threadPool, Names.SEARCH).type(), equalTo("blocking")); + assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); + + // Change queue capacity + threadPool.updateSettings(settingsBuilder() + .put("threadpool.search.queue_size", "500") + .build()); + assertThat(info(threadPool, Names.SEARCH).capacity().singles(), equalTo(500L)); + + // Change wait time capacity + threadPool.updateSettings(settingsBuilder() + .put("threadpool.search.wait_time", "2m") + .build()); + assertThat(info(threadPool, Names.SEARCH).waitTime().minutes(), equalTo(2L)); + + threadPool.shutdown(); + } + + @Test(timeOut = 10000) + public void testShutdownDownNowDoesntBlock() throws Exception { + ThreadPool threadPool = new ThreadPool(ImmutableSettings.Builder.EMPTY_SETTINGS, null); + + final CountDownLatch latch = new CountDownLatch(1); + Executor oldExecutor = threadPool.executor(Names.SEARCH); + threadPool.executor(Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(20000); + } catch (InterruptedException ex) { + latch.countDown(); + Thread.currentThread().interrupt(); + } + } + }); + threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()); + assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor))); + assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true)); + assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true)); + assertThat(((ThreadPoolExecutor) oldExecutor).isTerminated(), equalTo(false)); + threadPool.shutdownNow(); + latch.await(); + } + +}