diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 87925b574d..301b5fc1a2 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -49,14 +49,18 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.reader.MessageUtil; +import org.apache.activemq.artemis.selector.impl.LRUCache; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -70,6 +74,8 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.filter.DestinationFilter; +import org.apache.activemq.filter.DestinationPath; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.state.ProducerState; @@ -128,6 +134,9 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl private final Map prefixes = new HashMap<>(); + private final Map vtConsumerDestinationMatchers = new HashMap<>(); + protected final LRUCache vtDestMapCache = new LRUCache(); + public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; this.server = server; @@ -607,4 +616,70 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl public void setSuppressInternalManagementObjects(boolean suppressInternalManagementObjects) { this.suppressInternalManagementObjects = suppressInternalManagementObjects; } + + public void setVirtualTopicConsumerWildcards(String virtualTopicConsumerWildcards) { + for (String filter : virtualTopicConsumerWildcards.split(",")) { + String[] wildcardLimitPair = filter.split(";"); + vtConsumerDestinationMatchers.put(DestinationFilter.parseFilter(new ActiveMQQueue(wildcardLimitPair[0])), Integer.valueOf(wildcardLimitPair[1])); + } + } + + public void setVirtualTopicConsumerLruCacheMax(int max) { + vtDestMapCache.setMaxCacheSize(max); + } + + public ActiveMQDestination virtualTopicConsumerToFQQN(final ActiveMQDestination destination) { + + if (vtConsumerDestinationMatchers.isEmpty()) { + return destination; + } + + ActiveMQDestination mappedDestination = null; + synchronized (vtDestMapCache) { + mappedDestination = vtDestMapCache.get(destination); + } + + if (mappedDestination != null) { + return mappedDestination; + } + + for (Map.Entry candidate : vtConsumerDestinationMatchers.entrySet()) { + if (candidate.getKey().matches(destination)) { + // convert to matching FQQN + String[] paths = DestinationPath.getDestinationPaths(destination); + StringBuilder fqqn = new StringBuilder(); + int filterPathTerminus = candidate.getValue(); + // address - ie: topic + for (int i = filterPathTerminus; i < paths.length; i++) { + if (i > filterPathTerminus) { + fqqn.append(ActiveMQDestination.PATH_SEPERATOR); + } + fqqn.append(paths[i]); + } + fqqn.append(CompositeAddress.SEPARATOR); + // consumer queue + for (int i = 0; i < filterPathTerminus; i++) { + if (i > 0) { + fqqn.append(ActiveMQDestination.PATH_SEPERATOR); + } + fqqn.append(paths[i]); + } + mappedDestination = new ActiveMQQueue(fqqn.toString()); + break; + } + } + if (mappedDestination == null) { + // cache the identity mapping + mappedDestination = destination; + } + synchronized (vtDestMapCache) { + ActiveMQDestination existing = vtDestMapCache.put(destination, mappedDestination); + if (existing != null) { + // some one beat us to the put, revert + vtDestMapCache.put(destination, existing); + mappedDestination = existing; + } + } + return mappedDestination; + } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index b0eb678183..d284d6cc65 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -173,6 +173,7 @@ public class AMQSession implements SessionCallback { isInternalAddress = connection.isSuppressInternalManagementObjects(); } if (openWireDest.isQueue()) { + openWireDest = protocolManager.virtualTopicConsumerToFQQN(openWireDest); SimpleString queueName = new SimpleString(convertWildcard(openWireDest.getPhysicalName())); if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) { diff --git a/docs/migration-guide/en/VirtualTopics.md b/docs/migration-guide/en/VirtualTopics.md index e0775ad198..0cf406d44f 100644 --- a/docs/migration-guide/en/VirtualTopics.md +++ b/docs/migration-guide/en/VirtualTopics.md @@ -11,12 +11,19 @@ component must be unique to a connection on the broker. This means that the subs not possible to load balance the stream of messages across consumers and quick failover is difficult because the existing connection state on the broker needs to be first disposed. With virtual topics, each subscription's stream of messages is redirected to a queue. + +In Artemis there are two alternatives, the new JMS 2.0 api or direct access to a subscription queue via the FQQN. -JMS2.0 adds the possibility of shared subscriptions with new API's that are fully supported in Artemis. +JMS 2.0 shared subscriptions +---------------------------- +JMS 2.0 adds the possibility of shared subscriptions with new API's that are fully supported in Artemis. + +Fully Qualified Queue name (FQQN) +--------------------------------- Secondly, Artemis uses a queue per topic subscriber model internally and it is possibly to directly address the subscription queue using it's Fully Qualified Queue name (FQQN). -For example, a default 5.x consumer for topic `VirtualTopic.Orders` subscription `A`: +For example, a default 5.x consumer destination for topic `VirtualTopic.Orders` subscription `A`: ``` ... Queue subscriptionQueue = session.createQueue("Consumer.A.VirtualTopic.Orders"); @@ -30,6 +37,19 @@ would be replaced with an Artemis FQQN comprised of the address and queue. session.createConsumer(subscriptionQueue); ``` +This does require modification to the destination name used by consumers which is not ideal. +If OpenWire clients cannot be modified, Artemis supports a virtual topic wildcard filter +mechanism on the openwire protocol handler that will automatically convert the consumer destination into the +corresponding FQQN. +The format is a comma separated list of strings pairs, delimited with a ';'. Each pair identifies a filter to match +the virtual topic consumer destination and an int that specifies the number of path matches that terminate the consumer +queue identity. + +E.g: For the default 5.x virtual topic consumer prefix of ```Consumer.*.``` the url parameter ```virtualTopicConsumerWildcards``` should be: ```Consumer.*.>;2```. +In this way a consumer destination of ```Consumer.A.VirtualTopic.Orders``` will be transformed into a FQQN of +```VirtualTopic.Orders::Consumer.A```. + + Durable topic subscribers in a network of brokers ------------------------------------------------- The store and forward network bridges in 5.x create a durable subscriber per destination. As demand migrates across a diff --git a/docs/user-manual/en/protocols-interoperability.md b/docs/user-manual/en/protocols-interoperability.md index 6114de02f7..d7c22e3816 100644 --- a/docs/user-manual/en/protocols-interoperability.md +++ b/docs/user-manual/en/protocols-interoperability.md @@ -204,6 +204,25 @@ The two parameters are configured on openwire acceptors, via URLs or API. For ex tcp://127.0.0.1:61616?protocols=CORE,AMQP,OPENWIRE;supportAdvisory=true;suppressInternalManagementObjects=false +### Virtual Topic Consumer Destination Translation + +For existing Openwire consumers of virtual topic destinations it is possible to configure a mapping function +that will translate the virtual topic consumer destination into a FQQN address. This address then represents +the consumer as a multicast binding to an address representing the virtual topic. + +The configuration string property ```virtualTopicConsumerWildcards``` has two parts seperated by a ```;```. +The first is the 5.x style destination filter that identifies the destination as belonging to a virtual topic. +The second identifies the number of ```paths``` that identify the consumer queue such that it can be parsed from the +destination. +For example, the default 5.x virtual topic with consumer prefix of ```Consumer.*.```, would require a +```virtualTopicConsumerWildcards``` filter of: + + tcp://127.0.0.1:61616?protocols=OPENWIRE;virtualTopicConsumerWildcards=Consumer.*.>;2 + +This will translate ```Consumer.A.VirtualTopic.Orders``` into a FQQN of ```VirtualTopic.Orders::Consumer.A``` using the +int component ```2``` of the configuration to identify the consumer queue as the first two paths of the destination. +```virtualTopicConsumerWildcards``` is multi valued using a ```,``` separator. + ## MQTT MQTT is a light weight, client to server, publish / subscribe messaging protocol. MQTT has been specifically diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerTest.java new file mode 100644 index 0000000000..85b568572e --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerTest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.openwire; + +import org.apache.activemq.artemis.core.server.cluster.ClusterManager; +import org.apache.activemq.artemis.core.server.impl.Activation; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.selector.impl.LRUCache; +import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.apache.activemq.command.ActiveMQDestination; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class OpenWireProtocolManagerTest { + + OpenWireProtocolManager underTest; + LRUCache lruCacheRef; + + @Test + public void testVtAutoConversion() throws Exception { + underTest = new OpenWireProtocolManager(null, new DummyServer()) { + @Override + public ActiveMQDestination virtualTopicConsumerToFQQN(ActiveMQDestination destination) { + if (lruCacheRef == null) { + lruCacheRef = vtDestMapCache; + } + return super.virtualTopicConsumerToFQQN(destination); + } + }; + + final int maxCacheSize = 10; + underTest.setVirtualTopicConsumerLruCacheMax(10); + underTest.setVirtualTopicConsumerWildcards("A.>;1,B.*.>;2,C.*.*.*.EE;3"); + + ActiveMQDestination A = new org.apache.activemq.command.ActiveMQQueue("A.SomeTopic"); + assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic::A"), underTest.virtualTopicConsumerToFQQN(A)); + + ActiveMQDestination B = new org.apache.activemq.command.ActiveMQQueue("B.b.SomeTopic.B"); + assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.B::B.b"), underTest.virtualTopicConsumerToFQQN(B)); + + ActiveMQDestination C = new org.apache.activemq.command.ActiveMQQueue("C.c.c.SomeTopic.EE"); + assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.EE::C.c.c"), underTest.virtualTopicConsumerToFQQN(C)); + + for (int i = 0; i < maxCacheSize; i++) { + ActiveMQDestination identity = new org.apache.activemq.command.ActiveMQQueue("Identity" + i); + assertEquals(identity, underTest.virtualTopicConsumerToFQQN(identity)); + } + + assertFalse(lruCacheRef.containsKey(A)); + } + + static final class DummyServer extends ActiveMQServerImpl { + + @Override + public ClusterManager getClusterManager() { + return new ClusterManager(getExecutorFactory(), this, null, null, null, null, null, false); + } + + @Override + public ExecutorFactory getExecutorFactory() { + return new ExecutorFactory() { + @Override + public ArtemisExecutor getExecutor() { + return null; + } + }; + } + + @Override + public Activation getActivation() { + return new Activation() { + @Override + public void close(boolean permanently, boolean restarting) throws Exception { + + } + + @Override + public void run() { + + } + }; + } + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java new file mode 100644 index 0000000000..34d08bcd5c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.Set; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.junit.Test; + +public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase { + + @Override + protected void extraServerConfig(Configuration serverConfig) { + Set acceptors = server.getConfiguration().getAcceptorConfigurations(); + for (TransportConfiguration tc : acceptors) { + if (tc.getName().equals("netty")) { + tc.getExtraParams().put("virtualTopicConsumerWildcards", "Consumer.*.>;2"); + tc.getExtraParams().put("virtualTopicConsumerLruCacheMax", "10000"); + + } + } + } + + @Test + public void testAutoVirtualTopicFQQN() throws Exception { + Connection connection = null; + + SimpleString topic = new SimpleString("VirtualTopic.Orders"); + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true); + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true); + + try { + ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString); + activeMQConnectionFactory.setWatchTopicAdvisories(false); + connection = activeMQConnectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(topic.toString()); + + MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("Consumer.A." + topic.toString())); + MessageConsumer messageConsumerB = session.createConsumer(session.createQueue("Consumer.B." + topic.toString())); + + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("This is a text message"); + producer.send(message); + + TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000); + TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000); + + assertTrue((messageReceivedA != null && messageReceivedB != null)); + String text = messageReceivedA.getText(); + assertEquals("This is a text message", text); + + messageConsumerA.close(); + messageConsumerB.close(); + + } finally { + if (connection != null) { + connection.close(); + } + } + } +}