diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java index a87b18aa6a..c3b1988c25 100755 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java @@ -20,7 +20,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /* * ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines @@ -47,31 +46,110 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { private ActiveMQThreadPoolExecutor executor = null; + // lock object to synchronize on + private final Object lock = new Object(); + + // keep track of the difference between the number of idle threads and + // the number of queued tasks. If the delta is > 0, we have more + // idle threads than queued tasks and can add more tasks into the queue. + // The delta is incremented if a thread becomes idle or if a task is taken from the queue. + // The delta is decremented if a thread leaves idle state or if a task is added to the queue. + private int threadTaskDelta = 0; + public void setExecutor(ActiveMQThreadPoolExecutor executor) { this.executor = executor; } @Override public boolean offer(Runnable runnable) { - int poolSize = executor.getPoolSize(); + boolean retval = false; - // If the are less threads than the configured maximum, then the tasks is - // only queued if there are some idle threads that can run that tasks. - // We have to add the queue size, since some tasks might just have been queued - // but not yet taken by an idle thread. - if (poolSize < executor.getMaximumPoolSize() && (size() + executor.getActive()) >= poolSize) - return false; + // Need to lock for 2 reasons: + // 1. to safely handle poll timeouts + // 2. to protect the delta from parallel updates + synchronized (lock) { + if ((executor.getPoolSize() >= executor.getMaximumPoolSize()) || (threadTaskDelta > 0)) { + // A new task will be added to the queue if the maximum number of threads has been reached + // or if the delta is > 0, which means that there are enough idle threads. - return super.offer(runnable); + retval = super.offer(runnable); + + // Only decrement the delta if the task has actually been added to the queue + if (retval) + threadTaskDelta--; + } + } + + return retval; + } + + @Override + public Runnable take() throws InterruptedException { + // Increment the delta as a thread becomes idle + // by waiting for a task to take from the queue + synchronized (lock) { + threadTaskDelta++; + } + + Runnable runnable = null; + + try { + runnable = super.take(); + return runnable; + } finally { + // Now the thread is no longer idle waiting for a task + // If it had taken a task, the delta remains the same + // (decremented by the thread and incremented by the taken task) + // Only if no task had been taken, we have to decrement the delta. + if (runnable == null) { + synchronized (lock) { + threadTaskDelta--; + } + } + } + } + + @Override + public Runnable poll(long arg0, TimeUnit arg2) throws InterruptedException { + // Increment the delta as a thread becomes idle + // by waiting for a task to poll from the queue + synchronized (lock) { + threadTaskDelta++; + } + + Runnable runnable = null; + boolean timedOut = false; + + try { + runnable = super.poll(arg0, arg2); + timedOut = (runnable == null); + } finally { + // Now the thread is no longer idle waiting for a task + // If it had taken a task, the delta remains the same + // (decremented by the thread and incremented by the taken task) + if (runnable == null) { + synchronized (lock) { + // If the poll called timed out, we check again within a synchronized block + // to make sure all offer calls have been completed. + // This is to handle a newly queued task if the timeout occurred while an offer call + // added that task to the queue instead of creating a new thread. + if (timedOut) + runnable = super.poll(); + + // Only if no task had been taken (either no timeout, or no task from after-timeout poll), + // we have to decrement the delta. + if (runnable == null) + threadTaskDelta--; + } + } + } + + return runnable; } } private int maxPoolSize; - // count the active threads with before-/afterExecute, since the .getActiveCount is not very - // efficient. - private final AtomicInteger active = new AtomicInteger(0); - public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, TimeUnit keepUnits, ThreadFactory factory) { this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory); } @@ -88,10 +166,6 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { myQueue.setExecutor(this); } - private int getActive() { - return active.get(); - } - @Override public int getMaximumPoolSize() { return maxPoolSize; @@ -101,16 +175,4 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { public void setMaximumPoolSize(int maxSize) { maxPoolSize = maxSize; } - - @Override - protected void beforeExecute(Thread thread, Runnable runnable) { - super.beforeExecute(thread, runnable); - active.incrementAndGet(); - } - - @Override - protected void afterExecute(Runnable runnable, Throwable throwable) { - active.decrementAndGet(); - super.afterExecute(runnable, throwable); - } }