<html>
<head>
<title>ActiveMQ Message Redistribution Example</title>
<link rel="stylesheet" type="text/css" href="../common/common.css" />
<link rel="stylesheet" type="text/css" href="../common/prettify.css" />
<script type="text/javascript" src="../common/prettify.js"></script>
</head>
<body onload="prettyPrint()">
<h1>Message Redistribution Example</h1>
<p>This example demonstrates message redistribution between queues with the same name deployed in different
nodes of a cluster.</p>
<p>As demontrated in the clustered queue example, if queues with the same name are deployed on different nodes of
a cluster, ActiveMQ can be configured to load balance messages between the nodes on the server side.</p>
<p>However, if the consumer(s) on a particular node are closed, then messages in the queue at that node can
appear to be stranded, since they have no local consumers.</p>
<p>If this is undesirable, ActiveMQ can be configured to <b>redistribute</b> messages from the node
with no consumers, to nodes where there are consumers. If the consumers have JMS selectors set on them, then they
will only be redistributed to nodes with consumers whose selectors match.</p>
<p>By default, message redistribution is disabled, but can be enabled by specifying some AddressSettings configuration
in either <code>activemq-queues.xml</code> or <code>activemq-configuration.xml</code></p>
<p>Setting <code>redistribution-delay</code> to <code>0</code> will cause redistribution to occur immediately
once there are no more matching consumers on a particular queue instance. Setting it to a positive value > 0 specifies
a delay in milliseconds before attempting to redistribute. The delay is useful in the case that another consumer is
likely to be created on the queue, to avoid unnecessary redistribution.</p>
<p>Here's the relevant snippet from the <code>activemq-queues.xml</code> configuration, which tells the server
to use a redistribution delay of <code>0</code> on any jms queues, i.e. any queues whose name starts with
<code>jms.</code></p>
<pre class="prettyprint">
<code>
<address-setting match="jms.#">
<redistribution-delay>0</redistribution-delay>
</address-setting>
</code>
</pre>
<p>For more information on ActiveMQ load balancing, and clustering in general, please see the clustering
section of the user manual.</p>
<h2>Example step-by-step</h2>
<p><i>To run the example, simply type <code>mvn verify</code> from this directory</i></p>
<ol>
<li>Get an initial context for looking up JNDI from server 0</li>
<pre class="prettyprint">
<code>
ic0 = getContext(0);
</code>
</pre>
<li>Look-up the JMS Queue object from JNDI</li>
<pre class="prettyprint">
<code>Queue queue = (Queue)ic0.lookup("/queue/exampleQueue");</code>
</pre>
<li>Look-up a JMS Connection Factory object from JNDI on server 0</li>
<pre class="prettyprint">
<code>ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");</code>
</pre>
<li>Get an initial context for looking up JNDI from server 1.</li>
<pre class="prettyprint">
<code>ic1 = getContext(1);</code>
</pre>
<li>Look-up a JMS Connection Factory object from JNDI on server 1</li>
<pre class="prettyprint">
<code>ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
</code>
</pre>
<li>We create a JMS Connection connection0 which is a connection to server 0</li>
<pre class="prettyprint">
<code>
connection0 = cf0.createConnection();
</code>
</pre>
<li>We create a JMS Connection connection1 which is a connection to server 1</li>
<pre class="prettyprint">
<code>
connection1 = cf1.createConnection();
</code>
</pre>
<li>We create a JMS Session on server 0, note the session is CLIENT_ACKNOWLEDGE</li>
<pre class="prettyprint">
<code>
Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
</code>
</pre>
<li>We create a JMS Session on server 1, note the session is CLIENT_ACKNOWLEDGE</li>
<pre class="prettyprint">
<code>
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
</code>
</pre>
<li>We start the connections to ensure delivery occurs on them</li>
<pre class="prettyprint">
<code>
connection0.start();
connection1.start();
</code>
</pre>
<li>We create JMS MessageConsumer objects on server 0 and server 1</li>
<pre class="prettyprint">
<code>
MessageConsumer consumer0 = session0.createConsumer(queue);
MessageConsumer consumer1 = session1.createConsumer(queue);
</code>
</pre>
<li>We create a JMS MessageProducer object on server 0.</li>
<pre class="prettyprint">
<code>
MessageProducer producer = session0.createProducer(queue);</code>
</pre>
<li>We send some messages to server 0.</li>
<pre class="prettyprint">
<code>
final int numMessages = 10;
for (int i = 0; i < numMessages; i++)
{
TextMessage message = session0.createTextMessage("This is text message " + i);
producer.send(message);
System.out.println("Sent message: " + message.getText());
}
</code>
</pre>
<li>We now consume those messages on *both* server 0 and server 1.
We note the messages have been distributed between servers in a round robin fashion.
ActiveMQ has <b>load balanced</b> the messages between the available consumers on the different nodes.
ActiveMQ can be configured to always load balance messages to all nodes, or to only balance messages
to nodes which have consumers with no or matching selectors. See the user manual for more details.</li>
JMS Queues implement point-to-point message where each message is only ever consumed by a
maximum of one consumer.
<pre class="prettyprint">
<code>
for (int i = 0; i < numMessages; i += 2)
{
TextMessage message0 = (TextMessage)consumer0.receive(5000);
System.out.println("Got message: " + message0.getText() + " from node 0");
TextMessage message1 = (TextMessage)consumer1.receive(5000);
System.out.println("Got message: " + message1.getText() + " from node 1");
}
</code>
</pre>
<li>We acknowledge the messages consumed on node 0. The sessions are CLIENT_ACKNOWLEDGE so
messages will not get acknowledged until they are explicitly acknowledged.
Note that we <b>do not</b> acknowledge the message consumed on node 1 yet.</li>
<pre class="prettyprint">
<code>
message0.acknowledge();
</code>
</pre>
<li>We now close the session and consumer on node 1. (Closing the session automatically closes the consumer)
</li>
<pre class="prettyprint">
<code>session1.close();</code>
</pre>
<li>Since there is no more consumer on node 1, the messages on node 1 are now stranded (no local consumers)
so ActiveMQ will redistribute them to node 0 so they can be consumed. We consume them from
node 0.</li>
<pre class="prettyprint">
<code>
for (int i = 0; i < numMessages; i += 2)
{
message0 = (TextMessage)consumer0.receive(5000);
System.out.println("Got message: " + message0.getText() + " from node 0");
}
</code>
</pre>
<li>We ack the redistributed messages.</li>
<pre class="prettyprint">
<code>message0.acknowledge();</code>
</pre>
<li>And finally (no pun intended), <b>always</b> remember to close your resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
<pre class="prettyprint">
<code>
finally
{
if (connection0 != null)
{
connection0.close();
}
if (connection1 != null)
{
connection1.close();
}
if (ic0 != null)
{
ic0.close();
}
if (ic1 != null)
{
ic1.close();
}
}
</code>
</pre>
</ol>
</body>
</html>