simplify and improve scaling/blocking thread pools

This commit is contained in:
Shay Banon 2011-12-20 12:03:28 +02:00
parent 1f250ede50
commit dd6c076454
10 changed files with 146 additions and 1068 deletions

View File

@ -1,126 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import java.util.concurrent.*;
/**
*
*/
public class DynamicExecutors {
/**
* Creates a thread pool that creates new threads as needed, but will reuse
* previously constructed threads when they are available. Calls to
* <tt>execute</tt> will reuse previously constructed threads if
* available. If no existing thread is available, a new thread will be
* created and added to the pool. No more than <tt>max</tt> threads will
* be created. Threads that have not been used for a <tt>keepAlive</tt>
* timeout are terminated and removed from the cache. Thus, a pool that
* remains idle for long enough will not consume any resources other than
* the <tt>min</tt> specified.
*
* @param min the number of threads to keep in the pool, even if they are
* idle.
* @param max the maximum number of threads to allow in the pool.
* @param keepAliveTime when the number of threads is greater than the min,
* this is the maximum time that excess idle threads will wait
* for new tasks before terminating (in milliseconds).
* @return the newly created thread pool
*/
public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime) {
return newScalingThreadPool(min, max, keepAliveTime, Executors.defaultThreadFactory());
}
/**
* Creates a thread pool, same as in
* {@link #newScalingThreadPool(int, int, long)}, using the provided
* ThreadFactory to create new threads when needed.
*
* @param min the number of threads to keep in the pool, even if they are
* idle.
* @param max the maximum number of threads to allow in the pool.
* @param keepAliveTime when the number of threads is greater than the min,
* this is the maximum time that excess idle threads will wait
* for new tasks before terminating (in milliseconds).
* @param threadFactory the factory to use when creating new threads.
* @return the newly created thread pool
*/
public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime, ThreadFactory threadFactory) {
DynamicThreadPoolExecutor.DynamicQueue<Runnable> queue = new DynamicThreadPoolExecutor.DynamicQueue<Runnable>();
ThreadPoolExecutor executor = new DynamicThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue, threadFactory);
executor.setRejectedExecutionHandler(new DynamicThreadPoolExecutor.ForceQueuePolicy());
queue.setThreadPoolExecutor(executor);
return executor;
}
/**
* Creates a thread pool similar to that constructed by
* {@link #newScalingThreadPool(int, int, long)}, but blocks the call to
* <tt>execute</tt> if the queue has reached it's capacity, and all
* <tt>max</tt> threads are busy handling requests.
* <p/>
* If the wait time of this queue has elapsed, a
* {@link RejectedExecutionException} will be thrown.
*
* @param min the number of threads to keep in the pool, even if they are
* idle.
* @param max the maximum number of threads to allow in the pool.
* @param keepAliveTime when the number of threads is greater than the min,
* this is the maximum time that excess idle threads will wait
* for new tasks before terminating (in milliseconds).
* @param capacity the fixed capacity of the underlying queue (resembles
* backlog).
* @param waitTime the wait time (in milliseconds) for space to become
* available in the queue.
* @return the newly created thread pool
*/
public static ExecutorService newBlockingThreadPool(int min, int max, long keepAliveTime, int capacity, long waitTime) {
return newBlockingThreadPool(min, max, keepAliveTime, capacity, waitTime, Executors.defaultThreadFactory());
}
/**
* Creates a thread pool, same as in
* {@link #newBlockingThreadPool(int, int, long, int, long)}, using the
* provided ThreadFactory to create new threads when needed.
*
* @param min the number of threads to keep in the pool, even if they are
* idle.
* @param max the maximum number of threads to allow in the pool.
* @param keepAliveTime when the number of threads is greater than the min,
* this is the maximum time that excess idle threads will wait
* for new tasks before terminating (in milliseconds).
* @param capacity the fixed capacity of the underlying queue (resembles
* backlog).
* @param waitTime the wait time (in milliseconds) for space to become
* available in the queue.
* @param threadFactory the factory to use when creating new threads.
* @return the newly created thread pool
*/
public static ExecutorService newBlockingThreadPool(int min, int max,
long keepAliveTime, int capacity, long waitTime,
ThreadFactory threadFactory) {
DynamicThreadPoolExecutor.DynamicQueue<Runnable> queue = new DynamicThreadPoolExecutor.DynamicQueue<Runnable>(capacity);
ThreadPoolExecutor executor = new DynamicThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue, threadFactory);
executor.setRejectedExecutionHandler(new DynamicThreadPoolExecutor.TimedBlockingPolicy(waitTime));
queue.setThreadPoolExecutor(executor);
return executor;
}
}

View File

@ -1,166 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* An {@link ExecutorService} that executes each submitted task using one of
* possibly several pooled threads, normally configured using
* {@link DynamicExecutors} factory methods.
*
*
*/
public class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
/**
* number of threads that are actively executing tasks
*/
private final AtomicInteger activeCount = new AtomicInteger();
public DynamicThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
@Override
public int getActiveCount() {
return activeCount.get();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
activeCount.incrementAndGet();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
activeCount.decrementAndGet();
}
/**
* Much like a {@link SynchronousQueue} which acts as a rendezvous channel. It
* is well suited for handoff designs, in which a tasks is only queued if there
* is an available thread to pick it up.
* <p/>
* This queue is correlated with a thread-pool, and allows insertions to the
* queue only if there is a free thread that can poll this task. Otherwise, the
* task is rejected and the decision is left up to one of the
* {@link RejectedExecutionHandler} policies:
* <ol>
* <li> {@link ForceQueuePolicy} - forces the queue to accept the rejected task. </li>
* <li> {@link TimedBlockingPolicy} - waits for a given time for the task to be
* executed.</li>
* </ol>
*
*
*/
public static class DynamicQueue<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 1L;
/**
* The executor this Queue belongs to
*/
private transient ThreadPoolExecutor executor;
/**
* Creates a <tt>DynamicQueue</tt> with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public DynamicQueue() {
super();
}
/**
* Creates a <tt>DynamicQueue</tt> with the given (fixed) capacity.
*
* @param capacity the capacity of this queue.
*/
public DynamicQueue(int capacity) {
super(capacity);
}
/**
* Sets the executor this queue belongs to.
*/
public void setThreadPoolExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
}
/**
* Inserts the specified element at the tail of this queue if there is at
* least one available thread to run the current task. If all pool threads
* are actively busy, it rejects the offer.
*
* @param o the element to add.
* @return <tt>true</tt> if it was possible to add the element to this
* queue, else <tt>false</tt>
* @see ThreadPoolExecutor#execute(Runnable)
*/
@Override
public boolean offer(E o) {
int allWorkingThreads = executor.getActiveCount() + super.size();
return allWorkingThreads < executor.getPoolSize() && super.offer(o);
}
}
/**
* A handler for rejected tasks that adds the specified element to this queue,
* waiting if necessary for space to become available.
*/
public static class ForceQueuePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
//should never happen since we never wait
throw new RejectedExecutionException(e);
}
}
}
/**
* A handler for rejected tasks that inserts the specified element into this
* queue, waiting if necessary up to the specified wait time for space to become
* available.
*/
public static class TimedBlockingPolicy implements RejectedExecutionHandler {
private final long waitTime;
/**
* @param waitTime wait time in milliseconds for space to become available.
*/
public TimedBlockingPolicy(long waitTime) {
this.waitTime = waitTime;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS);
if (!successful)
throw new RejectedExecutionException("Rejected execution after waiting "
+ waitTime + " ms for task [" + r.getClass() + "] to be executed.");
} catch (InterruptedException e) {
throw new RejectedExecutionException(e);
}
}
}
}

View File

@ -19,8 +19,8 @@
package org.elasticsearch.common.util.concurrent;
import jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import java.util.concurrent.*;
@ -29,11 +29,24 @@ import java.util.concurrent.*;
*/
public class EsExecutors {
public static ExecutorService newCachedThreadPool(TimeValue keepAlive, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
keepAlive.millis(), TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
public static ThreadPoolExecutor newScalingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<Runnable>();
// we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue
ThreadPoolExecutor executor = new ThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory,
new ForceQueuePolicy());
queue.executor = executor;
return executor;
}
public static ThreadPoolExecutor newBlockingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, int capacity,
long waitTime, TimeUnit waitTimeUnit) {
ExecutorBlockingQueue<Runnable> queue = new ExecutorBlockingQueue<Runnable>(capacity);
ThreadPoolExecutor executor = new ThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory,
new TimedBlockingPolicy(waitTimeUnit.toMillis(waitTime)));
queue.executor = executor;
return executor;
}
public static String threadName(Settings settings, String namePrefix) {
@ -88,4 +101,84 @@ public class EsExecutors {
*/
private EsExecutors() {
}
static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> {
ThreadPoolExecutor executor;
public ExecutorScalingQueue() {
}
@Override
public boolean offer(E e) {
int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
if (!tryTransfer(e)) {
if (left > 0) {
return false;
} else {
return super.offer(e);
}
} else {
return true;
}
}
}
static class ExecutorBlockingQueue<E> extends ArrayBlockingQueue<E> {
ThreadPoolExecutor executor;
ExecutorBlockingQueue(int capacity) {
super(capacity);
}
@Override
public boolean offer(E o) {
int allWorkingThreads = executor.getActiveCount() + super.size();
return allWorkingThreads < executor.getPoolSize() && super.offer(o);
}
}
/**
* A handler for rejected tasks that adds the specified element to this queue,
* waiting if necessary for space to become available.
*/
static class ForceQueuePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
//should never happen since we never wait
throw new RejectedExecutionException(e);
}
}
}
/**
* A handler for rejected tasks that inserts the specified element into this
* queue, waiting if necessary up to the specified wait time for space to become
* available.
*/
static class TimedBlockingPolicy implements RejectedExecutionHandler {
private final long waitTime;
/**
* @param waitTime wait time in milliseconds for space to become available.
*/
public TimedBlockingPolicy(long waitTime) {
this.waitTime = waitTime;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS);
if (!successful)
throw new RejectedExecutionException("Rejected execution after waiting "
+ waitTime + " ms for task [" + r.getClass() + "] to be executed.");
} catch (InterruptedException e) {
throw new RejectedExecutionException(e);
}
}
}
}

View File

