Change to a lock free ordered executor
This commit is contained in:
parent
90c9469701
commit
631c2fa780
|
@ -16,8 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.utils;
|
package org.apache.activemq.artemis.utils;
|
||||||
|
|
||||||
|
import java.util.Queue;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||||
|
@ -54,78 +56,70 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
|
||||||
* More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
|
* More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
|
||||||
* same method, will result in B's task running after A's.
|
* same method, will result in B's task running after A's.
|
||||||
*/
|
*/
|
||||||
private static final class OrderedExecutor implements Executor {
|
private static class OrderedExecutor implements Executor {
|
||||||
|
|
||||||
private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();
|
private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
|
||||||
|
private final Executor delegate;
|
||||||
|
private final ExecutorTask task = new ExecutorTask();
|
||||||
|
|
||||||
// @protected by tasks
|
private static final AtomicIntegerFieldUpdater<OrderedExecutor> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutor.class, "state");
|
||||||
private boolean running;
|
|
||||||
|
|
||||||
private final Executor parent;
|
private static final int STATE_NOT_RUNNING = 0;
|
||||||
|
private static final int STATE_RUNNING = 1;
|
||||||
|
|
||||||
private final Runnable runner;
|
public OrderedExecutor(Executor delegate) {
|
||||||
|
this.delegate = delegate;
|
||||||
/**
|
|
||||||
* Construct a new instance.
|
|
||||||
*
|
|
||||||
* @param parent the parent executor
|
|
||||||
*/
|
|
||||||
public OrderedExecutor(final Executor parent) {
|
|
||||||
this.parent = parent;
|
|
||||||
runner = new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
for (;;) {
|
|
||||||
// Optimization, first try without any locks
|
|
||||||
Runnable task = tasks.poll();
|
|
||||||
if (task == null) {
|
|
||||||
synchronized (tasks) {
|
|
||||||
// if it's null we need to retry now holding the lock on tasks
|
|
||||||
// this is because running=false and tasks.empty must be an atomic operation
|
|
||||||
// so we have to retry before setting the tasks to false
|
|
||||||
// this is a different approach to the anti-pattern on synchronize-retry,
|
|
||||||
// as this is just guaranteeing the running=false and tasks.empty being an atomic operation
|
|
||||||
task = tasks.poll();
|
|
||||||
if (task == null) {
|
|
||||||
running = false;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
task.run();
|
|
||||||
}
|
|
||||||
catch (ActiveMQInterruptedException e) {
|
|
||||||
// This could happen during shutdowns. Nothing to be concerned about here
|
|
||||||
ActiveMQClientLogger.LOGGER.debug("Interrupted Thread", e);
|
|
||||||
}
|
|
||||||
catch (Throwable t) {
|
|
||||||
ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Run a task.
|
|
||||||
*
|
|
||||||
* @param command the task to run.
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(final Runnable command) {
|
public void execute(Runnable command) {
|
||||||
synchronized (tasks) {
|
tasks.add(command);
|
||||||
tasks.add(command);
|
if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
|
||||||
if (!running) {
|
//note that this can result in multiple tasks being queued
|
||||||
running = true;
|
//this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored
|
||||||
parent.execute(runner);
|
delegate.execute(task);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class ExecutorTask implements Runnable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
do {
|
||||||
|
//if there is no thread active then we run
|
||||||
|
if (stateUpdater.compareAndSet(OrderedExecutor.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
|
||||||
|
Runnable task = tasks.poll();
|
||||||
|
//while the queue is not empty we process in order
|
||||||
|
while (task != null) {
|
||||||
|
try {
|
||||||
|
task.run();
|
||||||
|
}
|
||||||
|
catch (ActiveMQInterruptedException e) {
|
||||||
|
// This could happen during shutdowns. Nothing to be concerned about here
|
||||||
|
ActiveMQClientLogger.LOGGER.debug("Interrupted Thread", e);
|
||||||
|
}
|
||||||
|
catch (Throwable t) {
|
||||||
|
ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t);
|
||||||
|
}
|
||||||
|
task = tasks.poll();
|
||||||
|
}
|
||||||
|
//set state back to not running.
|
||||||
|
stateUpdater.set(OrderedExecutor.this, STATE_NOT_RUNNING);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
//we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
|
||||||
|
//but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
|
||||||
|
//this check fixes the issue
|
||||||
|
} while (!tasks.isEmpty());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "OrderedExecutor(running=" + running + ", tasks=" + tasks + ")";
|
return "OrderedExecutor(tasks=" + tasks + ")";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue