diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java index fcd197d143..dbc0776c39 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java @@ -91,6 +91,11 @@ public abstract class ProcessorBase { long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout); try { while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) { + + if (tasks.isEmpty()) { + return true; + } + Thread.sleep(10); } } catch (InterruptedException e) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 87ac615ccc..982bc88d9a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -158,6 +159,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { private final boolean direct; + private static final ThreadLocal inHandler = ThreadLocal.withInitial(AtomicBoolean::new); + public ServerSessionPacketHandler(final ActiveMQServer server, final CoreProtocolManager manager, final ServerSession session, @@ -225,8 +228,10 @@ public class ServerSessionPacketHandler implements ChannelHandler { } public void flushExecutor() { - packetActor.flush(); - callExecutor.flush(); + if (!inHandler.get().get()) { + packetActor.flush(); + callExecutor.flush(); + } } public void close() { @@ -256,28 +261,33 @@ public class ServerSessionPacketHandler implements ChannelHandler { if (logger.isTraceEnabled()) { logger.trace("ServerSessionPacketHandler::handlePacket," + packet); } - final byte type = packet.getType(); - switch (type) { - case SESS_SEND: { - onSessionSend(packet); - break; + inHandler.get().set(true); + try { + final byte type = packet.getType(); + switch (type) { + case SESS_SEND: { + onSessionSend(packet); + break; + } + case SESS_ACKNOWLEDGE: { + onSessionAcknowledge(packet); + break; + } + case SESS_PRODUCER_REQUEST_CREDITS: { + onSessionRequestProducerCredits(packet); + break; + } + case SESS_FLOWTOKEN: { + onSessionConsumerFlowCredit(packet); + break; + } + default: + // separating a method for everything else as JIT was faster this way + slowPacketHandler(packet); + break; } - case SESS_ACKNOWLEDGE: { - onSessionAcknowledge(packet); - break; - } - case SESS_PRODUCER_REQUEST_CREDITS: { - onSessionRequestProducerCredits(packet); - break; - } - case SESS_FLOWTOKEN: { - onSessionConsumerFlowCredit(packet); - break; - } - default: - // separating a method for everything else as JIT was faster this way - slowPacketHandler(packet); - break; + } finally { + inHandler.get().set(false); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index 866130bf8d..8168e6ef92 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -65,7 +65,7 @@ public final class CoreSessionCallback implements SessionCallback { @Override public void close(boolean failed) { ServerSessionPacketHandler localHandler = handler; - if (failed && localHandler != null) { + if (localHandler != null) { // We wait any pending tasks before we make this as closed localHandler.flushExecutor(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index cbe3ce5bc6..ed704ef483 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -47,6 +47,15 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes public void testScaleDownWithConnector() throws Exception { } + + // it doesn't make sense through the core + // the pool will be shutdown while a connection is being used + // makes no sense! + @Override + public void testForceFailover() throws Exception { + } + + @Override protected ActiveMQServerControl createManagementControl() throws Exception { return new ActiveMQServerControl() {