@ -1,635 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import jsr166y.LinkedTransferQueue;
import jsr166y.TransferQueue;
import java.util.*;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* A thread pool based on {@link jsr166y.TransferQueue}.
* <p/>
* <p>Limited compared to ExecutorServer in what it does, but focused on speed.
*
*
*/
public class TransferThreadPoolExecutor extends AbstractExecutorService {
private final TransferQueue<Runnable> workQueue = new LinkedTransferQueue<Runnable>();
private final AtomicInteger queueSize = new AtomicInteger();
/**
* Lock held on updates to poolSize, corePoolSize,
* maximumPoolSize, runState, and workers set.
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* Factory for new threads. All threads are created using this
* factory (via method addThread). All callers must be prepared
* for addThread to fail by returning null, which may reflect a
* system or user's policy limiting the number of threads. Even
* though it is not treated as an error, failure to create threads
* may result in new tasks being rejected or existing ones
* remaining stuck in the queue. On the other hand, no special
* precautions exist to handle OutOfMemoryErrors that might be
* thrown while trying to create threads, since there is generally
* no recourse from within this class.
*/
private final ThreadFactory threadFactory;
/**
* runState provides the main lifecyle control, taking on values:
* <p/>
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TERMINATED: Same as STOP, plus all threads have terminated
* <p/>
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
* <p/>
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TERMINATED
* When both queue and pool are empty
* STOP -> TERMINATED
* When pool is empty
*/
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
private final boolean blocking;
private final int blockingCapacity;
private final long blockingTime;
/**
* Core pool size, updated only while holding mainLock, but
* volatile to allow concurrent readability even during updates.
*/
private final int corePoolSize;
/**
* Maximum pool size, updated only while holding mainLock but
* volatile to allow concurrent readability even during updates.
*/
private final int maximumPoolSize;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private final long keepAliveTime;
/**
* Current pool size, updated only while holding mainLock but
* volatile to allow concurrent readability even during updates.
*/
private final AtomicInteger poolSize = new AtomicInteger();
public static TransferThreadPoolExecutor newScalingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
return new TransferThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, false, 0, TimeUnit.NANOSECONDS, 0, threadFactory);
}
public static TransferThreadPoolExecutor newBlockingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
long blockingTime, TimeUnit blockingUnit, int blockingCapacity,
ThreadFactory threadFactory) {
return new TransferThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, true, blockingTime, blockingUnit, blockingCapacity, threadFactory);
}
private TransferThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
boolean blocking, long blockingTime, TimeUnit blockingUnit, int blockingCapacity,
ThreadFactory threadFactory) {
this.blocking = blocking;
this.blockingTime = blockingUnit.toNanos(blockingTime);
this.blockingCapacity = blockingCapacity;
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
for (int i = 0; i < corePoolSize; i++) {
Thread t = addWorker();
if (t != null) {
poolSize.incrementAndGet();
t.start();
}
}
}
@Override
public void execute(Runnable command) {
if (blocking) {
executeBlocking(command);
} else {
executeNonBlocking(command);
}
}
private void executeNonBlocking(Runnable command) {
// note, there might be starvation of some commands that were added to the queue,
// while others are being transferred directly
queueSize.getAndIncrement();
boolean succeeded = workQueue.tryTransfer(command);
if (succeeded) {
return;
}
int currentPoolSize = poolSize.get();
if (currentPoolSize < maximumPoolSize) {
// if we manage to add a worker, add it, and tryTransfer again
if (poolSize.compareAndSet(currentPoolSize, currentPoolSize + 1)) {
Thread t = addWorker();
if (t == null) {
poolSize.decrementAndGet();
workQueue.add(command);
} else {
t.start();
succeeded = workQueue.tryTransfer(command);
if (!succeeded) {
workQueue.add(command);
}
}
} else {
succeeded = workQueue.tryTransfer(command);
if (!succeeded) {
workQueue.add(command);
}
}
} else {
workQueue.add(command);
}
}
private void executeBlocking(Runnable command) {
int currentCapacity = queueSize.getAndIncrement();
boolean succeeded = workQueue.tryTransfer(command);
if (succeeded) {
return;
}
int currentPoolSize = poolSize.get();
if (currentPoolSize < maximumPoolSize) {
// if we manage to add a worker, add it, and tryTransfer again
if (poolSize.compareAndSet(currentPoolSize, currentPoolSize + 1)) {
Thread t = addWorker();
if (t == null) {
poolSize.decrementAndGet();
workQueue.add(command);
} else {
t.start();
succeeded = workQueue.tryTransfer(command);
if (!succeeded) {
transferOrAddBlocking(command, currentCapacity);
}
}
} else {
succeeded = workQueue.tryTransfer(command);
if (!succeeded) {
transferOrAddBlocking(command, currentCapacity);
}
}
} else {
transferOrAddBlocking(command, currentCapacity);
}
}
private void transferOrAddBlocking(Runnable command, int currentCapacity) {
if (currentCapacity < blockingCapacity) {
workQueue.add(command);
} else {
boolean succeeded;
try {
succeeded = workQueue.tryTransfer(command, blockingTime, TimeUnit.NANOSECONDS);
if (!succeeded) {
throw new RejectedExecutionException("Rejected execution after waiting "
+ TimeUnit.NANOSECONDS.toSeconds(blockingTime) + "s for task [" + command.getClass() + "] to be executed.");
}
} catch (InterruptedException e) {
throw new RejectedExecutionException(e);
}
}
}
@Override
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int state = runState;
if (state < SHUTDOWN)
runState = SHUTDOWN;
try {
for (Worker w : workers) {
w.interruptIfIdle();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
tryTerminate(); // Terminate now if pool and queue empty
} finally {
mainLock.unlock();
}
}
@Override
public List<Runnable> shutdownNow() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int state = runState;
if (state < STOP)
runState = STOP;
try {
for (Worker w : workers) {
w.interruptNow();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
List<Runnable> tasks = drainQueue();
tryTerminate(); // Terminate now if pool and queue empty
return tasks;
} finally {
mainLock.unlock();
}
}
@Override
public boolean isShutdown() {
return runState != RUNNING;
}
@Override
public boolean isTerminated() {
return runState == TERMINATED;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (; ; ) {
if (runState == TERMINATED)
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize() {
return poolSize.get();
}
/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers) {
if (w.isActive())
++n;
}
return n;
} finally {
mainLock.unlock();
}
}
public int getCorePoolSize() {
return corePoolSize;
}
public int getMaximumPoolSize() {
return maximumPoolSize;
}
public int getQueueSize() {
return queueSize.get();
}
private final class Worker implements Runnable {
/**
* The runLock is acquired and released surrounding each task
* execution. It mainly protects against interrupts that are
* intended to cancel the worker thread from instead
* interrupting the task being run.
*/
private final ReentrantLock runLock = new ReentrantLock();
/**
* Thread this worker is running in. Acts as a final field,
* but cannot be set until thread is created.
*/
Thread thread;
Worker() {
}
boolean isActive() {
return runLock.isLocked();
}
/**
* Interrupts thread if not running a task.
*/
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
/**
* Interrupts thread even if running a task.
*/
void interruptNow() {
thread.interrupt();
}
/**
* Runs a single task between before/after methods.
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
/*
* Ensure that unless pool is stopping, this thread
* does not have its interrupt set. This requires a
* double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
if (runState < STOP && Thread.interrupted() && runState >= STOP)
thread.interrupt();
task.run();
} finally {
runLock.unlock();
}
}
/**
* Main run loop
*/
public void run() {
try {
Runnable task;
while ((task = getTask()) != null) {
runTask(task);
}
} finally {
workerDone(this);
}
}
}
Runnable getTask() {
for (; ; ) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize.get() > corePoolSize)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null) {
queueSize.decrementAndGet();
return r;
}
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
/**
* Check whether a worker thread that fails to get a task can
* exit. We allow a worker thread to die if the pool is stopping,
* or the queue is empty, or there is at least one thread to
* handle possibly non-empty queue, even if core timeouts are
* allowed.
*/
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
try {
canExit = runState >= STOP || (queueSize.get() == 0 && (runState >= SHUTDOWN || poolSize.get() > corePoolSize));
} finally {
mainLock.unlock();
}
return canExit;
}
/**
* Wakes up all threads that might be waiting for tasks so they
* can check for termination. Note: this method is also called by
* ScheduledThreadPoolExecutor.
*/
void interruptIdleWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfIdle();
} finally {
mainLock.unlock();
}
}
/**
* Performs bookkeeping for an exiting worker thread.
*
* @param w the worker
*/
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
workers.remove(w);
if (poolSize.decrementAndGet() == 0)
tryTerminate();
} finally {
mainLock.unlock();
}
}
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty), otherwise unless
* stopped, ensuring that there is at least one live thread to
* handle queued tasks.
* <p/>
* This method is called from the three places in which
* termination can occur: in workerDone on exit of the last thread
* after pool has been shut down, or directly within calls to
* shutdown or shutdownNow, if there are no live threads.
*/
private void tryTerminate() {
if (poolSize.get() == 0) {
int state = runState;
if (state < STOP && queueSize.get() > 0) {
state = RUNNING; // disable termination check below
Thread t = addThread();
poolSize.incrementAndGet();
if (t != null)
t.start();
}
if (state == STOP || state == SHUTDOWN) {
runState = TERMINATED;
termination.signalAll();
}
}
}
/**
* Creates and returns a new thread running firstTask as its first
* task. Executed under mainLock.
*/
private Thread addWorker() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return addThread();
} finally {
mainLock.unlock();
}
}
/**
* Creates and returns a new thread running firstTask as its first
* task. Call only while holding mainLock.
*/
private Thread addThread() {
Worker w = new Worker();
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
}
return t;
}
/**
* Drains the task queue into a new list. Used by shutdownNow.
* Call only while holding main lock.
*/
private List<Runnable> drainQueue() {
List<Runnable> taskList = new ArrayList<Runnable>();
workQueue.drainTo(taskList);
queueSize.getAndAdd(taskList.size() * -1);
/*
* If the queue is a DelayQueue or any other kind of queue
* for which poll or drainTo may fail to remove some elements,
* we need to manually traverse and remove remaining tasks.
* To guarantee atomicity wrt other threads using this queue,
* we need to create a new iterator for each element removed.
*/
while (!workQueue.isEmpty()) {
Iterator<Runnable> it = workQueue.iterator();
try {
if (it.hasNext()) {
Runnable r = it.next();
if (workQueue.remove(r)) {
taskList.add(r);
queueSize.decrementAndGet();
}
}
} catch (ConcurrentModificationException ignore) {
}
}
return taskList;
}
}

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
@ -209,7 +208,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
public Executor executor() {
if (executor == null) {
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
executor = DynamicExecutors.newScalingThreadPool(1, concurrentConnects, 60000, threadFactory);
executor = EsExecutors.newScalingExecutorService(0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory);
}
return executor;
}

