ARTEMIS-1495 Lock-free ProcessorBase::shutdownNow

This commit is contained in:
Francesco Nigro 2017-11-08 12:03:49 +01:00 committed by Clebert Suconic
parent 3c5b57f1e9
commit 0fadc68ca5
1 changed files with 47 additions and 25 deletions

View File

@ -21,11 +21,13 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;
public abstract class ProcessorBase<T> { public abstract class ProcessorBase<T> {
private static final int STATE_NOT_RUNNING = 0; private static final int STATE_NOT_RUNNING = 0;
private static final int STATE_RUNNING = 1; private static final int STATE_RUNNING = 1;
private static final int STATE_FORCED_SHUTDOWN = 2;
protected final Queue<T> tasks = new ConcurrentLinkedQueue<>(); protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
@ -33,12 +35,11 @@ public abstract class ProcessorBase<T> {
private final ExecutorTask task = new ExecutorTask(); private final ExecutorTask task = new ExecutorTask();
private final Object startedGuard = new Object();
private volatile boolean started = true;
// used by stateUpdater // used by stateUpdater
@SuppressWarnings("unused") @SuppressWarnings("unused")
private volatile int state = 0; private volatile int state = STATE_NOT_RUNNING;
private volatile boolean requestedShutdown = false;
private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state"); private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
@ -47,26 +48,22 @@ public abstract class ProcessorBase<T> {
@Override @Override
public void run() { public void run() {
do { do {
//if there is no thread active then we run //if there is no thread active and is not already dead then we run
if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) { if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
try {
T task = tasks.poll(); T task = tasks.poll();
//while the queue is not empty we process in order //while the queue is not empty we process in order
while (task != null) {
// All we care on started, is that a current task is not running as we call shutdown. //just drain the tasks if has been requested a shutdown to help the shutdown process
// for that reason this first run doesn't need to be under any lock if (!requestedShutdown) {
while (task != null && started) {
// Synchronized here is just to guarantee that a current task is finished before
// the started update can be taken as false
synchronized (startedGuard) {
if (started) {
doTask(task); doTask(task);
} }
}
task = tasks.poll(); task = tasks.poll();
} }
} finally {
//set state back to not running. //set state back to not running.
stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING); stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING);
}
} else { } else {
return; return;
} }
@ -81,10 +78,28 @@ public abstract class ProcessorBase<T> {
/** It will wait the current execution (if there is one) to finish /** It will wait the current execution (if there is one) to finish
* but will not complete any further executions */ * but will not complete any further executions */
public void shutdownNow() { public void shutdownNow() {
synchronized (startedGuard) { //alert anyone that has been requested (at least) an immediate shutdown
started = false; requestedShutdown = true;
//it could take a very long time depending on the current executing task
do {
//alert the ExecutorTask (if is running) to just drain the current backlog of tasks
final int startState = stateUpdater.get(this);
if (startState == STATE_FORCED_SHUTDOWN) {
//another thread has completed a forced shutdown
return;
} }
if (startState == STATE_RUNNING) {
//wait 100 ms to avoid burning CPU while waiting and
//give other threads a chance to make progress
LockSupport.parkNanos(100_000_000L);
}
}
while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN));
//this could happen just one time: the forced shutdown state is the last one and
//can be set by just one caller.
//As noted on the execute method there is a small chance that some tasks would be enqueued
tasks.clear(); tasks.clear();
//we can report the killed tasks somehow: ExecutorService do the same on shutdownNow
} }
protected abstract void doTask(T task); protected abstract void doTask(T task);
@ -98,11 +113,18 @@ public abstract class ProcessorBase<T> {
} }
protected void task(T command) { protected void task(T command) {
// There is no need to verify the lock here. if (stateUpdater.get(this) != STATE_FORCED_SHUTDOWN) {
// you can only turn of running once //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
if (started) {
tasks.add(command); tasks.add(command);
startPoller(); //cache locally the state to avoid multiple volatile loads
final int state = stateUpdater.get(this);
if (state == STATE_FORCED_SHUTDOWN) {
//help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add
tasks.clear();
} else if (state == STATE_NOT_RUNNING) {
//startPoller could be deleted but is maintained because is inherited
delegate.execute(task);
}
} }
} }