ARTEMIS-1495 Few perf improvements to:

- reduce volatile loads
 - allow method inlining for hot execution paths
 - reduced pointers chasing due to inner classes uses
This commit is contained in:
Francesco Nigro 2017-11-09 11:26:21 +01:00 committed by Clebert Suconic
parent 91db08072b
commit 33b3eb6f09
3 changed files with 99 additions and 65 deletions

View File

@ -17,11 +17,10 @@
package org.apache.activemq.artemis.utils.actors; package org.apache.activemq.artemis.utils.actors;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public interface ArtemisExecutor extends Executor { public interface ArtemisExecutor extends Executor {
@ -40,10 +39,24 @@ public interface ArtemisExecutor extends Executor {
}; };
} }
/** It will wait the current execution (if there is one) to finish /**
* but will not complete any further executions */ * It will wait the current execution (if there is one) to finish
default List<Runnable> shutdownNow() { * but will not complete any further executions.
return Collections.emptyList(); *
* @param onPendingTask it will be called for each pending task found
* @return the number of pending tasks that won't be executed
*/
default int shutdownNow(Consumer<? super Runnable> onPendingTask) {
return 0;
}
/**
* It will wait the current execution (if there is one) to finish
* but will not complete any further executions
*/
default int shutdownNow() {
return shutdownNow(t -> {
});
} }

View File

@ -17,21 +17,19 @@
package org.apache.activemq.artemis.utils.actors; package org.apache.activemq.artemis.utils.actors;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
public abstract class ProcessorBase<T> extends HandlerBase { public abstract class ProcessorBase<T> extends HandlerBase {
private static final Logger logger = Logger.getLogger(ProcessorBase.class); private static final Logger logger = Logger.getLogger(ProcessorBase.class);
public static final int STATE_NOT_RUNNING = 0; public static final int STATE_NOT_RUNNING = 0;
public static final int STATE_RUNNING = 1; public static final int STATE_RUNNING = 1;
public static final int STATE_FORCED_SHUTDOWN = 2; public static final int STATE_FORCED_SHUTDOWN = 2;
@ -39,53 +37,50 @@ public abstract class ProcessorBase<T> extends HandlerBase {
protected final Queue<T> tasks = new ConcurrentLinkedQueue<>(); protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
private final Executor delegate; private final Executor delegate;
/**
private final ExecutorTask task = new ExecutorTask(); * Using a method reference instead of an inner classes allows the caller to reduce the pointer chasing
* when accessing ProcessorBase.this fields/methods.
*/
private final Runnable task = this::executePendingTasks;
// used by stateUpdater // used by stateUpdater
@SuppressWarnings("unused") @SuppressWarnings("unused")
private volatile int state = STATE_NOT_RUNNING; private volatile int state = STATE_NOT_RUNNING;
// Request of forced shutdown
private volatile boolean requestedForcedShutdown = false;
// Request of educated shutdown:
private volatile boolean requestedShutdown = false; private volatile boolean requestedShutdown = false;
private volatile boolean started = true;
private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state"); private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
private final class ExecutorTask implements Runnable { private void executePendingTasks() {
do {
@Override //if there is no thread active and is not already dead then we run
public void run() { if (stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_RUNNING)) {
do { enter();
//if there is no thread active and is not already dead then we run try {
if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) { T task;
enter(); //while the queue is not empty we process in order:
try { //if requestedForcedShutdown==true than no new tasks will be drained from the tasks q.
T task = tasks.poll(); while (!requestedForcedShutdown && (task = tasks.poll()) != null) {
//while the queue is not empty we process in order doTask(task);
while (task != null && !requestedShutdown) { }
//just drain the tasks if has been requested a shutdown to help the shutdown process } finally {
if (requestedShutdown) { leave();
tasks.add(task); //set state back to not running if possible: shutdownNow could be called by doTask(task).
break; //If a shutdown has happened there is no need to continue polling tasks
} if (!stateUpdater.compareAndSet(this, STATE_RUNNING, STATE_NOT_RUNNING)) {
doTask(task); return;
task = tasks.poll();
}
} finally {
leave();
//set state back to not running.
stateUpdater.compareAndSet(ProcessorBase.this, STATE_RUNNING, STATE_NOT_RUNNING);
} }
} else {
return;
} }
//we loop again based on tasks not being empty. Otherwise there is a window where the state is running, } else {
//but poll() has returned null, so a submitting thread will believe that it does not need re-execute. return;
//this check fixes the issue
} }
while (!tasks.isEmpty()); //we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
//but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
//this check fixes the issue
} }
while (!tasks.isEmpty() && !requestedShutdown);
} }
/** /**
@ -96,7 +91,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
} }
public void shutdown(long timeout, TimeUnit unit) { public void shutdown(long timeout, TimeUnit unit) {
started = false; requestedShutdown = true;
if (!inHandler()) { if (!inHandler()) {
// if it's in handler.. we just return // if it's in handler.. we just return
@ -108,10 +103,10 @@ public abstract class ProcessorBase<T> extends HandlerBase {
* It will wait the current execution (if there is one) to finish * It will wait the current execution (if there is one) to finish
* but will not complete any further executions * but will not complete any further executions
*/ */
public List<T> shutdownNow() { public int shutdownNow(Consumer<? super T> onPendingItem) {
//alert anyone that has been requested (at least) an immediate shutdown //alert anyone that has been requested (at least) an immediate shutdown
requestedForcedShutdown = true;
requestedShutdown = true; requestedShutdown = true;
started = false;
if (inHandler()) { if (inHandler()) {
stateUpdater.set(this, STATE_FORCED_SHUTDOWN); stateUpdater.set(this, STATE_FORCED_SHUTDOWN);
@ -121,7 +116,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
//alert the ExecutorTask (if is running) to just drain the current backlog of tasks //alert the ExecutorTask (if is running) to just drain the current backlog of tasks
final int startState = stateUpdater.get(this); final int startState = stateUpdater.get(this);
if (startState == STATE_FORCED_SHUTDOWN) { if (startState == STATE_FORCED_SHUTDOWN) {
//another thread has completed a forced shutdown //another thread has completed a forced shutdown: let it to manage the tasks cleanup
break; break;
} }
if (startState == STATE_RUNNING) { if (startState == STATE_RUNNING) {
@ -135,10 +130,16 @@ public abstract class ProcessorBase<T> extends HandlerBase {
//can be set by just one caller. //can be set by just one caller.
//As noted on the execute method there is a small chance that some tasks would be enqueued //As noted on the execute method there is a small chance that some tasks would be enqueued
} }
ArrayList<T> returnList = new ArrayList<>(tasks); int pendingItems = 0;
tasks.clear(); //there is a small chance that execute() could race with this cleanup: the lock allow an all-or-nothing behaviour between them
synchronized (tasks) {
return returnList; T item;
while ((item = tasks.poll()) != null) {
onPendingItem.accept(item);
pendingItems++;
}
}
return pendingItems;
} }
protected abstract void doTask(T task); protected abstract void doTask(T task);
@ -148,7 +149,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
} }
public final boolean isFlushed() { public final boolean isFlushed() {
return stateUpdater.get(this) == STATE_NOT_RUNNING; return this.state == STATE_NOT_RUNNING;
} }
/** /**
@ -158,14 +159,14 @@ public abstract class ProcessorBase<T> extends HandlerBase {
* like in shutdown and failover situations. * like in shutdown and failover situations.
*/ */
public final boolean flush(long timeout, TimeUnit unit) { public final boolean flush(long timeout, TimeUnit unit) {
if (stateUpdater.get(this) == STATE_NOT_RUNNING) { if (this.state == STATE_NOT_RUNNING) {
// quick test, most of the time it will be empty anyways // quick test, most of the time it will be empty anyways
return true; return true;
} }
long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout); long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
try { try {
while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) { while (this.state == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
if (tasks.isEmpty()) { if (tasks.isEmpty()) {
return true; return true;
@ -177,23 +178,42 @@ public abstract class ProcessorBase<T> extends HandlerBase {
// ignored // ignored
} }
return stateUpdater.get(this) == STATE_NOT_RUNNING; return this.state == STATE_NOT_RUNNING;
} }
protected void task(T command) { protected void task(T command) {
if (!started) { if (requestedShutdown) {
logger.debug("Ordered executor has been shutdown at", new Exception("debug")); logAddOnShutdown();
} }
//The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
tasks.add(command); tasks.add(command);
//cache locally the state to avoid multiple volatile loads //cache locally the state to avoid multiple volatile loads
final int state = stateUpdater.get(this); final int state = stateUpdater.get(this);
if (state == STATE_FORCED_SHUTDOWN) { if (state != STATE_RUNNING) {
//help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add onAddedTaskIfNotRunning(state);
tasks.clear(); }
} else if (state == STATE_NOT_RUNNING) { }
/**
* This has to be called on the assumption that state!=STATE_RUNNING.
* It is packed separately from {@link #task(Object)} just for performance reasons: it
* handles the uncommon execution cases for bursty scenarios i.e. the slowest execution path.
*/
private void onAddedTaskIfNotRunning(int state) {
if (state == STATE_NOT_RUNNING) {
//startPoller could be deleted but is maintained because is inherited //startPoller could be deleted but is maintained because is inherited
delegate.execute(task); delegate.execute(task);
} else if (state == STATE_FORCED_SHUTDOWN) {
//help the GC by draining any task just submitted: it helps to cover the case of a shutdownNow finished before tasks.add
synchronized (tasks) {
tasks.clear();
}
}
}
private static void logAddOnShutdown() {
if (logger.isDebugEnabled()) {
logger.debug("Ordered executor has been gently shutdown at", new Exception("debug"));
} }
} }
@ -208,7 +228,8 @@ public abstract class ProcessorBase<T> extends HandlerBase {
} }
public final int status() { public final int status() {
return stateUpdater.get(this); //avoid using the updater because in older version of JDK 8 isn't optimized as a vanilla volatile get
return this.state;
} }
} }

View File

@ -82,7 +82,7 @@ public class OrderedExecutorSanityTest {
@Test @Test
public void shutdownWithin() throws InterruptedException { public void shutdownNowOnDelegateExecutor() throws InterruptedException {
final ExecutorService executorService = Executors.newSingleThreadExecutor(); final ExecutorService executorService = Executors.newSingleThreadExecutor();
try { try {
final OrderedExecutor executor = new OrderedExecutor(executorService); final OrderedExecutor executor = new OrderedExecutor(executorService);
@ -93,7 +93,7 @@ public class OrderedExecutorSanityTest {
executor.execute(() -> { executor.execute(() -> {
try { try {
latch.await(1, TimeUnit.MINUTES); latch.await(1, TimeUnit.MINUTES);
numberOfTasks.set(executor.shutdownNow().size()); numberOfTasks.set(executor.shutdownNow());
ran.countDown(); ran.countDown();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();