ARTEMIS-1495 Fixing In Handler executor and added benchmark to measure impact of changes

This commit is contained in:
Clebert Suconic 2017-11-08 09:16:59 -05:00
parent 0fadc68ca5
commit 91db08072b
7 changed files with 261 additions and 101 deletions

View File

@ -17,6 +17,8 @@
package org.apache.activemq.artemis.utils.actors;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@ -40,9 +42,15 @@ public interface ArtemisExecutor extends Executor {
/** It will wait the current execution (if there is one) to finish
* but will not complete any further executions */
default void shutdownNow() {
default List<Runnable> shutdownNow() {
return Collections.emptyList();
}
default void shutdown() {
}
/**
* This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
* @return

View File

@ -0,0 +1,47 @@
/*
* Copyright 2005-2014 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.apache.activemq.artemis.utils.actors;
/**
* This abstract class will encapsulate
* ThreadLocals to determine when a class is a handler.
* This is because some functionality has to be avoided if inHandler().
*
*/
public abstract class HandlerBase {
//marker instance used to recognize if a thread is performing a packet handling
private static final Object DUMMY = Boolean.TRUE;
// this cannot be static as the Actor will be used within another executor. For that reason
// each instance will have its own ThreadLocal.
// ... a thread that has its thread-local map populated with DUMMY while performing a handler
private final ThreadLocal<Object> inHandler = new ThreadLocal<>();
protected void enter() {
assert inHandler.get() == null : "should be null";
inHandler.set(DUMMY);
}
public boolean inHandler() {
final Object dummy = inHandler.get();
return dummy != null;
}
protected void leave() {
assert inHandler.get() != null : "marker not set";
inHandler.set(null);
}
}

View File

@ -17,17 +17,24 @@
package org.apache.activemq.artemis.utils.actors;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;
public abstract class ProcessorBase<T> {
import org.jboss.logging.Logger;
private static final int STATE_NOT_RUNNING = 0;
private static final int STATE_RUNNING = 1;
private static final int STATE_FORCED_SHUTDOWN = 2;
public abstract class ProcessorBase<T> extends HandlerBase {
private static final Logger logger = Logger.getLogger(ProcessorBase.class);
public static final int STATE_NOT_RUNNING = 0;
public static final int STATE_RUNNING = 1;
public static final int STATE_FORCED_SHUTDOWN = 2;
protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
@ -41,6 +48,8 @@ public abstract class ProcessorBase<T> {
private volatile boolean requestedShutdown = false;
private volatile boolean started = true;
private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
private final class ExecutorTask implements Runnable {
@ -50,19 +59,23 @@ public abstract class ProcessorBase<T> {
do {
//if there is no thread active and is not already dead then we run
if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
enter();
try {
T task = tasks.poll();
//while the queue is not empty we process in order
while (task != null) {
while (task != null && !requestedShutdown) {
//just drain the tasks if has been requested a shutdown to help the shutdown process
if (!requestedShutdown) {
doTask(task);
if (requestedShutdown) {
tasks.add(task);
break;
}
doTask(task);
task = tasks.poll();
}
} finally {
leave();
//set state back to not running.
stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING);
stateUpdater.compareAndSet(ProcessorBase.this, STATE_RUNNING, STATE_NOT_RUNNING);
}
} else {
return;
@ -75,31 +88,57 @@ public abstract class ProcessorBase<T> {
}
}
/** It will wait the current execution (if there is one) to finish
* but will not complete any further executions */
public void shutdownNow() {
/**
* It will shutdown and wait 30 seconds for timeout.
*/
public void shutdown() {
shutdown(30, TimeUnit.SECONDS);
}
public void shutdown(long timeout, TimeUnit unit) {
started = false;
if (!inHandler()) {
// if it's in handler.. we just return
flush(timeout, unit);
}
}
/**
* It will wait the current execution (if there is one) to finish
* but will not complete any further executions
*/
public List<T> shutdownNow() {
//alert anyone that has been requested (at least) an immediate shutdown
requestedShutdown = true;
//it could take a very long time depending on the current executing task
do {
//alert the ExecutorTask (if is running) to just drain the current backlog of tasks
final int startState = stateUpdater.get(this);
if (startState == STATE_FORCED_SHUTDOWN) {
//another thread has completed a forced shutdown
return;
}
if (startState == STATE_RUNNING) {
//wait 100 ms to avoid burning CPU while waiting and
//give other threads a chance to make progress
LockSupport.parkNanos(100_000_000L);
started = false;
if (inHandler()) {
stateUpdater.set(this, STATE_FORCED_SHUTDOWN);
} else {
//it could take a very long time depending on the current executing task
do {
//alert the ExecutorTask (if is running) to just drain the current backlog of tasks
final int startState = stateUpdater.get(this);
if (startState == STATE_FORCED_SHUTDOWN) {
//another thread has completed a forced shutdown
break;
}
if (startState == STATE_RUNNING) {
//wait 100 ms to avoid burning CPU while waiting and
//give other threads a chance to make progress
LockSupport.parkNanos(100_000_000L);
}
}
while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN));
//this could happen just one time: the forced shutdown state is the last one and
//can be set by just one caller.
//As noted on the execute method there is a small chance that some tasks would be enqueued
}
while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN));
//this could happen just one time: the forced shutdown state is the last one and
//can be set by just one caller.
//As noted on the execute method there is a small chance that some tasks would be enqueued
ArrayList<T> returnList = new ArrayList<>(tasks);
tasks.clear();
//we can report the killed tasks somehow: ExecutorService do the same on shutdownNow
return returnList;
}
protected abstract void doTask(T task);
@ -112,26 +151,48 @@ public abstract class ProcessorBase<T> {
return stateUpdater.get(this) == STATE_NOT_RUNNING;
}
protected void task(T command) {
if (stateUpdater.get(this) != STATE_FORCED_SHUTDOWN) {
//The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
tasks.add(command);
//cache locally the state to avoid multiple volatile loads
final int state = stateUpdater.get(this);
if (state == STATE_FORCED_SHUTDOWN) {
//help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add
tasks.clear();
} else if (state == STATE_NOT_RUNNING) {
//startPoller could be deleted but is maintained because is inherited
delegate.execute(task);
}
/**
* WARNING: This will only flush when all the activity is suspended.
* don't expect success on this call if another thread keeps feeding the queue
* this is only valid on situations where you are not feeding the queue,
* like in shutdown and failover situations.
*/
public final boolean flush(long timeout, TimeUnit unit) {
if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
// quick test, most of the time it will be empty anyways
return true;
}
long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
try {
while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
if (tasks.isEmpty()) {
return true;
}
Thread.sleep(10);
}
} catch (InterruptedException e) {
// ignored
}
return stateUpdater.get(this) == STATE_NOT_RUNNING;
}
protected void startPoller() {
if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
//note that this can result in multiple tasks being queued
//this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored
protected void task(T command) {
if (!started) {
logger.debug("Ordered executor has been shutdown at", new Exception("debug"));
}
//The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
tasks.add(command);
//cache locally the state to avoid multiple volatile loads
final int state = stateUpdater.get(this);
if (state == STATE_FORCED_SHUTDOWN) {
//help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add
tasks.clear();
} else if (state == STATE_NOT_RUNNING) {
//startPoller could be deleted but is maintained because is inherited
delegate.execute(task);
}
}
@ -146,4 +207,8 @@ public abstract class ProcessorBase<T> {
return tasks.size();
}
public final int status() {
return stateUpdater.get(this);
}
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
@ -70,7 +71,7 @@ public class OrderedExecutorSanityTest {
//from now on new tasks won't be executed
final CountDownLatch afterDeatchExecution = new CountDownLatch(1);
executor.execute(afterDeatchExecution::countDown);
Assert.assertFalse("After shutdownNow no new tasks can be executed", afterDeatchExecution.await(1, TimeUnit.SECONDS));
Assert.assertFalse("After shutdownNow no new tasks can be executed", afterDeatchExecution.await(100, TimeUnit.MILLISECONDS));
//to avoid memory leaks the executor must take care of the new submitted tasks immediatly
Assert.assertEquals("Any new task submitted after death must be collected", 0, executor.remaining());
} finally {
@ -78,4 +79,70 @@ public class OrderedExecutorSanityTest {
}
}
@Test
public void shutdownWithin() throws InterruptedException {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
final OrderedExecutor executor = new OrderedExecutor(executorService);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger numberOfTasks = new AtomicInteger(0);
final CountDownLatch ran = new CountDownLatch(1);
executor.execute(() -> {
try {
latch.await(1, TimeUnit.MINUTES);
numberOfTasks.set(executor.shutdownNow().size());
ran.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
for (int i = 0; i < 100; i++) {
executor.execute(() -> System.out.println("Dont worry, this will never happen"));
}
latch.countDown();
ran.await(1, TimeUnit.SECONDS);
Assert.assertEquals(100, numberOfTasks.get());
Assert.assertEquals(ProcessorBase.STATE_FORCED_SHUTDOWN, executor.status());
Assert.assertEquals(0, executor.remaining());
} finally {
executorService.shutdown();
}
}
@Test
public void testMeasure() throws InterruptedException {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
final OrderedExecutor executor = new OrderedExecutor(executorService);
int MAX_LOOP = 1_000_000;
// extend the number for longer numbers
int runs = 10;
for (int i = 0; i < runs; i++) {
long start = System.nanoTime();
final CountDownLatch executed = new CountDownLatch(MAX_LOOP);
for (int l = 0; l < MAX_LOOP; l++) {
executor.execute(executed::countDown);
}
Assert.assertTrue(executed.await(1, TimeUnit.MINUTES));
long end = System.nanoTime();
long elapsed = (end - start);
System.out.println("execution " + i + " in " + TimeUnit.NANOSECONDS.toMillis(elapsed) + " milliseconds");
}
} finally {
executorService.shutdown();
}
}
}

View File

@ -159,11 +159,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private final boolean direct;
//marker instance used to recognize if a thread is performing a packet handling
private static final Object DUMMY = Boolean.TRUE;
//a thread that has its thread-local map populated with DUMMY is performing a packet handling
private static final ThreadLocal<Object> inHandler = new ThreadLocal<>();
public ServerSessionPacketHandler(final ActiveMQServer server,
final CoreProtocolManager manager,
@ -231,26 +226,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
}
private static void onStartMessagePacketHandler() {
assert inHandler.get() == null : "recursion on packet handling is not supported";
inHandler.set(DUMMY);
}
private static boolean inHandler() {
final Object dummy = inHandler.get();
//sanity check: can't exist a thread using a marker different from DUMMY
assert ((dummy != null && dummy == DUMMY) || dummy == null) : "wrong marker";
return dummy != null;
}
private static void onExitMessagePacketHandler() {
assert inHandler.get() != null : "marker not set";
inHandler.set(null);
}
public void closeExecutors() {
packetActor.shutdownNow();
callExecutor.shutdownNow();
packetActor.shutdown();
callExecutor.shutdown();
}
public void close() {
@ -280,33 +258,28 @@ public class ServerSessionPacketHandler implements ChannelHandler {
if (logger.isTraceEnabled()) {
logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
}
onStartMessagePacketHandler();
try {
final byte type = packet.getType();
switch (type) {
case SESS_SEND: {
onSessionSend(packet);
break;
}
case SESS_ACKNOWLEDGE: {
onSessionAcknowledge(packet);
break;
}
case SESS_PRODUCER_REQUEST_CREDITS: {
onSessionRequestProducerCredits(packet);
break;
}
case SESS_FLOWTOKEN: {
onSessionConsumerFlowCredit(packet);
break;
}
default:
// separating a method for everything else as JIT was faster this way
slowPacketHandler(packet);
break;
final byte type = packet.getType();
switch (type) {
case SESS_SEND: {
onSessionSend(packet);
break;
}
} finally {
onExitMessagePacketHandler();
case SESS_ACKNOWLEDGE: {
onSessionAcknowledge(packet);
break;
}
case SESS_PRODUCER_REQUEST_CREDITS: {
onSessionRequestProducerCredits(packet);
break;
}
case SESS_FLOWTOKEN: {
onSessionConsumerFlowCredit(packet);
break;
}
default:
// separating a method for everything else as JIT was faster this way
slowPacketHandler(packet);
break;
}
}

View File

@ -302,7 +302,7 @@ public abstract class ActiveMQTestBase extends Assert {
//clean up pools before failing
if (!exceptions.isEmpty()) {
for (Exception exception : exceptions) {
exception.printStackTrace();
exception.printStackTrace(System.out);
}
fail("Client Session Factories still trying to reconnect, see above to see where created");
}

View File

@ -330,9 +330,9 @@ public class ConsumerTest extends ActiveMQTestBase {
connection.close();
}
assertNull(server.getAddressInfo(SimpleString.toSimpleString("queue")));
assertNull(server.locateQueue(SimpleString.toSimpleString("queue")));
assertEquals(0, server.getTotalMessageCount());
Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString("queue")) == null);
Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString("queue")) == null);
Wait.assertEquals(0, server::getTotalMessageCount);
}
@Test