ARTEMIS-3928 Fixing yield and shutdownNow

org.apache.activemq.artemis.tests.integration.jms.client.ReceiveNoWaitTest.testReceiveNoWait was failing because before this change
This commit is contained in:
Clebert Suconic 2022-08-19 15:55:08 -04:00
parent c96243698d
commit 9acd036dcc
1 changed files with 19 additions and 19 deletions

View File

@ -40,19 +40,17 @@ public abstract class ProcessorBase<T> extends HandlerBase {
* Using a method reference instead of an inner classes allows the caller to reduce the pointer chasing
* when accessing ProcessorBase.this fields/methods.
*/
private final Runnable mainTask = this::executePendingTasks;
private final Runnable task = this::executePendingTasks;
// used by stateUpdater
@SuppressWarnings("unused")
private volatile int state = STATE_NOT_RUNNING;
private enum request {
keepRunning,
shutdown,
yield
}
private volatile request loopRequest = request.keepRunning;
// Request of forced shutdown
private volatile boolean requestedForcedShutdown = false;
// Request of educated shutdown:
private volatile boolean requestedShutdown = false;
// Request to yield to another thread
private volatile boolean yielded = false;
private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
@ -65,7 +63,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
T task;
//while the queue is not empty we process in order:
//if requestedForcedShutdown==true than no new tasks will be drained from the tasks q.
while (loopRequest == request.keepRunning && (task = tasks.poll()) != null) {
while (!yielded && !requestedForcedShutdown && (task = tasks.poll()) != null) {
doTask(task);
}
} finally {
@ -83,11 +81,11 @@ public abstract class ProcessorBase<T> extends HandlerBase {
//but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
//this check fixes the issue
}
while (!tasks.isEmpty() && loopRequest == request.keepRunning);
while (!tasks.isEmpty() && !requestedShutdown && !yielded);
if (loopRequest == request.yield) {
loopRequest = request.keepRunning;
delegate.execute(mainTask);
if (yielded) {
yielded = false;
delegate.execute(task);
}
}
@ -99,7 +97,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
}
public void shutdown(long timeout, TimeUnit unit) {
loopRequest = request.shutdown;
requestedShutdown = true;
if (!inHandler()) {
// if it's in handler.. we just return
@ -108,13 +106,15 @@ public abstract class ProcessorBase<T> extends HandlerBase {
}
public void yield() {
this.loopRequest = request.yield;
this.yielded = true;
}
/** It will shutdown the executor however it will not wait for finishing tasks*/
public int shutdownNow(Consumer<? super T> onPendingItem, int timeout, TimeUnit unit) {
//alert anyone that has been requested (at least) an immediate shutdown
loopRequest = request.shutdown;
requestedForcedShutdown = true;
requestedShutdown = true;
yielded = false;
if (!inHandler()) {
// We don't have an option where we could do an immediate timeout
@ -174,7 +174,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
}
protected void task(T command) {
if (loopRequest == request.shutdown) {
if (requestedShutdown) {
logAddOnShutdown();
return;
}
@ -195,7 +195,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
private void onAddedTaskIfNotRunning(int state) {
if (state == STATE_NOT_RUNNING) {
//startPoller could be deleted but is maintained because is inherited
delegate.execute(mainTask);
delegate.execute(task);
}
}