diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index c0f52a29f78..10202bfde58 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -36,14 +36,7 @@ import org.elasticsearch.common.util.concurrent.MoreExecutors; import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; import java.util.Map; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.elasticsearch.common.unit.TimeValue.*; @@ -177,11 +170,21 @@ public class ThreadPool extends AbstractComponent { threadFactory); } else if ("fixed".equals(type)) { int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5)); - logger.debug("creating thread_pool [{}], type [{}], size [{}]", name, type, size); + int queueSize = settings.getAsInt("queue_size", defaultSettings.getAsInt("queue_size", -1)); + RejectedExecutionHandler rejectedExecutionHandler; + String rejectSetting = settings.get("reject_policy", defaultSettings.get("reject_policy", "abort")); + if ("abort".equals(rejectSetting)) { + rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); + } else if ("caller".equals(rejectSetting)) { + rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); + } else { + throw new ElasticSearchIllegalArgumentException("reject_policy [" + rejectSetting + "] not valid for [" + name + "] thread pool"); + } + logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}]", name, type, size, queueSize, rejectSetting); return new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, - new LinkedTransferQueue(), - threadFactory); + queueSize <= 0 ? new LinkedTransferQueue() : new LinkedBlockingQueue(queueSize), + threadFactory, rejectedExecutionHandler); } else if ("scaling".equals(type)) { TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1));