Revert "ARTEMIS-2240 ActiveMQThreadPoolExecutor should use LinkedTransferQueue"

This reverts commit ea29483449
This commit is contained in:
Francesco Nigro 2019-03-13 13:47:06 +01:00 committed by Clebert Suconic
parent 5a74b8b34d
commit 79ca203d6d
1 changed files with 104 additions and 38 deletions

View File

@ -16,12 +16,11 @@
*/ */
package org.apache.activemq.artemis.utils; package org.apache.activemq.artemis.utils;
import java.util.Objects; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/* /*
* ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines * ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines
@ -30,58 +29,125 @@ import java.util.concurrent.TimeUnit;
* and will be removed after idling for a specified keep time. * and will be removed after idling for a specified keep time.
* But in contrast to a standard cached executor, tasks are queued if the * But in contrast to a standard cached executor, tasks are queued if the
* maximum pool size if reached, instead of rejected. * maximum pool size if reached, instead of rejected.
*
* This is achieved by using a specialized blocking queue, which checks the
* state of the associated executor in the offer method to decide whether to
* queue a task or have the executor create another thread.
*
* Since the thread pool's execute method is reentrant, more than one caller
* could try to offer a task into the queue. There is a small chance that
* (a few) more threads are created as it should be limited by max pool size.
* To allow for such a case not to reject a task, the underlying thread pool
* executor is not limited. Only the offer method checks the configured limit.
*/ */
public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
/** @SuppressWarnings("serial")
* The default rejected execution handler private static class ThreadPoolQueue extends LinkedBlockingQueue<Runnable> {
*/
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
// Handler executed when a task is submitted and a new thread cannot be created (because maxSize was reached) private ActiveMQThreadPoolExecutor executor = null;
// It queues the task on the executors's queue (using the add() method, see ThreadPoolQueue class below)
private static class QueueExecutionHandler implements RejectedExecutionHandler {
private final RejectedExecutionHandler handler; // keep track of the difference between the number of idle threads and
// the number of queued tasks. If the delta is > 0, we have more
// idle threads than queued tasks and can add more tasks into the queue.
// The delta is incremented if a thread becomes idle or if a task is taken from the queue.
// The delta is decremented if a thread leaves idle state or if a task is added to the queue.
private static final AtomicIntegerFieldUpdater<ThreadPoolQueue> DELTA_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ThreadPoolQueue.class, "threadTaskDelta");
private volatile int threadTaskDelta = 0;
private QueueExecutionHandler(RejectedExecutionHandler handler) { public void setExecutor(ActiveMQThreadPoolExecutor executor) {
Objects.requireNonNull(handler); this.executor = executor;
this.handler = handler;
} }
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown() || !executor.getQueue().add(r)) {
handler.rejectedExecution(r, executor);
}
}
}
// A specialized LinkedBlockingQueue that takes new elements by calling add() but not offer()
// This is to force the ThreadPoolExecutor to always create new threads and never queue
private static class ThreadPoolQueue extends LinkedTransferQueue<Runnable> {
@Override @Override
public boolean offer(Runnable runnable) { public boolean offer(Runnable runnable) {
return tryTransfer(runnable); boolean retval = false;
if (threadTaskDelta > 0 || (executor.getPoolSize() >= executor.getMaximumPoolSize())) {
// A new task will be added to the queue if the maximum number of threads has been reached
// or if the delta is > 0, which means that there are enough idle threads.
retval = super.offer(runnable);
// Only decrement the delta if the task has actually been added to the queue
if (retval)
DELTA_UPDATER.decrementAndGet(this);
}
return retval;
} }
@Override @Override
public boolean add(Runnable runnable) { public Runnable take() throws InterruptedException {
return super.offer(runnable); // Increment the delta as a thread becomes idle
// by waiting for a task to take from the queue
DELTA_UPDATER.incrementAndGet(this);
Runnable runnable = null;
try {
runnable = super.take();
return runnable;
} finally {
// Now the thread is no longer idle waiting for a task
// If it had taken a task, the delta remains the same
// (decremented by the thread and incremented by the taken task)
// Only if no task had been taken, we have to decrement the delta.
if (runnable == null) {
DELTA_UPDATER.decrementAndGet(this);
} }
} }
}
@Override
public Runnable poll(long arg0, TimeUnit arg2) throws InterruptedException {
// Increment the delta as a thread becomes idle
// by waiting for a task to poll from the queue
DELTA_UPDATER.incrementAndGet(this);
Runnable runnable = null;
try {
runnable = super.poll(arg0, arg2);
} finally {
// Now the thread is no longer idle waiting for a task
// If it had taken a task, the delta remains the same
// (decremented by the thread and incremented by the taken task)
if (runnable == null) {
DELTA_UPDATER.decrementAndGet(this);
}
}
return runnable;
}
}
private int maxPoolSize;
public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, TimeUnit keepUnits, ThreadFactory factory) { public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, TimeUnit keepUnits, ThreadFactory factory) {
this(coreSize, maxSize, keep, keepUnits, factory, defaultHandler); this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory);
} }
public ActiveMQThreadPoolExecutor(int coreSize, // private constructor is needed to inject 'this' into the ThreadPoolQueue instance
private ActiveMQThreadPoolExecutor(int coreSize,
int maxSize, int maxSize,
long keep, long keep,
TimeUnit keepUnits, TimeUnit keepUnits,
ThreadFactory factory, ThreadPoolQueue myQueue,
RejectedExecutionHandler handler) { ThreadFactory factory) {
super(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory, new QueueExecutionHandler(handler)); super(coreSize, Integer.MAX_VALUE, keep, keepUnits, myQueue, factory);
maxPoolSize = maxSize;
myQueue.setExecutor(this);
}
@Override
public int getMaximumPoolSize() {
return maxPoolSize;
}
@Override
public void setMaximumPoolSize(int maxSize) {
maxPoolSize = maxSize;
} }
} }