diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index bfef98cab6..21e97486fb 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -80,6 +80,8 @@ import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.LongSequenceGenerator; +import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.SELECTOR_AWARE_OPTION; + public class OpenWireProtocolManager implements ProtocolManager, ClusterTopologyListener { private static final List websocketRegistryNames = Collections.EMPTY_LIST; @@ -133,7 +135,29 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl private final Map prefixes = new HashMap<>(); - private final Map vtConsumerDestinationMatchers = new HashMap<>(); + protected class VirtualTopicConfig { + public int filterPathTerminus; + public boolean selectorAware; + + public VirtualTopicConfig(String[] configuration) { + filterPathTerminus = Integer.valueOf(configuration[1]); + // optional config + for (int i = 2; i < configuration.length; i++) { + String[] optionPair = configuration[i].split("="); + consumeOption(optionPair); + } + } + + private void consumeOption(String[] optionPair) { + if (optionPair.length == 2) { + if (SELECTOR_AWARE_OPTION.equals(optionPair[0])) { + selectorAware = Boolean.valueOf(optionPair[1]); + } + } + } + } + + private final Map vtConsumerDestinationMatchers = new HashMap<>(); protected final LRUCache vtDestMapCache = new LRUCache(); public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { @@ -622,8 +646,8 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl public void setVirtualTopicConsumerWildcards(String virtualTopicConsumerWildcards) { for (String filter : virtualTopicConsumerWildcards.split(",")) { - String[] wildcardLimitPair = filter.split(";"); - vtConsumerDestinationMatchers.put(DestinationFilter.parseFilter(new ActiveMQQueue(wildcardLimitPair[0])), Integer.valueOf(wildcardLimitPair[1])); + String[] configuration = filter.split(";"); + vtConsumerDestinationMatchers.put(DestinationFilter.parseFilter(new ActiveMQQueue(configuration[0])), new VirtualTopicConfig(configuration)); } } @@ -646,15 +670,15 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl return mappedDestination; } - for (Map.Entry candidate : vtConsumerDestinationMatchers.entrySet()) { + for (Map.Entry candidate : vtConsumerDestinationMatchers.entrySet()) { if (candidate.getKey().matches(destination)) { // convert to matching FQQN String[] paths = DestinationPath.getDestinationPaths(destination); StringBuilder fqqn = new StringBuilder(); - int filterPathTerminus = candidate.getValue(); + VirtualTopicConfig virtualTopicConfig = candidate.getValue(); // address - ie: topic - for (int i = filterPathTerminus; i < paths.length; i++) { - if (i > filterPathTerminus) { + for (int i = virtualTopicConfig.filterPathTerminus; i < paths.length; i++) { + if (i > virtualTopicConfig.filterPathTerminus) { fqqn.append(ActiveMQDestination.PATH_SEPERATOR); } fqqn.append(paths[i]); @@ -667,7 +691,7 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl } fqqn.append(paths[i]); } - mappedDestination = new ActiveMQQueue(fqqn.toString()); + mappedDestination = new ActiveMQQueue(fqqn.toString() + ( virtualTopicConfig.selectorAware ? "?" + SELECTOR_AWARE_OPTION + "=true" : "" )); break; } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 7aaeabbefe..e963990ce5 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; +import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; @@ -181,7 +182,7 @@ public class AMQSession implements SessionCallback { openWireDest = protocolManager.virtualTopicConsumerToFQQN(openWireDest); SimpleString queueName = new SimpleString(convertWildcard(openWireDest)); - if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) { + if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary(), OpenWireUtil.extractFilterStringOrNull(info, openWireDest))) { throw new InvalidDestinationException("Destination doesn't exist: " + queueName); } } @@ -223,6 +224,10 @@ public class AMQSession implements SessionCallback { } private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary) throws Exception { + return checkAutoCreateQueue(queueName, isTemporary, null); + } + + private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary, String filter) throws Exception { boolean hasQueue = true; if (!connection.containsKnownDestination(queueName)) { @@ -245,7 +250,7 @@ public class AMQSession implements SessionCallback { routingTypeToUse = as.getDefaultAddressRoutingType(); } } - coreSession.createQueue(new QueueConfiguration(queueNameToUse).setAddress(addressToUse).setRoutingType(routingTypeToUse).setTemporary(isTemporary).setAutoCreated(true)); + coreSession.createQueue(new QueueConfiguration(queueNameToUse).setAddress(addressToUse).setRoutingType(routingTypeToUse).setTemporary(isTemporary).setAutoCreated(true).setFilterString(filter)); connection.addKnownDestination(queueName); } else { hasQueue = false; diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java index 3d22647d4a..197e130c67 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; @@ -37,6 +38,19 @@ public class OpenWireUtil { public static final WildcardConfiguration OPENWIRE_WILDCARD = new OpenWireWildcardConfiguration(); + public static final String SELECTOR_AWARE_OPTION = "selectorAware"; + + public static String extractFilterStringOrNull(final ConsumerInfo info, final ActiveMQDestination openWireDest) { + if (info.getSelector() != null) { + if (openWireDest.getOptions() != null) { + if (Boolean.valueOf(openWireDest.getOptions().get(SELECTOR_AWARE_OPTION))) { + return info.getSelector(); + } + } + } + return null; + } + /** * We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the * destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was diff --git a/docs/migration-guide/en/VirtualTopics.md b/docs/migration-guide/en/VirtualTopics.md index b98ac809ee..0e972768f0 100644 --- a/docs/migration-guide/en/VirtualTopics.md +++ b/docs/migration-guide/en/VirtualTopics.md @@ -2,7 +2,7 @@ Virtual Topics ============== Virtual Topics (a specialisation of virtual destinations) in ActiveMQ 5.x typically address two different but related -problems. Lets take each in turn: +problems. Let's take each in turn: Shared access to a JMS durable topic subscription ------------------------------------------------- @@ -20,7 +20,7 @@ JMS 2.0 adds the possibility of shared subscriptions with new API's that are ful Fully Qualified Queue name (FQQN) --------------------------------- -Secondly, Artemis uses a queue per topic subscriber model internally and it is possibly to directly address the +Secondly, Artemis uses a queue per topic subscriber model internally, and it is possibly to directly address the subscription queue using its Fully Qualified Queue name (FQQN). For example, a default 5.x consumer destination for topic `VirtualTopic.Orders` subscription `A`: @@ -42,7 +42,7 @@ If OpenWire clients cannot be modified, Artemis supports a virtual topic wildcar mechanism on the OpenWire protocol handler that will automatically convert the consumer destination into the corresponding FQQN. The format is a comma separated list of strings pairs, delimited with a ';'. Each pair identifies a filter to match -the virtual topic consumer destination and an int that specifies the number of path matches that terminate the consumer +the virtual topic consumer destination, and an int that specifies the number of path matches that terminate the consumer queue identity. E.g: For the default 5.x virtual topic consumer prefix of ```Consumer.*.``` the parameter ```virtualTopicConsumerWildcards``` should be: ```Consumer.*.>;2```. @@ -55,8 +55,8 @@ In this way a consumer destination of ```Consumer.A.VirtualTopic.Orders``` will Durable topic subscribers in a network of brokers ------------------------------------------------- The store and forward network bridges in 5.x create a durable subscriber per destination. As demand migrates across a -network, duplicate durable subs get created on each node in the network but they do not migrate. The end result can +network, duplicate durable subs get created on each node in the network, but they do not migrate. The end result can result in duplicate message storage and ultimately duplicate delivery, which is not good. -When durable subscribers map to virtual topic subscriber queues, the queues can migrate and the problem can be avoided. +When durable subscribers map to virtual topic subscriber queues, the queues can migrate, and the problem can be avoided. In Artemis, because a durable sub is modeled as a queue, this problem does not arise. \ No newline at end of file diff --git a/docs/user-manual/en/openwire.md b/docs/user-manual/en/openwire.md index 9cecfe14ba..b522284111 100644 --- a/docs/user-manual/en/openwire.md +++ b/docs/user-manual/en/openwire.md @@ -35,7 +35,7 @@ are: - `useKeepAlive` - Whether or not to send a KeepAliveInfo on an idle connection to prevent it + Indicates whether to send a KeepAliveInfo on an idle connection to prevent it from timing out. Enabled by default. Disabling the keep alive will still make connections time out if no data was received on the connection for the specified amount of time. @@ -64,13 +64,13 @@ broker side. - `supportAdvisory` - Whether or not the broker supports advisory messages. If the value is true, + Indicates whether the broker supports advisory messages. If the value is true, advisory addresses/queues will be created. If the value is false, no advisory addresses/queues are created. Default value is `true`. - `suppressInternalManagementObjects` - Whether or not the advisory addresses/queues, if any, will be registered to + Indicates whether advisory addresses/queues, if any, will be registered to management service (e.g. JMX registry). If set to true, no advisory addresses/queues will be registered. If set to false, those are registered and will be displayed on the management console. Default value is `true`. @@ -88,12 +88,14 @@ configure a mapping function that will translate the virtual topic consumer destination into a FQQN address. This address will then represents the consumer as a multicast binding to an address representing the virtual topic. -The configuration string property `virtualTopicConsumerWildcards` has two parts -separated by a `;`. The first is the 5.x style destination filter that +The configuration string list property `virtualTopicConsumerWildcards` has parts +separated by a `;`. The first is the classic style destination filter that identifies the destination as belonging to a virtual topic. The second identifies the number of `paths` that identify the consumer queue such that it -can be parsed from the destination. For example, the default 5.x virtual topic -with consumer prefix of `Consumer.*.`, would require a +can be parsed from the destination. Any subsequent parts are additional configuration +parameters for that mapping. + +For example, the default virtual topic with consumer prefix of `Consumer.*.`, would require a `virtualTopicConsumerWildcards` filter of `Consumer.*.>;2`. As a url parameter this transforms to `Consumer.*.%3E%3B2` when the url significant characters `>;` are escaped with their hex code points. In an `acceptor` url it would be: @@ -105,8 +107,13 @@ this transforms to `Consumer.*.%3E%3B2` when the url significant characters This will translate `Consumer.A.VirtualTopic.Orders` into a FQQN of `VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders` using the int component `2` of the configuration to identify the consumer queue as the first two paths of the -destination. `virtualTopicConsumerWildcards` is multi valued using a `,` +destination. `virtualTopicConsumerWildcards` is multi valued using a `,` separator. +### selectorAware +The mappings support an optional parameter, `selectorAware` which when true, transfers any selector information from the +OpenWire consumer into a queue filter of any auto-created subscription queue. Note: the selector/filter is persisted with +the queue binding in the normal way, such that it works independent of connected consumers. + Please see Virtual Topic Mapping example contained in the OpenWire [examples](examples.md). \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java index 2774cd6d07..e060cb36c2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.openwire; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; +import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; @@ -27,8 +28,6 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.command.ActiveMQDestination; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; public class OpenWireProtocolManagerTest extends ActiveMQTestBase { @@ -36,7 +35,7 @@ public class OpenWireProtocolManagerTest extends ActiveMQTestBase { LRUCache lruCacheRef; @Test - public void testVtAutoConversion() throws Exception { + public void testVtAutoConversion() { underTest = new OpenWireProtocolManager(null, new DummyServer()) { @Override public ActiveMQDestination virtualTopicConsumerToFQQN(ActiveMQDestination destination) { @@ -49,17 +48,24 @@ public class OpenWireProtocolManagerTest extends ActiveMQTestBase { final int maxCacheSize = 10; underTest.setVirtualTopicConsumerLruCacheMax(10); - underTest.setVirtualTopicConsumerWildcards("A.>;1,B.*.>;2,C.*.*.*.EE;3"); + underTest.setVirtualTopicConsumerWildcards("A.>;1;selectorAware=true,B.*.>;2,C.*.*.*.EE;3;selectorAware=false"); ActiveMQDestination A = new org.apache.activemq.command.ActiveMQQueue("A.SomeTopic"); assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic::A.SomeTopic"), underTest.virtualTopicConsumerToFQQN(A)); + ActiveMQDestination checkOption = underTest.virtualTopicConsumerToFQQN(A); + assertNotNull(checkOption.getOptions()); + assertTrue(Boolean.parseBoolean(checkOption.getOptions().get(OpenWireUtil.SELECTOR_AWARE_OPTION))); + ActiveMQDestination B = new org.apache.activemq.command.ActiveMQQueue("B.b.SomeTopic.B"); assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.B::B.b.SomeTopic.B"), underTest.virtualTopicConsumerToFQQN(B)); ActiveMQDestination C = new org.apache.activemq.command.ActiveMQQueue("C.c.c.SomeTopic.EE"); assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.EE::C.c.c.SomeTopic.EE"), underTest.virtualTopicConsumerToFQQN(C)); + checkOption = underTest.virtualTopicConsumerToFQQN(C); + assertNull(checkOption.getOptions()); + for (int i = 0; i < maxCacheSize; i++) { ActiveMQDestination identity = new org.apache.activemq.command.ActiveMQQueue("Identity" + i); assertEquals(identity, underTest.virtualTopicConsumerToFQQN(identity)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java index 228c9048ad..26d5940092 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java @@ -25,23 +25,27 @@ import javax.jms.TextMessage; import java.util.Set; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; +import org.apache.activemq.artemis.utils.Wait; import org.junit.Test; public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase { @Override protected void extraServerConfig(Configuration serverConfig) { - Set acceptors = server.getConfiguration().getAcceptorConfigurations(); + Set acceptors = serverConfig.getAcceptorConfigurations(); for (TransportConfiguration tc : acceptors) { if (tc.getName().equals("netty")) { - tc.getExtraParams().put("virtualTopicConsumerWildcards", "Consumer.*.>;2"); + tc.getExtraParams().put("virtualTopicConsumerWildcards", "Consumer.*.>;2,C.*.>;2;selectorAware=true"); tc.getExtraParams().put("virtualTopicConsumerLruCacheMax", "10000"); - } } + serverConfig.setJMXManagementEnabled(true); } @Test @@ -51,6 +55,7 @@ public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase { SimpleString topic = new SimpleString("VirtualTopic.Orders"); this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true); this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true); + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoDeleteQueues(false); try { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString); @@ -222,4 +227,59 @@ public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase { } } } + + + @Test + public void testSelectorAwareVT() throws Exception { + Connection connection = null; + + SimpleString topic = new SimpleString("SVT.Orders.A"); + + this.server.getAddressSettingsRepository().getMatch("SVT.#").setAutoCreateQueues(true); + this.server.getAddressSettingsRepository().getMatch("SVT.#").setAutoCreateAddresses(true); + + try { + ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString); + activeMQConnectionFactory.setWatchTopicAdvisories(false); + connection = activeMQConnectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(topic.toString()); + + MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("C.A." + topic.toString()), "stuff = 'A'"); + MessageConsumer messageConsumerB = session.createConsumer(session.createQueue("C.B." + topic.toString()), "stuff = 'B'"); + + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("This is a text message"); + for (String stuffValue : new String[] {"A", "B", "C"}) { + message.setStringProperty("stuff", stuffValue); + producer.send(message); + } + + TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000); + TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000); + + assertTrue((messageReceivedA != null && messageReceivedB != null)); + String text = messageReceivedA.getText(); + assertEquals("This is a text message", text); + + assertEquals("A", messageReceivedA.getStringProperty("stuff")); + assertEquals("B", messageReceivedB.getStringProperty("stuff")); + + // verify C message got dropped + + final QueueControl queueControlA = ManagementControlHelper.createQueueControl(topic, SimpleString.toSimpleString("C.A." + topic.toString()), RoutingType.MULTICAST, mbeanServer); + Wait.assertEquals(0, () -> queueControlA.countMessages()); + + final QueueControl queueControlB = ManagementControlHelper.createQueueControl(topic, SimpleString.toSimpleString("C.B." + topic.toString()), RoutingType.MULTICAST, mbeanServer); + Wait.assertEquals(0, () -> queueControlB.countMessages()); + + } finally { + if (connection != null) { + connection.close(); + } + } + } + }