From 2c4312d4f279b0955c7644bd302d8480d7e4b6f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ville=20Skytt=C3=A4?= Date: Mon, 25 Jan 2016 21:01:10 +0200 Subject: [PATCH 1/2] Disconnect on STOMP 1.2 errors https://stomp.github.io/stomp-specification-1.2.html#ERROR --- .../stomp/ActiveMQStompException.java | 6 ++-- .../protocol/stomp/v12/StompFrameV12.java | 10 ++++++ .../util/AbstractStompClientConnection.java | 2 +- .../integration/stomp/v12/StompV12Test.java | 36 +++++++++---------- 4 files changed, 31 insertions(+), 23 deletions(-) diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java index a6a7c507d0..9fb0786af3 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java @@ -32,7 +32,7 @@ public class ActiveMQStompException extends Exception { private final List
headers = new ArrayList<>(10); private String body; private VersionedStompFrameHandler handler; - private boolean disconnect; + private Boolean disconnect; public ActiveMQStompException(StompConnection connection, String msg) { super(msg); @@ -86,7 +86,9 @@ public class ActiveMQStompException extends Exception { else { frame.setByteBody(new byte[0]); } - frame.setNeedsDisconnect(disconnect); + if (disconnect != null) { + frame.setNeedsDisconnect(disconnect); + } return frame; } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameV12.java index 471311d9ff..ac51ed4e94 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameV12.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameV12.java @@ -18,15 +18,25 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12; import java.util.Map; +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameV11; public class StompFrameV12 extends StompFrameV11 { public StompFrameV12(String command, Map headers, byte[] content) { super(command, headers, content); + initV12(); } public StompFrameV12(String command) { super(command); + initV12(); + } + + private void initV12() { + // STOMP 1.2 requires disconnect after sending ERROR + if (Stomp.Responses.ERROR.equals(command)) { + setNeedsDisconnect(true); + } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java index 42f8a1dba5..5c6cc6bba7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java @@ -264,7 +264,7 @@ public abstract class AbstractStompClientConnection implements StompClientConnec @Override public boolean isConnected() { - return connected; + return connected && socketChannel.isConnected(); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java index e6cbf64ef2..6711643091 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java @@ -952,9 +952,8 @@ public class StompV12Test extends StompV11TestBase { Assert.assertEquals("ERROR", error.getCommand()); - unsubscribe(connV12, "sub1"); - - connV12.disconnect(); + Thread.sleep(1000); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); //message should be still there MessageConsumer consumer = session.createConsumer(queue); @@ -982,9 +981,8 @@ public class StompV12Test extends StompV11TestBase { System.out.println("Receiver error: " + error); - unsubscribe(connV12, "sub1"); - - connV12.disconnect(); + Thread.sleep(1000); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); //message should still there MessageConsumer consumer = session.createConsumer(queue); @@ -1040,9 +1038,8 @@ public class StompV12Test extends StompV11TestBase { Assert.assertEquals("ERROR", frame.getCommand()); - unsubscribe(connV12, "sub1"); - - connV12.disconnect(); + Thread.sleep(1000); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); //message still there. MessageConsumer consumer = session.createConsumer(queue); @@ -1068,9 +1065,8 @@ public class StompV12Test extends StompV11TestBase { System.out.println("Receiver error: " + error); - unsubscribe(connV12, "sub1"); - - connV12.disconnect(); + Thread.sleep(1000); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); //message should still there MessageConsumer consumer = session.createConsumer(queue); @@ -1104,9 +1100,8 @@ public class StompV12Test extends StompV11TestBase { Assert.assertEquals("answer-me", error.getHeader("receipt-id")); - unsubscribe(connV12, "sub1"); - - connV12.disconnect(); + Thread.sleep(1000); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); //message should still there MessageConsumer consumer = session.createConsumer(queue); @@ -1140,9 +1135,8 @@ public class StompV12Test extends StompV11TestBase { Assert.assertEquals("answer-me", error.getHeader("receipt-id")); - unsubscribe(connV12, "sub1"); - - connV12.disconnect(); + Thread.sleep(1000); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); //message should still there MessageConsumer consumer = session.createConsumer(queue); @@ -1521,7 +1515,8 @@ public class StompV12Test extends StompV11TestBase { ClientStompFrame frame = connV12.receiveFrame(); Assert.assertTrue(frame.getCommand().equals("ERROR")); - connV12.disconnect(); + Thread.sleep(1000); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); } @Test @@ -2513,7 +2508,8 @@ public class StompV12Test extends StompV11TestBase { Assert.assertEquals("1234", frame.getHeader("receipt-id")); System.out.println("message: " + frame.getHeader("message")); - connV12.disconnect(); + Thread.sleep(1000); + Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); } @Test From af150b5beaf4b59950febb05196a4ed7b161d5da Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 25 Jan 2016 21:27:18 -0500 Subject: [PATCH 2/2] small improvement on stomp test --- .../integration/stomp/v12/StompV12Test.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java index 6711643091..246cc30baf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java @@ -1038,7 +1038,7 @@ public class StompV12Test extends StompV11TestBase { Assert.assertEquals("ERROR", frame.getCommand()); - Thread.sleep(1000); + waitDisconnect(connV12); Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); //message still there. @@ -1065,7 +1065,7 @@ public class StompV12Test extends StompV11TestBase { System.out.println("Receiver error: " + error); - Thread.sleep(1000); + waitDisconnect(connV12); Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); //message should still there @@ -1100,7 +1100,7 @@ public class StompV12Test extends StompV11TestBase { Assert.assertEquals("answer-me", error.getHeader("receipt-id")); - Thread.sleep(1000); + waitDisconnect(connV12); Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); //message should still there @@ -1135,7 +1135,7 @@ public class StompV12Test extends StompV11TestBase { Assert.assertEquals("answer-me", error.getHeader("receipt-id")); - Thread.sleep(1000); + waitDisconnect(connV12); Assert.assertFalse("Should be disconnected in STOMP 1.2 after ERROR", connV12.isConnected()); //message should still there @@ -1144,6 +1144,14 @@ public class StompV12Test extends StompV11TestBase { Assert.assertNotNull(message); } + protected void waitDisconnect(StompClientConnectionV12 connection) throws Exception { + + long timeout = System.currentTimeMillis() + 10000; + while (timeout > System.currentTimeMillis() && connection.isConnected()) { + Thread.sleep(10); + } + } + @Test public void testAckModeClient() throws Exception { connV12.connect(defUser, defPass);