This closes #1650
This commit is contained in:
commit
ead60d54d0
|
@ -20,12 +20,13 @@ package org.apache.activemq.artemis.utils.actors;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface ArtemisExecutor extends Executor {
|
||||
|
||||
/**
|
||||
* Artemis is supposed to implement this properly, however in tests or tools
|
||||
* this can be used as a fake, doing a sipmle delegate and using the default methods implemented here.
|
||||
* this can be used as a fake, doing a simple delegate and using the default methods implemented here.
|
||||
* @param executor
|
||||
* @return
|
||||
*/
|
||||
|
@ -38,11 +39,36 @@ public interface ArtemisExecutor extends Executor {
|
|||
};
|
||||
}
|
||||
|
||||
default boolean flush() {
|
||||
return flush(30, TimeUnit.SECONDS);
|
||||
/**
|
||||
* It will wait the current execution (if there is one) to finish
|
||||
* but will not complete any further executions.
|
||||
*
|
||||
* @param onPendingTask it will be called for each pending task found
|
||||
* @return the number of pending tasks that won't be executed
|
||||
*/
|
||||
default int shutdownNow(Consumer<? super Runnable> onPendingTask) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
default boolean flush(long timeout, TimeUnit unit) {
|
||||
/**
|
||||
* It will wait the current execution (if there is one) to finish
|
||||
* but will not complete any further executions
|
||||
*/
|
||||
default int shutdownNow() {
|
||||
return shutdownNow(t -> {
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
default void shutdown() {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
|
||||
* @return
|
||||
*/
|
||||
default boolean isFlushed() {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Runnable runnable = new Runnable() {
|
||||
@Override
|
||||
|
@ -52,18 +78,10 @@ public interface ArtemisExecutor extends Executor {
|
|||
};
|
||||
execute(runnable);
|
||||
try {
|
||||
return latch.await(timeout, unit);
|
||||
return latch.await(100, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
|
||||
* @return
|
||||
*/
|
||||
default boolean isFlushed() {
|
||||
return flush(100, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.utils.actors;
|
||||
|
||||
/**
|
||||
* This abstract class will encapsulate
|
||||
* ThreadLocals to determine when a class is a handler.
|
||||
* This is because some functionality has to be avoided if inHandler().
|
||||
*
|
||||
*/
|
||||
public abstract class HandlerBase {
|
||||
|
||||
//marker instance used to recognize if a thread is performing a packet handling
|
||||
private static final Object DUMMY = Boolean.TRUE;
|
||||
|
||||
// this cannot be static as the Actor will be used within another executor. For that reason
|
||||
// each instance will have its own ThreadLocal.
|
||||
// ... a thread that has its thread-local map populated with DUMMY while performing a handler
|
||||
private final ThreadLocal<Object> inHandler = new ThreadLocal<>();
|
||||
|
||||
protected void enter() {
|
||||
assert inHandler.get() == null : "should be null";
|
||||
inHandler.set(DUMMY);
|
||||
}
|
||||
|
||||
public boolean inHandler() {
|
||||
final Object dummy = inHandler.get();
|
||||
return dummy != null;
|
||||
}
|
||||
|
||||
protected void leave() {
|
||||
assert inHandler.get() != null : "marker not set";
|
||||
inHandler.set(null);
|
||||
}
|
||||
|
||||
}
|
|
@ -22,48 +22,124 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public abstract class ProcessorBase<T> {
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
private static final int STATE_NOT_RUNNING = 0;
|
||||
private static final int STATE_RUNNING = 1;
|
||||
public abstract class ProcessorBase<T> extends HandlerBase {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(ProcessorBase.class);
|
||||
public static final int STATE_NOT_RUNNING = 0;
|
||||
public static final int STATE_RUNNING = 1;
|
||||
public static final int STATE_FORCED_SHUTDOWN = 2;
|
||||
|
||||
protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
|
||||
|
||||
private final Executor delegate;
|
||||
|
||||
private final ExecutorTask task = new ExecutorTask();
|
||||
/**
|
||||
* Using a method reference instead of an inner classes allows the caller to reduce the pointer chasing
|
||||
* when accessing ProcessorBase.this fields/methods.
|
||||
*/
|
||||
private final Runnable task = this::executePendingTasks;
|
||||
|
||||
// used by stateUpdater
|
||||
@SuppressWarnings("unused")
|
||||
private volatile int state = 0;
|
||||
private volatile int state = STATE_NOT_RUNNING;
|
||||
// Request of forced shutdown
|
||||
private volatile boolean requestedForcedShutdown = false;
|
||||
// Request of educated shutdown:
|
||||
private volatile boolean requestedShutdown = false;
|
||||
|
||||
private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
|
||||
|
||||
private final class ExecutorTask implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
do {
|
||||
//if there is no thread active then we run
|
||||
if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
|
||||
T task = tasks.poll();
|
||||
//while the queue is not empty we process in order
|
||||
while (task != null) {
|
||||
private void executePendingTasks() {
|
||||
do {
|
||||
//if there is no thread active and is not already dead then we run
|
||||
if (stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_RUNNING)) {
|
||||
enter();
|
||||
try {
|
||||
T task;
|
||||
//while the queue is not empty we process in order:
|
||||
//if requestedForcedShutdown==true than no new tasks will be drained from the tasks q.
|
||||
while (!requestedForcedShutdown && (task = tasks.poll()) != null) {
|
||||
doTask(task);
|
||||
task = tasks.poll();
|
||||
}
|
||||
//set state back to not running.
|
||||
stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING);
|
||||
} else {
|
||||
return;
|
||||
} finally {
|
||||
leave();
|
||||
//set state back to not running if possible: shutdownNow could be called by doTask(task).
|
||||
//If a shutdown has happened there is no need to continue polling tasks
|
||||
if (!stateUpdater.compareAndSet(this, STATE_RUNNING, STATE_NOT_RUNNING)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
//we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
|
||||
//but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
|
||||
//this check fixes the issue
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
while (!tasks.isEmpty());
|
||||
//we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
|
||||
//but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
|
||||
//this check fixes the issue
|
||||
}
|
||||
while (!tasks.isEmpty() && !requestedShutdown);
|
||||
}
|
||||
|
||||
/**
|
||||
* It will shutdown and wait 30 seconds for timeout.
|
||||
*/
|
||||
public void shutdown() {
|
||||
shutdown(30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void shutdown(long timeout, TimeUnit unit) {
|
||||
requestedShutdown = true;
|
||||
|
||||
if (!inHandler()) {
|
||||
// if it's in handler.. we just return
|
||||
flush(timeout, unit);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* It will wait the current execution (if there is one) to finish
|
||||
* but will not complete any further executions
|
||||
*/
|
||||
public int shutdownNow(Consumer<? super T> onPendingItem) {
|
||||
//alert anyone that has been requested (at least) an immediate shutdown
|
||||
requestedForcedShutdown = true;
|
||||
requestedShutdown = true;
|
||||
|
||||
if (inHandler()) {
|
||||
stateUpdater.set(this, STATE_FORCED_SHUTDOWN);
|
||||
} else {
|
||||
//it could take a very long time depending on the current executing task
|
||||
do {
|
||||
//alert the ExecutorTask (if is running) to just drain the current backlog of tasks
|
||||
final int startState = stateUpdater.get(this);
|
||||
if (startState == STATE_FORCED_SHUTDOWN) {
|
||||
//another thread has completed a forced shutdown: let it to manage the tasks cleanup
|
||||
break;
|
||||
}
|
||||
if (startState == STATE_RUNNING) {
|
||||
//wait 100 ms to avoid burning CPU while waiting and
|
||||
//give other threads a chance to make progress
|
||||
LockSupport.parkNanos(100_000_000L);
|
||||
}
|
||||
}
|
||||
while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN));
|
||||
//this could happen just one time: the forced shutdown state is the last one and
|
||||
//can be set by just one caller.
|
||||
//As noted on the execute method there is a small chance that some tasks would be enqueued
|
||||
}
|
||||
int pendingItems = 0;
|
||||
//there is a small chance that execute() could race with this cleanup: the lock allow an all-or-nothing behaviour between them
|
||||
synchronized (tasks) {
|
||||
T item;
|
||||
while ((item = tasks.poll()) != null) {
|
||||
onPendingItem.accept(item);
|
||||
pendingItems++;
|
||||
}
|
||||
}
|
||||
return pendingItems;
|
||||
}
|
||||
|
||||
protected abstract void doTask(T task);
|
||||
|
@ -72,25 +148,25 @@ public abstract class ProcessorBase<T> {
|
|||
this.delegate = parent;
|
||||
}
|
||||
|
||||
public final boolean flush() {
|
||||
return flush(30, TimeUnit.SECONDS);
|
||||
public final boolean isFlushed() {
|
||||
return this.state == STATE_NOT_RUNNING;
|
||||
}
|
||||
|
||||
/**
|
||||
* WARNING: This will only flush when all the activity is suspended.
|
||||
* don't expect success on this call if another thread keeps feeding the queue
|
||||
* this is only valid on situations where you are not feeding the queue,
|
||||
* like in shutdown and failover situations.
|
||||
* */
|
||||
* don't expect success on this call if another thread keeps feeding the queue
|
||||
* this is only valid on situations where you are not feeding the queue,
|
||||
* like in shutdown and failover situations.
|
||||
*/
|
||||
public final boolean flush(long timeout, TimeUnit unit) {
|
||||
if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
|
||||
if (this.state == STATE_NOT_RUNNING) {
|
||||
// quick test, most of the time it will be empty anyways
|
||||
return true;
|
||||
}
|
||||
|
||||
long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
|
||||
try {
|
||||
while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
|
||||
while (this.state == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
|
||||
|
||||
if (tasks.isEmpty()) {
|
||||
return true;
|
||||
|
@ -102,24 +178,58 @@ public abstract class ProcessorBase<T> {
|
|||
// ignored
|
||||
}
|
||||
|
||||
return stateUpdater.get(this) == STATE_NOT_RUNNING;
|
||||
}
|
||||
|
||||
public final boolean isFlushed() {
|
||||
return stateUpdater.get(this) == STATE_NOT_RUNNING;
|
||||
return this.state == STATE_NOT_RUNNING;
|
||||
}
|
||||
|
||||
protected void task(T command) {
|
||||
if (requestedShutdown) {
|
||||
logAddOnShutdown();
|
||||
}
|
||||
//The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
|
||||
tasks.add(command);
|
||||
startPoller();
|
||||
}
|
||||
|
||||
protected void startPoller() {
|
||||
if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
|
||||
//note that this can result in multiple tasks being queued
|
||||
//this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored
|
||||
delegate.execute(task);
|
||||
//cache locally the state to avoid multiple volatile loads
|
||||
final int state = stateUpdater.get(this);
|
||||
if (state != STATE_RUNNING) {
|
||||
onAddedTaskIfNotRunning(state);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This has to be called on the assumption that state!=STATE_RUNNING.
|
||||
* It is packed separately from {@link #task(Object)} just for performance reasons: it
|
||||
* handles the uncommon execution cases for bursty scenarios i.e. the slowest execution path.
|
||||
*/
|
||||
private void onAddedTaskIfNotRunning(int state) {
|
||||
if (state == STATE_NOT_RUNNING) {
|
||||
//startPoller could be deleted but is maintained because is inherited
|
||||
delegate.execute(task);
|
||||
} else if (state == STATE_FORCED_SHUTDOWN) {
|
||||
//help the GC by draining any task just submitted: it helps to cover the case of a shutdownNow finished before tasks.add
|
||||
synchronized (tasks) {
|
||||
tasks.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void logAddOnShutdown() {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Ordered executor has been gently shutdown at", new Exception("debug"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the remaining items to be processed.
|
||||
* <p>
|
||||
* This method is safe to be called by different threads and its accuracy is subject to concurrent modifications.<br>
|
||||
* It is meant to be used only for test purposes, because of its {@code O(n)} cost.
|
||||
*/
|
||||
public final int remaining() {
|
||||
return tasks.size();
|
||||
}
|
||||
|
||||
public final int status() {
|
||||
//avoid using the updater because in older version of JDK 8 isn't optimized as a vanilla volatile get
|
||||
return this.state;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,148 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.utils.actors;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class OrderedExecutorSanityTest {
|
||||
|
||||
@Test
|
||||
public void shouldExecuteTasksInOrder() throws InterruptedException {
|
||||
final int threads = 3;
|
||||
final int tasks = 100;
|
||||
final long timeoutMillis = TimeUnit.SECONDS.toMillis(10);
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(threads);
|
||||
try {
|
||||
final ArtemisExecutor executor = new OrderedExecutor(executorService);
|
||||
//it can be not thread safe too
|
||||
final List<Integer> results = new ArrayList<>(tasks);
|
||||
final List<Integer> expectedResults = new ArrayList<>(tasks);
|
||||
final CountDownLatch executed = new CountDownLatch(tasks);
|
||||
for (int i = 0; i < tasks; i++) {
|
||||
final int value = i;
|
||||
executor.execute(() -> {
|
||||
results.add(value);
|
||||
executed.countDown();
|
||||
});
|
||||
expectedResults.add(value);
|
||||
}
|
||||
Assert.assertTrue("The tasks must be executed in " + timeoutMillis + " ms", executed.await(timeoutMillis, TimeUnit.MILLISECONDS));
|
||||
Assert.assertArrayEquals("The processing of tasks must be ordered", expectedResults.toArray(), results.toArray());
|
||||
} finally {
|
||||
executorService.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldShutdownNowDoNotExecuteFurtherTasks() throws InterruptedException {
|
||||
final long timeoutMillis = TimeUnit.SECONDS.toMillis(10);
|
||||
final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
try {
|
||||
final OrderedExecutor executor = new OrderedExecutor(executorService);
|
||||
final CountDownLatch executed = new CountDownLatch(1);
|
||||
executor.execute(executed::countDown);
|
||||
Assert.assertTrue("The task must be executed in " + timeoutMillis + " ms", executed.await(timeoutMillis, TimeUnit.MILLISECONDS));
|
||||
executor.shutdownNow();
|
||||
Assert.assertEquals("There are no remaining tasks to be executed", 0, executor.remaining());
|
||||
//from now on new tasks won't be executed
|
||||
final CountDownLatch afterDeatchExecution = new CountDownLatch(1);
|
||||
executor.execute(afterDeatchExecution::countDown);
|
||||
Assert.assertFalse("After shutdownNow no new tasks can be executed", afterDeatchExecution.await(100, TimeUnit.MILLISECONDS));
|
||||
//to avoid memory leaks the executor must take care of the new submitted tasks immediatly
|
||||
Assert.assertEquals("Any new task submitted after death must be collected", 0, executor.remaining());
|
||||
} finally {
|
||||
executorService.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void shutdownNowOnDelegateExecutor() throws InterruptedException {
|
||||
final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
try {
|
||||
final OrderedExecutor executor = new OrderedExecutor(executorService);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicInteger numberOfTasks = new AtomicInteger(0);
|
||||
final CountDownLatch ran = new CountDownLatch(1);
|
||||
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
latch.await(1, TimeUnit.MINUTES);
|
||||
numberOfTasks.set(executor.shutdownNow());
|
||||
ran.countDown();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
executor.execute(() -> System.out.println("Dont worry, this will never happen"));
|
||||
}
|
||||
|
||||
latch.countDown();
|
||||
ran.await(1, TimeUnit.SECONDS);
|
||||
Assert.assertEquals(100, numberOfTasks.get());
|
||||
|
||||
Assert.assertEquals(ProcessorBase.STATE_FORCED_SHUTDOWN, executor.status());
|
||||
Assert.assertEquals(0, executor.remaining());
|
||||
} finally {
|
||||
executorService.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMeasure() throws InterruptedException {
|
||||
final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
try {
|
||||
final OrderedExecutor executor = new OrderedExecutor(executorService);
|
||||
int MAX_LOOP = 1_000_000;
|
||||
|
||||
// extend the number for longer numbers
|
||||
int runs = 10;
|
||||
|
||||
for (int i = 0; i < runs; i++) {
|
||||
long start = System.nanoTime();
|
||||
final CountDownLatch executed = new CountDownLatch(MAX_LOOP);
|
||||
for (int l = 0; l < MAX_LOOP; l++) {
|
||||
executor.execute(executed::countDown);
|
||||
}
|
||||
Assert.assertTrue(executed.await(1, TimeUnit.MINUTES));
|
||||
long end = System.nanoTime();
|
||||
|
||||
long elapsed = (end - start);
|
||||
|
||||
System.out.println("execution " + i + " in " + TimeUnit.NANOSECONDS.toMillis(elapsed) + " milliseconds");
|
||||
}
|
||||
} finally {
|
||||
executorService.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -240,7 +240,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
cursor.stop();
|
||||
}
|
||||
|
||||
waitForFuture();
|
||||
executor.shutdownNow();
|
||||
}
|
||||
|
||||
private void waitForFuture() {
|
||||
|
|
|
@ -352,7 +352,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
running = false;
|
||||
|
||||
flushExecutors();
|
||||
executor.shutdownNow();
|
||||
|
||||
if (currentPage != null) {
|
||||
currentPage.close(false);
|
||||
|
|
|
@ -159,11 +159,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
|
||||
private final boolean direct;
|
||||
|
||||
//marker instance used to recognize if a thread is performing a packet handling
|
||||
private static final Object DUMMY = Boolean.TRUE;
|
||||
|
||||
//a thread that has its thread-local map populated with DUMMY is performing a packet handling
|
||||
private static final ThreadLocal<Object> inHandler = new ThreadLocal<>();
|
||||
|
||||
public ServerSessionPacketHandler(final ActiveMQServer server,
|
||||
final CoreProtocolManager manager,
|
||||
|
@ -220,7 +215,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
public void connectionFailed(final ActiveMQException exception, boolean failedOver) {
|
||||
ActiveMQServerLogger.LOGGER.clientConnectionFailed(session.getName());
|
||||
|
||||
flushExecutor();
|
||||
closeExecutors();
|
||||
|
||||
try {
|
||||
session.close(true);
|
||||
|
@ -231,32 +226,13 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
|
||||
}
|
||||
|
||||
private static void onStartMessagePacketHandler() {
|
||||
assert inHandler.get() == null : "recursion on packet handling is not supported";
|
||||
inHandler.set(DUMMY);
|
||||
}
|
||||
|
||||
private static boolean inHandler() {
|
||||
final Object dummy = inHandler.get();
|
||||
//sanity check: can't exist a thread using a marker different from DUMMY
|
||||
assert ((dummy != null && dummy == DUMMY) || dummy == null) : "wrong marker";
|
||||
return dummy != null;
|
||||
}
|
||||
|
||||
private static void onExitMessagePacketHandler() {
|
||||
assert inHandler.get() != null : "marker not set";
|
||||
inHandler.set(null);
|
||||
}
|
||||
|
||||
public void flushExecutor() {
|
||||
if (!inHandler()) {
|
||||
packetActor.flush();
|
||||
callExecutor.flush();
|
||||
}
|
||||
public void closeExecutors() {
|
||||
packetActor.shutdown();
|
||||
callExecutor.shutdown();
|
||||
}
|
||||
|
||||
public void close() {
|
||||
flushExecutor();
|
||||
closeExecutors();
|
||||
|
||||
channel.flushConfirmations();
|
||||
|
||||
|
@ -282,33 +258,28 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
|
||||
}
|
||||
onStartMessagePacketHandler();
|
||||
try {
|
||||
final byte type = packet.getType();
|
||||
switch (type) {
|
||||
case SESS_SEND: {
|
||||
onSessionSend(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_ACKNOWLEDGE: {
|
||||
onSessionAcknowledge(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_PRODUCER_REQUEST_CREDITS: {
|
||||
onSessionRequestProducerCredits(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_FLOWTOKEN: {
|
||||
onSessionConsumerFlowCredit(packet);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
// separating a method for everything else as JIT was faster this way
|
||||
slowPacketHandler(packet);
|
||||
break;
|
||||
final byte type = packet.getType();
|
||||
switch (type) {
|
||||
case SESS_SEND: {
|
||||
onSessionSend(packet);
|
||||
break;
|
||||
}
|
||||
} finally {
|
||||
onExitMessagePacketHandler();
|
||||
case SESS_ACKNOWLEDGE: {
|
||||
onSessionAcknowledge(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_PRODUCER_REQUEST_CREDITS: {
|
||||
onSessionRequestProducerCredits(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_FLOWTOKEN: {
|
||||
onSessionConsumerFlowCredit(packet);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
// separating a method for everything else as JIT was faster this way
|
||||
slowPacketHandler(packet);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -895,8 +866,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
remotingConnection.removeFailureListener((FailureListener) closeListener);
|
||||
}
|
||||
}
|
||||
|
||||
flushExecutor();
|
||||
}
|
||||
|
||||
public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID) {
|
||||
|
|
|
@ -67,7 +67,7 @@ public final class CoreSessionCallback implements SessionCallback {
|
|||
ServerSessionPacketHandler localHandler = handler;
|
||||
if (localHandler != null) {
|
||||
// We wait any pending tasks before we make this as closed
|
||||
localHandler.flushExecutor();
|
||||
localHandler.closeExecutors();
|
||||
}
|
||||
this.handler = null;
|
||||
}
|
||||
|
|
|
@ -257,7 +257,9 @@ public class ManagementServiceImpl implements ManagementService {
|
|||
ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType);
|
||||
unregisterFromJMX(objectName);
|
||||
unregisterFromRegistry(ResourceNames.QUEUE + name);
|
||||
messageCounterManager.unregisterMessageCounter(name.toString());
|
||||
if (messageCounterManager != null) {
|
||||
messageCounterManager.unregisterMessageCounter(name.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -302,7 +302,7 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
//clean up pools before failing
|
||||
if (!exceptions.isEmpty()) {
|
||||
for (Exception exception : exceptions) {
|
||||
exception.printStackTrace();
|
||||
exception.printStackTrace(System.out);
|
||||
}
|
||||
fail("Client Session Factories still trying to reconnect, see above to see where created");
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import javax.jms.BytesMessage;
|
|||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
|
@ -31,9 +32,13 @@ import java.io.Serializable;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLongArray;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||
|
@ -325,9 +330,9 @@ public class ConsumerTest extends ActiveMQTestBase {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
assertNull(server.getAddressInfo(SimpleString.toSimpleString("queue")));
|
||||
assertNull(server.locateQueue(SimpleString.toSimpleString("queue")));
|
||||
assertEquals(0, server.getTotalMessageCount());
|
||||
Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString("queue")) == null);
|
||||
Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString("queue")) == null);
|
||||
Wait.assertEquals(0, server::getTotalMessageCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1074,4 +1079,119 @@ public class ConsumerTest extends ActiveMQTestBase {
|
|||
session.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleConsumersOnSharedQueue() throws Throwable {
|
||||
if (!isNetty() || this.durable) {
|
||||
return;
|
||||
}
|
||||
final boolean durable = false;
|
||||
final long TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
|
||||
final int forks = 100;
|
||||
final int queues = forks;
|
||||
final int runs = 1;
|
||||
final int messages = 1;
|
||||
final ConnectionFactory factorySend = createFactory(1);
|
||||
final AtomicLongArray receivedMessages = new AtomicLongArray(forks);
|
||||
final Thread[] producersRunners = new Thread[forks];
|
||||
final Thread[] consumersRunners = new Thread[forks];
|
||||
//parties are forks (1 producer 1 consumer) + 1 controller in the main test thread
|
||||
final CyclicBarrier onStartRun = new CyclicBarrier((forks * 2) + 1);
|
||||
final CyclicBarrier onFinishRun = new CyclicBarrier((forks * 2) + 1);
|
||||
|
||||
final int messagesSent = forks * messages;
|
||||
final AtomicInteger messagesRecieved = new AtomicInteger(0);
|
||||
|
||||
for (int i = 0; i < forks; i++) {
|
||||
final int forkIndex = i;
|
||||
final String queueName = "q_" + (forkIndex % queues);
|
||||
final Thread producerRunner = new Thread(() -> {
|
||||
try (Connection connection = factorySend.createConnection()) {
|
||||
connection.start();
|
||||
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
|
||||
final javax.jms.Queue queue = session.createQueue(queueName);
|
||||
try (MessageProducer producer = session.createProducer(queue)) {
|
||||
producer.setDeliveryMode(durable ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
|
||||
for (int r = 0; r < runs; r++) {
|
||||
onStartRun.await();
|
||||
for (int m = 0; m < messages; m++) {
|
||||
final BytesMessage bytesMessage = session.createBytesMessage();
|
||||
bytesMessage.writeInt(forkIndex);
|
||||
producer.send(bytesMessage);
|
||||
}
|
||||
onFinishRun.await();
|
||||
}
|
||||
} catch (InterruptedException | BrokenBarrierException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
|
||||
producerRunner.setDaemon(true);
|
||||
|
||||
final Thread consumerRunner = new Thread(() -> {
|
||||
try (Connection connection = factorySend.createConnection()) {
|
||||
connection.start();
|
||||
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
|
||||
final javax.jms.Queue queue = session.createQueue(queueName);
|
||||
try (MessageConsumer consumer = session.createConsumer(queue)) {
|
||||
for (int r = 0; r < runs; r++) {
|
||||
onStartRun.await();
|
||||
while (messagesRecieved.get() != messagesSent) {
|
||||
final BytesMessage receivedMessage = (BytesMessage) consumer.receive(1000);
|
||||
if (receivedMessage != null) {
|
||||
final int receivedConsumerIndex = receivedMessage.readInt();
|
||||
receivedMessages.getAndIncrement(receivedConsumerIndex);
|
||||
messagesRecieved.incrementAndGet();
|
||||
}
|
||||
}
|
||||
onFinishRun.await();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (BrokenBarrierException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
consumerRunner.setDaemon(true);
|
||||
consumersRunners[forkIndex] = consumerRunner;
|
||||
producersRunners[forkIndex] = producerRunner;
|
||||
}
|
||||
Stream.of(consumersRunners).forEach(Thread::start);
|
||||
Stream.of(producersRunners).forEach(Thread::start);
|
||||
final long messagesPerRun = (forks * messages);
|
||||
for (int r = 0; r < runs; r++) {
|
||||
onStartRun.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
|
||||
System.out.println("started run " + r);
|
||||
final long start = System.currentTimeMillis();
|
||||
onFinishRun.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
|
||||
final long elapsedMillis = System.currentTimeMillis() - start;
|
||||
System.out.println((messagesPerRun * 1000L) / elapsedMillis + " msg/sec");
|
||||
}
|
||||
Stream.of(producersRunners).forEach(runner -> {
|
||||
try {
|
||||
runner.join(TIMEOUT_MILLIS * runs);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
Stream.of(producersRunners).forEach(Thread::interrupt);
|
||||
Stream.of(consumersRunners).forEach(runner -> {
|
||||
try {
|
||||
runner.join(TIMEOUT_MILLIS * runs);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
Stream.of(consumersRunners).forEach(Thread::interrupt);
|
||||
for (int i = 0; i < forks; i++) {
|
||||
Assert.assertEquals("The consumer " + i + " must receive all the messages sent.", messages * runs, receivedMessages.get(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
|||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -303,8 +304,9 @@ public class JmsConsumerTest extends JMSTestBase {
|
|||
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
|
||||
conn.close();
|
||||
|
||||
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
|
||||
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
|
||||
Queue queue = server.locateQueue(queueName);
|
||||
Wait.assertEquals(0, queue::getDeliveringCount);
|
||||
Wait.assertEquals(0, queue::getMessageCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -329,8 +331,9 @@ public class JmsConsumerTest extends JMSTestBase {
|
|||
|
||||
// Messages should all have been acked since we set pre ack on the cf
|
||||
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
|
||||
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
|
||||
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
|
||||
Queue queue = server.locateQueue(queueName);
|
||||
Wait.assertEquals(0, queue::getDeliveringCount);
|
||||
Wait.assertEquals(0, queue::getMessageCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue