Update settings: Allow to dynamically update thread pool settings

Closes #2509
This commit is contained in:
Igor Motov 2012-12-27 09:39:27 -05:00
parent 6ef0e4ddda
commit b7ff23ff93
4 changed files with 874 additions and 56 deletions

View File

@ -19,6 +19,8 @@
package org.elasticsearch.common.util.concurrent; package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ElasticSearchIllegalStateException;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
@ -26,6 +28,10 @@ import java.util.concurrent.*;
*/ */
public class EsThreadPoolExecutor extends ThreadPoolExecutor { public class EsThreadPoolExecutor extends ThreadPoolExecutor {
private volatile ShutdownListener listener;
private final Object monitor = new Object();
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy()); super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy());
} }
@ -33,4 +39,37 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { public EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
} }
public void shutdown(ShutdownListener listener) {
synchronized (monitor) {
if (this.listener != null) {
throw new ElasticSearchIllegalStateException("Shutdown was already called on this thread pool");
}
if (isTerminated()) {
listener.onTerminated();
} else {
this.listener = listener;
}
shutdown();
}
}
@Override
protected synchronized void terminated() {
super.terminated();
synchronized (monitor) {
if (listener != null) {
try {
listener.onTerminated();
} finally {
listener = null;
}
}
}
}
public static interface ShutdownListener {
public void onTerminated();
}
} }

View File

