diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index a8e613e406..14efb79011 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -203,14 +203,6 @@ public class TransportConstants { public static final String STOMP_MIN_LARGE_MESSAGE_SIZE = "stomp-min-large-message-size"; - public static final String STOMP_ANYCAST_PREFIX = "stompAnycastPrefix"; - - public static final String DEFAULT_STOMP_ANYCAST_PREFIX = ""; - - public static final String STOMP_MULTICAST_PREFIX = "stompMulticastPrefix"; - - public static final String DEFAULT_STOMP_MULTICAST_PREFIX = ""; - public static final String NETTY_CONNECT_TIMEOUT = "connect-timeout-millis"; public static final int DEFAULT_NETTY_CONNECT_TIMEOUT = -1; @@ -250,8 +242,6 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION); allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT); allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE); - allowableAcceptorKeys.add(TransportConstants.STOMP_ANYCAST_PREFIX); - allowableAcceptorKeys.add(TransportConstants.STOMP_MULTICAST_PREFIX); allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL); allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MAX); allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MIN); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 3f148f3180..5dafe6075c 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -90,10 +90,6 @@ public final class StompConnection implements RemotingConnection { private final int minLargeMessageSize; - private final String anycastPrefix; - - private final String multicastPrefix; - private StompVersions version; private VersionedStompFrameHandler frameHandler; @@ -168,8 +164,6 @@ public final class StompConnection implements RemotingConnection { this.enableMessageID = ConfigurationHelper.getBooleanProperty(TransportConstants.STOMP_ENABLE_MESSAGE_ID, false, acceptorUsed.getConfiguration()); this.minLargeMessageSize = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, acceptorUsed.getConfiguration()); - this.anycastPrefix = ConfigurationHelper.getStringProperty(TransportConstants.STOMP_ANYCAST_PREFIX, TransportConstants.DEFAULT_STOMP_ANYCAST_PREFIX, acceptorUsed.getConfiguration()); - this.multicastPrefix = ConfigurationHelper.getStringProperty(TransportConstants.STOMP_MULTICAST_PREFIX, TransportConstants.DEFAULT_STOMP_MULTICAST_PREFIX, acceptorUsed.getConfiguration()); } @Override @@ -255,14 +249,14 @@ public final class StompConnection implements RemotingConnection { // TODO this should take a type - send or receive so it knows whether to check the address or the queue public void checkDestination(String destination) throws ActiveMQStompException { - if (!manager.destinationExists(destination)) { + if (!manager.destinationExists(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString())) { throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler); } } public boolean autoCreateDestinationIfPossible(String queue, RoutingType routingType) throws ActiveMQStompException { boolean result = false; - ServerSession session = getSession().getSession(); + ServerSession session = getSession().getCoreSession(); try { if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(queue)) == null) { @@ -291,9 +285,9 @@ public final class StompConnection implements RemotingConnection { } public void checkRoutingSemantics(String destination, RoutingType routingType) throws ActiveMQStompException { - Set actualDeliveryModesOfAddres = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingTypes(); - if (routingType != null && !actualDeliveryModesOfAddres.contains(routingType)) { - throw BUNDLE.illegalSemantics(routingType.toString(), actualDeliveryModesOfAddres.toString()); + Set actualDeliveryModesOfAddress = manager.getServer().getAddressInfo(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination))).getRoutingTypes(); + if (routingType != null && !actualDeliveryModesOfAddress.contains(routingType)) { + throw BUNDLE.illegalSemantics(routingType.toString(), actualDeliveryModesOfAddress.toString()); } } @@ -757,14 +751,6 @@ public final class StompConnection implements RemotingConnection { return minLargeMessageSize; } - public String getAnycastPrefix() { - return anycastPrefix; - } - - public String getMulticastPrefix() { - return multicastPrefix; - } - public StompProtocolManager getManager() { return manager; } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index aba363434e..54339a4f78 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -261,9 +261,9 @@ public class StompProtocolManager extends AbstractProtocolManager entry = iterator.next(); if (entry.getValue().getConnection() == connection) { - ServerSession serverSession = entry.getValue().getSession(); + ServerSession serverSession = entry.getValue().getCoreSession(); try { serverSession.rollback(true); serverSession.close(false); @@ -355,7 +355,7 @@ public class StompProtocolManager extends AbstractProtocolManager routingTypes = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingTypes(); + Set routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(SimpleString.toSimpleString(destination))).getRoutingTypes(); if (routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST)) { // subscribes to a topic pubSub = true; diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index cdd9e50574..02facd6e26 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -178,6 +178,9 @@ public abstract class VersionedStompFrameHandler { long timestamp = System.currentTimeMillis(); ServerMessageImpl message = connection.createServerMessage(); + if (routingType != null) { + message.putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType()); + } message.setTimestamp(timestamp); message.setAddress(SimpleString.toSimpleString(destination)); StompUtils.copyStandardHeadersFromFrameToMessage(frame, message); @@ -236,25 +239,25 @@ public abstract class VersionedStompFrameHandler { return response; } - public StompFrame onSubscribe(StompFrame request) { + public StompFrame onSubscribe(StompFrame frame) { StompFrame response = null; - String destination = getDestination(request); - - String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR); - String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE); - String id = request.getHeader(Stomp.Headers.Subscribe.ID); - String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME); - if (durableSubscriptionName == null) { - durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME); - } - RoutingType routingType = getRoutingType(request.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), request.getHeader(Headers.Subscribe.DESTINATION)); - boolean noLocal = false; - - if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) { - noLocal = Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL)); - } - try { + String destination = getDestination(frame); + + String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR); + String ack = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE); + String id = frame.getHeader(Stomp.Headers.Subscribe.ID); + String durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME); + if (durableSubscriptionName == null) { + durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME); + } + RoutingType routingType = getRoutingType(frame.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), frame.getHeader(Headers.Subscribe.DESTINATION)); + boolean noLocal = false; + + if (frame.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) { + noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL)); + } + connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType); } catch (ActiveMQStompException e) { response = e.getFrame(); @@ -264,14 +267,7 @@ public abstract class VersionedStompFrameHandler { } public String getDestination(StompFrame request) { - String destination = request.getHeader(Headers.Subscribe.DESTINATION); - if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) { - destination = destination.substring(connection.getMulticastPrefix().length()); - } else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) { - destination = destination.substring(connection.getAnycastPrefix().length()); - } - - return destination; + return request.getHeader(Headers.Subscribe.DESTINATION); } public StompFrame postprocess(StompFrame request) { @@ -344,17 +340,13 @@ public abstract class VersionedStompFrameHandler { connection.destroy(); } - private RoutingType getRoutingType(String typeHeader, String destination) { + private RoutingType getRoutingType(String typeHeader, String destination) throws ActiveMQStompException { // null is valid to return here so we know when the user didn't provide any routing info - RoutingType routingType = null; + RoutingType routingType; if (typeHeader != null) { routingType = RoutingType.valueOf(typeHeader); - } else if (destination != null && !connection.getAnycastPrefix().equals(connection.getMulticastPrefix())) { - if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) { - routingType = RoutingType.MULTICAST; - } else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) { - routingType = RoutingType.ANYCAST; - } + } else { + routingType = connection.getSession().getCoreSession().getAddressAndRoutingType(SimpleString.toSimpleString(destination), null).getB(); } return routingType; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index ef8b9a898e..382b3e364d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -646,7 +646,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active @Override public void createQueue(final String address, final String name, final String routingType) throws Exception { - createQueue(address, name, routingType, true); + createQueue(address, name, true, routingType); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 9559d74b48..fb3bd22af1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -21,6 +21,7 @@ import javax.transaction.xa.Xid; import java.util.List; import java.util.Set; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.persistence.OperationContext; @@ -250,4 +251,36 @@ public interface ServerSession extends SecurityAuth { SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception; AddressInfo getAddress(SimpleString address); + + /** + * Strip the prefix (if it exists) from the address based on the prefixes provided to the ServerSession constructor. + * + * @param address the address to inspect + * @return the canonical (i.e. non-prefixed) address name + */ + SimpleString removePrefix(SimpleString address); + + /** + * Get the canonical (i.e. non-prefixed) address and the corresponding routing-type. + * + * @param address the address to inspect + * @param defaultRoutingType the {@code org.apache.activemq.artemis.core.server.RoutingType} to return if no prefix + * match is found. + * @return a {@code org.apache.activemq.artemis.api.core.Pair} representing the canonical (i.e. non-prefixed) address + * name and the {@code org.apache.activemq.artemis.core.server.RoutingType} corresponding to the that prefix. + */ + Pair getAddressAndRoutingType(SimpleString address, RoutingType defaultRoutingType); + + /** + * Get the canonical (i.e. non-prefixed) address and the corresponding routing-type. + * + * @param address the address to inspect + * @param defaultRoutingTypes a the {@code java.util.Set} of {@code org.apache.activemq.artemis.core.server.RoutingType} + * objects to return if no prefix match is found. + * @return a {@code org.apache.activemq.artemis.api.core.Pair} representing the canonical (i.e. non-prefixed) address + * name and the {@code java.util.Set} of {@code org.apache.activemq.artemis.core.server.RoutingType} objects + * corresponding to the that prefix. + */ + Pair> getAddressAndRoutingTypes(SimpleString address, + Set defaultRoutingTypes); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 097118fea6..3d8b9a9917 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -601,7 +601,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { } public void resetNodeManager() throws Exception { - nodeManager.stop(); + if (nodeManager != null) { + nodeManager.stop(); + } nodeManager = createNodeManager(configuration.getJournalLocation(), true); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 49cf471b55..d622f5a548 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1726,14 +1726,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } } - private SimpleString removePrefix(SimpleString address) { + @Override + public SimpleString removePrefix(SimpleString address) { if (prefixEnabled) { return PrefixUtil.getAddress(address, prefixes); } return address; } - private Pair getAddressAndRoutingType(SimpleString address, + @Override + public Pair getAddressAndRoutingType(SimpleString address, RoutingType defaultRoutingType) { if (prefixEnabled) { return PrefixUtil.getAddressAndRoutingType(address, defaultRoutingType, prefixes); @@ -1741,7 +1743,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { return new Pair<>(address, defaultRoutingType); } - private Pair> getAddressAndRoutingTypes(SimpleString address, + @Override + public Pair> getAddressAndRoutingTypes(SimpleString address, Set defaultRoutingTypes) { if (prefixEnabled) { return PrefixUtil.getAddressAndRoutingTypes(address, defaultRoutingTypes, prefixes); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/ReplicationWithDivertTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/ReplicationWithDivertTest.java index 40372086e2..44b5ecf632 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/ReplicationWithDivertTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/ReplicationWithDivertTest.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -98,14 +99,14 @@ public class ReplicationWithDivertTest extends ActiveMQTestBase { backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)). setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)). setLargeMessagesDirectory(getLargeMessagesDir(0, true)); - backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE)); - backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE)); + backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE).setRoutingType(RoutingType.ANYCAST)); + backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE).setRoutingType(RoutingType.ANYCAST)); DivertConfiguration divertConfiguration = new DivertConfiguration().setName("Test").setAddress(SOURCE_QUEUE).setForwardingAddress(TARGET_QUEUE).setRoutingName("Test"); liveConfig = createDefaultInVMConfig(); - liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE).setDurable(true)); - liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE).setDurable(true)); + liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE).setDurable(true).setRoutingType(RoutingType.ANYCAST)); liveConfig.addDivertConfiguration(divertConfiguration); backupConfig.addDivertConfiguration(divertConfiguration); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/JmsProducerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/JmsProducerTest.java index 07ef73c941..d25f41371c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/JmsProducerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/JmsProducerTest.java @@ -30,6 +30,7 @@ import java.util.Random; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQJMSContext; import org.apache.activemq.artemis.jms.client.ActiveMQSession; @@ -95,7 +96,7 @@ public class JmsProducerTest extends JMSTestBase { @Test public void multipleSendsUsingSetters() throws Exception { - server.createQueue(SimpleString.toSimpleString("q1"), SimpleString.toSimpleString("q1"), null, true, false); + server.createQueue(SimpleString.toSimpleString("q1"), RoutingType.ANYCAST, SimpleString.toSimpleString("q1"), null, true, false); Queue q1 = context.createQueue("q1"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 60ce16878f..06e3563100 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -40,12 +40,14 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; @@ -1306,7 +1308,7 @@ public class StompTest extends StompTestBase { final String ADDRESS = UUID.randomUUID().toString(); final String PREFIXED_ADDRESS = prefix + ADDRESS; String param = routingType.toString(); - String urlParam = "stomp" + param.substring(0, 1) + param.substring(1).toLowerCase() + "Prefix"; + String urlParam = param.toLowerCase() + "Prefix"; server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start(); conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); conn.connect(defUser, defPass); @@ -1329,9 +1331,9 @@ public class StompTest extends StompTestBase { AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS)); assertNotNull("No address was created with the name " + ADDRESS, addressInfo); - Set deliveryModest = new HashSet<>(); - deliveryModest.add(RoutingType.valueOf(param)); - assertEquals(deliveryModest, addressInfo.getRoutingTypes()); + Set routingTypes = new HashSet<>(); + routingTypes.add(RoutingType.valueOf(param)); + assertEquals(routingTypes, addressInfo.getRoutingTypes()); conn.disconnect(); } @@ -1360,8 +1362,7 @@ public class StompTest extends StompTestBase { int port = 61614; final String ADDRESS = UUID.randomUUID().toString(); final String PREFIXED_ADDRESS = prefix + ADDRESS; - String param = routingType.toString(); - String urlParam = "stomp" + param.substring(0, 1) + param.substring(1).toLowerCase() + "Prefix"; + String urlParam = routingType.toString().toLowerCase() + "Prefix"; server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start(); conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); conn.connect(defUser, defPass); @@ -1473,4 +1474,72 @@ public class StompTest extends StompTestBase { conn.disconnect(); } + + @Test + public void testAnycastMessageRoutingExclusivity() throws Exception { + conn.connect(defUser, defPass); + + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + + ActiveMQServer activeMQServer = server.getActiveMQServer(); + ActiveMQServerControl serverControl = server.getActiveMQServer().getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + + send(conn, addressA, null, "Hello World!", true, RoutingType.ANYCAST); + + assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount()); + } + + @Test + public void testMulticastMessageRoutingExclusivity() throws Exception { + conn.connect(defUser, defPass); + + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + + ActiveMQServer activeMQServer = server.getActiveMQServer(); + ActiveMQServerControl serverControl = server.getActiveMQServer().getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + + send(conn, addressA, null, "Hello World!", true, RoutingType.MULTICAST); + + assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount()); + assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + } + + @Test + public void testAmbiguousMessageRouting() throws Exception { + conn.connect(defUser, defPass); + + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + final String queueD = "queueD"; + + ActiveMQServer activeMQServer = server.getActiveMQServer(); + ActiveMQServerControl serverControl = server.getActiveMQServer().getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString()); + + send(conn, addressA, null, "Hello World!", true); + + assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount()); + } }