This closes #344
This commit is contained in:
commit
065bd16782
|
@ -32,7 +32,7 @@ public class ActiveMQStompException extends Exception {
|
|||
private final List<Header> headers = new ArrayList<>(10);
|
||||
private String body;
|
||||
private VersionedStompFrameHandler handler;
|
||||
private boolean disconnect;
|
||||
private Boolean disconnect;
|
||||
|
||||
public ActiveMQStompException(StompConnection connection, String msg) {
|
||||
super(msg);
|
||||
|
@ -86,7 +86,9 @@ public class ActiveMQStompException extends Exception {
|
|||
else {
|
||||
frame.setByteBody(new byte[0]);
|
||||
}
|
||||
if (disconnect != null) {
|
||||
frame.setNeedsDisconnect(disconnect);
|
||||
}
|
||||
return frame;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,15 +18,25 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
|
||||
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameV11;
|
||||
|
||||
public class StompFrameV12 extends StompFrameV11 {
|
||||
|
||||
public StompFrameV12(String command, Map<String, String> headers, byte[] content) {
|
||||
super(command, headers, content);
|
||||
initV12();
|
||||
}
|
||||
|
||||
public StompFrameV12(String command) {
|
||||
super(command);
|
||||
initV12();
|
||||
}
|
||||
|
||||
private void initV12() {
|
||||
// STOMP 1.2 requires disconnect after sending ERROR
|
||||
if (Stomp.Responses.ERROR.equals(command)) {
|
||||
setNeedsDisconnect(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -264,7 +264,7 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
|
|||
|
||||
@Override
|
||||
public boolean isConnected() {
|
||||
return connected;
|
||||
return connected && socketChannel.isConnected();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -952,9 +952,8 @@ public class StompV12Test extends StompV11TestBase {
|
|||
|
||||
Assert.assertEquals("ERROR", error.getCommand());
|
||||
|
||||
unsubscribe(connV12, "sub1");
|
||||
|
||||
connV12.disconnect();
|
||||
Thread.sleep(1000);
|
||||
Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected());
|
||||
|
||||
//message should be still there
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
@ -982,9 +981,8 @@ public class StompV12Test extends StompV11TestBase {
|
|||
|
||||
System.out.println("Receiver error: " + error);
|
||||
|
||||
unsubscribe(connV12, "sub1");
|
||||
|
||||
connV12.disconnect();
|
||||
Thread.sleep(1000);
|
||||
Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected());
|
||||
|
||||
//message should still there
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
@ -1040,9 +1038,8 @@ public class StompV12Test extends StompV11TestBase {
|
|||
|
||||
Assert.assertEquals("ERROR", frame.getCommand());
|
||||
|
||||
unsubscribe(connV12, "sub1");
|
||||
|
||||
connV12.disconnect();
|
||||
waitDisconnect(connV12);
|
||||
Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected());
|
||||
|
||||
//message still there.
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
@ -1068,9 +1065,8 @@ public class StompV12Test extends StompV11TestBase {
|
|||
|
||||
System.out.println("Receiver error: " + error);
|
||||
|
||||
unsubscribe(connV12, "sub1");
|
||||
|
||||
connV12.disconnect();
|
||||
waitDisconnect(connV12);
|
||||
Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected());
|
||||
|
||||
//message should still there
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
@ -1104,9 +1100,8 @@ public class StompV12Test extends StompV11TestBase {
|
|||
|
||||
Assert.assertEquals("answer-me", error.getHeader("receipt-id"));
|
||||
|
||||
unsubscribe(connV12, "sub1");
|
||||
|
||||
connV12.disconnect();
|
||||
waitDisconnect(connV12);
|
||||
Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected());
|
||||
|
||||
//message should still there
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
@ -1140,9 +1135,8 @@ public class StompV12Test extends StompV11TestBase {
|
|||
|
||||
Assert.assertEquals("answer-me", error.getHeader("receipt-id"));
|
||||
|
||||
unsubscribe(connV12, "sub1");
|
||||
|
||||
connV12.disconnect();
|
||||
waitDisconnect(connV12);
|
||||
Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected());
|
||||
|
||||
//message should still there
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
@ -1150,6 +1144,14 @@ public class StompV12Test extends StompV11TestBase {
|
|||
Assert.assertNotNull(message);
|
||||
}
|
||||
|
||||
protected void waitDisconnect(StompClientConnectionV12 connection) throws Exception {
|
||||
|
||||
long timeout = System.currentTimeMillis() + 10000;
|
||||
while (timeout > System.currentTimeMillis() && connection.isConnected()) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAckModeClient() throws Exception {
|
||||
connV12.connect(defUser, defPass);
|
||||
|
@ -1521,7 +1523,8 @@ public class StompV12Test extends StompV11TestBase {
|
|||
ClientStompFrame frame = connV12.receiveFrame();
|
||||
Assert.assertTrue(frame.getCommand().equals("ERROR"));
|
||||
|
||||
connV12.disconnect();
|
||||
Thread.sleep(1000);
|
||||
Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -2513,7 +2516,8 @@ public class StompV12Test extends StompV11TestBase {
|
|||
Assert.assertEquals("1234", frame.getHeader("receipt-id"));
|
||||
System.out.println("message: " + frame.getHeader("message"));
|
||||
|
||||
connV12.disconnect();
|
||||
Thread.sleep(1000);
|
||||
Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue