From fabc0701a38628ffa8f8d9959cc5ec64c6c3cb10 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 8 Aug 2017 22:48:25 -0400 Subject: [PATCH] ARTEMIS-1333 SendACK listener fix --- .../core/ServerSessionPacketHandler.java | 15 ++++++++------- .../core/impl/ActiveMQPacketHandler.java | 5 ++++- .../core/impl/CoreSessionCallback.java | 18 ++++++++++++++++++ .../core/server/impl/ServerSessionImpl.java | 5 +++++ .../spi/core/protocol/SessionCallback.java | 4 ++++ .../jms/client/ReceiveNoWaitTest.java | 2 +- 6 files changed, 40 insertions(+), 9 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 88a6c2cdbb..87ac615ccc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.protocol.core; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.util.List; -import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -95,7 +94,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.SimpleFuture; import org.apache.activemq.artemis.utils.SimpleFutureImpl; import org.apache.activemq.artemis.utils.actors.Actor; -import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.jboss.logging.Logger; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS; @@ -150,7 +149,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { private final Actor packetActor; - private final Executor callExecutor; + private final ArtemisExecutor callExecutor; private final CoreProtocolManager manager; @@ -214,19 +213,20 @@ public class ServerSessionPacketHandler implements ChannelHandler { public void connectionFailed(final ActiveMQException exception, boolean failedOver) { ActiveMQServerLogger.LOGGER.clientConnectionFailed(session.getName()); + flushExecutor(); + try { session.close(true); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorClosingSession(e); } - flushExecutor(); ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName()); } - private void flushExecutor() { + public void flushExecutor() { packetActor.flush(); - OrderedExecutorFactory.flushExecutor(callExecutor); + callExecutor.flush(); } public void close() { @@ -247,7 +247,6 @@ public class ServerSessionPacketHandler implements ChannelHandler { @Override public void handlePacket(final Packet packet) { - channel.confirm(packet); // This method will call onMessagePacket through an actor packetActor.act(packet); @@ -838,6 +837,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { final boolean flush, final boolean closeChannel) { if (confirmPacket != null) { + channel.confirm(confirmPacket); + if (flush) { channel.flushConfirmations(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index c9cc926b1a..cefd10cdc9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -167,10 +167,13 @@ public class ActiveMQPacketHandler implements ChannelHandler { routingTypeMap.put(PacketImpl.OLD_TOPIC_PREFIX, RoutingType.MULTICAST); } - ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap); + CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection); + + ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, protocolManager, session, server.getStorageManager(), channel); channel.setHandler(handler); + sessionCallback.setSessionHandler(handler); // TODO - where is this removed? protocolManager.addSessionHandler(request.getName(), handler); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index 542d72625f..866130bf8d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; @@ -44,6 +45,8 @@ public final class CoreSessionCallback implements SessionCallback { private String name; + private ServerSessionPacketHandler handler; + public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel, @@ -54,6 +57,21 @@ public final class CoreSessionCallback implements SessionCallback { this.connection = connection; } + public CoreSessionCallback setSessionHandler(ServerSessionPacketHandler handler) { + this.handler = handler; + return this; + } + + @Override + public void close(boolean failed) { + ServerSessionPacketHandler localHandler = handler; + if (failed && localHandler != null) { + // We wait any pending tasks before we make this as closed + localHandler.flushExecutor(); + } + this.handler = null; + } + @Override public boolean isWritable(ReadyListener callback, Object protocolContext) { return connection.isWritable(callback); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index f3617c170b..1661ab2130 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -345,6 +345,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } protected void doClose(final boolean failed) throws Exception { + callback.close(failed); synchronized (this) { if (!closed) { server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeCloseSession(this, failed) : null); @@ -1238,6 +1239,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener { public void close(final boolean failed) { if (closed) return; + + if (failed) { + + } context.executeOnCompletion(new IOCallback() { @Override public void onError(int errorCode, String errorMessage) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index edfb5dc46e..ae1612f292 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -89,4 +89,8 @@ public interface SessionCallback { * Some protocols (Openwire) needs a special message with the browser is finished. */ void browserFinished(ServerConsumer consumer); + + default void close(boolean failed) { + + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java index a426948632..6114f4926e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java @@ -52,7 +52,7 @@ public class ReceiveNoWaitTest extends JMSTestBase { public void testReceiveNoWait() throws Exception { assertNotNull(queue); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 1000; i++) { Connection connection = cf.createConnection(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);