View File

@ -26,8 +26,6 @@ import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
@ -37,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
*
@ -60,7 +59,7 @@ public class FsGateway extends BlobStoreGateway {
}
int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5);
this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[fs_stream]"));
this.concurrentStreamPool = EsExecutors.newScalingExecutorService(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]"));
initialize(new FsBlobStore(componentSettings, concurrentStreamPool, gatewayFile), clusterName, null);
}

View File

@ -27,12 +27,11 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*/
@ -69,7 +68,7 @@ public class RecoverySettings extends AbstractComponent {
this.compress = componentSettings.getAsBoolean("compress", true);
this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 5));
this.concurrentStreamPool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
this.concurrentStreamPool = EsExecutors.newScalingExecutorService(0, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
this.maxSizePerSec = componentSettings.getAsBytesSize("max_size_per_sec", new ByteSizeValue(0));
if (maxSizePerSec.bytes() <= 0) {

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import java.util.Map;
@ -194,7 +193,7 @@ public class ThreadPool extends AbstractComponent {
int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1));
int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5));
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
return DynamicExecutors.newScalingThreadPool(min, size, keepAlive.millis(), threadFactory);
return EsExecutors.newScalingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
} else if ("blocking".equals(type)) {
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)));
int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1));
@ -202,7 +201,7 @@ public class ThreadPool extends AbstractComponent {
SizeValue capacity = settings.getAsSize("queue_size", defaultSettings.getAsSize("queue_size", new SizeValue(1000)));
TimeValue waitTime = settings.getAsTime("wait_time", defaultSettings.getAsTime("wait_time", timeValueSeconds(60)));
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime);
return DynamicExecutors.newBlockingThreadPool(min, size, keepAlive.millis(), (int) capacity.singles(), waitTime.millis(), threadFactory);
return EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) capacity.singles(), waitTime.millis(), TimeUnit.MILLISECONDS);
}
throw new ElasticSearchIllegalArgumentException("No type found [" + type + "], for [" + name + "]");
}

View File

@ -1,95 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.common.util.concurrent;
import org.elasticsearch.common.util.concurrent.ThreadBarrier;
import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor;
import org.testng.annotations.Test;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*
*/
@Test(enabled = false)
public class BlockingThreadPoolTest {
@Test
public void testBlocking() throws Exception {
final int min = 2;
final int max = 4;
final long waitTime = 1000; //1 second
final ThreadBarrier barrier = new ThreadBarrier(max + 1);
TransferThreadPoolExecutor pool = TransferThreadPoolExecutor.newBlockingExecutor(min, max, 60000, TimeUnit.MILLISECONDS, waitTime, TimeUnit.MILLISECONDS, 1, Executors.defaultThreadFactory());
assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
for (int i = 0; i < max; ++i) {
pool.execute(new Runnable() {
public void run() {
try {
barrier.await();
barrier.await();
} catch (Throwable e) {
barrier.reset(e);
}
}
});
//wait until thread executes this task
//otherwise, a task might be queued
Thread.sleep(100);
}
barrier.await();
assertThat("wrong pool size", pool.getPoolSize(), equalTo(max));
assertThat("wrong active size", pool.getActiveCount(), equalTo(max));
//Queue should be empty, lets occupy it's only free space
assertThat("queue isn't empty", pool.getQueueSize(), equalTo(0));
pool.execute(new Runnable() {
public void run() {
//dummy task
}
});
assertThat("queue isn't full", pool.getQueueSize(), equalTo(1));
//request should block since queue is full
try {
pool.execute(new Runnable() {
public void run() {
//dummy task
}
});
assertThat("Should have thrown RejectedExecutionException", false, equalTo(true));
} catch (RejectedExecutionException e) {
//caught expected exception
}
barrier.await();
pool.shutdown();
}
}

View File

