From f79b21e866539ca196eea67adc700c424f61fbfc Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 25 Jan 2017 10:12:06 -0500 Subject: [PATCH] ARTEMIS-934 Stomp Heart beat not being stopped in some cases --- .../core/protocol/stomp/StompConnection.java | 9 ++++ .../stomp/VersionedStompFrameHandler.java | 3 ++ .../stomp/v11/StompFrameHandlerV11.java | 15 +++++- .../util/AbstractStompClientConnection.java | 8 +++- .../integration/stomp/v11/StompV11Test.java | 47 +++++++++++++++++++ 5 files changed, 79 insertions(+), 3 deletions(-) diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 0eb81b9d5c..f72a73e5dd 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -110,6 +110,10 @@ public final class StompConnection implements RemotingConnection { return false; } + public VersionedStompFrameHandler getStompVersionHandler() { + return frameHandler; + } + public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException { StompFrame frame = null; try { @@ -343,6 +347,11 @@ public final class StompConnection implements RemotingConnection { } ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); + + if (frameHandler != null) { + frameHandler.disconnect(); + } + // Then call the listeners callFailureListeners(me); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 02facd6e26..673c86eedc 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -43,6 +43,9 @@ public abstract class VersionedStompFrameHandler { protected final ScheduledExecutorService scheduledExecutorService; protected final ExecutorFactory executorFactory; + protected void disconnect() { + } + public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version, ScheduledExecutorService scheduledExecutorService, diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java index 867cdd8d97..c6831cd6fa 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java @@ -57,6 +57,10 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements decoder.init(); } + public ActiveMQScheduledComponent getHeartBeater() { + return heartBeater; + } + @Override public StompFrame onConnect(StompFrame frame) { StompFrame response = null; @@ -131,15 +135,22 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements //client receive ping long minAcceptInterval = Long.valueOf(params[1]); - heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(), minPingInterval, minAcceptInterval); + if (heartBeater == null) { + heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(), minPingInterval, minAcceptInterval); + } } @Override public StompFrame onDisconnect(StompFrame frame) { + disconnect(); + return null; + } + + @Override + protected void disconnect() { if (this.heartBeater != null) { heartBeater.shutdown(); } - return null; } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java index d8a487e9d3..fa1ec7385f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java @@ -45,6 +45,7 @@ public abstract class AbstractStompClientConnection implements StompClientConnec protected BlockingQueue frameQueue = new LinkedBlockingQueue<>(); protected boolean connected = false; protected int serverPingCounter; + protected ReaderThread readerThread; public AbstractStompClientConnection(String version, String host, int port) throws IOException { this.version = version; @@ -67,7 +68,12 @@ public abstract class AbstractStompClientConnection implements StompClientConnec readBuffer = ByteBuffer.allocateDirect(10240); receiveList = new ArrayList<>(10240); - new ReaderThread().start(); + readerThread = new ReaderThread(); + readerThread.start(); + } + + public void killReaderThread() { + readerThread.stop(); } private ClientStompFrame sendFrameInternal(ClientStompFrame frame, boolean wicked) throws IOException, InterruptedException { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java index 01f1cf8731..eb055f110b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java @@ -31,13 +31,17 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.core.protocol.stomp.StompConnection; +import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; +import org.apache.activemq.artemis.tests.integration.stomp.util.AbstractStompClientConnection; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -2113,6 +2117,49 @@ public class StompV11Test extends StompTestBase { conn.disconnect(); } + + @Test + public void testHeartBeat3() throws Exception { + + connection.close(); + ClientStompFrame frame = conn.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "500,500"); + frame.addHeader("accept-version", "1.0,1.1"); + + ClientStompFrame reply = conn.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("500,500", reply.getHeader("heart-beat")); + + + System.out.println("========== start pinger!"); + + conn.startPinger(100); + + + Assert.assertEquals(1, server.getActiveMQServer().getRemotingService().getConnections().size()); + StompConnection stompConnection = (StompConnection)server.getActiveMQServer().getRemotingService().getConnections().iterator().next(); + StompFrameHandlerV11 stompFrameHandler = (StompFrameHandlerV11)stompConnection.getStompVersionHandler(); + + Thread.sleep(1000); + + //now check the frame size + int size = conn.getServerPingNumber(); + + conn.stopPinger(); + ((AbstractStompClientConnection)conn).killReaderThread(); + Wait.waitFor(() -> { + return server.getActiveMQServer().getRemotingService().getConnections().size() == 0; + }); + + Assert.assertFalse(stompFrameHandler.getHeartBeater().isStarted()); + } + + protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception { conn.connect(defUser, defPass);