diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index ba0abbfe88..2596b15280 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMess import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; @@ -276,7 +275,8 @@ public class StompSession implements SessionCallback { String destination, String selector, String ack) throws Exception { - SimpleString queue = SimpleString.toSimpleString(destination); + SimpleString queueName = SimpleString.toSimpleString(destination); + boolean pubSub = false; int receiveCredits = consumerCredits; if (ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) { receiveCredits = -1; @@ -284,27 +284,27 @@ public class StompSession implements SessionCallback { if (destination.startsWith("jms.topic")) { // subscribes to a topic + pubSub = true; if (durableSubscriptionName != null) { if (clientID == null) { throw BUNDLE.missingClientID(); } - queue = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName); - QueueQueryResult query = session.executeQueueQuery(queue); - if (!query.isExists()) { - session.createQueue(SimpleString.toSimpleString(destination), queue, SimpleString.toSimpleString(selector), false, true); + queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName); + if (manager.getServer().locateQueue(queueName) == null) { + session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector), false, true); } } else { - queue = UUIDGenerator.getInstance().generateSimpleStringUUID(); - session.createQueue(SimpleString.toSimpleString(destination), queue, SimpleString.toSimpleString(selector), true, false); + queueName = UUIDGenerator.getInstance().generateSimpleStringUUID(); + session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector), true, false); } - ((ServerSessionImpl) session).createConsumer(consumerID, queue, null, false, false, receiveCredits); + session.createConsumer(consumerID, queueName, null, false, false, receiveCredits); } else { - ((ServerSessionImpl) session).createConsumer(consumerID, queue, SimpleString.toSimpleString(selector), false, false, receiveCredits); + session.createConsumer(consumerID, queueName, SimpleString.toSimpleString(selector), false, false, receiveCredits); } - StompSubscription subscription = new StompSubscription(subscriptionID, ack); + StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, pubSub); subscriptions.put(consumerID, subscription); session.start(); @@ -320,10 +320,9 @@ public class StompSession implements SessionCallback { StompSubscription sub = entry.getValue(); if (id != null && id.equals(sub.getID())) { iterator.remove(); + SimpleString queueName = sub.getQueueName(); session.closeConsumer(consumerID); - SimpleString queueName = SimpleString.toSimpleString(id); - QueueQueryResult query = session.executeQueueQuery(queueName); - if (query.isExists()) { + if (sub.isPubSub() && manager.getServer().locateQueue(queueName) != null) { session.deleteQueue(queueName); } result = true; @@ -332,8 +331,7 @@ public class StompSession implements SessionCallback { if (!result && durableSubscriptionName != null && clientID != null) { SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName); - QueueQueryResult query = session.executeQueueQuery(queueName); - if (query.isExists()) { + if (manager.getServer().locateQueue(queueName) != null) { session.deleteQueue(queueName); } result = true; diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java index 971af27c8d..a1417adc9f 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.stomp; +import org.apache.activemq.artemis.api.core.SimpleString; + public class StompSubscription { // Constants ----------------------------------------------------- @@ -25,13 +27,20 @@ public class StompSubscription { private final String ack; + private final SimpleString queueName; + + // whether or not this subscription follows publish/subscribe semantics (e.g. for a JMS topic) + private final boolean pubSub; + // Static -------------------------------------------------------- // Constructors -------------------------------------------------- - public StompSubscription(String subID, String ack) { + public StompSubscription(String subID, String ack, SimpleString queueName, boolean pubSub) { this.subID = subID; this.ack = ack; + this.queueName = queueName; + this.pubSub = pubSub; } // Public -------------------------------------------------------- @@ -44,17 +53,17 @@ public class StompSubscription { return subID; } - @Override - public String toString() { - return "StompSubscription[id=" + subID + ", ack=" + ack + "]"; + public SimpleString getQueueName() { + return queueName; } - // Package protected --------------------------------------------- + public boolean isPubSub() { + return pubSub; + } - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- + @Override + public String toString() { + return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", pubSub=" + pubSub + "]"; + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 1c92f42035..951aa85c30 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -1282,6 +1282,7 @@ public class StompTest extends StompTestBase { @Test public void testSubscribeToTopic() throws Exception { + final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length; String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; sendFrame(frame); @@ -1301,6 +1302,19 @@ public class StompTest extends StompTestBase { frame = receiveFrame(10000); Assert.assertTrue(frame.startsWith("RECEIPT")); + assertTrue("Subscription queue should be created here", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) { + return true; + } + else { + return false; + } + } + }, TimeUnit.SECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100))); + sendMessage(getName(), topic); frame = receiveFrame(10000); @@ -1326,6 +1340,74 @@ public class StompTest extends StompTestBase { log.info("Received frame: " + frame); Assert.assertNull("No message should have been received since subscription was removed", frame); + assertEquals("Subscription queue should be deleted", 0, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + sendFrame(frame); + } + + @Test + public void testSubscribeToQueue() throws Exception { + final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length; + + String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + sendFrame(frame); + + frame = receiveFrame(100000); + Assert.assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:" + + getQueuePrefix() + + getQueueName() + + "\n" + + "receipt: 12\n" + + "\n\n" + + Stomp.NULL; + sendFrame(frame); + // wait for SUBSCRIBE's receipt + frame = receiveFrame(10000); + Assert.assertTrue(frame.startsWith("RECEIPT")); + + assertFalse("Queue should not be created here", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) { + return true; + } + else { + return false; + } + } + }, TimeUnit.MILLISECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100))); + + sendMessage(getName(), queue); + + frame = receiveFrame(10000); + Assert.assertTrue(frame.startsWith("MESSAGE")); + Assert.assertTrue(frame.indexOf("destination:") > 0); + Assert.assertTrue(frame.indexOf(getName()) > 0); + + frame = "UNSUBSCRIBE\n" + "destination:" + + getQueuePrefix() + + getQueueName() + + "\n" + + "receipt: 1234\n" + + "\n\n" + + Stomp.NULL; + sendFrame(frame); + // wait for UNSUBSCRIBE's receipt + frame = receiveFrame(10000); + Assert.assertTrue(frame.startsWith("RECEIPT")); + + sendMessage(getName(), queue); + + frame = receiveFrame(1000); + log.info("Received frame: " + frame); + Assert.assertNull("No message should have been received since subscription was removed", frame); + + assertEquals("Subscription queue should not be deleted", baselineQueueCount, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length); + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; sendFrame(frame); }