ActiveMQ diverts allow messages to be transparently "diverted" from one address to another with just some simple configuration defined on the server side.
Diverts can be defined to be exclusive or non-exclusive.
With an exclusive divert the message is intercepted and does not get sent to the queues originally bound to that address - it only gets diverted.
With a non-exclusive divert the message continues to go to the queues bound to the address, but also a copy of the message gets sent to the address specified in the divert. Consequently non-exclusive diverts can be used to "snoop" on another address
Diverts can also be configured to have an optional filter. If specified then only matching messages will be diverted.
Diverts can be configured to apply a Transformer. If specified, all diverted messages will have the opportunity of being transformed by the Transformer.
Diverts are a very sophisticated concept, which when combined with bridges can be used to create interesting and complex routings. The set of diverts can be thought of as a type of routing table for messages.
In this example we will imagine a fictitious company which has two offices; one in London and another in New York.
The company accepts orders for it's products only at it's London office, and also generates price-updates for it's products, also only from it's London office. However only the New York office is interested in receiving price updates for New York products. Any prices for New York products need to be forwarded to the New York office.
There is an unreliable WAN linking the London and New York offices.
The company also requires a copy of any order received to be available to be inspected by management.
In order to achieve this, we will create a queue orderQueue
on the London server in to which orders arrive.
We will create a topic, spyTopic
on the London server, and there will be two subscribers both in London.
We will create a non-exclusive divert on the London server which will siphon off a copy of each order
received to the topic spyTopic
.
Here's the xml config for that divert, from hornetq-configuration.xml
<divert name="order-divert">
<address>jms.queue.orders</address>
<forwarding-address>jms.topic.spyTopic</forwarding-address>
<exclusive>false</exclusive>
</divert>
For the prices we will create a topic on the London server, priceUpdates
to which all price updates
are sent. We will create another topic on the New York server newYorkPriceUpdates
to which all New York
price updates need to be forwarded.
Diverts can only be used to divert messages from one local address to another local address so we cannot divert directly to an address on another server.
Instead we divert to a local store and forward queue they we define in the configuration. This is just a normal queue that we use for storing messages before forwarding to another node.
Here's the configuration for it:
<queues>
<queue name="jms.queue.priceForwarding">
<address>jms.queue.priceForwarding</address>
</queue>
</queues>
Here's the configuration for the divert:
<divert name="prices-divert">
<address>jms.topic.priceUpdates</address>
<forwarding-address>jms.queue.priceForwarding</forwarding-address>
<filter string="office='New York'"/>
<transformer-class-name>org.apache.activemq.jms.example.AddForwardingTimeTransformer</transformer-class-name>
<exclusive>true</exclusive>
</divert>
Note we specify a filter in the divert, so only New York prices get diverted. We also specify a Transformer class since we are going to insert a header in the message at divert time, recording the time the diversion happened.
And finally we define a bridge that moves messages from the local queue to the address on the New York server. Bridges move messages from queues to remote addresses and are ideal to use when the target server may be stopped and started independently, and/or the network might be unreliable. Bridges guarantee once and only once delivery of messages from their source queues to their target addresses.
Here is the bridge configuration:
<bridges>
<bridge name="price-forward-bridge">
<queue-name>jms.queue.priceForwarding</queue-name>
<forwarding-address>jms.topic.newYorkPriceUpdates</forwarding-address>
<reconnect-attempts>-1</reconnect-attempts>
<static-connectors>
<connector-ref>newyork-connector</connector-ref>
</static-connectors>
</bridge>
</bridges>
To run the example, simply type mvn verify
from this directory
initialContext0 = getContext(0);
Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");
Topic priceUpdates = (Topic)initialContextLondon.lookup("/topic/priceUpdates");
Topic spyTopic = (Topic)initialContextLondon.lookup("/topic/spyTopic");
initialContextNewYork = getContext(1);
Topic newYorkPriceUpdates = (Topic)initialContextNewYork.lookup("/topic/newYorkPriceUpdates");
ConnectionFactory cfLondon = (ConnectionFactory)initialContextLondon.lookup("/ConnectionFactory");
ConnectionFactory cfNewYork = (ConnectionFactory)initialContextNewYork.lookup("/ConnectionFactory");
connectionLondon = cfLondon.createConnection();
connectionNewYork = cfNewYork.createConnection();
Session sessionLondon = connectionLondon.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sessionNewYork = connectionNewYork.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer orderProducer = sessionLondon.createProducer(orderQueue);
/code>
MessageProducer priceProducer = sessionLondon.createProducer(priceUpdates);
/code>
MessageConsumer spySubscriberA = sessionLondon.createConsumer(spyTopic);
MessageConsumer spySubscriberB = sessionLondon.createConsumer(spyTopic);
MessageConsumer orderConsumer = sessionLondon.createConsumer(orderQueue);
MessageConsumer priceUpdatesSubscriberLondon = sessionLondon.createConsumer(priceUpdates);
MessageConsumer newYorkPriceUpdatesSubscriberA = sessionNewYork.createConsumer(newYorkPriceUpdates);
MessageConsumer newYorkPriceUpdatesSubscriberB = sessionNewYork.createConsumer(newYorkPriceUpdates);
connectionLondon.start();
connectionNewYork.start();
TextMessage orderMessage = sessionLondon.createTextMessage("This is an order");
orderProducer.send(orderMessage);
System.out.println("Sent message: " + orderMessage.getText());
TextMessage receivedOrder = (TextMessage)orderConsumer.receive(5000);
System.out.println("Received order: " + receivedOrder.getText());
TextMessage spiedOrder1 = (TextMessage)spySubscriberA.receive(5000);
System.out.println("Snooped on order: " + spiedOrder1.getText());
TextMessage spiedOrder2 = (TextMessage)spySubscriberB.receive(5000);
System.out.println("Snooped on order: " + spiedOrder2.getText());
TextMessage priceUpdateMessageLondon = sessionLondon.createTextMessage("This is a price update for London");
priceUpdateMessageLondon.setStringProperty("office", "London");
priceProducer.send(priceUpdateMessageLondon);
TextMessage receivedUpdate = (TextMessage)priceUpdatesSubscriberLondon.receive(2000);
System.out.println("Received price update locally: " + receivedUpdate.getText());
TextMessage priceUpdate1 = (TextMessage)newYorkPriceUpdatesSubscriberA.receive(1000);
if (priceUpdate1 != null)
{
return false;
}
System.out.println("Did not received price update in New York, look it's: " + priceUpdate1);
TextMessage priceUpdate2 = (TextMessage)newYorkPriceUpdatesSubscriberB.receive(1000);
if (priceUpdate2 != null)
{
return false;
}
System.out.println("Did not received price update in New York, look it's: " + priceUpdate2);
TextMessage priceUpdateMessageNewYork = sessionLondon.createTextMessage("This is a price update for New York");
priceUpdateMessageNewYork.setStringProperty("office", "New York");
priceProducer.send(priceUpdateMessageNewYork);
Message message = priceUpdatesSubscriberLondon.receive(1000);
if (message != null)
{
return false;
}
System.out.println("Didn't receive local price update, look, it's: " + message);
priceUpdate1 = (TextMessage)newYorkPriceUpdatesSubscriberA.receive(5000);
System.out.println("Received forwarded price update on server 1: " + priceUpdate1.getText());
System.out.println("Time of forward: " + priceUpdate1.getLongProperty("time_of_forward"));
priceUpdate2 = (TextMessage)newYorkPriceUpdatesSubscriberB.receive(5000);
System.out.println("Received forwarded price update on server 2: " + priceUpdate2.getText());
System.out.println("Time of forward: " + priceUpdate2.getLongProperty("time_of_forward"));
finally
block.
finally
{
if (initialContextLondon != null)
{
initialContextLondon.close();
}
if (initialContextNewYork != null)
{
initialContextNewYork.close();
}
if (connectionLondon != null)
{
connectionLondon.close();
}
if (connectionNewYork != null)
{
connectionNewYork.close();
}
}