diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java index 8e8acb3054..badcc1a2c4 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java @@ -139,8 +139,11 @@ public interface Stomp { String SELECTOR = "selector"; + @Deprecated String DURABLE_SUBSCRIBER_NAME = "durable-subscriber-name"; + String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name"; + String NO_LOCAL = "no-local"; public interface AckModeValues { @@ -159,7 +162,10 @@ public interface Stomp { String ID = "id"; + @Deprecated String DURABLE_SUBSCRIBER_NAME = "durable-subscriber-name"; + + String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name"; } public interface Connect { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 7ab275015e..07a85b1207 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -655,9 +655,9 @@ public final class StompConnection implements RemotingConnection { } } - public void unsubscribe(String subscriptionID, String durableSubscriberName) throws ActiveMQStompException { + public void unsubscribe(String subscriptionID, String durableSubscriptionName) throws ActiveMQStompException { try { - manager.unsubscribe(this, subscriptionID, durableSubscriberName); + manager.unsubscribe(this, subscriptionID, durableSubscriptionName); } catch (ActiveMQStompException e) { throw e; diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 2c8751c274..9c92fd1fe6 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -385,15 +385,14 @@ public class StompProtocolManager extends AbstractProtocolManager> iterator = subscriptions.entrySet().iterator(); + while (iterator.hasNext()) { Map.Entry entry = iterator.next(); long consumerID = entry.getKey(); @@ -315,21 +317,25 @@ public class StompSession implements SessionCallback { if (id != null && id.equals(sub.getID())) { iterator.remove(); session.closeConsumer(consumerID); - SimpleString queueName; - if (durableSubscriptionName != null && durableSubscriptionName.trim().length() != 0) { - queueName = SimpleString.toSimpleString(id + "." + durableSubscriptionName); - } - else { - queueName = SimpleString.toSimpleString(id); - } + SimpleString queueName = SimpleString.toSimpleString(id); QueueQueryResult query = session.executeQueueQuery(queueName); if (query.isExists()) { session.deleteQueue(queueName); } - return true; + result = true; } } - return false; + + if (!result && durableSubscriptionName != null && clientID != null) { + SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName); + QueueQueryResult query = session.executeQueueQuery(queueName); + if (query.isExists()) { + session.deleteQueue(queueName); + } + result = true; + } + + return result; } boolean containsSubscription(String subscriptionID) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 5da8574786..185f81f8ef 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -251,6 +251,9 @@ public abstract class VersionedStompFrameHandler { String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE); String id = request.getHeader(Stomp.Headers.Subscribe.ID); String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME); + if (durableSubscriptionName == null) { + durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME); + } boolean noLocal = false; if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java index 8c76f6fc6b..25db3b05a8 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java @@ -92,7 +92,10 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements StompFrame response = null; String destination = request.getHeader(Stomp.Headers.Unsubscribe.DESTINATION); String id = request.getHeader(Stomp.Headers.Unsubscribe.ID); - String durableSubscriberName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME); + String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME); + if (durableSubscriptionName == null) { + durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME); + } String subscriptionID = null; if (id != null) { @@ -108,7 +111,7 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements } try { - connection.unsubscribe(subscriptionID, durableSubscriberName); + connection.unsubscribe(subscriptionID, durableSubscriptionName); } catch (ActiveMQStompException e) { return e.getFrame(); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java index f9f60b9c03..d17fd8282a 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java @@ -153,19 +153,22 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements StompFrame response = null; //unsubscribe in 1.1 only needs id header String id = request.getHeader(Stomp.Headers.Unsubscribe.ID); - String durableSubscriberName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME); + String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME); + if (durableSubscriptionName == null) { + durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME); + } String subscriptionID = null; if (id != null) { subscriptionID = id; } - else { + else if (durableSubscriptionName == null) { response = BUNDLE.needSubscriptionID().setHandler(this).getFrame(); return response; } try { - connection.unsubscribe(subscriptionID, durableSubscriberName); + connection.unsubscribe(subscriptionID, durableSubscriptionName); } catch (ActiveMQStompException e) { response = e.getFrame(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 4a6324a022..67293beddf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -1432,6 +1432,60 @@ public class StompTest extends StompTestBase { sendFrame(frame); } + @Test + public void testDurableUnSubscribe() throws Exception { + String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL; + sendFrame(frame); + + frame = receiveFrame(1000); + Assert.assertTrue(frame.startsWith("CONNECTED")); + + String subscribeFrame = "SUBSCRIBE\n" + "destination:" + + getTopicPrefix() + + getTopicName() + + "\n" + + "receipt: 12\n" + + "durable-subscriber-name: " + + getName() + + "\n" + + "\n\n" + + Stomp.NULL; + sendFrame(subscribeFrame); + // wait for SUBSCRIBE's receipt + frame = receiveFrame(1000); + Assert.assertTrue(frame.startsWith("RECEIPT")); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + sendFrame(frame); + waitForFrameToTakeEffect(); + + assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName()))); + + reconnect(100); + frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL; + sendFrame(frame); + + frame = receiveFrame(1000); + Assert.assertTrue(frame.startsWith("CONNECTED")); + + String unsubscribeFrame = "UNSUBSCRIBE\n" + "destination:" + + getTopicPrefix() + + getTopicName() + + "\n" + + "durable-subscriber-name: " + + getName() + + "\n" + + "\n\n" + + Stomp.NULL; + sendFrame(unsubscribeFrame); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + sendFrame(frame); + waitForFrameToTakeEffect(); + + assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName()))); + } + @Test public void testSubscribeToTopicWithNoLocal() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java index 2cba55ebdd..1b873765c5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; @@ -48,6 +49,7 @@ import org.junit.Test; public class StompV11Test extends StompV11TestBase { private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + public static final String CLIENT_ID = "myclientid"; private StompClientConnection connV11; @@ -1333,7 +1335,7 @@ public class StompV11Test extends StompV11TestBase { @Test public void testTwoSubscribers() throws Exception { - connV11.connect(defUser, defPass, "myclientid"); + connV11.connect(defUser, defPass, CLIENT_ID); this.subscribeTopic(connV11, "sub1", "auto", null); @@ -1535,7 +1537,7 @@ public class StompV11Test extends StompV11TestBase { @Test public void testDurableSubscriberWithReconnection() throws Exception { - connV11.connect(defUser, defPass, "myclientid"); + connV11.connect(defUser, defPass, CLIENT_ID); this.subscribeTopic(connV11, "sub1", "auto", getName()); @@ -1553,7 +1555,7 @@ public class StompV11Test extends StompV11TestBase { connV11.destroy(); connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); - connV11.connect(defUser, defPass, "myclientid"); + connV11.connect(defUser, defPass, CLIENT_ID); this.subscribeTopic(connV11, "sub1", "auto", getName()); @@ -1569,6 +1571,30 @@ public class StompV11Test extends StompV11TestBase { connV11.disconnect(); } + @Test + public void testDurableUnSubscribe() throws Exception { + connV11.connect(defUser, defPass, CLIENT_ID); + + this.subscribeTopic(connV11, null, "auto", getName()); + + connV11.disconnect(); + connV11.destroy(); + connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port); + connV11.connect(defUser, defPass, CLIENT_ID); + + this.unsubscribe(connV11, getName(), false, true); + + long start = System.currentTimeMillis(); + SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName()); + while (server.getActiveMQServer().locateQueue(queueName) != null && (System.currentTimeMillis() - start) < 5000) { + Thread.sleep(100); + } + + assertNull(server.getActiveMQServer().locateQueue(queueName)); + + connV11.disconnect(); + } + @Test public void testJMSXGroupIdCanBeSet() throws Exception { MessageConsumer consumer = session.createConsumer(queue); @@ -2364,8 +2390,10 @@ public class StompV11Test extends StompV11TestBase { boolean receipt, boolean noLocal) throws IOException, InterruptedException { ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", subId); subFrame.addHeader("destination", getTopicPrefix() + getTopicName()); + if (subId != null) { + subFrame.addHeader("id", subId); + } if (ack != null) { subFrame.addHeader("ack", ack); } @@ -2386,18 +2414,14 @@ public class StompV11Test extends StompV11TestBase { } } - private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException { - ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE"); - subFrame.addHeader("id", subId); - - conn.sendFrame(subFrame); - } - - private void unsubscribe(StompClientConnection conn, - String subId, - boolean receipt) throws IOException, InterruptedException { - ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE"); - subFrame.addHeader("id", subId); + private void unsubscribe(StompClientConnection conn, String subId, boolean receipt, boolean durable) throws IOException, InterruptedException { + ClientStompFrame subFrame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE); + if (durable) { + subFrame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, subId); + } + else { + subFrame.addHeader(Stomp.Headers.Unsubscribe.ID, subId); + } if (receipt) { subFrame.addHeader("receipt", "4321"); @@ -2412,6 +2436,16 @@ public class StompV11Test extends StompV11TestBase { } } + private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException { + unsubscribe(conn, subId, false, false); + } + + private void unsubscribe(StompClientConnection conn, + String subId, + boolean receipt) throws IOException, InterruptedException { + unsubscribe(conn, subId, receipt, false); + } + protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception { connV11.connect(defUser, defPass); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java index 9835c17d3a..fe1e3391ac 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java @@ -31,6 +31,9 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; @@ -38,7 +41,6 @@ import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConne import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV12; import org.apache.activemq.artemis.tests.integration.stomp.v11.StompV11TestBase; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -50,6 +52,7 @@ import org.junit.Test; public class StompV12Test extends StompV11TestBase { private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + public static final String CLIENT_ID = "myclientid"; private StompClientConnectionV12 connV12; @@ -1325,7 +1328,7 @@ public class StompV12Test extends StompV11TestBase { @Test public void testTwoSubscribers() throws Exception { - connV12.connect(defUser, defPass, "myclientid"); + connV12.connect(defUser, defPass, CLIENT_ID); this.subscribeTopic(connV12, "sub1", "auto", null); @@ -1529,7 +1532,7 @@ public class StompV12Test extends StompV11TestBase { @Test public void testDurableSubscriberWithReconnection() throws Exception { - connV12.connect(defUser, defPass, "myclientid"); + connV12.connect(defUser, defPass, CLIENT_ID); this.subscribeTopic(connV12, "sub1", "auto", getName()); @@ -1547,7 +1550,7 @@ public class StompV12Test extends StompV11TestBase { connV12.destroy(); connV12 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); - connV12.connect(defUser, defPass, "myclientid"); + connV12.connect(defUser, defPass, CLIENT_ID); this.subscribeTopic(connV12, "sub1", "auto", getName()); @@ -1563,6 +1566,30 @@ public class StompV12Test extends StompV11TestBase { connV12.disconnect(); } + @Test + public void testDurableUnSubscribe() throws Exception { + connV12.connect(defUser, defPass, CLIENT_ID); + + this.subscribeTopic(connV12, null, "auto", getName()); + + connV12.disconnect(); + connV12.destroy(); + connV12 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port); + connV12.connect(defUser, defPass, CLIENT_ID); + + this.unsubscribe(connV12, getName(), false, true); + + long start = System.currentTimeMillis(); + SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName()); + while (server.getActiveMQServer().locateQueue(queueName) != null && (System.currentTimeMillis() - start) < 5000) { + Thread.sleep(100); + } + + assertNull(server.getActiveMQServer().locateQueue(queueName)); + + connV12.disconnect(); + } + @Test public void testJMSXGroupIdCanBeSet() throws Exception { MessageConsumer consumer = session.createConsumer(queue); @@ -2403,8 +2430,10 @@ public class StompV12Test extends StompV11TestBase { boolean receipt, boolean noLocal) throws IOException, InterruptedException { ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", subId); subFrame.addHeader("destination", getTopicPrefix() + getTopicName()); + if (subId != null) { + subFrame.addHeader("id", subId); + } if (ack != null) { subFrame.addHeader("ack", ack); } @@ -2425,18 +2454,14 @@ public class StompV12Test extends StompV11TestBase { } } - private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException { - ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE"); - subFrame.addHeader("id", subId); - - conn.sendFrame(subFrame); - } - - private void unsubscribe(StompClientConnection conn, - String subId, - boolean receipt) throws IOException, InterruptedException { - ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE"); - subFrame.addHeader("id", subId); + private void unsubscribe(StompClientConnection conn, String subId, boolean receipt, boolean durable) throws IOException, InterruptedException { + ClientStompFrame subFrame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE); + if (durable) { + subFrame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, subId); + } + else { + subFrame.addHeader(Stomp.Headers.Unsubscribe.ID, subId); + } if (receipt) { subFrame.addHeader("receipt", "4321"); @@ -2446,11 +2471,21 @@ public class StompV12Test extends StompV11TestBase { if (receipt) { System.out.println("response: " + f); - Assert.assertEquals("RECEIPT", f.getCommand()); - Assert.assertEquals("4321", f.getHeader("receipt-id")); + assertEquals("RECEIPT", f.getCommand()); + assertEquals("4321", f.getHeader("receipt-id")); } } + private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException { + unsubscribe(conn, subId, false, false); + } + + private void unsubscribe(StompClientConnection conn, + String subId, + boolean receipt) throws IOException, InterruptedException { + unsubscribe(conn, subId, receipt, false); + } + protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception { connV12.connect(defUser, defPass);