diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java index 8d19c220ca..73dbf2f4a3 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java @@ -21,11 +21,13 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.locks.LockSupport; public abstract class ProcessorBase { private static final int STATE_NOT_RUNNING = 0; private static final int STATE_RUNNING = 1; + private static final int STATE_FORCED_SHUTDOWN = 2; protected final Queue tasks = new ConcurrentLinkedQueue<>(); @@ -33,12 +35,11 @@ public abstract class ProcessorBase { private final ExecutorTask task = new ExecutorTask(); - private final Object startedGuard = new Object(); - private volatile boolean started = true; - // used by stateUpdater @SuppressWarnings("unused") - private volatile int state = 0; + private volatile int state = STATE_NOT_RUNNING; + + private volatile boolean requestedShutdown = false; private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state"); @@ -47,26 +48,22 @@ public abstract class ProcessorBase { @Override public void run() { do { - //if there is no thread active then we run + //if there is no thread active and is not already dead then we run if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) { - T task = tasks.poll(); - //while the queue is not empty we process in order - - // All we care on started, is that a current task is not running as we call shutdown. - // for that reason this first run doesn't need to be under any lock - while (task != null && started) { - - // Synchronized here is just to guarantee that a current task is finished before - // the started update can be taken as false - synchronized (startedGuard) { - if (started) { + try { + T task = tasks.poll(); + //while the queue is not empty we process in order + while (task != null) { + //just drain the tasks if has been requested a shutdown to help the shutdown process + if (!requestedShutdown) { doTask(task); } + task = tasks.poll(); } - task = tasks.poll(); + } finally { + //set state back to not running. + stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING); } - //set state back to not running. - stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING); } else { return; } @@ -81,10 +78,28 @@ public abstract class ProcessorBase { /** It will wait the current execution (if there is one) to finish * but will not complete any further executions */ public void shutdownNow() { - synchronized (startedGuard) { - started = false; + //alert anyone that has been requested (at least) an immediate shutdown + requestedShutdown = true; + //it could take a very long time depending on the current executing task + do { + //alert the ExecutorTask (if is running) to just drain the current backlog of tasks + final int startState = stateUpdater.get(this); + if (startState == STATE_FORCED_SHUTDOWN) { + //another thread has completed a forced shutdown + return; + } + if (startState == STATE_RUNNING) { + //wait 100 ms to avoid burning CPU while waiting and + //give other threads a chance to make progress + LockSupport.parkNanos(100_000_000L); + } } + while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN)); + //this could happen just one time: the forced shutdown state is the last one and + //can be set by just one caller. + //As noted on the execute method there is a small chance that some tasks would be enqueued tasks.clear(); + //we can report the killed tasks somehow: ExecutorService do the same on shutdownNow } protected abstract void doTask(T task); @@ -98,11 +113,18 @@ public abstract class ProcessorBase { } protected void task(T command) { - // There is no need to verify the lock here. - // you can only turn of running once - if (started) { + if (stateUpdater.get(this) != STATE_FORCED_SHUTDOWN) { + //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks tasks.add(command); - startPoller(); + //cache locally the state to avoid multiple volatile loads + final int state = stateUpdater.get(this); + if (state == STATE_FORCED_SHUTDOWN) { + //help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add + tasks.clear(); + } else if (state == STATE_NOT_RUNNING) { + //startPoller could be deleted but is maintained because is inherited + delegate.execute(task); + } } }