From a1a30fb7fda4873e4246103ec6498ba487f9e706 Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 14 Dec 2017 16:30:56 +0000 Subject: [PATCH] [ARTEMIS-550] add section on migrating virtual topics and test to validate --- artemis-website/pom.xml | 9 ++ artemis-website/src/main/resources/index.html | 1 + docs/migration-guide/en/SUMMARY.md | 1 + docs/migration-guide/en/VirtualTopics.md | 40 ++++++++ .../openwire/FQQNOpenWireTest.java | 94 +++++++++++++++++++ 5 files changed, 145 insertions(+) create mode 100644 docs/migration-guide/en/VirtualTopics.md diff --git a/artemis-website/pom.xml b/artemis-website/pom.xml index 1f818a070a..a858928976 100644 --- a/artemis-website/pom.xml +++ b/artemis-website/pom.xml @@ -80,6 +80,7 @@ ${project.artifactId}-${project.version} ${basedir}/target/classes/user-manual ${basedir}/target/classes/hacking-guide + ${basedir}/target/classes/migration-guide 0.0.29 v6.11.0 @@ -191,6 +192,14 @@ + + executing ${gitbook.cmd} + + + + + + diff --git a/artemis-website/src/main/resources/index.html b/artemis-website/src/main/resources/index.html index 25d623152c..c44d9ad2b5 100644 --- a/artemis-website/src/main/resources/index.html +++ b/artemis-website/src/main/resources/index.html @@ -55,6 +55,7 @@
  • API
  • User Manual
  • Hacking Guide
  • +
  • Migration Guide
  • Examples
  • Apache ActiveMQ Artemis Website
  • diff --git a/docs/migration-guide/en/SUMMARY.md b/docs/migration-guide/en/SUMMARY.md index 21c9eedd6a..3c30dd3898 100644 --- a/docs/migration-guide/en/SUMMARY.md +++ b/docs/migration-guide/en/SUMMARY.md @@ -3,6 +3,7 @@ * [Configuration](configuration.md) * [Connectors](connectors.md) * [Destinations](destinations.md) +* [Virtual Topics](VirtualTopics.md) * [Authentication](authentication.md) * [Authorization](authorization.md) * [SSL](ssl.md) diff --git a/docs/migration-guide/en/VirtualTopics.md b/docs/migration-guide/en/VirtualTopics.md new file mode 100644 index 0000000000..e0775ad198 --- /dev/null +++ b/docs/migration-guide/en/VirtualTopics.md @@ -0,0 +1,40 @@ +Virtual Topics +============== + +Virtual Topics (a specialisation of virtual destinations) in ActiveMQ 5.x typically address two different but related +problems. Lets take each in turn: + +Shared access to a JMS durable topic subscription +------------------------------------------------- +With JMS1.1, a durable subscription is identified by the pair of clientId and subscriptionName. The clientId +component must be unique to a connection on the broker. This means that the subscription is exclusive. It is +not possible to load balance the stream of messages across consumers and quick failover is difficult because the +existing connection state on the broker needs to be first disposed. +With virtual topics, each subscription's stream of messages is redirected to a queue. + +JMS2.0 adds the possibility of shared subscriptions with new API's that are fully supported in Artemis. +Secondly, Artemis uses a queue per topic subscriber model internally and it is possibly to directly address the +subscription queue using it's Fully Qualified Queue name (FQQN). + +For example, a default 5.x consumer for topic `VirtualTopic.Orders` subscription `A`: +``` + ... + Queue subscriptionQueue = session.createQueue("Consumer.A.VirtualTopic.Orders"); + session.createConsumer(subscriptionQueue); + +``` +would be replaced with an Artemis FQQN comprised of the address and queue. +``` + ... + Queue subscriptionQueue = session.createQueue("VirtualTopic.Orders::Consumer.A"); + session.createConsumer(subscriptionQueue); +``` + +Durable topic subscribers in a network of brokers +------------------------------------------------- +The store and forward network bridges in 5.x create a durable subscriber per destination. As demand migrates across a +network, duplicate durable subs get created on each node in the network but they do not migrate. The end result can +result in duplicate message storage and ultimately duplicate delivery, which is not good. +When durable subscribers map to virtual topic subscriber queues, the queues can migrate and the problem can be avoided. + +In Artemis, because a durable sub is modeled as a queue, this problem does not arise. \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java index 91f4e24105..ff819ec871 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java @@ -39,7 +39,9 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.utils.CompositeAddress; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -266,4 +268,96 @@ public class FQQNOpenWireTest extends OpenWireTestBase { } } } + + @Test + public void testVirtualTopicFQQN() throws Exception { + Connection exConn = null; + + SimpleString topic = new SimpleString("VirtualTopic.Orders"); + SimpleString subscriptionQ = new SimpleString("Consumer.A"); + + this.server.addAddressInfo(new AddressInfo(topic, RoutingType.MULTICAST)); + this.server.createQueue(topic, RoutingType.MULTICAST, subscriptionQ, null, true, false, -1, false, true); + + try { + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(); + exFact.setWatchTopicAdvisories(false); + exConn = exFact.createConnection(); + exConn.start(); + + Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(topic.toString()); + MessageProducer producer = session.createProducer(destination); + + Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString()); + MessageConsumer messageConsumerA = session.createConsumer(destinationFQN); + MessageConsumer messageConsumerB = session.createConsumer(destinationFQN); + + TextMessage message = session.createTextMessage("This is a text message"); + producer.send(message); + + // only one consumer should get the message + TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000); + TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000); + + assertTrue((messageReceivedA == null || messageReceivedB == null)); + String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText(); + assertEquals("This is a text message", text); + + messageConsumerA.close(); + messageConsumerB.close(); + + } finally { + if (exConn != null) { + exConn.close(); + } + } + } + + @Test + @Ignore("need to figure auto bindings creation") + public void testVirtualTopicFQQNAutoCreate() throws Exception { + Connection exConn = null; + + SimpleString topic = new SimpleString("VirtualTopic.Orders"); + SimpleString subscriptionQ = new SimpleString("Consumer.A"); + + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true); + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setDefaultAddressRoutingType(RoutingType.MULTICAST); + + try { + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(); + exFact.setWatchTopicAdvisories(false); + exConn = exFact.createConnection(); + exConn.start(); + + Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(topic.toString()); + MessageProducer producer = session.createProducer(destination); + + Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString()); + + MessageConsumer messageConsumerA = session.createConsumer(destinationFQN); + MessageConsumer messageConsumerB = session.createConsumer(destinationFQN); + + TextMessage message = session.createTextMessage("This is a text message"); + producer.send(message); + + // only one consumer should get the message + TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000); + TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000); + + assertTrue((messageReceivedA == null || messageReceivedB == null)); + String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText(); + assertEquals("This is a text message", text); + + messageConsumerA.close(); + messageConsumerB.close(); + + } finally { + if (exConn != null) { + exConn.close(); + } + } + } }