[ARTEMIS-550] add section on migrating virtual topics and test to validate
This commit is contained in:
parent
2d81f2d4bb
commit
a1a30fb7fd
|
@ -80,6 +80,7 @@
|
|||
<webapp-dir>${project.artifactId}-${project.version}</webapp-dir>
|
||||
<webapp-outdir-user-manual>${basedir}/target/classes/user-manual</webapp-outdir-user-manual>
|
||||
<webapp-outdir-hacking-guide>${basedir}/target/classes/hacking-guide</webapp-outdir-hacking-guide>
|
||||
<webapp-outdir-migration-guide>${basedir}/target/classes/migration-guide</webapp-outdir-migration-guide>
|
||||
|
||||
<frontend-maven-plugin-version>0.0.29</frontend-maven-plugin-version>
|
||||
<nodeVersion>v6.11.0</nodeVersion>
|
||||
|
@ -191,6 +192,14 @@
|
|||
<arg value="${basedir}/../docs/hacking-guide/en" />
|
||||
<arg value="${webapp-outdir-hacking-guide}" />
|
||||
</exec>
|
||||
<mkdir dir="${webapp-outdir-migration-guide}" />
|
||||
<echo>executing ${gitbook.cmd}</echo>
|
||||
<exec executable="${gitbook.cmd}" failonerror="true">
|
||||
<env key="PATH" path="${basedir}/node" />
|
||||
<arg value="build" />
|
||||
<arg value="${basedir}/../docs/migration-guide/en" />
|
||||
<arg value="${webapp-outdir-migration-guide}" />
|
||||
</exec>
|
||||
</target>
|
||||
</configuration>
|
||||
<goals>
|
||||
|
|
|
@ -55,6 +55,7 @@
|
|||
<li><a target="_blank" href="api/index.html">API</a></li>
|
||||
<li><a target="_blank" href="user-manual/index.html">User Manual</a></li>
|
||||
<li><a target="_blank" href="hacking-guide/index.html">Hacking Guide</a></li>
|
||||
<li><a target="_blank" href="migration-guide/index.html">Migration Guide</a></li>
|
||||
<li><a href="examples/index.html">Examples</a></li>
|
||||
<li><a href="http://activemq.apache.org/artemis/">Apache ActiveMQ Artemis Website</a></li>
|
||||
</ul>
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
* [Configuration](configuration.md)
|
||||
* [Connectors](connectors.md)
|
||||
* [Destinations](destinations.md)
|
||||
* [Virtual Topics](VirtualTopics.md)
|
||||
* [Authentication](authentication.md)
|
||||
* [Authorization](authorization.md)
|
||||
* [SSL](ssl.md)
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
Virtual Topics
|
||||
==============
|
||||
|
||||
Virtual Topics (a specialisation of virtual destinations) in ActiveMQ 5.x typically address two different but related
|
||||
problems. Lets take each in turn:
|
||||
|
||||
Shared access to a JMS durable topic subscription
|
||||
-------------------------------------------------
|
||||
With JMS1.1, a durable subscription is identified by the pair of clientId and subscriptionName. The clientId
|
||||
component must be unique to a connection on the broker. This means that the subscription is exclusive. It is
|
||||
not possible to load balance the stream of messages across consumers and quick failover is difficult because the
|
||||
existing connection state on the broker needs to be first disposed.
|
||||
With virtual topics, each subscription's stream of messages is redirected to a queue.
|
||||
|
||||
JMS2.0 adds the possibility of shared subscriptions with new API's that are fully supported in Artemis.
|
||||
Secondly, Artemis uses a queue per topic subscriber model internally and it is possibly to directly address the
|
||||
subscription queue using it's Fully Qualified Queue name (FQQN).
|
||||
|
||||
For example, a default 5.x consumer for topic `VirtualTopic.Orders` subscription `A`:
|
||||
```
|
||||
...
|
||||
Queue subscriptionQueue = session.createQueue("Consumer.A.VirtualTopic.Orders");
|
||||
session.createConsumer(subscriptionQueue);
|
||||
|
||||
```
|
||||
would be replaced with an Artemis FQQN comprised of the address and queue.
|
||||
```
|
||||
...
|
||||
Queue subscriptionQueue = session.createQueue("VirtualTopic.Orders::Consumer.A");
|
||||
session.createConsumer(subscriptionQueue);
|
||||
```
|
||||
|
||||
Durable topic subscribers in a network of brokers
|
||||
-------------------------------------------------
|
||||
The store and forward network bridges in 5.x create a durable subscriber per destination. As demand migrates across a
|
||||
network, duplicate durable subs get created on each node in the network but they do not migrate. The end result can
|
||||
result in duplicate message storage and ultimately duplicate delivery, which is not good.
|
||||
When durable subscribers map to virtual topic subscriber queues, the queues can migrate and the problem can be avoided.
|
||||
|
||||
In Artemis, because a durable sub is modeled as a queue, this problem does not arise.
|
|
@ -39,7 +39,9 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
|
|||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
@ -266,4 +268,96 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVirtualTopicFQQN() throws Exception {
|
||||
Connection exConn = null;
|
||||
|
||||
SimpleString topic = new SimpleString("VirtualTopic.Orders");
|
||||
SimpleString subscriptionQ = new SimpleString("Consumer.A");
|
||||
|
||||
this.server.addAddressInfo(new AddressInfo(topic, RoutingType.MULTICAST));
|
||||
this.server.createQueue(topic, RoutingType.MULTICAST, subscriptionQ, null, true, false, -1, false, true);
|
||||
|
||||
try {
|
||||
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
||||
exFact.setWatchTopicAdvisories(false);
|
||||
exConn = exFact.createConnection();
|
||||
exConn.start();
|
||||
|
||||
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createTopic(topic.toString());
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString());
|
||||
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
|
||||
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
|
||||
|
||||
TextMessage message = session.createTextMessage("This is a text message");
|
||||
producer.send(message);
|
||||
|
||||
// only one consumer should get the message
|
||||
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
|
||||
TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
|
||||
|
||||
assertTrue((messageReceivedA == null || messageReceivedB == null));
|
||||
String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText();
|
||||
assertEquals("This is a text message", text);
|
||||
|
||||
messageConsumerA.close();
|
||||
messageConsumerB.close();
|
||||
|
||||
} finally {
|
||||
if (exConn != null) {
|
||||
exConn.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("need to figure auto bindings creation")
|
||||
public void testVirtualTopicFQQNAutoCreate() throws Exception {
|
||||
Connection exConn = null;
|
||||
|
||||
SimpleString topic = new SimpleString("VirtualTopic.Orders");
|
||||
SimpleString subscriptionQ = new SimpleString("Consumer.A");
|
||||
|
||||
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
|
||||
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setDefaultAddressRoutingType(RoutingType.MULTICAST);
|
||||
|
||||
try {
|
||||
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
||||
exFact.setWatchTopicAdvisories(false);
|
||||
exConn = exFact.createConnection();
|
||||
exConn.start();
|
||||
|
||||
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createTopic(topic.toString());
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString());
|
||||
|
||||
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
|
||||
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
|
||||
|
||||
TextMessage message = session.createTextMessage("This is a text message");
|
||||
producer.send(message);
|
||||
|
||||
// only one consumer should get the message
|
||||
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
|
||||
TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
|
||||
|
||||
assertTrue((messageReceivedA == null || messageReceivedB == null));
|
||||
String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText();
|
||||
assertEquals("This is a text message", text);
|
||||
|
||||
messageConsumerA.close();
|
||||
messageConsumerB.close();
|
||||
|
||||
} finally {
|
||||
if (exConn != null) {
|
||||
exConn.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue