mirror of
synced 2025-02-17 10:25:15 +00:00
Change default thread pool to scaling from cached, closes #381.
This commit is contained in:
@ -68,7 +68,7 @@ public class SingleThreadBulkStress {
StopWatch stopWatch = new StopWatch().start();
int COUNT = 200000;
int BATCH = 1000;
int BATCH = 100;
System.out.println("Indexing [" + COUNT + "] ...");
int i = 1;
@ -0,0 +1,126 @@
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.common.util.concurrent;
import java.util.concurrent.*;
* @author kimchy (shay.banon)
public class DynamicExecutors {
* Creates a thread pool that creates new threads as needed, but will reuse
* previously constructed threads when they are available. Calls to
* <tt>execute</tt> will reuse previously constructed threads if
* available. If no existing thread is available, a new thread will be
* created and added to the pool. No more than <tt>max</tt> threads will
* be created. Threads that have not been used for a <tt>keepAlive</tt>
* timeout are terminated and removed from the cache. Thus, a pool that
* remains idle for long enough will not consume any resources other than
* the <tt>min</tt> specified.
* @param min the number of threads to keep in the pool, even if they are
* idle.
* @param max the maximum number of threads to allow in the pool.
* @param keepAliveTime when the number of threads is greater than the min,
* this is the maximum time that excess idle threads will wait
* for new tasks before terminating (in milliseconds).
* @return the newly created thread pool
public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime) {
return newScalingThreadPool(min, max, keepAliveTime, Executors.defaultThreadFactory());
* Creates a thread pool, same as in
* {@link #newScalingThreadPool(int, int, long)}, using the provided
* ThreadFactory to create new threads when needed.
* @param min the number of threads to keep in the pool, even if they are
* idle.
* @param max the maximum number of threads to allow in the pool.
* @param keepAliveTime when the number of threads is greater than the min,
* this is the maximum time that excess idle threads will wait
* for new tasks before terminating (in milliseconds).
* @param threadFactory the factory to use when creating new threads.
* @return the newly created thread pool
public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime, ThreadFactory threadFactory) {
DynamicThreadPoolExecutor.DynamicQueue<Runnable> queue = new DynamicThreadPoolExecutor.DynamicQueue<Runnable>();
ThreadPoolExecutor executor = new DynamicThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue, threadFactory);
executor.setRejectedExecutionHandler(new DynamicThreadPoolExecutor.ForceQueuePolicy());
return executor;
* Creates a thread pool similar to that constructed by
* {@link #newScalingThreadPool(int, int, long)}, but blocks the call to
* <tt>execute</tt> if the queue has reached it's capacity, and all
* <tt>max</tt> threads are busy handling requests.
* <p/>
* If the wait time of this queue has elapsed, a
* {@link RejectedExecutionException} will be thrown.
* @param min the number of threads to keep in the pool, even if they are
* idle.
* @param max the maximum number of threads to allow in the pool.
* @param keepAliveTime when the number of threads is greater than the min,
* this is the maximum time that excess idle threads will wait
* for new tasks before terminating (in milliseconds).
* @param capacity the fixed capacity of the underlying queue (resembles
* backlog).
* @param waitTime the wait time (in milliseconds) for space to become
* available in the queue.
* @return the newly created thread pool
public static ExecutorService newBlockingThreadPool(int min, int max, long keepAliveTime, int capacity, long waitTime) {
return newBlockingThreadPool(min, max, keepAliveTime, capacity, waitTime, Executors.defaultThreadFactory());
* Creates a thread pool, same as in
* {@link #newBlockingThreadPool(int, int, long, int, long)}, using the
* provided ThreadFactory to create new threads when needed.
* @param min the number of threads to keep in the pool, even if they are
* idle.
* @param max the maximum number of threads to allow in the pool.
* @param keepAliveTime when the number of threads is greater than the min,
* this is the maximum time that excess idle threads will wait
* for new tasks before terminating (in milliseconds).
* @param capacity the fixed capacity of the underlying queue (resembles
* backlog).
* @param waitTime the wait time (in milliseconds) for space to become
* available in the queue.
* @param threadFactory the factory to use when creating new threads.
* @return the newly created thread pool
public static ExecutorService newBlockingThreadPool(int min, int max,
long keepAliveTime, int capacity, long waitTime,
ThreadFactory threadFactory) {
DynamicThreadPoolExecutor.DynamicQueue<Runnable> queue = new DynamicThreadPoolExecutor.DynamicQueue<Runnable>(capacity);
ThreadPoolExecutor executor = new DynamicThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue, threadFactory);
executor.setRejectedExecutionHandler(new DynamicThreadPoolExecutor.TimedBlockingPolicy(waitTime));
return executor;
@ -0,0 +1,163 @@
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.common.util.concurrent;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
* An {@link ExecutorService} that executes each submitted task using one of
* possibly several pooled threads, normally configured using
* {@link DynamicExecutors} factory methods.
* @author kimchy (shay.banon)
public class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
* number of threads that are actively executing tasks
private final AtomicInteger activeCount = new AtomicInteger();
public DynamicThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
@Override public int getActiveCount() {
return activeCount.get();
@Override protected void beforeExecute(Thread t, Runnable r) {
@Override protected void afterExecute(Runnable r, Throwable t) {
* Much like a {@link SynchronousQueue} which acts as a rendezvous channel. It
* is well suited for handoff designs, in which a tasks is only queued if there
* is an available thread to pick it up.
* <p/>
* This queue is correlated with a thread-pool, and allows insertions to the
* queue only if there is a free thread that can poll this task. Otherwise, the
* task is rejected and the decision is left up to one of the
* {@link RejectedExecutionHandler} policies:
* <ol>
* <li> {@link ForceQueuePolicy} - forces the queue to accept the rejected task. </li>
* <li> {@link TimedBlockingPolicy} - waits for a given time for the task to be
* executed.</li>
* </ol>
* @author kimchy (Shay Banon)
public static class DynamicQueue<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 1L;
* The executor this Queue belongs to
private transient ThreadPoolExecutor executor;
* Creates a <tt>DynamicQueue</tt> with a capacity of
* {@link Integer#MAX_VALUE}.
public DynamicQueue() {
* Creates a <tt>DynamicQueue</tt> with the given (fixed) capacity.
* @param capacity the capacity of this queue.
public DynamicQueue(int capacity) {
* Sets the executor this queue belongs to.
public void setThreadPoolExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
* Inserts the specified element at the tail of this queue if there is at
* least one available thread to run the current task. If all pool threads
* are actively busy, it rejects the offer.
* @param o the element to add.
* @return <tt>true</tt> if it was possible to add the element to this
* queue, else <tt>false</tt>
* @see ThreadPoolExecutor#execute(Runnable)
public boolean offer(E o) {
int allWorkingThreads = executor.getActiveCount() + super.size();
return allWorkingThreads < executor.getPoolSize() && super.offer(o);
* A handler for rejected tasks that adds the specified element to this queue,
* waiting if necessary for space to become available.
public static class ForceQueuePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
} catch (InterruptedException e) {
//should never happen since we never wait
throw new RejectedExecutionException(e);
* A handler for rejected tasks that inserts the specified element into this
* queue, waiting if necessary up to the specified wait time for space to become
* available.
public static class TimedBlockingPolicy implements RejectedExecutionHandler {
private final long waitTime;
* @param waitTime wait time in milliseconds for space to become available.
public TimedBlockingPolicy(long waitTime) {
this.waitTime = waitTime;
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS);
if (!successful)
throw new RejectedExecutionException("Rejected execution after waiting "
+ waitTime + " ms for task [" + r.getClass() + "] to be executed.");
} catch (InterruptedException e) {
throw new RejectedExecutionException(e);
@ -25,7 +25,7 @@ import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.Modules;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.cached.CachedThreadPoolModule;
import org.elasticsearch.threadpool.scaling.ScalingThreadPoolModule;
* @author kimchy (shay.banon)
@ -39,7 +39,7 @@ public class ThreadPoolModule extends AbstractModule implements SpawnModules {
@Override public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(Modules.createModule(settings.getAsClass("threadpool.type", CachedThreadPoolModule.class, "org.elasticsearch.threadpool.", "ThreadPoolModule"), settings));
return ImmutableList.of(Modules.createModule(settings.getAsClass("threadpool.type", ScalingThreadPoolModule.class, "org.elasticsearch.threadpool.", "ThreadPoolModule"), settings));
@Override protected void configure() {
@ -23,13 +23,12 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor;
import org.elasticsearch.threadpool.support.AbstractThreadPool;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.common.unit.TimeValue.*;
@ -56,20 +55,16 @@ public class BlockingThreadPool extends AbstractThreadPool {
@Inject public BlockingThreadPool(Settings settings) {
this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20);
this.min = componentSettings.getAsInt("min", 1);
int max = componentSettings.getAsInt("max", 100);
if (max < 10) {
logger.warn("blocking threadpool max threads [{}] must not be lower than 10, setting it to 10", max);
max = 10;
this.max = max;
this.min = componentSettings.getAsInt("min", 10);
this.max = componentSettings.getAsInt("max", 100);
// capacity is set to 0 as it might cause starvation in blocking mode
this.capacity = (int) componentSettings.getAsSize("capacity", new SizeValue(0)).singles();
this.waitTime = componentSettings.getAsTime("wait_time", timeValueSeconds(60));
this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60));
logger.debug("initializing {} thread pool with min[{}], max[{}], keep_alive[{}], capacity[{}], wait_time[{}], scheduled_size[{}]", getType(), min, max, keepAlive, capacity, waitTime, scheduledSize);
executorService = TransferThreadPoolExecutor.newBlockingExecutor(min, max, keepAlive.millis(), TimeUnit.MILLISECONDS, waitTime.millis(), TimeUnit.MILLISECONDS, capacity, EsExecutors.daemonThreadFactory(settings, "[tp]"));
// executorService = TransferThreadPoolExecutor.newBlockingExecutor(min, max, keepAlive.millis(), TimeUnit.MILLISECONDS, waitTime.millis(), TimeUnit.MILLISECONDS, capacity, EsExecutors.daemonThreadFactory(settings, "[tp]"));
executorService = DynamicExecutors.newBlockingThreadPool(min, max, keepAlive.millis(), capacity, waitTime.millis(), EsExecutors.daemonThreadFactory(settings, "[tp]"));
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]"));
cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]"));
started = true;
@ -22,13 +22,13 @@ package org.elasticsearch.threadpool.scaling;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor;
import org.elasticsearch.threadpool.support.AbstractThreadPool;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.common.unit.TimeValue.*;
@ -56,7 +56,8 @@ public class ScalingThreadPool extends AbstractThreadPool {
this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20);
logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", getType(), min, max, keepAlive, scheduledSize);
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]"));
executorService = TransferThreadPoolExecutor.newScalingExecutor(min, max, keepAlive.nanos(), TimeUnit.NANOSECONDS, EsExecutors.daemonThreadFactory(settings, "[tp]"));
// executorService = TransferThreadPoolExecutor.newScalingExecutor(min, max, keepAlive.nanos(), TimeUnit.NANOSECONDS, EsExecutors.daemonThreadFactory(settings, "[tp]"));
executorService = DynamicExecutors.newScalingThreadPool(min, max, keepAlive.millis(), EsExecutors.daemonThreadFactory(settings, "[tp]"));
cached = EsExecutors.newCachedThreadPool(keepAlive, EsExecutors.daemonThreadFactory(settings, "[cached]"));
started = true;
Reference in New Issue
Block a user