From b9a063f2e715eeac05860dc4977dac9221e6aa93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Wed, 23 Jan 2019 09:52:24 +0000 Subject: [PATCH 1/2] ARTEMIS-2236 - Revert Original ARTEMIS-1451 This reverts commit f8b758d1 --- .../utils/ActiveMQThreadPoolExecutor.java | 147 ++++++++++++++++-- 1 file changed, 132 insertions(+), 15 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java index ed5f4efac5..c3b1988c25 100755 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.utils; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -29,33 +28,151 @@ import java.util.concurrent.TimeUnit; * and will be removed after idling for a specified keep time. * But in contrast to a standard cached executor, tasks are queued if the * maximum pool size if reached, instead of rejected. + * + * This is achieved by using a specialized blocking queue, which checks the + * state of the associated executor in the offer method to decide whether to + * queue a task or have the executor create another thread. + * + * Since the thread pool's execute method is reentrant, more than one caller + * could try to offer a task into the queue. There is a small chance that + * (a few) more threads are created as it should be limited by max pool size. + * To allow for such a case not to reject a task, the underlying thread pool + * executor is not limited. Only the offer method checks the configured limit. */ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { - // Handler executed when a task is submitted and a new thread cannot be created (because maxSize was reached) - // It queues the task on the executors's queue (using the add() method, see ThreadPoolQueue class below) - private static final RejectedExecutionHandler QUEUE_EXECUTION_HANDLER = (r, e) -> { - if (!e.isShutdown()) { - e.getQueue().add(r); - } - }; - - // A specialized LinkedBlockingQueue that takes new elements by calling add() but not offer() - // This is to force the ThreadPoolExecutor to always create new threads and never queue + @SuppressWarnings("serial") private static class ThreadPoolQueue extends LinkedBlockingQueue { + private ActiveMQThreadPoolExecutor executor = null; + + // lock object to synchronize on + private final Object lock = new Object(); + + // keep track of the difference between the number of idle threads and + // the number of queued tasks. If the delta is > 0, we have more + // idle threads than queued tasks and can add more tasks into the queue. + // The delta is incremented if a thread becomes idle or if a task is taken from the queue. + // The delta is decremented if a thread leaves idle state or if a task is added to the queue. + private int threadTaskDelta = 0; + + public void setExecutor(ActiveMQThreadPoolExecutor executor) { + this.executor = executor; + } + @Override public boolean offer(Runnable runnable) { - return false; + boolean retval = false; + + // Need to lock for 2 reasons: + // 1. to safely handle poll timeouts + // 2. to protect the delta from parallel updates + synchronized (lock) { + if ((executor.getPoolSize() >= executor.getMaximumPoolSize()) || (threadTaskDelta > 0)) { + // A new task will be added to the queue if the maximum number of threads has been reached + // or if the delta is > 0, which means that there are enough idle threads. + + retval = super.offer(runnable); + + // Only decrement the delta if the task has actually been added to the queue + if (retval) + threadTaskDelta--; + } + } + + return retval; } @Override - public boolean add(Runnable runnable) { - return super.offer( runnable ); + public Runnable take() throws InterruptedException { + // Increment the delta as a thread becomes idle + // by waiting for a task to take from the queue + synchronized (lock) { + threadTaskDelta++; + } + + Runnable runnable = null; + + try { + runnable = super.take(); + return runnable; + } finally { + // Now the thread is no longer idle waiting for a task + // If it had taken a task, the delta remains the same + // (decremented by the thread and incremented by the taken task) + // Only if no task had been taken, we have to decrement the delta. + if (runnable == null) { + synchronized (lock) { + threadTaskDelta--; + } + } + } + } + + @Override + public Runnable poll(long arg0, TimeUnit arg2) throws InterruptedException { + // Increment the delta as a thread becomes idle + // by waiting for a task to poll from the queue + synchronized (lock) { + threadTaskDelta++; + } + + Runnable runnable = null; + boolean timedOut = false; + + try { + runnable = super.poll(arg0, arg2); + timedOut = (runnable == null); + } finally { + // Now the thread is no longer idle waiting for a task + // If it had taken a task, the delta remains the same + // (decremented by the thread and incremented by the taken task) + if (runnable == null) { + synchronized (lock) { + // If the poll called timed out, we check again within a synchronized block + // to make sure all offer calls have been completed. + // This is to handle a newly queued task if the timeout occurred while an offer call + // added that task to the queue instead of creating a new thread. + if (timedOut) + runnable = super.poll(); + + // Only if no task had been taken (either no timeout, or no task from after-timeout poll), + // we have to decrement the delta. + if (runnable == null) + threadTaskDelta--; + } + } + } + + return runnable; } } + private int maxPoolSize; + public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, TimeUnit keepUnits, ThreadFactory factory) { - super( coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory, QUEUE_EXECUTION_HANDLER ); + this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory); + } + + // private constructor is needed to inject 'this' into the ThreadPoolQueue instance + private ActiveMQThreadPoolExecutor(int coreSize, + int maxSize, + long keep, + TimeUnit keepUnits, + ThreadPoolQueue myQueue, + ThreadFactory factory) { + super(coreSize, Integer.MAX_VALUE, keep, keepUnits, myQueue, factory); + maxPoolSize = maxSize; + myQueue.setExecutor(this); + } + + @Override + public int getMaximumPoolSize() { + return maxPoolSize; + } + + @Override + public void setMaximumPoolSize(int maxSize) { + maxPoolSize = maxSize; } } From d4c41e45bc762f2d8b7831529678bc4061bf9cd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Wed, 23 Jan 2019 09:59:15 +0000 Subject: [PATCH 2/2] ARTEMIS-2236 Address Latency Impact caused by ARTEMIS-1451 Readdress ARTEMIS-1451 concern of sync blocks, remove synchronization by simplifying original code and using atomics. --- .../utils/ActiveMQThreadPoolExecutor.java | 55 +++++-------------- 1 file changed, 15 insertions(+), 40 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java index c3b1988c25..da7de12925 100755 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java @@ -20,6 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /* * ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines @@ -46,15 +47,13 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { private ActiveMQThreadPoolExecutor executor = null; - // lock object to synchronize on - private final Object lock = new Object(); - // keep track of the difference between the number of idle threads and // the number of queued tasks. If the delta is > 0, we have more // idle threads than queued tasks and can add more tasks into the queue. // The delta is incremented if a thread becomes idle or if a task is taken from the queue. // The delta is decremented if a thread leaves idle state or if a task is added to the queue. - private int threadTaskDelta = 0; + private static final AtomicIntegerFieldUpdater DELTA_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ThreadPoolQueue.class, "threadTaskDelta"); + private volatile int threadTaskDelta = 0; public void setExecutor(ActiveMQThreadPoolExecutor executor) { this.executor = executor; @@ -64,20 +63,15 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { public boolean offer(Runnable runnable) { boolean retval = false; - // Need to lock for 2 reasons: - // 1. to safely handle poll timeouts - // 2. to protect the delta from parallel updates - synchronized (lock) { - if ((executor.getPoolSize() >= executor.getMaximumPoolSize()) || (threadTaskDelta > 0)) { - // A new task will be added to the queue if the maximum number of threads has been reached - // or if the delta is > 0, which means that there are enough idle threads. + if (threadTaskDelta > 0 || (executor.getPoolSize() >= executor.getMaximumPoolSize())) { + // A new task will be added to the queue if the maximum number of threads has been reached + // or if the delta is > 0, which means that there are enough idle threads. - retval = super.offer(runnable); + retval = super.offer(runnable); - // Only decrement the delta if the task has actually been added to the queue - if (retval) - threadTaskDelta--; - } + // Only decrement the delta if the task has actually been added to the queue + if (retval) + DELTA_UPDATER.decrementAndGet(this); } return retval; @@ -87,9 +81,8 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { public Runnable take() throws InterruptedException { // Increment the delta as a thread becomes idle // by waiting for a task to take from the queue - synchronized (lock) { - threadTaskDelta++; - } + DELTA_UPDATER.incrementAndGet(this); + Runnable runnable = null; @@ -102,9 +95,7 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { // (decremented by the thread and incremented by the taken task) // Only if no task had been taken, we have to decrement the delta. if (runnable == null) { - synchronized (lock) { - threadTaskDelta--; - } + DELTA_UPDATER.decrementAndGet(this); } } } @@ -113,34 +104,18 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { public Runnable poll(long arg0, TimeUnit arg2) throws InterruptedException { // Increment the delta as a thread becomes idle // by waiting for a task to poll from the queue - synchronized (lock) { - threadTaskDelta++; - } + DELTA_UPDATER.incrementAndGet(this); Runnable runnable = null; - boolean timedOut = false; try { runnable = super.poll(arg0, arg2); - timedOut = (runnable == null); } finally { // Now the thread is no longer idle waiting for a task // If it had taken a task, the delta remains the same // (decremented by the thread and incremented by the taken task) if (runnable == null) { - synchronized (lock) { - // If the poll called timed out, we check again within a synchronized block - // to make sure all offer calls have been completed. - // This is to handle a newly queued task if the timeout occurred while an offer call - // added that task to the queue instead of creating a new thread. - if (timedOut) - runnable = super.poll(); - - // Only if no task had been taken (either no timeout, or no task from after-timeout poll), - // we have to decrement the delta. - if (runnable == null) - threadTaskDelta--; - } + DELTA_UPDATER.decrementAndGet(this); } }