ARTEMIS-1078 Improving ActiveMQThreadPoolExecutor
This is now considering only threads waiting for the queue to get new tasks as idle.
The thread pool maintained a counter of active threads, but that counter was increased
too late in the beforeExecute method. Submitting a task created a new thread.
If now a second task was submitter before the new thread had started to execute it's task,
the second task was queued without creating a 2nd thread. So the second task was only
executed after the first task had been completed - even if the thread pool's
maximum number of thread had not been reached.
This fix now maintains the delta between the number those threads that are currently waiting
in the queue's poll or take methods as idle threads, and the number of queued tasks.
It creates new threads unless there are enough idle threads to pick up all queued tasks.
This closes #1144
(cherry picked from commit 5a31e70353
)
This commit is contained in:
parent
ea01aeb65e
commit
f94f8f4718
|
@ -20,7 +20,6 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines
|
* ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines
|
||||||
|
@ -47,31 +46,110 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
|
|
||||||
private ActiveMQThreadPoolExecutor executor = null;
|
private ActiveMQThreadPoolExecutor executor = null;
|
||||||
|
|
||||||
|
// lock object to synchronize on
|
||||||
|
private final Object lock = new Object();
|
||||||
|
|
||||||
|
// keep track of the difference between the number of idle threads and
|
||||||
|
// the number of queued tasks. If the delta is > 0, we have more
|
||||||
|
// idle threads than queued tasks and can add more tasks into the queue.
|
||||||
|
// The delta is incremented if a thread becomes idle or if a task is taken from the queue.
|
||||||
|
// The delta is decremented if a thread leaves idle state or if a task is added to the queue.
|
||||||
|
private int threadTaskDelta = 0;
|
||||||
|
|
||||||
public void setExecutor(ActiveMQThreadPoolExecutor executor) {
|
public void setExecutor(ActiveMQThreadPoolExecutor executor) {
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean offer(Runnable runnable) {
|
public boolean offer(Runnable runnable) {
|
||||||
int poolSize = executor.getPoolSize();
|
boolean retval = false;
|
||||||
|
|
||||||
// If the are less threads than the configured maximum, then the tasks is
|
// Need to lock for 2 reasons:
|
||||||
// only queued if there are some idle threads that can run that tasks.
|
// 1. to safely handle poll timeouts
|
||||||
// We have to add the queue size, since some tasks might just have been queued
|
// 2. to protect the delta from parallel updates
|
||||||
// but not yet taken by an idle thread.
|
synchronized (lock) {
|
||||||
if (poolSize < executor.getMaximumPoolSize() && (size() + executor.getActive()) >= poolSize)
|
if ((executor.getPoolSize() >= executor.getMaximumPoolSize()) || (threadTaskDelta > 0)) {
|
||||||
return false;
|
// A new task will be added to the queue if the maximum number of threads has been reached
|
||||||
|
// or if the delta is > 0, which means that there are enough idle threads.
|
||||||
|
|
||||||
return super.offer(runnable);
|
retval = super.offer(runnable);
|
||||||
|
|
||||||
|
// Only decrement the delta if the task has actually been added to the queue
|
||||||
|
if (retval)
|
||||||
|
threadTaskDelta--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return retval;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable take() throws InterruptedException {
|
||||||
|
// Increment the delta as a thread becomes idle
|
||||||
|
// by waiting for a task to take from the queue
|
||||||
|
synchronized (lock) {
|
||||||
|
threadTaskDelta++;
|
||||||
|
}
|
||||||
|
|
||||||
|
Runnable runnable = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
runnable = super.take();
|
||||||
|
return runnable;
|
||||||
|
} finally {
|
||||||
|
// Now the thread is no longer idle waiting for a task
|
||||||
|
// If it had taken a task, the delta remains the same
|
||||||
|
// (decremented by the thread and incremented by the taken task)
|
||||||
|
// Only if no task had been taken, we have to decrement the delta.
|
||||||
|
if (runnable == null) {
|
||||||
|
synchronized (lock) {
|
||||||
|
threadTaskDelta--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable poll(long arg0, TimeUnit arg2) throws InterruptedException {
|
||||||
|
// Increment the delta as a thread becomes idle
|
||||||
|
// by waiting for a task to poll from the queue
|
||||||
|
synchronized (lock) {
|
||||||
|
threadTaskDelta++;
|
||||||
|
}
|
||||||
|
|
||||||
|
Runnable runnable = null;
|
||||||
|
boolean timedOut = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
runnable = super.poll(arg0, arg2);
|
||||||
|
timedOut = (runnable == null);
|
||||||
|
} finally {
|
||||||
|
// Now the thread is no longer idle waiting for a task
|
||||||
|
// If it had taken a task, the delta remains the same
|
||||||
|
// (decremented by the thread and incremented by the taken task)
|
||||||
|
if (runnable == null) {
|
||||||
|
synchronized (lock) {
|
||||||
|
// If the poll called timed out, we check again within a synchronized block
|
||||||
|
// to make sure all offer calls have been completed.
|
||||||
|
// This is to handle a newly queued task if the timeout occurred while an offer call
|
||||||
|
// added that task to the queue instead of creating a new thread.
|
||||||
|
if (timedOut)
|
||||||
|
runnable = super.poll();
|
||||||
|
|
||||||
|
// Only if no task had been taken (either no timeout, or no task from after-timeout poll),
|
||||||
|
// we have to decrement the delta.
|
||||||
|
if (runnable == null)
|
||||||
|
threadTaskDelta--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return runnable;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int maxPoolSize;
|
private int maxPoolSize;
|
||||||
|
|
||||||
// count the active threads with before-/afterExecute, since the .getActiveCount is not very
|
|
||||||
// efficient.
|
|
||||||
private final AtomicInteger active = new AtomicInteger(0);
|
|
||||||
|
|
||||||
public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, TimeUnit keepUnits, ThreadFactory factory) {
|
public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, TimeUnit keepUnits, ThreadFactory factory) {
|
||||||
this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory);
|
this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory);
|
||||||
}
|
}
|
||||||
|
@ -88,10 +166,6 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
myQueue.setExecutor(this);
|
myQueue.setExecutor(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getActive() {
|
|
||||||
return active.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getMaximumPoolSize() {
|
public int getMaximumPoolSize() {
|
||||||
return maxPoolSize;
|
return maxPoolSize;
|
||||||
|
@ -101,16 +175,4 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
public void setMaximumPoolSize(int maxSize) {
|
public void setMaximumPoolSize(int maxSize) {
|
||||||
maxPoolSize = maxSize;
|
maxPoolSize = maxSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void beforeExecute(Thread thread, Runnable runnable) {
|
|
||||||
super.beforeExecute(thread, runnable);
|
|
||||||
active.incrementAndGet();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void afterExecute(Runnable runnable, Throwable throwable) {
|
|
||||||
active.decrementAndGet();
|
|
||||||
super.afterExecute(runnable, throwable);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue