ARTEMIS-2236 Address Latency Impact caused by ARTEMIS-1451

Readdress ARTEMIS-1451 concern of sync blocks, remove synchronization by simplifying original code and using atomics.
This commit is contained in:
Michael André Pearce 2019-01-23 09:59:15 +00:00
parent b9a063f2e7
commit d4c41e45bc
1 changed files with 15 additions and 40 deletions

View File

@ -20,6 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/* /*
* ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines * ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines
@ -46,15 +47,13 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
private ActiveMQThreadPoolExecutor executor = null; private ActiveMQThreadPoolExecutor executor = null;
// lock object to synchronize on
private final Object lock = new Object();
// keep track of the difference between the number of idle threads and // keep track of the difference between the number of idle threads and
// the number of queued tasks. If the delta is > 0, we have more // the number of queued tasks. If the delta is > 0, we have more
// idle threads than queued tasks and can add more tasks into the queue. // idle threads than queued tasks and can add more tasks into the queue.
// The delta is incremented if a thread becomes idle or if a task is taken from the queue. // The delta is incremented if a thread becomes idle or if a task is taken from the queue.
// The delta is decremented if a thread leaves idle state or if a task is added to the queue. // The delta is decremented if a thread leaves idle state or if a task is added to the queue.
private int threadTaskDelta = 0; private static final AtomicIntegerFieldUpdater<ThreadPoolQueue> DELTA_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ThreadPoolQueue.class, "threadTaskDelta");
private volatile int threadTaskDelta = 0;
public void setExecutor(ActiveMQThreadPoolExecutor executor) { public void setExecutor(ActiveMQThreadPoolExecutor executor) {
this.executor = executor; this.executor = executor;
@ -64,20 +63,15 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
public boolean offer(Runnable runnable) { public boolean offer(Runnable runnable) {
boolean retval = false; boolean retval = false;
// Need to lock for 2 reasons: if (threadTaskDelta > 0 || (executor.getPoolSize() >= executor.getMaximumPoolSize())) {
// 1. to safely handle poll timeouts // A new task will be added to the queue if the maximum number of threads has been reached
// 2. to protect the delta from parallel updates // or if the delta is > 0, which means that there are enough idle threads.
synchronized (lock) {
if ((executor.getPoolSize() >= executor.getMaximumPoolSize()) || (threadTaskDelta > 0)) {
// A new task will be added to the queue if the maximum number of threads has been reached
// or if the delta is > 0, which means that there are enough idle threads.
retval = super.offer(runnable); retval = super.offer(runnable);
// Only decrement the delta if the task has actually been added to the queue // Only decrement the delta if the task has actually been added to the queue
if (retval) if (retval)
threadTaskDelta--; DELTA_UPDATER.decrementAndGet(this);
}
} }
return retval; return retval;
@ -87,9 +81,8 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
public Runnable take() throws InterruptedException { public Runnable take() throws InterruptedException {
// Increment the delta as a thread becomes idle // Increment the delta as a thread becomes idle
// by waiting for a task to take from the queue // by waiting for a task to take from the queue
synchronized (lock) { DELTA_UPDATER.incrementAndGet(this);
threadTaskDelta++;
}
Runnable runnable = null; Runnable runnable = null;
@ -102,9 +95,7 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
// (decremented by the thread and incremented by the taken task) // (decremented by the thread and incremented by the taken task)
// Only if no task had been taken, we have to decrement the delta. // Only if no task had been taken, we have to decrement the delta.
if (runnable == null) { if (runnable == null) {
synchronized (lock) { DELTA_UPDATER.decrementAndGet(this);
threadTaskDelta--;
}
} }
} }
} }
@ -113,34 +104,18 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
public Runnable poll(long arg0, TimeUnit arg2) throws InterruptedException { public Runnable poll(long arg0, TimeUnit arg2) throws InterruptedException {
// Increment the delta as a thread becomes idle // Increment the delta as a thread becomes idle
// by waiting for a task to poll from the queue // by waiting for a task to poll from the queue
synchronized (lock) { DELTA_UPDATER.incrementAndGet(this);
threadTaskDelta++;
}
Runnable runnable = null; Runnable runnable = null;
boolean timedOut = false;
try { try {
runnable = super.poll(arg0, arg2); runnable = super.poll(arg0, arg2);
timedOut = (runnable == null);
} finally { } finally {
// Now the thread is no longer idle waiting for a task // Now the thread is no longer idle waiting for a task
// If it had taken a task, the delta remains the same // If it had taken a task, the delta remains the same
// (decremented by the thread and incremented by the taken task) // (decremented by the thread and incremented by the taken task)
if (runnable == null) { if (runnable == null) {
synchronized (lock) { DELTA_UPDATER.decrementAndGet(this);
// If the poll called timed out, we check again within a synchronized block
// to make sure all offer calls have been completed.
// This is to handle a newly queued task if the timeout occurred while an offer call
// added that task to the queue instead of creating a new thread.
if (timedOut)
runnable = super.poll();
// Only if no task had been taken (either no timeout, or no task from after-timeout poll),
// we have to decrement the delta.
if (runnable == null)
threadTaskDelta--;
}
} }
} }