From d773e8f66b5db2b0fcaa646384c24690d38aaa69 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 10 Apr 2018 10:41:15 -0500 Subject: [PATCH] ARTEMIS-1794 STOMP clients using same addr w/diff routing types --- .../core/protocol/stomp/StompConnection.java | 38 ++++- .../stomp/VersionedStompFrameHandler.java | 12 +- .../stomp/v10/StompFrameHandlerV10.java | 50 +++--- .../tests/integration/stomp/StompTest.java | 155 ++++++++++++++++++ 4 files changed, 221 insertions(+), 34 deletions(-) diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 32e64b3f9b..fbd0107759 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -262,7 +263,7 @@ public final class StompConnection implements RemotingConnection { // TODO this should take a type - send or receive so it knows whether to check the address or the queue public void checkDestination(String destination) throws ActiveMQStompException { - if (!manager.destinationExists(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString())) { + if (!manager.destinationExists(destination)) { throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler); } } @@ -272,28 +273,47 @@ public final class StompConnection implements RemotingConnection { try { SimpleString simpleQueue = SimpleString.toSimpleString(queue); - if (manager.getServer().getAddressInfo(simpleQueue) == null) { - AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue); - - RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType; + AddressInfo addressInfo = manager.getServer().getAddressInfo(simpleQueue); + AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue); + RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType; + boolean checkAnycast = false; + /** + * If the address doesn't exist then it is created if possible. + * If the address does exist but doesn't support the routing-type then the address is updated if possible. + */ + if (addressInfo == null) { if (addressSettings.isAutoCreateAddresses()) { session.createAddress(simpleQueue, effectiveAddressRoutingType, true); } - // only auto create the queue if the address is ANYCAST - if (effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues()) { - session.createQueue(simpleQueue, simpleQueue, routingType == null ? addressSettings.getDefaultQueueRoutingType() : routingType, null, false, true, true); + checkAnycast = true; + } else if (!addressInfo.getRoutingTypes().contains(effectiveAddressRoutingType)) { + if (addressSettings.isAutoCreateAddresses()) { + EnumSet routingTypes = EnumSet.noneOf(RoutingType.class); + for (RoutingType existingRoutingType : addressInfo.getRoutingTypes()) { + routingTypes.add(existingRoutingType); + } + routingTypes.add(effectiveAddressRoutingType); + manager.getServer().updateAddressInfo(simpleQueue, routingTypes); } + + checkAnycast = true; + } + + // only auto create the queue if the address is ANYCAST + if (checkAnycast && effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues()) { + session.createQueue(simpleQueue, simpleQueue, routingType == null ? addressSettings.getDefaultQueueRoutingType() : routingType, null, false, true, true); } } catch (ActiveMQQueueExistsException e) { // ignore } catch (Exception e) { + ActiveMQStompProtocolLogger.LOGGER.debug("Exception while auto-creating destination", e); throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler); } } public void checkRoutingSemantics(String destination, RoutingType routingType) throws ActiveMQStompException { - AddressInfo addressInfo = manager.getServer().getAddressInfo(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination))); + AddressInfo addressInfo = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)); // may be null here if, for example, the management address is being checked if (addressInfo != null) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 3cb5ab87d3..941cee6ab8 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -279,8 +279,16 @@ public abstract class VersionedStompFrameHandler { return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType); } - public String getDestination(StompFrame request) { - return request.getHeader(Headers.Subscribe.DESTINATION); + public String getDestination(StompFrame request) throws ActiveMQStompException { + return getDestination(request, Headers.Subscribe.DESTINATION); + } + + public String getDestination(StompFrame request, String header) throws ActiveMQStompException { + String destination = request.getHeader(header); + if (destination == null) { + return null; + } + return connection.getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString(); } public StompFrame postprocess(StompFrame request) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java index a6785b71e2..2011e0504a 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java @@ -94,32 +94,36 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements @Override public StompFrame onUnsubscribe(StompFrame request) { StompFrame response = null; - String destination = request.getHeader(Stomp.Headers.Unsubscribe.DESTINATION); - String id = request.getHeader(Stomp.Headers.Unsubscribe.ID); - String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME); - if (durableSubscriptionName == null) { - durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME); - } - if (durableSubscriptionName == null) { - durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME); - } - - String subscriptionID = null; - if (id != null) { - subscriptionID = id; - } else { - if (destination == null) { - ActiveMQStompException error = BUNDLE.needIDorDestination().setHandler(this); - response = error.getFrame(); - return response; - } - subscriptionID = "subscription/" + destination; - } - try { + String destination = getDestination(request, Stomp.Headers.Unsubscribe.DESTINATION); + String id = request.getHeader(Stomp.Headers.Unsubscribe.ID); + String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME); + if (durableSubscriptionName == null) { + durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME); + } + if (durableSubscriptionName == null) { + durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME); + } + + String subscriptionID = null; + if (id != null) { + subscriptionID = id; + } else { + if (destination == null) { + ActiveMQStompException error = BUNDLE.needIDorDestination().setHandler(this); + response = error.getFrame(); + return response; + } + subscriptionID = "subscription/" + destination; + } + connection.unsubscribe(subscriptionID, durableSubscriptionName); + } catch (ActiveMQStompException e) { - return e.getFrame(); + response = e.getFrame(); + } catch (Exception e) { + ActiveMQStompException error = BUNDLE.errorHandleSend(e).setHandler(this); + response = error.getFrame(); } return response; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index cd18d0c9f4..de6a11dc96 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -1573,6 +1573,159 @@ public class StompTest extends StompTestBase { conn.disconnect(); } + /** + * This test and testPrefixedAutoCreatedMulticastAndAnycastWithSameName are basically the same but doing the + * operations in opposite order. In this test the anycast subscription is created first. + * @throws Exception + */ + @Test + public void testPrefixedAutoCreatedAnycastAndMulticastWithSameName() throws Exception { + int port = 61614; + + URI uri = createStompClientUri(scheme, hostname, port); + + final String ADDRESS = UUID.randomUUID().toString(); + server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=STOMP&anycastPrefix=/queue/&multicastPrefix=/topic/").start(); + StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri); + conn.connect(defUser, defPass); + + // since this queue doesn't exist the broker should create a new ANYCAST address & queue + String uuid = UUID.randomUUID().toString(); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/queue/" + ADDRESS) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); + frame = conn.sendFrame(frame); + assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + + AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS)); + assertNotNull("No address was created with the name " + ADDRESS, addressInfo); + assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)); + assertFalse(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)); + assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString(ADDRESS))); + + // sending a MULTICAST message should alter the address to support MULTICAST + frame = send(conn, "/topic/" + ADDRESS, null, "Hello World 1", true); + assertFalse(frame.getCommand().equals("ERROR")); + addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS)); + assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)); + assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)); + + // however, no message should be routed to the ANYCAST queue + frame = conn.receiveFrame(1000); + Assert.assertNull(frame); + + // sending a message to the ANYCAST queue, should be received + frame = send(conn, "/queue/" + ADDRESS, null, "Hello World 2", true); + assertFalse(frame.getCommand().equals("ERROR")); + frame = conn.receiveFrame(1000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals("Hello World 2", frame.getBody()); + Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE)); + frame = conn.receiveFrame(1000); + Assert.assertNull(frame); + + unsubscribe(conn, null, "/queue/" + ADDRESS, true, false); + + // now subscribe to the address in a MULTICAST way which will create a MULTICAST queue for the subscription + uuid = UUID.randomUUID().toString(); + frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/" + ADDRESS) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); + frame = conn.sendFrame(frame); + assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + + // send a message which will be routed to the MULTICAST queue + frame = send(conn, "/topic/" + ADDRESS, null, "Hello World 3", true); + assertFalse(frame.getCommand().equals("ERROR")); + + // receive that message on the topic subscription + frame = conn.receiveFrame(1000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals("Hello World 3", frame.getBody()); + Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE)); + frame = conn.receiveFrame(1000); + Assert.assertNull(frame); + + unsubscribe(conn, null, "/topic/" + ADDRESS, true, false); + + conn.disconnect(); + } + + /** + * This test and testPrefixedAutoCreatedMulticastAndAnycastWithSameName are basically the same but doing the + * operations in opposite order. In this test the multicast subscription is created first. + * @throws Exception + */ + @Test + public void testPrefixedAutoCreatedMulticastAndAnycastWithSameName() throws Exception { + int port = 61614; + + URI uri = createStompClientUri(scheme, hostname, port); + + final String ADDRESS = UUID.randomUUID().toString(); + server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=STOMP&anycastPrefix=/queue/&multicastPrefix=/topic/").start(); + StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri); + conn.connect(defUser, defPass); + + // since this queue doesn't exist the broker should create a new MULTICAST address + String uuid = UUID.randomUUID().toString(); + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/" + ADDRESS) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); + frame = conn.sendFrame(frame); + assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + + AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS)); + assertNotNull("No address was created with the name " + ADDRESS, addressInfo); + assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)); + assertFalse(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)); + + // sending an ANYCAST message should alter the address to support ANYCAST and create an ANYCAST queue + frame = send(conn, "/queue/" + ADDRESS, null, "Hello World 1", true); + assertFalse(frame.getCommand().equals("ERROR")); + addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS)); + assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)); + assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)); + assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString(ADDRESS))); + + // however, no message should be routed to the MULTICAST queue + frame = conn.receiveFrame(1000); + Assert.assertNull(frame); + + // sending a message to the MULTICAST queue, should be received + frame = send(conn, "/topic/" + ADDRESS, null, "Hello World 2", true); + assertFalse(frame.getCommand().equals("ERROR")); + frame = conn.receiveFrame(2000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals("Hello World 2", frame.getBody()); + Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE)); + frame = conn.receiveFrame(1000); + Assert.assertNull(frame); + + frame = unsubscribe(conn, null, "/topic/" + ADDRESS, true, false); + assertFalse(frame.getCommand().equals("ERROR")); + + // now subscribe to the address in an ANYCAST way + uuid = UUID.randomUUID().toString(); + frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/queue/" + ADDRESS) + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); + frame = conn.sendFrame(frame); + assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + + // receive that message on the ANYCAST queue + frame = conn.receiveFrame(1000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals("Hello World 1", frame.getBody()); + Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE)); + frame = conn.receiveFrame(2000); + Assert.assertNull(frame); + + unsubscribe(conn, null, "/queue/" + ADDRESS, true, false); + + conn.disconnect(); + } + @Test public void testDotPrefixedSendAndRecieveAnycast() throws Exception { testPrefixedSendAndRecieve("jms.queue.", RoutingType.ANYCAST); @@ -1626,11 +1779,13 @@ public class StompTest extends StompTestBase { @Test public void testMulticastOperationsOnAnycastAddress() throws Exception { + server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false)); testRoutingSemantics(RoutingType.MULTICAST.toString(), getQueuePrefix() + getQueueName()); } @Test public void testAnycastOperationsOnMulticastAddress() throws Exception { + server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false)); testRoutingSemantics(RoutingType.ANYCAST.toString(), getTopicPrefix() + getTopicName()); }