diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index fbd0107759..171d7be59e 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -335,15 +335,11 @@ public final class StompConnection implements RemotingConnection { if (destroyed) { return; } - } - destroyed = true; + destroyed = true; + } internalClose(); - - synchronized (sendLock) { - callClosingListeners(); - } } public Acceptor getAcceptorUsed() { @@ -351,9 +347,17 @@ public final class StompConnection implements RemotingConnection { } private void internalClose() { + if (frameHandler != null) { + frameHandler.disconnect(); + } + transportConnection.close(); manager.cleanup(this); + + synchronized (sendLock) { + callClosingListeners(); + } } @Override @@ -372,15 +376,9 @@ public final class StompConnection implements RemotingConnection { ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); - if (frameHandler != null) { - frameHandler.disconnect(); - } - // Then call the listeners callFailureListeners(me); - callClosingListeners(); - internalClose(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java index 99ad1fba3a..6a3fae6600 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java @@ -28,6 +28,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; +import java.util.Iterator; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -37,6 +38,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.StompConnection; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; @@ -2181,6 +2183,65 @@ public class StompV11Test extends StompTestBase { Assert.assertFalse(stompFrameHandler.getHeartBeater().isStarted()); } + @Test + public void testHeartBeat4() throws Exception { + connection.close(); + ClientStompFrame frame = conn.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "500,500"); + frame.addHeader("accept-version", "1.1,1.2"); + + ClientStompFrame reply = conn.sendFrame(frame); + + System.out.println("Reply: " + reply.toString()); + + assertEquals("CONNECTED", reply.getCommand()); + + // Obtain a reference to the server StompConnection object + RemotingConnection remotingConnection = null; + StompConnection stompConnection = null; + Iterator iterator = server.getActiveMQServer().getRemotingService().getConnections().iterator(); + while (iterator.hasNext()) { + remotingConnection = iterator.next(); + if (remotingConnection instanceof StompConnection) { + stompConnection = (StompConnection)remotingConnection; + } + } + + StompFrameHandlerV11 stompFrameHandler = (StompFrameHandlerV11) stompConnection.getStompVersionHandler(); + + System.out.println("========== start pinger!"); + + conn.startPinger(100); + + ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE"); + subFrame.addHeader("destination", getTopicPrefix() + getTopicName()); + subFrame.addHeader("id", "0"); + + ClientStompFrame f = conn.sendFrame(subFrame); + f = conn.sendFrame(subFrame); + + // Send subscription with a duplicate ID, triggering a server error and closing of the session. + f = conn.sendFrame(subFrame); + + f = conn.receiveFrame(1000); + System.out.println("Received " + f.toString()); + Assert.assertTrue(f.getCommand().equals("ERROR")); + + conn.stopPinger(); + + // give it some time to detect and close connections + Thread.sleep(2000); + + Wait.waitFor(() -> { + return server.getActiveMQServer().getRemotingService().getConnections().size() == 0; + }); + + Assert.assertFalse("HeartBeater is still running!!", stompFrameHandler.getHeartBeater().isStarted()); + } + protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception { conn.connect(defUser, defPass);