439 lines
18 KiB
HTML
439 lines
18 KiB
HTML
<!--
|
|
Licensed to the Apache Software Foundation (ASF) under one
|
|
or more contributor license agreements. See the NOTICE file
|
|
distributed with this work for additional information
|
|
regarding copyright ownership. The ASF licenses this file
|
|
to you under the Apache License, Version 2.0 (the
|
|
"License"); you may not use this file except in compliance
|
|
with the License. You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing,
|
|
software distributed under the License is distributed on an
|
|
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
KIND, either express or implied. See the License for the
|
|
specific language governing permissions and limitations
|
|
under the License.
|
|
-->
|
|
|
|
<html>
|
|
<head>
|
|
<title>ActiveMQ Divert Example</title>
|
|
<link rel="stylesheet" type="text/css" href="../common/common.css" />
|
|
<link rel="stylesheet" type="text/css" href="../common/prettify.css" />
|
|
<script type="text/javascript" src="../common/prettify.js"></script>
|
|
</head>
|
|
<body onload="prettyPrint()">
|
|
<h1>Divert Example</h1>
|
|
|
|
<p>ActiveMQ diverts allow messages to be transparently "diverted" from one address to another
|
|
with just some simple configuration defined on the server side.</p>
|
|
<p>Diverts can be defined to be <b>exclusive</b> or <b>non-exclusive</b>.</p>
|
|
<p>With an <b>exclusive</b> divert the message is intercepted and does not get sent to the queues originally
|
|
bound to that address - it only gets diverted.</p>
|
|
<p>With a <b>non-exclusive</b> divert the message continues to go to the queues bound to the address,
|
|
but also a <b>copy</b> of the message gets sent to the address specified in the divert. Consequently non-exclusive
|
|
diverts can be used to "snoop" on another address</p>
|
|
<p>Diverts can also be configured to have an optional filter. If specified then only matching messages
|
|
will be diverted.</p>
|
|
<p>Diverts can be configured to apply a Transformer. If specified, all diverted messages will have the
|
|
opportunity of being transformed by the Transformer.</p>
|
|
<p>Diverts are a very sophisticated concept, which when combined with bridges can be used to create
|
|
interesting and complex routings. The set of diverts can be thought of as a type of <i>routing table</i>
|
|
for messages.</p>
|
|
|
|
<h2>Example step-by-step</h2>
|
|
<p>In this example we will imagine a fictitious company which has two offices; one in London and another in New York.</p>
|
|
<p>The company accepts orders for it's products only at it's London office, and also generates price-updates
|
|
for it's products, also only from it's London office. However only the New York office is interested in receiving
|
|
price updates for New York products. Any prices for New York products need to be forwarded to the New York office.</p>
|
|
<p>There is an unreliable WAN linking the London and New York offices.</p>
|
|
<p>The company also requires a copy of any order received to be available to be inspected by management.</p>
|
|
<p>In order to achieve this, we will create a queue <code>orderQueue</code> on the London server in to which orders arrive.</p>
|
|
<p>We will create a topic, <code>spyTopic</code> on the London server, and there will be two subscribers both in London.</p>
|
|
<p>We will create a <i>non-exclusive</i> divert on the London server which will siphon off a copy of each order
|
|
received to the topic <code>spyTopic</code>.</p>
|
|
<p>Here's the xml config for that divert, from <code>activemq-configuration.xml</code></p>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
<divert name="order-divert">
|
|
<address>jms.queue.orders</address>
|
|
<forwarding-address>jms.topic.spyTopic</forwarding-address>
|
|
<exclusive>false</exclusive>
|
|
</divert>
|
|
</code>
|
|
</pre>
|
|
<p>For the prices we will create a topic on the London server, <code>priceUpdates</code> to which all price updates
|
|
are sent. We will create another topic on the New York server <code>newYorkPriceUpdates</code> to which all New York
|
|
price updates need to be forwarded.</p>
|
|
<p>Diverts can only be used to divert messages from one <b>local</b> address to another <b>local</b> address
|
|
so we cannot divert directly to an address on another server.</p>
|
|
<p>Instead we divert to a local <i>store and forward queue</i> they we define in the configuration. This is just a normal queue
|
|
that we use for storing messages before forwarding to another node.</p>
|
|
<p>Here's the configuration for it:</p>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
<queues>
|
|
<queue name="jms.queue.priceForwarding">
|
|
<address>jms.queue.priceForwarding</address>
|
|
</queue>
|
|
</queues>
|
|
</code>
|
|
</pre>
|
|
<p>Here's the configuration for the divert:</p>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
<divert name="prices-divert">
|
|
<address>jms.topic.priceUpdates</address>
|
|
<forwarding-address>jms.queue.priceForwarding</forwarding-address>
|
|
<filter string="office='New York'"/>
|
|
<transformer-class-name>org.apache.activemq.jms.example.AddForwardingTimeTransformer</transformer-class-name>
|
|
<exclusive>true</exclusive>
|
|
</divert>
|
|
</code>
|
|
</pre>
|
|
<p>Note we specify a filter in the divert, so only New York prices get diverted. We also specify a Transformer class
|
|
since we are going to insert a header in the message at divert time, recording the time the diversion happened.</p>
|
|
<p>And finally we define a bridge that moves messages from the local queue to the address on the New York server.
|
|
Bridges move messages from queues to remote addresses and are ideal to use when the target server may be stopped and
|
|
started independently, and/or the network might be unreliable. Bridges guarantee once and only once delivery
|
|
of messages from their source queues to their target addresses.</p>
|
|
<p>Here is the bridge configuration: </p>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
<bridges>
|
|
<bridge name="price-forward-bridge">
|
|
<queue-name>jms.queue.priceForwarding</queue-name>
|
|
<forwarding-address>jms.topic.newYorkPriceUpdates</forwarding-address>
|
|
<reconnect-attempts>-1</reconnect-attempts>
|
|
<static-connectors>
|
|
<connector-ref>newyork-connector</connector-ref>
|
|
</static-connectors>
|
|
</bridge>
|
|
</bridges>
|
|
</code>
|
|
</pre>
|
|
|
|
<p><i>To run the example, simply type <code>mvn verify -Pexample</code> from this directory</i></p>
|
|
|
|
<ol>
|
|
<li>Create an initial context to perform the JNDI lookup on the London server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
initialContext0 = getContext(0);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Look-up the queue orderQueue on the London server - this is the queue any orders are sent to</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Look-up the topic priceUpdates on the London server- this is the topic that any price updates are sent to</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
Topic priceUpdates = (Topic)initialContextLondon.lookup("/topic/priceUpdates");
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Look-up the spy topic on the London server- this is what we will use to snoop on any orders</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
Topic spyTopic = (Topic)initialContextLondon.lookup("/topic/spyTopic");
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create an initial context to perform the JNDI lookup on the New York server.</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
initialContextNewYork = getContext(1);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Look-up the topic newYorkPriceUpdates on the New York server - any price updates sent to priceUpdates on the London server will
|
|
be diverted to the queue priceForward on the London server, and a bridge will consume from that queue and forward
|
|
them to the address newYorkPriceUpdates on the New York server where they will be distributed to the topic subscribers on
|
|
the New York server.
|
|
</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
Topic newYorkPriceUpdates = (Topic)initialContextNewYork.lookup("/topic/newYorkPriceUpdates");
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Perform a lookup on the Connection Factory on the London server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
ConnectionFactory cfLondon = (ConnectionFactory)initialContextLondon.lookup("/ConnectionFactory");
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Perform a lookup on the Connection Factory on the New York server.</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
ConnectionFactory cfNewYork = (ConnectionFactory)initialContextNewYork.lookup("/ConnectionFactory");
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create a JMS Connection on the London server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
connectionLondon = cfLondon.createConnection();
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create a JMS Connection on the New York server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
connectionNewYork = cfNewYork.createConnection();
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create a JMS Session on the London server.</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
Session sessionLondon = connectionLondon.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create a JMS Session on the New York server.</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
Session sessionNewYork = connectionNewYork.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create a JMS MessageProducer orderProducer that sends to the queue orderQueue on the London server.</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
MessageProducer orderProducer = sessionLondon.createProducer(orderQueue);
|
|
/code>
|
|
</pre>
|
|
|
|
<li>Create a JMS MessageProducer priceProducer that sends to the topic priceUpdates on the London server.</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
MessageProducer priceProducer = sessionLondon.createProducer(priceUpdates);
|
|
/code>
|
|
</pre>
|
|
|
|
<li>Create a JMS subscriber which subscribes to the spyTopic on the London server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
MessageConsumer spySubscriberA = sessionLondon.createConsumer(spyTopic);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create another JMS subscriber which also subscribes to the spyTopic on the London server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
MessageConsumer spySubscriberB = sessionLondon.createConsumer(spyTopic);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create a JMS MessageConsumer which consumes orders from the order queue on the London server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
MessageConsumer orderConsumer = sessionLondon.createConsumer(orderQueue);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create a JMS subscriber which subscribes to the priceUpdates topic on the London server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
MessageConsumer priceUpdatesSubscriberLondon = sessionLondon.createConsumer(priceUpdates);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create a JMS subscriber which subscribes to the newYorkPriceUpdates topic on the New York server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
MessageConsumer newYorkPriceUpdatesSubscriberA = sessionNewYork.createConsumer(newYorkPriceUpdates);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create another JMS subscriber which also subscribes to the newYorkPriceUpdates topic on the New York server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
MessageConsumer newYorkPriceUpdatesSubscriberB = sessionNewYork.createConsumer(newYorkPriceUpdates);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Start the connections</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
connectionLondon.start();
|
|
|
|
connectionNewYork.start();
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create an order message</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
TextMessage orderMessage = sessionLondon.createTextMessage("This is an order");
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Send the order message to the order queue on the London server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
orderProducer.send(orderMessage);
|
|
|
|
System.out.println("Sent message: " + orderMessage.getText());
|
|
</code>
|
|
</pre>
|
|
|
|
<li>The order message is consumed by the orderConsumer on the London server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
TextMessage receivedOrder = (TextMessage)orderConsumer.receive(5000);
|
|
|
|
System.out.println("Received order: " + receivedOrder.getText());
|
|
</code>
|
|
</pre>
|
|
|
|
<li>A copy of the order is also received by the spyTopic subscribers on the London server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
TextMessage spiedOrder1 = (TextMessage)spySubscriberA.receive(5000);
|
|
|
|
System.out.println("Snooped on order: " + spiedOrder1.getText());
|
|
|
|
TextMessage spiedOrder2 = (TextMessage)spySubscriberB.receive(5000);
|
|
|
|
System.out.println("Snooped on order: " + spiedOrder2.getText());
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create and send a price update message, destined for London</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
TextMessage priceUpdateMessageLondon = sessionLondon.createTextMessage("This is a price update for London");
|
|
|
|
priceUpdateMessageLondon.setStringProperty("office", "London");
|
|
|
|
priceProducer.send(priceUpdateMessageLondon);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>The price update *should* be received by the local subscriber since we only divert messages
|
|
where office = New York</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
TextMessage receivedUpdate = (TextMessage)priceUpdatesSubscriberLondon.receive(2000);
|
|
|
|
System.out.println("Received price update locally: " + receivedUpdate.getText());
|
|
</code>
|
|
</pre>
|
|
|
|
<li>The price update *should not* be received in New York</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
TextMessage priceUpdate1 = (TextMessage)newYorkPriceUpdatesSubscriberA.receive(1000);
|
|
|
|
if (priceUpdate1 != null)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
System.out.println("Did not received price update in New York, look it's: " + priceUpdate1);
|
|
|
|
TextMessage priceUpdate2 = (TextMessage)newYorkPriceUpdatesSubscriberB.receive(1000);
|
|
|
|
if (priceUpdate2 != null)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
System.out.println("Did not received price update in New York, look it's: " + priceUpdate2);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Create a price update message, destined for New York</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
TextMessage priceUpdateMessageNewYork = sessionLondon.createTextMessage("This is a price update for New York");
|
|
|
|
priceUpdateMessageNewYork.setStringProperty("office", "New York");
|
|
</code>
|
|
</pre>
|
|
|
|
<li>Send the price update message to the priceUpdates topic on the London server</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
priceProducer.send(priceUpdateMessageNewYork);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>The price update *should not* be received by the local subscriber to the priceUpdates topic
|
|
since it has been *exclusively* diverted to the priceForward queue, because it has a header saying
|
|
it is destined for the New York office</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
Message message = priceUpdatesSubscriberLondon.receive(1000);
|
|
|
|
if (message != null)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
System.out.println("Didn't receive local price update, look, it's: " + message);
|
|
</code>
|
|
</pre>
|
|
|
|
<li>The remote subscribers on server 1 *should* receive a copy of the price update since
|
|
it has been diverted to a local priceForward queue which has a bridge consuming from it and which
|
|
forwards it to the same address on server 1.
|
|
We notice how the forwarded messages have had a special header added by our custom transformer that
|
|
we told the divert to use</li>
|
|
<pre class="prettyprint">
|
|
<code>
|
|
priceUpdate1 = (TextMessage)newYorkPriceUpdatesSubscriberA.receive(5000);
|
|
|
|
System.out.println("Received forwarded price update on server 1: " + priceUpdate1.getText());
|
|
System.out.println("Time of forward: " + priceUpdate1.getLongProperty("time_of_forward"));
|
|
|
|
priceUpdate2 = (TextMessage)newYorkPriceUpdatesSubscriberB.receive(5000);
|
|
|
|
System.out.println("Received forwarded price update on server 2: " + priceUpdate2.getText());
|
|
System.out.println("Time of forward: " + priceUpdate2.getLongProperty("time_of_forward"));
|
|
</code>
|
|
</pre>
|
|
|
|
|
|
<li>And finally, <b>always</b> remember to close your resources after use, in a <code>finally</code> block.</li>
|
|
|
|
<pre class="prettyprint">
|
|
<code>
|
|
finally
|
|
{
|
|
if (initialContextLondon != null)
|
|
{
|
|
initialContextLondon.close();
|
|
}
|
|
if (initialContextNewYork != null)
|
|
{
|
|
initialContextNewYork.close();
|
|
}
|
|
if (connectionLondon != null)
|
|
{
|
|
connectionLondon.close();
|
|
}
|
|
if (connectionNewYork != null)
|
|
{
|
|
connectionNewYork.close();
|
|
}
|
|
}
|
|
</code>
|
|
</pre>
|
|
|
|
|
|
|
|
</ol>
|
|
</body>
|
|
</html>
|