ARTEMIS-934 Stomp Heart beat not being stopped in some cases

This commit is contained in:
Clebert Suconic 2017-01-25 10:12:06 -05:00
parent 98f6fa7607
commit f79b21e866
5 changed files with 79 additions and 3 deletions

View File

@ -110,6 +110,10 @@ public final class StompConnection implements RemotingConnection {
return false; return false;
} }
public VersionedStompFrameHandler getStompVersionHandler() {
return frameHandler;
}
public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException { public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {
StompFrame frame = null; StompFrame frame = null;
try { try {
@ -343,6 +347,11 @@ public final class StompConnection implements RemotingConnection {
} }
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
if (frameHandler != null) {
frameHandler.disconnect();
}
// Then call the listeners // Then call the listeners
callFailureListeners(me); callFailureListeners(me);

View File

@ -43,6 +43,9 @@ public abstract class VersionedStompFrameHandler {
protected final ScheduledExecutorService scheduledExecutorService; protected final ScheduledExecutorService scheduledExecutorService;
protected final ExecutorFactory executorFactory; protected final ExecutorFactory executorFactory;
protected void disconnect() {
}
public static VersionedStompFrameHandler getHandler(StompConnection connection, public static VersionedStompFrameHandler getHandler(StompConnection connection,
StompVersions version, StompVersions version,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,

View File

@ -57,6 +57,10 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
decoder.init(); decoder.init();
} }
public ActiveMQScheduledComponent getHeartBeater() {
return heartBeater;
}
@Override @Override
public StompFrame onConnect(StompFrame frame) { public StompFrame onConnect(StompFrame frame) {
StompFrame response = null; StompFrame response = null;
@ -131,15 +135,22 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
//client receive ping //client receive ping
long minAcceptInterval = Long.valueOf(params[1]); long minAcceptInterval = Long.valueOf(params[1]);
heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(), minPingInterval, minAcceptInterval); if (heartBeater == null) {
heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(), minPingInterval, minAcceptInterval);
}
} }
@Override @Override
public StompFrame onDisconnect(StompFrame frame) { public StompFrame onDisconnect(StompFrame frame) {
disconnect();
return null;
}
@Override
protected void disconnect() {
if (this.heartBeater != null) { if (this.heartBeater != null) {
heartBeater.shutdown(); heartBeater.shutdown();
} }
return null;
} }
@Override @Override

View File

@ -45,6 +45,7 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<>(); protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<>();
protected boolean connected = false; protected boolean connected = false;
protected int serverPingCounter; protected int serverPingCounter;
protected ReaderThread readerThread;
public AbstractStompClientConnection(String version, String host, int port) throws IOException { public AbstractStompClientConnection(String version, String host, int port) throws IOException {
this.version = version; this.version = version;
@ -67,7 +68,12 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
readBuffer = ByteBuffer.allocateDirect(10240); readBuffer = ByteBuffer.allocateDirect(10240);
receiveList = new ArrayList<>(10240); receiveList = new ArrayList<>(10240);
new ReaderThread().start(); readerThread = new ReaderThread();
readerThread.start();
}
public void killReaderThread() {
readerThread.stop();
} }
private ClientStompFrame sendFrameInternal(ClientStompFrame frame, boolean wicked) throws IOException, InterruptedException { private ClientStompFrame sendFrameInternal(ClientStompFrame frame, boolean wicked) throws IOException, InterruptedException {

View File

@ -31,13 +31,17 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
import org.apache.activemq.artemis.tests.integration.stomp.util.AbstractStompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -2113,6 +2117,49 @@ public class StompV11Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
} }
@Test
public void testHeartBeat3() throws Exception {
connection.close();
ClientStompFrame frame = conn.createFrame("CONNECT");
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
frame.addHeader("heart-beat", "500,500");
frame.addHeader("accept-version", "1.0,1.1");
ClientStompFrame reply = conn.sendFrame(frame);
assertEquals("CONNECTED", reply.getCommand());
assertEquals("500,500", reply.getHeader("heart-beat"));
System.out.println("========== start pinger!");
conn.startPinger(100);
Assert.assertEquals(1, server.getActiveMQServer().getRemotingService().getConnections().size());
StompConnection stompConnection = (StompConnection)server.getActiveMQServer().getRemotingService().getConnections().iterator().next();
StompFrameHandlerV11 stompFrameHandler = (StompFrameHandlerV11)stompConnection.getStompVersionHandler();
Thread.sleep(1000);
//now check the frame size
int size = conn.getServerPingNumber();
conn.stopPinger();
((AbstractStompClientConnection)conn).killReaderThread();
Wait.waitFor(() -> {
return server.getActiveMQServer().getRemotingService().getConnections().size() == 0;
});
Assert.assertFalse(stompFrameHandler.getHeartBeater().isStarted());
}
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception { protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
conn.connect(defUser, defPass); conn.connect(defUser, defPass);