diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index c373ff7b2d..786a11467c 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; +import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.PendingTask; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -259,7 +260,8 @@ public class StompSession implements SessionCallback { Set routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes(); boolean multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST); - if (multicast) { + // if the destination is FQQN then the queue will have already been created + if (multicast && !CompositeAddress.isFullyQualified(destination)) { // subscribes to a topic if (durableSubscriptionName != null) { if (clientID == null) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java index d57b5dc3a8..f6922e5ecb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/FQQNStompTest.java @@ -204,7 +204,7 @@ public class FQQNStompTest extends StompTestBase { } @Test - public void testSendFQQNAutoCreateOnSend() throws Exception { + public void testAutoCreateOnSendFQQN() throws Exception { final SimpleString myAddress = SimpleString.toSimpleString("myAddress"); final SimpleString q1Name = SimpleString.toSimpleString("q1"); @@ -224,30 +224,68 @@ public class FQQNStompTest extends StompTestBase { } @Test - public void testSendFQQNAutoCreateOnSubscribe() throws Exception { + public void testAutoCreateOnSubscribeFQQNAnycast() throws Exception { + internalTestAutoCreateOnSubscribeFQQN(RoutingType.ANYCAST); + } + + @Test + public void testAutoCreateOnSubscribeFQQNMulticast() throws Exception { + internalTestAutoCreateOnSubscribeFQQN(RoutingType.MULTICAST); + } + + @Test + public void testAutoCreateOnSubscribeFQQNNoRoutingType() throws Exception { + internalTestAutoCreateOnSubscribeFQQN(null); + } + + private void internalTestAutoCreateOnSubscribeFQQN(RoutingType routingType) throws Exception { final SimpleString myAddress = SimpleString.toSimpleString("myAddress"); final SimpleString q1Name = SimpleString.toSimpleString("q1"); final SimpleString q2Name = SimpleString.toSimpleString("q2"); StompClientConnection consumer1Connection = StompClientConnectionFactory.createClientConnection(uri); consumer1Connection.connect(defUser, defPass); - subscribeQueue(consumer1Connection, "sub-01", myAddress + "\\c\\c" + q1Name); + + ClientStompFrame frame = consumer1Connection + .createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, myAddress + "\\c\\c" + q1Name) + .addHeader(Stomp.Headers.Subscribe.ID, "sub-01") + .addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO); + + if (routingType != null) { + frame.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, routingType.toString()); + } + + consumer1Connection.sendFrame(frame); + + assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000, 100)); StompClientConnection consumer2Connection = StompClientConnectionFactory.createClientConnection(uri); consumer2Connection.connect(defUser, defPass); - subscribeQueue(consumer2Connection, "sub-02", myAddress + "\\c\\c" + q2Name); + + frame = consumer2Connection + .createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, myAddress + "\\c\\c" + q2Name) + .addHeader(Stomp.Headers.Subscribe.ID, "sub-02") + .addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO); + + if (routingType != null) { + frame.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, routingType.toString()); + } + + consumer2Connection.sendFrame(frame); assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name) != null, 2000, 100)); assertTrue(Wait.waitFor(() -> server.locateQueue(q2Name) != null, 2000, 100)); StompClientConnection senderConnection = StompClientConnectionFactory.createClientConnection(uri); senderConnection.connect(defUser, defPass); - send(senderConnection, myAddress + "\\c\\c" + q1Name, null, "Hello World!", false, RoutingType.ANYCAST); + send(senderConnection, myAddress + "\\c\\c" + q1Name, null, "Hello World!", false, routingType); assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessagesAdded() == 1, 2000, 100)); assertTrue(Wait.waitFor(() -> server.locateQueue(q2Name).getMessagesAdded() == 0, 2000, 100)); - ClientStompFrame frame = consumer1Connection.receiveFrame(2000); + frame = consumer1Connection.receiveFrame(2000); assertNotNull(frame); assertEquals("Hello World!", frame.getBody()); assertTrue(Wait.waitFor(() -> server.locateQueue(q1Name).getMessageCount() == 0, 4000, 100));