@ -19,10 +19,12 @@
package org.elasticsearch.threadpool; package org.elasticsearch.threadpool;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -38,13 +40,16 @@ import org.elasticsearch.common.util.concurrent.*;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.concurrent.*; import java.util.concurrent.*;
import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes; import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
@ -70,40 +75,59 @@ public class ThreadPool extends AbstractComponent {
public static final String SNAPSHOT = "snapshot"; public static final String SNAPSHOT = "snapshot";
} }
private final ImmutableMap<String, ExecutorHolder> executors; static {
MetaData.addDynamicSettings(
"threadpool.*"
);
}
private volatile ImmutableMap<String, ExecutorHolder> executors;
private final ImmutableMap<String, Settings> defaultExecutorTypeSettings;
private final Queue<ExecutorHolder> retiredExecutors = new ConcurrentLinkedQueue<ExecutorHolder>();
private final ScheduledThreadPoolExecutor scheduler; private final ScheduledThreadPoolExecutor scheduler;
private final EstimatedTimeThread estimatedTimeThread; private final EstimatedTimeThread estimatedTimeThread;
public ThreadPool() { public ThreadPool() {
this(ImmutableSettings.Builder.EMPTY_SETTINGS); this(ImmutableSettings.Builder.EMPTY_SETTINGS, null);
} }
@Inject @Inject
public ThreadPool(Settings settings) { public ThreadPool(Settings settings, @Nullable NodeSettingsService nodeSettingsService) {
super(settings); super(settings);
Map<String, Settings> groupSettings = settings.getGroups("threadpool"); Map<String, Settings> groupSettings = settings.getGroups("threadpool");
defaultExecutorTypeSettings = ImmutableMap.<String, Settings>builder()
.put(Names.GENERIC, settingsBuilder().put("type", "cached").put("keep_alive", "30s").build())
.put(Names.INDEX, settingsBuilder().put("type", "cached").build())
.put(Names.BULK, settingsBuilder().put("type", "cached").build())
.put(Names.GET, settingsBuilder().put("type", "cached").build())
.put(Names.SEARCH, settingsBuilder().put("type", "cached").build())
.put(Names.PERCOLATE, settingsBuilder().put("type", "cached").build())
.put(Names.MANAGEMENT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build())
.put(Names.FLUSH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 10).build())
.put(Names.MERGE, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 20).build())
.put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 10).build())
.put(Names.CACHE, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 4).build())
.put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build())
.build();
Map<String, ExecutorHolder> executors = Maps.newHashMap(); Map<String, ExecutorHolder> executors = Maps.newHashMap();
executors.put(Names.GENERIC, build(Names.GENERIC, "cached", groupSettings.get(Names.GENERIC), settingsBuilder().put("keep_alive", "30s").build())); for (Map.Entry<String, Settings> executor : defaultExecutorTypeSettings.entrySet()) {
executors.put(Names.INDEX, build(Names.INDEX, "cached", groupSettings.get(Names.INDEX), ImmutableSettings.Builder.EMPTY_SETTINGS)); executors.put(executor.getKey(), build(executor.getKey(), groupSettings.get(executor.getKey()), executor.getValue()));
executors.put(Names.BULK, build(Names.BULK, "cached", groupSettings.get(Names.BULK), ImmutableSettings.Builder.EMPTY_SETTINGS)); }
executors.put(Names.GET, build(Names.GET, "cached", groupSettings.get(Names.GET), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.SEARCH, build(Names.SEARCH, "cached", groupSettings.get(Names.SEARCH), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.PERCOLATE, build(Names.PERCOLATE, "cached", groupSettings.get(Names.PERCOLATE), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.MANAGEMENT, build(Names.MANAGEMENT, "scaling", groupSettings.get(Names.MANAGEMENT), settingsBuilder().put("keep_alive", "5m").put("size", 5).build()));
executors.put(Names.FLUSH, build(Names.FLUSH, "scaling", groupSettings.get(Names.FLUSH), settingsBuilder().put("keep_alive", "5m").put("size", 10).build()));
executors.put(Names.MERGE, build(Names.MERGE, "scaling", groupSettings.get(Names.MERGE), settingsBuilder().put("keep_alive", "5m").put("size", 20).build()));
executors.put(Names.REFRESH, build(Names.REFRESH, "scaling", groupSettings.get(Names.REFRESH), settingsBuilder().put("keep_alive", "5m").put("size", 10).build()));
executors.put(Names.CACHE, build(Names.CACHE, "scaling", groupSettings.get(Names.CACHE), settingsBuilder().put("keep_alive", "5m").put("size", 4).build()));
executors.put(Names.SNAPSHOT, build(Names.SNAPSHOT, "scaling", groupSettings.get(Names.SNAPSHOT), settingsBuilder().put("keep_alive", "5m").put("size", 5).build()));
executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(Names.SAME, "same"))); executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(Names.SAME, "same")));
this.executors = ImmutableMap.copyOf(executors); this.executors = ImmutableMap.copyOf(executors);
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "scheduler")); this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "scheduler"));
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
if (nodeSettingsService != null) {
nodeSettingsService.addListener(new ApplySettings());
}
TimeValue estimatedTimeInterval = componentSettings.getAsTime("estimated_time_interval", TimeValue.timeValueMillis(200)); TimeValue estimatedTimeInterval = componentSettings.getAsTime("estimated_time_interval", TimeValue.timeValueMillis(200));
this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
@ -205,6 +229,9 @@ public class ThreadPool extends AbstractComponent {
((ThreadPoolExecutor) executor.executor).shutdownNow(); ((ThreadPoolExecutor) executor.executor).shutdownNow();
} }
} }
while (!retiredExecutors.isEmpty()) {
((ThreadPoolExecutor) retiredExecutors.remove().executor).shutdownNow();
}
} }
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
@ -214,79 +241,283 @@ public class ThreadPool extends AbstractComponent {
result &= ((ThreadPoolExecutor) executor.executor).awaitTermination(timeout, unit); result &= ((ThreadPoolExecutor) executor.executor).awaitTermination(timeout, unit);
} }
} }
while (!retiredExecutors.isEmpty()) {
result &= ((ThreadPoolExecutor) retiredExecutors.remove().executor).awaitTermination(timeout, unit);
}
return result; return result;
} }
private ExecutorHolder build(String name, String defaultType, @Nullable Settings settings, Settings defaultSettings) { private ExecutorHolder build(String name, @Nullable Settings settings, Settings defaultSettings) {
return rebuild(name, null, settings, defaultSettings);
}
private ExecutorHolder rebuild(String name, ExecutorHolder previousExecutorHolder, @Nullable Settings settings, Settings defaultSettings) {
if (Names.SAME.equals(name)) {
// Don't allow to change the "same" thread executor
return previousExecutorHolder;
}
if (settings == null) { if (settings == null) {
settings = ImmutableSettings.Builder.EMPTY_SETTINGS; settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
} }
String type = settings.get("type", defaultType); Info previousInfo = previousExecutorHolder != null ? previousExecutorHolder.info : null;
String type = settings.get("type", previousInfo != null ? previousInfo.type() : defaultSettings.get("type"));
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name); ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name);
if ("same".equals(type)) { if ("same".equals(type)) {
logger.debug("creating thread_pool [{}], type [{}]", name, type); if (previousExecutorHolder != null) {
logger.debug("updating thread_pool [{}], type [{}]", name, type);
} else {
logger.debug("creating thread_pool [{}], type [{}]", name, type);
}
return new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(name, type)); return new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(name, type));
} else if ("cached".equals(type)) { } else if ("cached".equals(type)) {
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); if (previousExecutorHolder != null) {
if ("cached".equals(previousInfo.type())) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.keepAlive());
if (!previousInfo.keepAlive().equals(updatedKeepAlive)) {
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, -1, -1, updatedKeepAlive, null));
}
return previousExecutorHolder;
}
if (previousInfo.keepAlive() != null) {
defaultKeepAlive = previousInfo.keepAlive();
}
}
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive);
if (previousExecutorHolder != null) {
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
} else {
logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
}
Executor executor = new EsThreadPoolExecutor(0, Integer.MAX_VALUE, Executor executor = new EsThreadPoolExecutor(0, Integer.MAX_VALUE,
keepAlive.millis(), TimeUnit.MILLISECONDS, keepAlive.millis(), TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new SynchronousQueue<Runnable>(),
threadFactory); threadFactory);
return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null));
} else if ("fixed".equals(type)) { } else if ("fixed".equals(type)) {
int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5)); int defaultSize = defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5);
SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null))))); SizeValue defaultCapacity = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null));
RejectedExecutionHandler rejectedExecutionHandler; String defaultRejectSetting = defaultSettings.get("reject_policy", "abort");
String rejectSetting = settings.get("reject_policy", defaultSettings.get("reject_policy", "abort")); String defaultQueueType = defaultSettings.get("queue_type", "linked");
if ("abort".equals(rejectSetting)) {
rejectedExecutionHandler = new EsAbortPolicy(); if (previousExecutorHolder != null) {
} else if ("caller".equals(rejectSetting)) { if ("fixed".equals(previousInfo.type())) {
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); SizeValue updatedCapacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", previousInfo.capacity())));
} else { String updatedQueueType = settings.get("queue_type", previousInfo.queueType());
throw new ElasticSearchIllegalArgumentException("reject_policy [" + rejectSetting + "] not valid for [" + name + "] thread pool"); if (Objects.equal(previousInfo.capacity(), updatedCapacity) && previousInfo.queueType().equals(updatedQueueType)) {
} int updatedSize = settings.getAsInt("size", previousInfo.max());
String queueType = settings.get("queue_type", "linked"); String updatedRejectSetting = settings.get("reject_policy", previousInfo.rejectSetting());
BlockingQueue<Runnable> workQueue; if (previousInfo.max() != updatedSize) {
if (capacity == null) { logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedCapacity, updatedRejectSetting, updatedQueueType);
workQueue = ConcurrentCollections.newBlockingQueue(); ((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedSize);
} else if ((int) capacity.singles() > 0) { ((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize);
if ("linked".equals(queueType)) { return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedCapacity, null, updatedRejectSetting, updatedQueueType));
workQueue = new LinkedBlockingQueue<Runnable>((int) capacity.singles()); }
} else if ("array".equals(queueType)) { if (!previousInfo.rejectSetting().equals(updatedRejectSetting)) {
workQueue = new ArrayBlockingQueue<Runnable>((int) capacity.singles()); logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, updatedSize, updatedCapacity, updatedRejectSetting, updatedQueueType);
} else { ((EsThreadPoolExecutor) previousExecutorHolder.executor).setRejectedExecutionHandler(newRejectedExecutionHandler(name, updatedRejectSetting));
throw new ElasticSearchIllegalArgumentException("illegal queue_type set to [" + queueType + "], should be either linked or array"); return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedCapacity, null, updatedRejectSetting, updatedQueueType));
}
return previousExecutorHolder;
}
}
if (previousInfo.max() >= 0) {
defaultSize = previousInfo.max();
}
defaultCapacity = previousInfo.capacity();
if (previousInfo.rejectSetting != null) {
defaultRejectSetting = previousInfo.rejectSetting;
}
if (previousInfo.queueType() != null) {
defaultQueueType = previousInfo.queueType();
} }
} else {
workQueue = new SynchronousQueue<Runnable>();
} }
int size = settings.getAsInt("size", defaultSize);
SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultCapacity)));
String rejectSetting = settings.get("reject_policy", defaultRejectSetting);
RejectedExecutionHandler rejectedExecutionHandler = newRejectedExecutionHandler(name, rejectSetting);
String queueType = settings.get("queue_type", defaultQueueType);
BlockingQueue<Runnable> workQueue = newQueue(capacity, queueType);
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, size, capacity, rejectSetting, queueType); logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}], queue_type [{}]", name, type, size, capacity, rejectSetting, queueType);
Executor executor = new EsThreadPoolExecutor(size, size, Executor executor = new EsThreadPoolExecutor(size, size,
0L, TimeUnit.MILLISECONDS, 0L, TimeUnit.MILLISECONDS,
workQueue, workQueue,
threadFactory, rejectedExecutionHandler); threadFactory, rejectedExecutionHandler);
return new ExecutorHolder(executor, new Info(name, type, size, size, null, capacity)); return new ExecutorHolder(executor, new Info(name, type, size, size, null, capacity, null, rejectSetting, queueType));
} else if ("scaling".equals(type)) { } else if ("scaling".equals(type)) {
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1)); int defaultMin = defaultSettings.getAsInt("min", 1);
int size = settings.getAsInt("max", settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5))); int defaultSize = defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5);
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); if (previousExecutorHolder != null) {
if ("scaling".equals(previousInfo.getType())) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
int updatedMin = settings.getAsInt("min", previousInfo.getMin());
int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax()));
if (!previousInfo.keepAlive().equals(updatedKeepAlive) || previousInfo.min() != updatedMin || previousInfo.max() != updatedSize) {
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
}
if (previousInfo.getMin() != updatedMin) {
((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedMin);
}
if (previousInfo.getMax() != updatedSize) {
((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize);
}
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, null));
}
return previousExecutorHolder;
}
if (previousInfo.getKeepAlive() != null) {
defaultKeepAlive = previousInfo.getKeepAlive();
}
if (previousInfo.getMin() >= 0) {
defaultMin = previousInfo.getMin();
}
if (previousInfo.getMax() >= 0) {
defaultSize = previousInfo.getMax();
}
}
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive);
int min = settings.getAsInt("min", defaultMin);
int size = settings.getAsInt("max", settings.getAsInt("size", defaultSize));
if (previousExecutorHolder != null) {
logger.debug("updating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
} else {
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
}
Executor executor = EsExecutors.newScalingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); Executor executor = EsExecutors.newScalingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null)); return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null));
} else if ("blocking".equals(type)) { } else if ("blocking".equals(type)) {
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1)); int defaultMin = defaultSettings.getAsInt("min", 1);
int size = settings.getAsInt("max", settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5))); int defaultSize = defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5);
SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue_size", defaultSettings.getAsSize("queue_size", new SizeValue(1000)))); SizeValue defaultCapacity = defaultSettings.getAsSize("queue_size", new SizeValue(1000));
TimeValue waitTime = settings.getAsTime("wait_time", defaultSettings.getAsTime("wait_time", timeValueSeconds(60))); TimeValue defaultWaitTime = defaultSettings.getAsTime("wait_time", timeValueSeconds(60));
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime); if (previousExecutorHolder != null) {
if ("blocking".equals(previousInfo.getType())) {
SizeValue updatedCapacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultCapacity)));
TimeValue updatedWaitTime = settings.getAsTime("wait_time", defaultWaitTime);
if (previousInfo.capacity().equals(updatedCapacity) && previousInfo.waitTime().equals(updatedWaitTime)) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
int updatedMin = settings.getAsInt("min", previousInfo.getMin());
int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax()));
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || !previousInfo.waitTime().equals(settings.getAsTime("wait_time", defaultWaitTime)) ||
previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) {
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
((EsThreadPoolExecutor) previousExecutorHolder.executor).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
}
if (previousInfo.getMin() != updatedMin) {
((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedMin);
}
if (previousInfo.getMax() != updatedSize) {
((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize);
}
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, updatedCapacity, updatedWaitTime));
}
return previousExecutorHolder;
}
}
if (previousInfo.getKeepAlive() != null) {
defaultKeepAlive = previousInfo.getKeepAlive();
}
if (previousInfo.getMin() >= 0) {
defaultMin = previousInfo.getMin();
}
if (previousInfo.getMax() >= 0) {
defaultSize = previousInfo.getMax();
}
if (previousInfo.getCapacity() != null) {
defaultCapacity = previousInfo.getCapacity();
}
if (previousInfo.waitTime() != null) {
defaultWaitTime = previousInfo.getKeepAlive();
}
}
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive);
int min = settings.getAsInt("min", defaultMin);
int size = settings.getAsInt("max", settings.getAsInt("size", defaultSize));
SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultCapacity)));
TimeValue waitTime = settings.getAsTime("wait_time", defaultWaitTime);
if (previousExecutorHolder != null) {
logger.debug("updating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime);
} else {
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime);
}
Executor executor = EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) capacity.singles(), waitTime.millis(), TimeUnit.MILLISECONDS); Executor executor = EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) capacity.singles(), waitTime.millis(), TimeUnit.MILLISECONDS);
return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, capacity)); return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, capacity, waitTime));
} }
throw new ElasticSearchIllegalArgumentException("No type found [" + type + "], for [" + name + "]"); throw new ElasticSearchIllegalArgumentException("No type found [" + type + "], for [" + name + "]");
} }
public void updateSettings(Settings settings) {
Map<String, Settings> groupSettings = settings.getGroups("threadpool");
if (groupSettings.isEmpty()) {
return;
}
for (Map.Entry<String, Settings> executor : defaultExecutorTypeSettings.entrySet()) {
Settings updatedSettings = groupSettings.get(executor.getKey());
if (updatedSettings == null) {
continue;
}
ExecutorHolder oldExecutorHolder = executors.get(executor.getKey());
ExecutorHolder newExecutorHolder = rebuild(executor.getKey(), oldExecutorHolder, updatedSettings, executor.getValue());
if (!oldExecutorHolder.equals(newExecutorHolder)) {
executors = newMapBuilder(executors).put(executor.getKey(), newExecutorHolder).immutableMap();
if (!oldExecutorHolder.executor.equals(newExecutorHolder.executor) && oldExecutorHolder.executor instanceof EsThreadPoolExecutor) {
retiredExecutors.add(oldExecutorHolder);
((EsThreadPoolExecutor) oldExecutorHolder.executor).shutdown(new ExecutorShutdownListener(oldExecutorHolder));
}
}
}
}
private BlockingQueue<Runnable> newQueue(SizeValue capacity, String queueType) {
if (capacity == null) {
return ConcurrentCollections.newBlockingQueue();
} else if ((int) capacity.singles() > 0) {
if ("linked".equals(queueType)) {
return new LinkedBlockingQueue<Runnable>((int) capacity.singles());
} else if ("array".equals(queueType)) {
return new ArrayBlockingQueue<Runnable>((int) capacity.singles());
} else {
throw new ElasticSearchIllegalArgumentException("illegal queue_type set to [" + queueType + "], should be either linked or array");
}
} else {
return new SynchronousQueue<Runnable>();
}
}
private RejectedExecutionHandler newRejectedExecutionHandler(String name, String rejectSetting) {
if ("abort".equals(rejectSetting)) {
return new EsAbortPolicy();
} else if ("caller".equals(rejectSetting)) {
return new ThreadPoolExecutor.CallerRunsPolicy();
} else {
throw new ElasticSearchIllegalArgumentException("reject_policy [" + rejectSetting + "] not valid for [" + name + "] thread pool");
}
}
class ExecutorShutdownListener implements EsThreadPoolExecutor.ShutdownListener {
private ExecutorHolder holder;
public ExecutorShutdownListener(ExecutorHolder holder) {
this.holder = holder;
}
@Override
public void onTerminated() {
retiredExecutors.remove(holder);
}
}
class LoggingRunnable implements Runnable { class LoggingRunnable implements Runnable {
private final Runnable runnable; private final Runnable runnable;
@ -407,6 +638,9 @@ public class ThreadPool extends AbstractComponent {
private int max; private int max;
private TimeValue keepAlive; private TimeValue keepAlive;
private SizeValue capacity; private SizeValue capacity;
private TimeValue waitTime;
private String rejectSetting;
private String queueType;
Info() { Info() {
@ -421,12 +655,23 @@ public class ThreadPool extends AbstractComponent {
} }
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity) { public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity) {
this(name, type, min, max, keepAlive, capacity, null);
}
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity, @Nullable TimeValue waitTime) {
this(name, type, min, max, keepAlive, capacity, waitTime, null, null);
}
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity, @Nullable TimeValue waitTime, String rejectSetting, String queueType) {
this.name = name; this.name = name;
this.type = type; this.type = type;
this.min = min; this.min = min;
this.max = max; this.max = max;
this.keepAlive = keepAlive; this.keepAlive = keepAlive;
this.capacity = capacity; this.capacity = capacity;
this.waitTime = waitTime;
this.rejectSetting = rejectSetting;
this.queueType = queueType;
} }
public String name() { public String name() {
@ -481,6 +726,37 @@ public class ThreadPool extends AbstractComponent {
return this.capacity; return this.capacity;
} }
@Nullable
public TimeValue waitTime() {
return this.waitTime;
}
@Nullable
public TimeValue getWaitTime() {
return this.waitTime;
}
@Nullable
public String rejectSetting() {
return this.rejectSetting;
}
@Nullable
public String getRejectSetting() {
return this.rejectSetting;
}
@Nullable
public String queueType() {
return this.queueType;
}
@Nullable
public String getQueueType() {
return this.queueType;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
name = in.readString(); name = in.readString();
@ -493,6 +769,11 @@ public class ThreadPool extends AbstractComponent {
if (in.readBoolean()) { if (in.readBoolean()) {
capacity = SizeValue.readSizeValue(in); capacity = SizeValue.readSizeValue(in);
} }
if (in.readBoolean()) {
waitTime = TimeValue.readTimeValue(in);
}
rejectSetting = in.readOptionalString();
queueType = in.readOptionalString();
} }
@Override @Override
@ -513,6 +794,14 @@ public class ThreadPool extends AbstractComponent {
out.writeBoolean(true); out.writeBoolean(true);
capacity.writeTo(out); capacity.writeTo(out);
} }
if (waitTime == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
waitTime.writeTo(out);
}
out.writeOptionalString(rejectSetting);
out.writeOptionalString(queueType);
} }
@Override @Override
@ -531,6 +820,15 @@ public class ThreadPool extends AbstractComponent {
if (capacity != null) { if (capacity != null) {
builder.field(Fields.CAPACITY, capacity.toString()); builder.field(Fields.CAPACITY, capacity.toString());
} }
if (waitTime != null) {
builder.field(Fields.WAIT_TIME, waitTime.toString());
}
if (rejectSetting != null) {
builder.field(Fields.REJECT_POLICY, rejectSetting);
}
if (queueType != null) {
builder.field(Fields.QUEUE_TYPE, queueType);
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }
@ -541,7 +839,17 @@ public class ThreadPool extends AbstractComponent {
static final XContentBuilderString MAX = new XContentBuilderString("max"); static final XContentBuilderString MAX = new XContentBuilderString("max");
static final XContentBuilderString KEEP_ALIVE = new XContentBuilderString("keep_alive"); static final XContentBuilderString KEEP_ALIVE = new XContentBuilderString("keep_alive");
static final XContentBuilderString CAPACITY = new XContentBuilderString("capacity"); static final XContentBuilderString CAPACITY = new XContentBuilderString("capacity");
static final XContentBuilderString WAIT_TIME = new XContentBuilderString("wait_time");
static final XContentBuilderString REJECT_POLICY = new XContentBuilderString("reject_policy");
static final XContentBuilderString QUEUE_TYPE = new XContentBuilderString("queue_type");
} }
} }
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
updateSettings(settings);
}
}
} }

View File

@ -0,0 +1,160 @@
package org.elasticsearch.test.integration.threadpool;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
/**
*/
public class SimpleThreadPoolTests extends AbstractNodesTests {
private Client client1;
private Client client2;
private ThreadPool threadPool;
@BeforeClass
public void createNodes() throws Exception {
startNode("node1");
startNode("node2");
client1 = client("node1");
client2 = client("node2");
threadPool = ((InternalNode) node("node1")).injector().getInstance(ThreadPool.class);
}
@AfterClass
public void closeNodes() {
client1.close();
client2.close();
closeAllNodes();
}
@Test(timeOut = 20000)
public void testUpdatingThreadPoolSettings() throws Exception {
// Check that settings are changed
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(5L));
client1.admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.keep_alive", "10m").build()).execute().actionGet();
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
// Make sure that threads continue executing when executor is replaced
final CyclicBarrier barrier = new CyclicBarrier(2);
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.executor(Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
try {
barrier.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (BrokenBarrierException ex) {
//
}
}
});
client1.admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet();
assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor)));
assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true));
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true));
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminated(), equalTo(false));
barrier.await();
// Make sure that new thread executor is functional
threadPool.executor(Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
try {
barrier.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (BrokenBarrierException ex) {
//
}
}
});
client1.admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet();
barrier.await();
// Check that node info is correct
NodesInfoResponse nodesInfoResponse = client2.admin().cluster().prepareNodesInfo().all().execute().actionGet();
for (int i = 0; i < 2; i++) {
NodeInfo nodeInfo = nodesInfoResponse.nodes()[i];
boolean found = false;
for (ThreadPool.Info info : nodeInfo.getThreadPool()) {
if (info.name().equals(Names.SEARCH)) {
assertThat(info.type(), equalTo("fixed"));
assertThat(info.rejectSetting(), equalTo("abort"));
assertThat(info.queueType(), equalTo("linked"));
found = true;
break;
}
}
assertThat(found, equalTo(true));
Map<String, Object> poolMap = getPoolSettingsThroughJson(nodeInfo.getThreadPool(), Names.SEARCH);
assertThat(poolMap.get("reject_policy").toString(), equalTo("abort"));
assertThat(poolMap.get("queue_type").toString(), equalTo("linked"));
}
client1.admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder()
.put("threadpool.search.type", "blocking")
.put("threadpool.search.wait_time", "10s")
.put("threadpool.search.keep_alive", "15s")
.put("threadpool.search.capacity", "100")
.build()).execute().actionGet();
nodesInfoResponse = client2.admin().cluster().prepareNodesInfo().all().execute().actionGet();
for (int i = 0; i < 2; i++) {
NodeInfo nodeInfo = nodesInfoResponse.nodes()[i];
boolean found = false;
for (ThreadPool.Info info : nodeInfo.getThreadPool()) {
if (info.name().equals(Names.SEARCH)) {
assertThat(info.type(), equalTo("blocking"));
assertThat(info.capacity().singles(), equalTo(100L));
assertThat(info.waitTime().seconds(), equalTo(10L));
assertThat(info.keepAlive().seconds(), equalTo(15L));
found = true;
break;
}
}
assertThat(found, equalTo(true));
Map<String, Object> poolMap = getPoolSettingsThroughJson(nodeInfo.getThreadPool(), Names.SEARCH);
assertThat(poolMap.get("capacity").toString(), equalTo("100"));
assertThat(poolMap.get("wait_time").toString(), equalTo("10s"));
assertThat(poolMap.get("keep_alive").toString(), equalTo("15s"));
}
}
private Map<String, Object> getPoolSettingsThroughJson(ThreadPoolInfo info, String poolName) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
info.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
builder.close();
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.string());
Map<String, Object> poolsMap = parser.mapAndClose();
return (Map<String, Object>) ((Map<String, Object>) poolsMap.get("thread_pool")).get(poolName);
}
}

