From 33b3eb6f095da4a21648c268c7a960e55f414ca3 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Thu, 9 Nov 2017 11:26:21 +0100 Subject: [PATCH] ARTEMIS-1495 Few perf improvements to: - reduce volatile loads - allow method inlining for hot execution paths - reduced pointers chasing due to inner classes uses --- .../artemis/utils/actors/ArtemisExecutor.java | 25 +++- .../artemis/utils/actors/ProcessorBase.java | 135 ++++++++++-------- .../actors/OrderedExecutorSanityTest.java | 4 +- 3 files changed, 99 insertions(+), 65 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java index 8efb3d3530..9903d658ad 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java @@ -17,11 +17,10 @@ package org.apache.activemq.artemis.utils.actors; -import java.util.Collections; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; public interface ArtemisExecutor extends Executor { @@ -40,10 +39,24 @@ public interface ArtemisExecutor extends Executor { }; } - /** It will wait the current execution (if there is one) to finish - * but will not complete any further executions */ - default List shutdownNow() { - return Collections.emptyList(); + /** + * It will wait the current execution (if there is one) to finish + * but will not complete any further executions. + * + * @param onPendingTask it will be called for each pending task found + * @return the number of pending tasks that won't be executed + */ + default int shutdownNow(Consumer onPendingTask) { + return 0; + } + + /** + * It will wait the current execution (if there is one) to finish + * but will not complete any further executions + */ + default int shutdownNow() { + return shutdownNow(t -> { + }); } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java index 1c77a522cc..ff6d9a1db5 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java @@ -17,21 +17,19 @@ package org.apache.activemq.artemis.utils.actors; -import java.util.ArrayList; -import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.LockSupport; +import java.util.function.Consumer; import org.jboss.logging.Logger; public abstract class ProcessorBase extends HandlerBase { private static final Logger logger = Logger.getLogger(ProcessorBase.class); - public static final int STATE_NOT_RUNNING = 0; public static final int STATE_RUNNING = 1; public static final int STATE_FORCED_SHUTDOWN = 2; @@ -39,53 +37,50 @@ public abstract class ProcessorBase extends HandlerBase { protected final Queue tasks = new ConcurrentLinkedQueue<>(); private final Executor delegate; - - private final ExecutorTask task = new ExecutorTask(); + /** + * Using a method reference instead of an inner classes allows the caller to reduce the pointer chasing + * when accessing ProcessorBase.this fields/methods. + */ + private final Runnable task = this::executePendingTasks; // used by stateUpdater @SuppressWarnings("unused") private volatile int state = STATE_NOT_RUNNING; - + // Request of forced shutdown + private volatile boolean requestedForcedShutdown = false; + // Request of educated shutdown: private volatile boolean requestedShutdown = false; - private volatile boolean started = true; - private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state"); - private final class ExecutorTask implements Runnable { - - @Override - public void run() { - do { - //if there is no thread active and is not already dead then we run - if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) { - enter(); - try { - T task = tasks.poll(); - //while the queue is not empty we process in order - while (task != null && !requestedShutdown) { - //just drain the tasks if has been requested a shutdown to help the shutdown process - if (requestedShutdown) { - tasks.add(task); - break; - } - doTask(task); - task = tasks.poll(); - } - } finally { - leave(); - //set state back to not running. - stateUpdater.compareAndSet(ProcessorBase.this, STATE_RUNNING, STATE_NOT_RUNNING); + private void executePendingTasks() { + do { + //if there is no thread active and is not already dead then we run + if (stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_RUNNING)) { + enter(); + try { + T task; + //while the queue is not empty we process in order: + //if requestedForcedShutdown==true than no new tasks will be drained from the tasks q. + while (!requestedForcedShutdown && (task = tasks.poll()) != null) { + doTask(task); + } + } finally { + leave(); + //set state back to not running if possible: shutdownNow could be called by doTask(task). + //If a shutdown has happened there is no need to continue polling tasks + if (!stateUpdater.compareAndSet(this, STATE_RUNNING, STATE_NOT_RUNNING)) { + return; } - } else { - return; } - //we loop again based on tasks not being empty. Otherwise there is a window where the state is running, - //but poll() has returned null, so a submitting thread will believe that it does not need re-execute. - //this check fixes the issue + } else { + return; } - while (!tasks.isEmpty()); + //we loop again based on tasks not being empty. Otherwise there is a window where the state is running, + //but poll() has returned null, so a submitting thread will believe that it does not need re-execute. + //this check fixes the issue } + while (!tasks.isEmpty() && !requestedShutdown); } /** @@ -96,7 +91,7 @@ public abstract class ProcessorBase extends HandlerBase { } public void shutdown(long timeout, TimeUnit unit) { - started = false; + requestedShutdown = true; if (!inHandler()) { // if it's in handler.. we just return @@ -108,10 +103,10 @@ public abstract class ProcessorBase extends HandlerBase { * It will wait the current execution (if there is one) to finish * but will not complete any further executions */ - public List shutdownNow() { + public int shutdownNow(Consumer onPendingItem) { //alert anyone that has been requested (at least) an immediate shutdown + requestedForcedShutdown = true; requestedShutdown = true; - started = false; if (inHandler()) { stateUpdater.set(this, STATE_FORCED_SHUTDOWN); @@ -121,7 +116,7 @@ public abstract class ProcessorBase extends HandlerBase { //alert the ExecutorTask (if is running) to just drain the current backlog of tasks final int startState = stateUpdater.get(this); if (startState == STATE_FORCED_SHUTDOWN) { - //another thread has completed a forced shutdown + //another thread has completed a forced shutdown: let it to manage the tasks cleanup break; } if (startState == STATE_RUNNING) { @@ -135,10 +130,16 @@ public abstract class ProcessorBase extends HandlerBase { //can be set by just one caller. //As noted on the execute method there is a small chance that some tasks would be enqueued } - ArrayList returnList = new ArrayList<>(tasks); - tasks.clear(); - - return returnList; + int pendingItems = 0; + //there is a small chance that execute() could race with this cleanup: the lock allow an all-or-nothing behaviour between them + synchronized (tasks) { + T item; + while ((item = tasks.poll()) != null) { + onPendingItem.accept(item); + pendingItems++; + } + } + return pendingItems; } protected abstract void doTask(T task); @@ -148,7 +149,7 @@ public abstract class ProcessorBase extends HandlerBase { } public final boolean isFlushed() { - return stateUpdater.get(this) == STATE_NOT_RUNNING; + return this.state == STATE_NOT_RUNNING; } /** @@ -158,14 +159,14 @@ public abstract class ProcessorBase extends HandlerBase { * like in shutdown and failover situations. */ public final boolean flush(long timeout, TimeUnit unit) { - if (stateUpdater.get(this) == STATE_NOT_RUNNING) { + if (this.state == STATE_NOT_RUNNING) { // quick test, most of the time it will be empty anyways return true; } long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout); try { - while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) { + while (this.state == STATE_RUNNING && timeLimit > System.currentTimeMillis()) { if (tasks.isEmpty()) { return true; @@ -177,23 +178,42 @@ public abstract class ProcessorBase extends HandlerBase { // ignored } - return stateUpdater.get(this) == STATE_NOT_RUNNING; + return this.state == STATE_NOT_RUNNING; } protected void task(T command) { - if (!started) { - logger.debug("Ordered executor has been shutdown at", new Exception("debug")); + if (requestedShutdown) { + logAddOnShutdown(); } //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks tasks.add(command); //cache locally the state to avoid multiple volatile loads final int state = stateUpdater.get(this); - if (state == STATE_FORCED_SHUTDOWN) { - //help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add - tasks.clear(); - } else if (state == STATE_NOT_RUNNING) { + if (state != STATE_RUNNING) { + onAddedTaskIfNotRunning(state); + } + } + + /** + * This has to be called on the assumption that state!=STATE_RUNNING. + * It is packed separately from {@link #task(Object)} just for performance reasons: it + * handles the uncommon execution cases for bursty scenarios i.e. the slowest execution path. + */ + private void onAddedTaskIfNotRunning(int state) { + if (state == STATE_NOT_RUNNING) { //startPoller could be deleted but is maintained because is inherited delegate.execute(task); + } else if (state == STATE_FORCED_SHUTDOWN) { + //help the GC by draining any task just submitted: it helps to cover the case of a shutdownNow finished before tasks.add + synchronized (tasks) { + tasks.clear(); + } + } + } + + private static void logAddOnShutdown() { + if (logger.isDebugEnabled()) { + logger.debug("Ordered executor has been gently shutdown at", new Exception("debug")); } } @@ -208,7 +228,8 @@ public abstract class ProcessorBase extends HandlerBase { } public final int status() { - return stateUpdater.get(this); + //avoid using the updater because in older version of JDK 8 isn't optimized as a vanilla volatile get + return this.state; } } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java index 4e2bbbadb4..345cbb5009 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java @@ -82,7 +82,7 @@ public class OrderedExecutorSanityTest { @Test - public void shutdownWithin() throws InterruptedException { + public void shutdownNowOnDelegateExecutor() throws InterruptedException { final ExecutorService executorService = Executors.newSingleThreadExecutor(); try { final OrderedExecutor executor = new OrderedExecutor(executorService); @@ -93,7 +93,7 @@ public class OrderedExecutorSanityTest { executor.execute(() -> { try { latch.await(1, TimeUnit.MINUTES); - numberOfTasks.set(executor.shutdownNow().size()); + numberOfTasks.set(executor.shutdownNow()); ran.countDown(); } catch (Exception e) { e.printStackTrace();