<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<html>
<head>
<title>ActiveMQ Divert Example</title>
<link rel="stylesheet" type="text/css" href="../common/common.css" />
<link rel="stylesheet" type="text/css" href="../common/prettify.css" />
<script type="text/javascript" src="../common/prettify.js"></script>
</head>
<body onload="prettyPrint()">
<h1>Divert Example</h1>
<p>ActiveMQ diverts allow messages to be transparently "diverted" from one address to another
with just some simple configuration defined on the server side.</p>
<p>Diverts can be defined to be <b>exclusive</b> or <b>non-exclusive</b>.</p>
<p>With an <b>exclusive</b> divert the message is intercepted and does not get sent to the queues originally
bound to that address - it only gets diverted.</p>
<p>With a <b>non-exclusive</b> divert the message continues to go to the queues bound to the address,
but also a <b>copy</b> of the message gets sent to the address specified in the divert. Consequently non-exclusive
diverts can be used to "snoop" on another address</p>
<p>Diverts can also be configured to have an optional filter. If specified then only matching messages
will be diverted.</p>
<p>Diverts can be configured to apply a Transformer. If specified, all diverted messages will have the
opportunity of being transformed by the Transformer.</p>
<p>Diverts are a very sophisticated concept, which when combined with bridges can be used to create
interesting and complex routings. The set of diverts can be thought of as a type of <i>routing table</i>
for messages.</p>
<h2>Example step-by-step</h2>
<p>In this example we will imagine a fictitious company which has two offices; one in London and another in New York.</p>
<p>The company accepts orders for it's products only at it's London office, and also generates price-updates
for it's products, also only from it's London office. However only the New York office is interested in receiving
price updates for New York products. Any prices for New York products need to be forwarded to the New York office.</p>
<p>There is an unreliable WAN linking the London and New York offices.</p>
<p>The company also requires a copy of any order received to be available to be inspected by management.</p>
<p>In order to achieve this, we will create a queue <code>orderQueue</code> on the London server in to which orders arrive.</p>
<p>We will create a topic, <code>spyTopic</code> on the London server, and there will be two subscribers both in London.</p>
<p>We will create a <i>non-exclusive</i> divert on the London server which will siphon off a copy of each order
received to the topic <code>spyTopic</code>.</p>
<p>Here's the xml config for that divert, from <code>activemq-configuration.xml</code></p>
<pre class="prettyprint">
<code>
<divert name="order-divert">
<address>jms.queue.orders</address>
<forwarding-address>jms.topic.spyTopic</forwarding-address>
<exclusive>false</exclusive>
</divert>
</code>
</pre>
<p>For the prices we will create a topic on the London server, <code>priceUpdates</code> to which all price updates
are sent. We will create another topic on the New York server <code>newYorkPriceUpdates</code> to which all New York
price updates need to be forwarded.</p>
<p>Diverts can only be used to divert messages from one <b>local</b> address to another <b>local</b> address
so we cannot divert directly to an address on another server.</p>
<p>Instead we divert to a local <i>store and forward queue</i> they we define in the configuration. This is just a normal queue
that we use for storing messages before forwarding to another node.</p>
<p>Here's the configuration for it:</p>
<pre class="prettyprint">
<code>
<queues>
<queue name="jms.queue.priceForwarding">
<address>jms.queue.priceForwarding</address>
</queue>
</queues>
</code>
</pre>
<p>Here's the configuration for the divert:</p>
<pre class="prettyprint">
<code>
<divert name="prices-divert">
<address>jms.topic.priceUpdates</address>
<forwarding-address>jms.queue.priceForwarding</forwarding-address>
<filter string="office='New York'"/>
<transformer-class-name>org.apache.activemq.jms.example.AddForwardingTimeTransformer</transformer-class-name>
<exclusive>true</exclusive>
</divert>
</code>
</pre>
<p>Note we specify a filter in the divert, so only New York prices get diverted. We also specify a Transformer class
since we are going to insert a header in the message at divert time, recording the time the diversion happened.</p>
<p>And finally we define a bridge that moves messages from the local queue to the address on the New York server.
Bridges move messages from queues to remote addresses and are ideal to use when the target server may be stopped and
started independently, and/or the network might be unreliable. Bridges guarantee once and only once delivery
of messages from their source queues to their target addresses.</p>
<p>Here is the bridge configuration: </p>
<pre class="prettyprint">
<code>
<bridges>
<bridge name="price-forward-bridge">
<queue-name>jms.queue.priceForwarding</queue-name>
<forwarding-address>jms.topic.newYorkPriceUpdates</forwarding-address>
<reconnect-attempts>-1</reconnect-attempts>
<static-connectors>
<connector-ref>newyork-connector</connector-ref>
</static-connectors>
</bridge>
</bridges>
</code>
</pre>
<p><i>To run the example, simply type <code>mvn verify</code> from this directory</i></p>
<ol>
<li>Create an initial context to perform the JNDI lookup on the London server</li>
<pre class="prettyprint">
<code>
initialContext0 = getContext(0);
</code>
</pre>
<li>Look-up the queue orderQueue on the London server - this is the queue any orders are sent to</li>
<pre class="prettyprint">
<code>
Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");
</code>
</pre>
<li>Look-up the topic priceUpdates on the London server- this is the topic that any price updates are sent to</li>
<pre class="prettyprint">
<code>
Topic priceUpdates = (Topic)initialContextLondon.lookup("/topic/priceUpdates");
</code>
</pre>
<li>Look-up the spy topic on the London server- this is what we will use to snoop on any orders</li>
<pre class="prettyprint">
<code>
Topic spyTopic = (Topic)initialContextLondon.lookup("/topic/spyTopic");
</code>
</pre>
<li>Create an initial context to perform the JNDI lookup on the New York server.</li>
<pre class="prettyprint">
<code>
initialContextNewYork = getContext(1);
</code>
</pre>
<li>Look-up the topic newYorkPriceUpdates on the New York server - any price updates sent to priceUpdates on the London server will
be diverted to the queue priceForward on the London server, and a bridge will consume from that queue and forward
them to the address newYorkPriceUpdates on the New York server where they will be distributed to the topic subscribers on
the New York server.
</li>
<pre class="prettyprint">
<code>
Topic newYorkPriceUpdates = (Topic)initialContextNewYork.lookup("/topic/newYorkPriceUpdates");
</code>
</pre>
<li>Perform a lookup on the Connection Factory on the London server</li>
<pre class="prettyprint">
<code>
ConnectionFactory cfLondon = (ConnectionFactory)initialContextLondon.lookup("/ConnectionFactory");
</code>
</pre>
<li>Perform a lookup on the Connection Factory on the New York server.</li>
<pre class="prettyprint">
<code>
ConnectionFactory cfNewYork = (ConnectionFactory)initialContextNewYork.lookup("/ConnectionFactory");
</code>
</pre>
<li>Create a JMS Connection on the London server</li>
<pre class="prettyprint">
<code>
connectionLondon = cfLondon.createConnection();
</code>
</pre>
<li>Create a JMS Connection on the New York server</li>
<pre class="prettyprint">
<code>
connectionNewYork = cfNewYork.createConnection();
</code>
</pre>
<li>Create a JMS Session on the London server.</li>
<pre class="prettyprint">
<code>
Session sessionLondon = connectionLondon.createSession(false, Session.AUTO_ACKNOWLEDGE);
</code>
</pre>
<li>Create a JMS Session on the New York server.</li>
<pre class="prettyprint">
<code>
Session sessionNewYork = connectionNewYork.createSession(false, Session.AUTO_ACKNOWLEDGE);
</code>
</pre>
<li>Create a JMS MessageProducer orderProducer that sends to the queue orderQueue on the London server.</li>
<pre class="prettyprint">
<code>
MessageProducer orderProducer = sessionLondon.createProducer(orderQueue);
/code>
</pre>
<li>Create a JMS MessageProducer priceProducer that sends to the topic priceUpdates on the London server.</li>
<pre class="prettyprint">
<code>
MessageProducer priceProducer = sessionLondon.createProducer(priceUpdates);
/code>
</pre>
<li>Create a JMS subscriber which subscribes to the spyTopic on the London server</li>
<pre class="prettyprint">
<code>
MessageConsumer spySubscriberA = sessionLondon.createConsumer(spyTopic);
</code>
</pre>
<li>Create another JMS subscriber which also subscribes to the spyTopic on the London server</li>
<pre class="prettyprint">
<code>
MessageConsumer spySubscriberB = sessionLondon.createConsumer(spyTopic);
</code>
</pre>
<li>Create a JMS MessageConsumer which consumes orders from the order queue on the London server</li>
<pre class="prettyprint">
<code>
MessageConsumer orderConsumer = sessionLondon.createConsumer(orderQueue);
</code>
</pre>
<li>Create a JMS subscriber which subscribes to the priceUpdates topic on the London server</li>
<pre class="prettyprint">
<code>
MessageConsumer priceUpdatesSubscriberLondon = sessionLondon.createConsumer(priceUpdates);
</code>
</pre>
<li>Create a JMS subscriber which subscribes to the newYorkPriceUpdates topic on the New York server</li>
<pre class="prettyprint">
<code>
MessageConsumer newYorkPriceUpdatesSubscriberA = sessionNewYork.createConsumer(newYorkPriceUpdates);
</code>
</pre>
<li>Create another JMS subscriber which also subscribes to the newYorkPriceUpdates topic on the New York server</li>
<pre class="prettyprint">
<code>
MessageConsumer newYorkPriceUpdatesSubscriberB = sessionNewYork.createConsumer(newYorkPriceUpdates);
</code>
</pre>
<li>Start the connections</li>
<pre class="prettyprint">
<code>
connectionLondon.start();
connectionNewYork.start();
</code>
</pre>
<li>Create an order message</li>
<pre class="prettyprint">
<code>
TextMessage orderMessage = sessionLondon.createTextMessage("This is an order");
</code>
</pre>
<li>Send the order message to the order queue on the London server</li>
<pre class="prettyprint">
<code>
orderProducer.send(orderMessage);
System.out.println("Sent message: " + orderMessage.getText());
</code>
</pre>
<li>The order message is consumed by the orderConsumer on the London server</li>
<pre class="prettyprint">
<code>
TextMessage receivedOrder = (TextMessage)orderConsumer.receive(5000);
System.out.println("Received order: " + receivedOrder.getText());
</code>
</pre>
<li>A copy of the order is also received by the spyTopic subscribers on the London server</li>
<pre class="prettyprint">
<code>
TextMessage spiedOrder1 = (TextMessage)spySubscriberA.receive(5000);
System.out.println("Snooped on order: " + spiedOrder1.getText());
TextMessage spiedOrder2 = (TextMessage)spySubscriberB.receive(5000);
System.out.println("Snooped on order: " + spiedOrder2.getText());
</code>
</pre>
<li>Create and send a price update message, destined for London</li>
<pre class="prettyprint">
<code>
TextMessage priceUpdateMessageLondon = sessionLondon.createTextMessage("This is a price update for London");
priceUpdateMessageLondon.setStringProperty("office", "London");
priceProducer.send(priceUpdateMessageLondon);
</code>
</pre>
<li>The price update *should* be received by the local subscriber since we only divert messages
where office = New York</li>
<pre class="prettyprint">
<code>
TextMessage receivedUpdate = (TextMessage)priceUpdatesSubscriberLondon.receive(2000);
System.out.println("Received price update locally: " + receivedUpdate.getText());
</code>
</pre>
<li>The price update *should not* be received in New York</li>
<pre class="prettyprint">
<code>
TextMessage priceUpdate1 = (TextMessage)newYorkPriceUpdatesSubscriberA.receive(1000);
if (priceUpdate1 != null)
{
return false;
}
System.out.println("Did not received price update in New York, look it's: " + priceUpdate1);
TextMessage priceUpdate2 = (TextMessage)newYorkPriceUpdatesSubscriberB.receive(1000);
if (priceUpdate2 != null)
{
return false;
}
System.out.println("Did not received price update in New York, look it's: " + priceUpdate2);
</code>
</pre>
<li>Create a price update message, destined for New York</li>
<pre class="prettyprint">
<code>
TextMessage priceUpdateMessageNewYork = sessionLondon.createTextMessage("This is a price update for New York");
priceUpdateMessageNewYork.setStringProperty("office", "New York");
</code>
</pre>
<li>Send the price update message to the priceUpdates topic on the London server</li>
<pre class="prettyprint">
<code>
priceProducer.send(priceUpdateMessageNewYork);
</code>
</pre>
<li>The price update *should not* be received by the local subscriber to the priceUpdates topic
since it has been *exclusively* diverted to the priceForward queue, because it has a header saying
it is destined for the New York office</li>
<pre class="prettyprint">
<code>
Message message = priceUpdatesSubscriberLondon.receive(1000);
if (message != null)
{
return false;
}
System.out.println("Didn't receive local price update, look, it's: " + message);
</code>
</pre>
<li>The remote subscribers on server 1 *should* receive a copy of the price update since
it has been diverted to a local priceForward queue which has a bridge consuming from it and which
forwards it to the same address on server 1.
We notice how the forwarded messages have had a special header added by our custom transformer that
we told the divert to use</li>
<pre class="prettyprint">
<code>
priceUpdate1 = (TextMessage)newYorkPriceUpdatesSubscriberA.receive(5000);
System.out.println("Received forwarded price update on server 1: " + priceUpdate1.getText());
System.out.println("Time of forward: " + priceUpdate1.getLongProperty("time_of_forward"));
priceUpdate2 = (TextMessage)newYorkPriceUpdatesSubscriberB.receive(5000);
System.out.println("Received forwarded price update on server 2: " + priceUpdate2.getText());
System.out.println("Time of forward: " + priceUpdate2.getLongProperty("time_of_forward"));
</code>
</pre>
<li>And finally, <b>always</b> remember to close your resources after use, in a <code>finally</code> block.</li>
<pre class="prettyprint">
<code>
finally
{
if (initialContextLondon != null)
{
initialContextLondon.close();
}
if (initialContextNewYork != null)
{
initialContextNewYork.close();
}
if (connectionLondon != null)
{
connectionLondon.close();
}
if (connectionNewYork != null)
{
connectionNewYork.close();
}
}
</code>
</pre>
</ol>
</body>
</html>