From b812bfdbedb0b5fcf8492c652ca5badb86b8ce9e Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 3 Oct 2018 13:24:42 +0100 Subject: [PATCH] ARTEMIS-2103 - use the full openwire consumer queue for the mapped virtual topic queue binding, fix and test --- .../openwire/OpenWireProtocolManager.java | 4 +- docs/migration-guide/en/VirtualTopics.md | 4 +- docs/user-manual/en/openwire.md | 4 +- .../openwire/virtual-topic-mapping/readme.md | 2 +- .../example/VirtualTopicMappingExample.java | 2 +- .../VirtualTopicToFQQNOpenWireTest.java | 138 ++++++++++++++++++ 6 files changed, 146 insertions(+), 8 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 95a400e5f9..505564d0ed 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -657,8 +657,8 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl fqqn.append(paths[i]); } fqqn.append(CompositeAddress.SEPARATOR); - // consumer queue - for (int i = 0; i < filterPathTerminus; i++) { + // consumer queue - the full vt queue + for (int i = 0; i < paths.length; i++) { if (i > 0) { fqqn.append(ActiveMQDestination.PATH_SEPERATOR); } diff --git a/docs/migration-guide/en/VirtualTopics.md b/docs/migration-guide/en/VirtualTopics.md index 6ad183deff..b98ac809ee 100644 --- a/docs/migration-guide/en/VirtualTopics.md +++ b/docs/migration-guide/en/VirtualTopics.md @@ -33,7 +33,7 @@ For example, a default 5.x consumer destination for topic `VirtualTopic.Orders` would be replaced with an Artemis FQQN comprised of the address and queue. ``` ... - Queue subscriptionQueue = session.createQueue("VirtualTopic.Orders::Consumer.A"); + Queue subscriptionQueue = session.createQueue("VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders"); session.createConsumer(subscriptionQueue); ``` @@ -49,7 +49,7 @@ E.g: For the default 5.x virtual topic consumer prefix of ```Consumer.*.``` the However, there is a caveat because this value needs to be encoded in a uri for the xml configuration. Any unsafe url characters , in this case: ```> ;``` need to be escaped with their hex code point representation; leading to a value of ```Consumer.*.%3E%3B2```. In this way a consumer destination of ```Consumer.A.VirtualTopic.Orders``` will be transformed into a FQQN of -```VirtualTopic.Orders::Consumer.A```. +```VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders```. Durable topic subscribers in a network of brokers diff --git a/docs/user-manual/en/openwire.md b/docs/user-manual/en/openwire.md index 7d24fb8934..9cecfe14ba 100644 --- a/docs/user-manual/en/openwire.md +++ b/docs/user-manual/en/openwire.md @@ -85,7 +85,7 @@ The two parameters are configured on an OpenWire `acceptor`, e.g.: For existing OpenWire consumers of virtual topic destinations it is possible to configure a mapping function that will translate the virtual topic consumer -destination into a FQQN address. This address then represents the consumer as a +destination into a FQQN address. This address will then represents the consumer as a multicast binding to an address representing the virtual topic. The configuration string property `virtualTopicConsumerWildcards` has two parts @@ -103,7 +103,7 @@ this transforms to `Consumer.*.%3E%3B2` when the url significant characters ``` This will translate `Consumer.A.VirtualTopic.Orders` into a FQQN of -`VirtualTopic.Orders::Consumer.A` using the int component `2` of the +`VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders` using the int component `2` of the configuration to identify the consumer queue as the first two paths of the destination. `virtualTopicConsumerWildcards` is multi valued using a `,` separator. diff --git a/examples/protocols/openwire/virtual-topic-mapping/readme.md b/examples/protocols/openwire/virtual-topic-mapping/readme.md index e96724d798..4660492da9 100644 --- a/examples/protocols/openwire/virtual-topic-mapping/readme.md +++ b/examples/protocols/openwire/virtual-topic-mapping/readme.md @@ -18,7 +18,7 @@ Address. The example sends a message to a topic (using openwire protocol) and an openwire consumer listens on the backing queue using the ActiveMQ 5.x virtual topic naming convention. Due to the acceptor url parameter `virtualTopicConsumerWildcards`, (see below), Artemis maps the consumer consuming from `Consumer.A.VirtualTopic.Orders` to actually consume from -FQQN of `VirtualTopic.Orders::Consumer.A` +FQQN of `VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders` ```xml diff --git a/examples/protocols/openwire/virtual-topic-mapping/src/main/java/org/apache/activemq/artemis/jms/example/VirtualTopicMappingExample.java b/examples/protocols/openwire/virtual-topic-mapping/src/main/java/org/apache/activemq/artemis/jms/example/VirtualTopicMappingExample.java index eff7b1840b..fd849b5d96 100644 --- a/examples/protocols/openwire/virtual-topic-mapping/src/main/java/org/apache/activemq/artemis/jms/example/VirtualTopicMappingExample.java +++ b/examples/protocols/openwire/virtual-topic-mapping/src/main/java/org/apache/activemq/artemis/jms/example/VirtualTopicMappingExample.java @@ -31,7 +31,7 @@ import org.apache.activemq.ActiveMQConnectionFactory; * The example sends a message to a topic (using openwire protocol) and an openwire consumer listens on the backing queue * using the ActiveMQ 5.x virtual topic naming convention. Due to the acceptor parameter virtualTopicConsumerWildcards * Artemis maps the consumer consuming from "Consumer.A.VirtualTopic.Orders" to actually consume from - * FQQN "VirtualTopic.Orders::Consumer.A" + * FQQN "VirtualTopic.Orders::Consumer.A.VirtualTopic.Orders" */ public class VirtualTopicMappingExample { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java index 34d08bcd5c..228c9048ad 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java @@ -84,4 +84,142 @@ public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase { } } } + + @Test + public void testTwoTopicSubsSameNameAutoVirtualTopicFQQN() throws Exception { + Connection connection = null; + + SimpleString topic1 = new SimpleString("VirtualTopic.Orders1"); + SimpleString topic2 = new SimpleString("VirtualTopic.Orders2"); + + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true); + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true); + + try { + ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString); + activeMQConnectionFactory.setWatchTopicAdvisories(false); + connection = activeMQConnectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination1 = session.createTopic(topic1.toString()); + Destination destination2 = session.createTopic(topic2.toString()); + + MessageConsumer messageConsumer1 = session.createConsumer(session.createQueue("Consumer.A." + topic1.toString())); + MessageConsumer messageConsumer2 = session.createConsumer(session.createQueue("Consumer.A." + topic2.toString())); + + MessageProducer producer = session.createProducer(null); + TextMessage message = session.createTextMessage("This is a text message to 1"); + producer.send(destination1, message); + message = session.createTextMessage("This is a text message to 2"); + producer.send(destination2, message); + + + TextMessage messageReceived1 = (TextMessage) messageConsumer1.receive(2000); + TextMessage messageReceived2 = (TextMessage) messageConsumer2.receive(2000); + + assertNotNull(messageReceived1); + assertNotNull(messageReceived2); + + String text = messageReceived1.getText(); + assertEquals("This is a text message to 1", text); + + text = messageReceived2.getText(); + assertEquals("This is a text message to 2", text); + + messageConsumer1.close(); + messageConsumer2.close(); + + } finally { + if (connection != null) { + connection.close(); + } + } + } + + + @Test + public void testAutoVirtualTopicWildcardFQQN() throws Exception { + Connection connection = null; + + SimpleString topicA = new SimpleString("VirtualTopic.Orders.A"); + SimpleString topicB = new SimpleString("VirtualTopic.Orders.B"); + SimpleString topic = new SimpleString("VirtualTopic.Orders.>"); + + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true); + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true); + + try { + ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString); + activeMQConnectionFactory.setWatchTopicAdvisories(false); + connection = activeMQConnectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(topicA.toString() + "," + topicB.toString()); + + MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("Consumer.A." + topic.toString())); + // MessageConsumer messageConsumerB = session.createConsumer(session.createQueue("Consumer.B." + topic.toString())); + + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("This is a text message"); + producer.send(message); + + TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000); + TextMessage messageReceivedB = (TextMessage) messageConsumerA.receive(2000); + + assertTrue((messageReceivedA != null && messageReceivedB != null)); + String text = messageReceivedA.getText(); + assertEquals("This is a text message", text); + + messageConsumerA.close(); + + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test + public void testAutoVirtualTopicWildcardStarFQQN() throws Exception { + Connection connection = null; + + SimpleString topicA = new SimpleString("VirtualTopic.Orders.A"); + SimpleString topicB = new SimpleString("VirtualTopic.Orders.B"); + SimpleString topic = new SimpleString("VirtualTopic.Orders.*"); + + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true); + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true); + + try { + ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString); + activeMQConnectionFactory.setWatchTopicAdvisories(false); + connection = activeMQConnectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(topicA.toString() + "," + topicB.toString()); + + MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("Consumer.A." + topic.toString())); + + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("This is a text message"); + producer.send(message); + + TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000); + TextMessage messageReceivedB = (TextMessage) messageConsumerA.receive(2000); + + assertTrue((messageReceivedA != null && messageReceivedB != null)); + String text = messageReceivedA.getText(); + assertEquals("This is a text message", text); + + messageConsumerA.close(); + + } finally { + if (connection != null) { + connection.close(); + } + } + } }