diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index ec010f75df..899ffdeedc 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -107,6 +107,10 @@ public final class StompConnection implements RemotingConnection { return false; } + public VersionedStompFrameHandler getStompVersionHandler() { + return frameHandler; + } + public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException { StompFrame frame = null; try { @@ -315,6 +319,11 @@ public final class StompConnection implements RemotingConnection { } ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); + + if (frameHandler != null) { + frameHandler.disconnect(); + } + // Then call the listeners callFailureListeners(me); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 003865c8aa..ea71a2cb41 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -42,6 +42,9 @@ public abstract class VersionedStompFrameHandler { protected final ScheduledExecutorService scheduledExecutorService; protected final ExecutorFactory executorFactory; + protected void disconnect() { + } + public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version, ScheduledExecutorService scheduledExecutorService, diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java index 867cdd8d97..c6831cd6fa 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java @@ -57,6 +57,10 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements decoder.init(); } + public ActiveMQScheduledComponent getHeartBeater() { + return heartBeater; + } + @Override public StompFrame onConnect(StompFrame frame) { StompFrame response = null; @@ -131,15 +135,22 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements //client receive ping long minAcceptInterval = Long.valueOf(params[1]); - heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(), minPingInterval, minAcceptInterval); + if (heartBeater == null) { + heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(), minPingInterval, minAcceptInterval); + } } @Override public StompFrame onDisconnect(StompFrame frame) { + disconnect(); + return null; + } + + @Override + protected void disconnect() { if (this.heartBeater != null) { heartBeater.shutdown(); } - return null; } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java index 56338e47ae..4d15bb8e1c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java @@ -62,7 +62,8 @@ public abstract class AbstractStompClientConnection implements StompClientConnec protected BlockingQueue frameQueue = new LinkedBlockingQueue<>(); protected boolean connected = false; - private int serverPingCounter; + protected int serverPingCounter; + protected ReaderThread readerThread; public AbstractStompClientConnection(String version, String host, int port) throws IOException { this.version = version; @@ -85,7 +86,13 @@ public abstract class AbstractStompClientConnection implements StompClientConnec readBuffer = ByteBuffer.allocateDirect(10240); receiveList = new ArrayList<>(10240); - new ReaderThread().start(); + readerThread = new ReaderThread(); + readerThread.start(); + //new ReaderThread().start(); + } + + public void killReaderThread() { + readerThread.stop(); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java index 2bd15a1af4..00ce729832 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java @@ -32,12 +32,16 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.core.protocol.stomp.StompConnection; +import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.integration.stomp.util.AbstractStompClientConnection; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -2468,6 +2472,48 @@ public class StompV11Test extends StompV11TestBase { unsubscribe(conn, subId, receipt, false); } + + @Test + public void testHeartBeat3() throws Exception { + + connection.close(); + ClientStompFrame frame = connV11.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "500,500"); + frame.addHeader("accept-version", "1.0,1.1"); + + ClientStompFrame reply = connV11.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("500,500", reply.getHeader("heart-beat")); + + + System.out.println("========== start pinger!"); + + connV11.startPinger(100); + + + Assert.assertEquals(1, server.getActiveMQServer().getRemotingService().getConnections().size()); + StompConnection stompConnection = (StompConnection)server.getActiveMQServer().getRemotingService().getConnections().iterator().next(); + StompFrameHandlerV11 stompFrameHandler = (StompFrameHandlerV11) stompConnection.getStompVersionHandler(); + + Thread.sleep(1000); + + //now check the frame size + int size = connV11.getServerPingNumber(); + + connV11.stopPinger(); + ((AbstractStompClientConnection)connV11).killReaderThread(); + Wait.waitFor(() -> { + return server.getActiveMQServer().getRemotingService().getConnections().size() == 0; + }); + + Assert.assertFalse(stompFrameHandler.getHeartBeater().isStarted()); + } + protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception { connV11.connect(defUser, defPass); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java index c1bdccc1f5..368c638aaf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java @@ -58,7 +58,7 @@ public abstract class StompV11TestBase extends ActiveMQTestBase { private ConnectionFactory connectionFactory; - private Connection connection; + protected Connection connection; protected Session session;