From f8b758d14bc4dac7d613e1d15a65d31289f0a587 Mon Sep 17 00:00:00 2001 From: barreiro Date: Thu, 5 Oct 2017 21:21:52 +0100 Subject: [PATCH] ARTEMIS-1451 - Remove synchronization on ActiveMQThreadPoolExecutor --- .../utils/ActiveMQThreadPoolExecutor.java | 147 ++---------------- 1 file changed, 15 insertions(+), 132 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java index c3b1988c25..ed5f4efac5 100755 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.utils; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -28,151 +29,33 @@ import java.util.concurrent.TimeUnit; * and will be removed after idling for a specified keep time. * But in contrast to a standard cached executor, tasks are queued if the * maximum pool size if reached, instead of rejected. - * - * This is achieved by using a specialized blocking queue, which checks the - * state of the associated executor in the offer method to decide whether to - * queue a task or have the executor create another thread. - * - * Since the thread pool's execute method is reentrant, more than one caller - * could try to offer a task into the queue. There is a small chance that - * (a few) more threads are created as it should be limited by max pool size. - * To allow for such a case not to reject a task, the underlying thread pool - * executor is not limited. Only the offer method checks the configured limit. */ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { - @SuppressWarnings("serial") - private static class ThreadPoolQueue extends LinkedBlockingQueue { - - private ActiveMQThreadPoolExecutor executor = null; - - // lock object to synchronize on - private final Object lock = new Object(); - - // keep track of the difference between the number of idle threads and - // the number of queued tasks. If the delta is > 0, we have more - // idle threads than queued tasks and can add more tasks into the queue. - // The delta is incremented if a thread becomes idle or if a task is taken from the queue. - // The delta is decremented if a thread leaves idle state or if a task is added to the queue. - private int threadTaskDelta = 0; - - public void setExecutor(ActiveMQThreadPoolExecutor executor) { - this.executor = executor; + // Handler executed when a task is submitted and a new thread cannot be created (because maxSize was reached) + // It queues the task on the executors's queue (using the add() method, see ThreadPoolQueue class below) + private static final RejectedExecutionHandler QUEUE_EXECUTION_HANDLER = (r, e) -> { + if (!e.isShutdown()) { + e.getQueue().add(r); } + }; + + // A specialized LinkedBlockingQueue that takes new elements by calling add() but not offer() + // This is to force the ThreadPoolExecutor to always create new threads and never queue + private static class ThreadPoolQueue extends LinkedBlockingQueue { @Override public boolean offer(Runnable runnable) { - boolean retval = false; - - // Need to lock for 2 reasons: - // 1. to safely handle poll timeouts - // 2. to protect the delta from parallel updates - synchronized (lock) { - if ((executor.getPoolSize() >= executor.getMaximumPoolSize()) || (threadTaskDelta > 0)) { - // A new task will be added to the queue if the maximum number of threads has been reached - // or if the delta is > 0, which means that there are enough idle threads. - - retval = super.offer(runnable); - - // Only decrement the delta if the task has actually been added to the queue - if (retval) - threadTaskDelta--; - } - } - - return retval; + return false; } @Override - public Runnable take() throws InterruptedException { - // Increment the delta as a thread becomes idle - // by waiting for a task to take from the queue - synchronized (lock) { - threadTaskDelta++; - } - - Runnable runnable = null; - - try { - runnable = super.take(); - return runnable; - } finally { - // Now the thread is no longer idle waiting for a task - // If it had taken a task, the delta remains the same - // (decremented by the thread and incremented by the taken task) - // Only if no task had been taken, we have to decrement the delta. - if (runnable == null) { - synchronized (lock) { - threadTaskDelta--; - } - } - } - } - - @Override - public Runnable poll(long arg0, TimeUnit arg2) throws InterruptedException { - // Increment the delta as a thread becomes idle - // by waiting for a task to poll from the queue - synchronized (lock) { - threadTaskDelta++; - } - - Runnable runnable = null; - boolean timedOut = false; - - try { - runnable = super.poll(arg0, arg2); - timedOut = (runnable == null); - } finally { - // Now the thread is no longer idle waiting for a task - // If it had taken a task, the delta remains the same - // (decremented by the thread and incremented by the taken task) - if (runnable == null) { - synchronized (lock) { - // If the poll called timed out, we check again within a synchronized block - // to make sure all offer calls have been completed. - // This is to handle a newly queued task if the timeout occurred while an offer call - // added that task to the queue instead of creating a new thread. - if (timedOut) - runnable = super.poll(); - - // Only if no task had been taken (either no timeout, or no task from after-timeout poll), - // we have to decrement the delta. - if (runnable == null) - threadTaskDelta--; - } - } - } - - return runnable; + public boolean add(Runnable runnable) { + return super.offer( runnable ); } } - private int maxPoolSize; - public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, TimeUnit keepUnits, ThreadFactory factory) { - this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory); - } - - // private constructor is needed to inject 'this' into the ThreadPoolQueue instance - private ActiveMQThreadPoolExecutor(int coreSize, - int maxSize, - long keep, - TimeUnit keepUnits, - ThreadPoolQueue myQueue, - ThreadFactory factory) { - super(coreSize, Integer.MAX_VALUE, keep, keepUnits, myQueue, factory); - maxPoolSize = maxSize; - myQueue.setExecutor(this); - } - - @Override - public int getMaximumPoolSize() { - return maxPoolSize; - } - - @Override - public void setMaximumPoolSize(int maxSize) { - maxPoolSize = maxSize; + super( coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory, QUEUE_EXECUTION_HANDLER ); } }