View File

@ -0,0 +1,311 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.threadpool;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.testng.annotations.Test;
import java.util.concurrent.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
/**
*/
public class UpdateThreadPoolSettingsTests {
private ThreadPool.Info info(ThreadPool threadPool, String name) {
for (ThreadPool.Info info : threadPool.info()) {
if (info.name().equals(name)) {
return info;
}
}
return null;
}
@Test
public void testCachedExecutorType() {
ThreadPool threadPool = new ThreadPool(ImmutableSettings.Builder.EMPTY_SETTINGS, null);
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached"));
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(5L));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Replace with different type
threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "same").build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("same"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(ListeningExecutorService.class));
// Replace with different type again
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.keep_alive", "10m")
.build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(1));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
// Put old type back
threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "cached").build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached"));
// Make sure keep alive value reused
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Change keep alive
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build());
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
// Set the same keep alive
threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build());
// Make sure keep alive value didn't change
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("cached"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
threadPool.shutdown();
}
@Test
public void testFixedExecutorType() {
ThreadPool threadPool = new ThreadPool(settingsBuilder().put("threadpool.search.type", "fixed").build(), null);
assertThat(info(threadPool, Names.SEARCH).rejectSetting(), equalTo("abort"));
assertThat(info(threadPool, Names.SEARCH).queueType(), equalTo("linked"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Replace with different type
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.keep_alive", "10m")
.put("threadpool.search.min", "2")
.put("threadpool.search.size", "15")
.build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
// Put old type back
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "fixed")
.build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("fixed"));
// Make sure keep alive value is not used
assertThat(info(threadPool, Names.SEARCH).keepAlive(), nullValue());
// Make sure keep pool size value were reused
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
// Change size
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").build());
// Make sure size values changed
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(10));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("fixed"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
// Change queue capacity
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.queue", "500")
.build());
assertThat(info(threadPool, Names.SEARCH).queueType(), equalTo("linked"));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue(), instanceOf(LinkedBlockingQueue.class));
// Set different queue and size type
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.queue_type", "array")
.put("threadpool.search.size", "12")
.build());
// Make sure keep size changed
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("fixed"));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(12));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(12));
assertThat(info(threadPool, Names.SEARCH).queueType(), equalTo("array"));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue(), instanceOf(ArrayBlockingQueue.class));
// Change rejection policy
oldExecutor = threadPool.executor(Names.SEARCH);
assertThat(info(threadPool, Names.SEARCH).rejectSetting(), equalTo("abort"));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getRejectedExecutionHandler(), instanceOf(EsAbortPolicy.class));
threadPool.updateSettings(settingsBuilder().put("threadpool.search.reject_policy", "caller").build());
// Make sure rejection handler changed
assertThat(info(threadPool, Names.SEARCH).rejectSetting(), equalTo("caller"));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getRejectedExecutionHandler(), instanceOf(ThreadPoolExecutor.CallerRunsPolicy.class));
// Make sure executor didn't change
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
threadPool.shutdown();
}
@Test
public void testScalingExecutorType() {
ThreadPool threadPool = new ThreadPool(
settingsBuilder().put("threadpool.search.type", "scaling").put("threadpool.search.size", 10).build(), null);
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(1));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(5L));
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Change settings that doesn't require pool replacement
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.keep_alive", "10m")
.put("threadpool.search.min", "2")
.put("threadpool.search.size", "15")
.build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
threadPool.shutdown();
}
@Test
public void testBlockingExecutorType() {
ThreadPool threadPool = new ThreadPool(settingsBuilder().put("threadpool.search.type", "blocking").put("threadpool.search.size", "10").build(), null);
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(1));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).capacity().singles(), equalTo(1000L));
assertThat(info(threadPool, Names.SEARCH).waitTime().minutes(), equalTo(1L));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Replace with different type
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.keep_alive", "10m")
.put("threadpool.search.min", "2")
.put("threadpool.search.size", "15")
.build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
// Put old type back
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "blocking")
.build());
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("blocking"));
// Make sure keep alive value is not used
assertThat(info(threadPool, Names.SEARCH).keepAlive().minutes(), equalTo(10L));
// Make sure keep pool size value were reused
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(15));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
// Change size
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").put("threadpool.search.min", "5").build());
// Make sure size values changed
assertThat(info(threadPool, Names.SEARCH).min(), equalTo(5));
assertThat(info(threadPool, Names.SEARCH).max(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(5));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(10));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).type(), equalTo("blocking"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
// Change queue capacity
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.queue_size", "500")
.build());
assertThat(info(threadPool, Names.SEARCH).capacity().singles(), equalTo(500L));
// Change wait time capacity
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.wait_time", "2m")
.build());
assertThat(info(threadPool, Names.SEARCH).waitTime().minutes(), equalTo(2L));
threadPool.shutdown();
}
@Test(timeOut = 10000)
public void testShutdownDownNowDoesntBlock() throws Exception {
ThreadPool threadPool = new ThreadPool(ImmutableSettings.Builder.EMPTY_SETTINGS, null);
final CountDownLatch latch = new CountDownLatch(1);
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.executor(Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(20000);
} catch (InterruptedException ex) {
latch.countDown();
Thread.currentThread().interrupt();
}
}
});
threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "fixed").build());
assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor)));
assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true));
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true));
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminated(), equalTo(false));
threadPool.shutdownNow();
latch.await();
}
}