This commit is contained in:
Timothy Bish 2018-05-22 18:40:29 -04:00
commit 78016cbe69
2 changed files with 71 additions and 12 deletions

View File

@ -335,15 +335,11 @@ public final class StompConnection implements RemotingConnection {
if (destroyed) { if (destroyed) {
return; return;
} }
}
destroyed = true; destroyed = true;
}
internalClose(); internalClose();
synchronized (sendLock) {
callClosingListeners();
}
} }
public Acceptor getAcceptorUsed() { public Acceptor getAcceptorUsed() {
@ -351,9 +347,17 @@ public final class StompConnection implements RemotingConnection {
} }
private void internalClose() { private void internalClose() {
if (frameHandler != null) {
frameHandler.disconnect();
}
transportConnection.close(); transportConnection.close();
manager.cleanup(this); manager.cleanup(this);
synchronized (sendLock) {
callClosingListeners();
}
} }
@Override @Override
@ -372,15 +376,9 @@ public final class StompConnection implements RemotingConnection {
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
if (frameHandler != null) {
frameHandler.disconnect();
}
// Then call the listeners // Then call the listeners
callFailureListeners(me); callFailureListeners(me);
callClosingListeners();
internalClose(); internalClose();
} }

View File

@ -28,6 +28,7 @@ import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -37,6 +38,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection; import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@ -2181,6 +2183,65 @@ public class StompV11Test extends StompTestBase {
Assert.assertFalse(stompFrameHandler.getHeartBeater().isStarted()); Assert.assertFalse(stompFrameHandler.getHeartBeater().isStarted());
} }
@Test
public void testHeartBeat4() throws Exception {
connection.close();
ClientStompFrame frame = conn.createFrame("CONNECT");
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
frame.addHeader("heart-beat", "500,500");
frame.addHeader("accept-version", "1.1,1.2");
ClientStompFrame reply = conn.sendFrame(frame);
System.out.println("Reply: " + reply.toString());
assertEquals("CONNECTED", reply.getCommand());
// Obtain a reference to the server StompConnection object
RemotingConnection remotingConnection = null;
StompConnection stompConnection = null;
Iterator<RemotingConnection> iterator = server.getActiveMQServer().getRemotingService().getConnections().iterator();
while (iterator.hasNext()) {
remotingConnection = iterator.next();
if (remotingConnection instanceof StompConnection) {
stompConnection = (StompConnection)remotingConnection;
}
}
StompFrameHandlerV11 stompFrameHandler = (StompFrameHandlerV11) stompConnection.getStompVersionHandler();
System.out.println("========== start pinger!");
conn.startPinger(100);
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
subFrame.addHeader("id", "0");
ClientStompFrame f = conn.sendFrame(subFrame);
f = conn.sendFrame(subFrame);
// Send subscription with a duplicate ID, triggering a server error and closing of the session.
f = conn.sendFrame(subFrame);
f = conn.receiveFrame(1000);
System.out.println("Received " + f.toString());
Assert.assertTrue(f.getCommand().equals("ERROR"));
conn.stopPinger();
// give it some time to detect and close connections
Thread.sleep(2000);
Wait.waitFor(() -> {
return server.getActiveMQServer().getRemotingService().getConnections().size() == 0;
});
Assert.assertFalse("HeartBeater is still running!!", stompFrameHandler.getHeartBeater().isStarted());
}
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception { protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
conn.connect(defUser, defPass); conn.connect(defUser, defPass);