@ -19,22 +19,22 @@
package org.elasticsearch.test.unit.common.util.concurrent;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadBarrier;
import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor;
import org.testng.annotations.Test;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
/**
*
*/
@Test(enabled = false)
public class ScalingThreadPoolTest {
@Test
public class EsExecutorsTests {
@Test
public void testScaleUp() throws Exception {
@ -42,8 +42,7 @@ public class ScalingThreadPoolTest {
final int max = 4;
final ThreadBarrier barrier = new ThreadBarrier(max + 1);
// ThreadPoolExecutor pool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(min, max, Long.MAX_VALUE);
TransferThreadPoolExecutor pool = TransferThreadPoolExecutor.newScalingExecutor(min, max, Long.MAX_VALUE, TimeUnit.NANOSECONDS, Executors.defaultThreadFactory());
ThreadPoolExecutor pool = EsExecutors.newScalingExecutorService(min, max, 100, TimeUnit.DAYS, EsExecutors.daemonThreadFactory("test"));
assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
@ -77,8 +76,7 @@ public class ScalingThreadPoolTest {
final int max = 4;
final ThreadBarrier barrier = new ThreadBarrier(max + 1);
// ThreadPoolExecutor pool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(min, max, 0 /*keep alive*/);
TransferThreadPoolExecutor pool = TransferThreadPoolExecutor.newScalingExecutor(min, max, 0, TimeUnit.NANOSECONDS, Executors.defaultThreadFactory());
ThreadPoolExecutor pool = EsExecutors.newScalingExecutorService(min, max, 10, TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test"));
assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
@ -108,34 +106,29 @@ public class ScalingThreadPoolTest {
// assertThat("not all tasks completed", pool.getCompletedTaskCount(), equalTo((long) max));
assertThat("wrong active count", pool.getActiveCount(), equalTo(0));
//Assert.assertEquals("wrong pool size. ", min, pool.getPoolSize()); //BUG in ThreadPool - Bug ID: 6458662
assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), greaterThan(0));
//assertThat("idle threads didn't stay above min (" + pool.getPoolSize() + ")", pool.getPoolSize(), greaterThan(0));
assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max));
pool.shutdown();
}
@Test
public void testScaleAbove() throws Exception {
public void testBlocking() throws Exception {
final int min = 2;
final int max = 4;
final int ntasks = 16;
final long waitTime = 1000; //1 second
final ThreadBarrier barrier = new ThreadBarrier(max + 1);
// ThreadPoolExecutor pool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(min, max, Long.MAX_VALUE);
TransferThreadPoolExecutor pool = TransferThreadPoolExecutor.newScalingExecutor(min, max, Long.MAX_VALUE, TimeUnit.NANOSECONDS, Executors.defaultThreadFactory());
ThreadPoolExecutor pool = EsExecutors.newBlockingExecutorService(min, max, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test"), 1, waitTime, TimeUnit.MILLISECONDS);
assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
final AtomicInteger tasksExecuted = new AtomicInteger();
for (int i = 0; i < ntasks; ++i) {
final int id = i;
for (int i = 0; i < max; ++i) {
pool.execute(new Runnable() {
public void run() {
tasksExecuted.incrementAndGet();
try {
if (id < max) {
barrier.await();
}
barrier.await();
barrier.await();
} catch (Throwable e) {
barrier.reset(e);
}
@ -147,14 +140,32 @@ public class ScalingThreadPoolTest {
Thread.sleep(100);
}
assertThat("wrong number of pooled tasks", pool.getQueueSize(), equalTo(ntasks - max));
barrier.await();
assertThat("wrong pool size", pool.getPoolSize(), equalTo(max));
assertThat("wrong active size", pool.getActiveCount(), equalTo(max));
//wait around for one second
Thread.sleep(1000);
assertThat("tasks not complete", tasksExecuted.get(), equalTo(ntasks));
// assertThat("didn't scale above core pool size. (" + pool.getLargestPoolSize() + ")", pool.getLargestPoolSize(), greaterThan(min));
// assertThat("Largest pool size exceeds max. (" + pool.getLargestPoolSize() + ")", pool.getLargestPoolSize(), lessThanOrEqualTo(max));
//Queue should be empty, lets occupy it's only free space
assertThat("queue isn't empty", pool.getQueue().size(), equalTo(0));
pool.execute(new Runnable() {
public void run() {
//dummy task
}
});
assertThat("queue isn't full", pool.getQueue().size(), equalTo(1));
//request should block since queue is full
try {
pool.execute(new Runnable() {
public void run() {
//dummy task
}
});
assertThat("Should have thrown RejectedExecutionException", false, equalTo(true));
} catch (RejectedExecutionException e) {
//caught expected exception
}
barrier.await();
pool.shutdown();
}
}