Message Redistribution Example

This example demonstrates message redistribution between queues with the same name deployed in different nodes of a cluster.

As demontrated in the clustered queue example, if queues with the same name are deployed on different nodes of a cluster, ActiveMQ can be configured to load balance messages between the nodes on the server side.

However, if the consumer(s) on a particular node are closed, then messages in the queue at that node can appear to be stranded, since they have no local consumers.

If this is undesirable, ActiveMQ can be configured to redistribute messages from the node with no consumers, to nodes where there are consumers. If the consumers have JMS selectors set on them, then they will only be redistributed to nodes with consumers whose selectors match.

By default, message redistribution is disabled, but can be enabled by specifying some AddressSettings configuration in either activemq-queues.xml or broker.xml

Setting redistribution-delay to 0 will cause redistribution to occur immediately once there are no more matching consumers on a particular queue instance. Setting it to a positive value > 0 specifies a delay in milliseconds before attempting to redistribute. The delay is useful in the case that another consumer is likely to be created on the queue, to avoid unnecessary redistribution.

Here's the relevant snippet from the activemq-queues.xml configuration, which tells the server to use a redistribution delay of 0 on any jms queues, i.e. any queues whose name starts with jms.

     
  <address-setting match="jms.#">
      <redistribution-delay>0</redistribution-delay>
   </address-setting>
   
     

For more information on ActiveMQ load balancing, and clustering in general, please see the clustering section of the user manual.

Example step-by-step

To run the example, simply type mvn verify -Pexample from this directory

  1. Get an initial context for looking up JNDI from server 0
  2.            
       ic0 = getContext(0);
       
            
  3. Look-up the JMS Queue object from JNDI
  4.            Queue queue = (Queue)ic0.lookup("/queue/exampleQueue");
            
  5. Look-up a JMS Connection Factory object from JNDI on server 0
  6.            ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
            
  7. Get an initial context for looking up JNDI from server 1.
  8.            ic1 = getContext(1);
            
  9. Look-up a JMS Connection Factory object from JNDI on server 1
  10.            ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
               
            
  11. We create a JMS Connection connection0 which is a connection to server 0
  12.           
       connection0 = cf0.createConnection();
              
            
  13. We create a JMS Connection connection1 which is a connection to server 1
  14.           
       connection1 = cf1.createConnection();
              
            
  15. We create a JMS Session on server 0, note the session is CLIENT_ACKNOWLEDGE
  16.            
       Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
               
            
  17. We create a JMS Session on server 1, note the session is CLIENT_ACKNOWLEDGE
  18.            
       Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
                
            
  19. We start the connections to ensure delivery occurs on them
  20.            
       connection0.start();
    
       connection1.start();
               
            
  21. We create JMS MessageConsumer objects on server 0 and server 1
  22.            
       MessageConsumer consumer0 = session0.createConsumer(queue);
    
       MessageConsumer consumer1 = session1.createConsumer(queue);
               
            
  23. We create a JMS MessageProducer object on server 0.
  24.            
       MessageProducer producer = session0.createProducer(queue);
            
  25. We send some messages to server 0.
  26.            
    	final int numMessages = 10;
    
    	for (int i = 0; i < numMessages; i++)
    	{
    	   TextMessage message = session0.createTextMessage("This is text message " + i);
    	      
    	   producer.send(message);
    	
    	   System.out.println("Sent message: " + message.getText());
    	}
               
            
  27. We now consume those messages on *both* server 0 and server 1. We note the messages have been distributed between servers in a round robin fashion. ActiveMQ has load balanced the messages between the available consumers on the different nodes. ActiveMQ can be configured to always load balance messages to all nodes, or to only balance messages to nodes which have consumers with no or matching selectors. See the user manual for more details.
  28. JMS Queues implement point-to-point message where each message is only ever consumed by a maximum of one consumer.
               
    	for (int i = 0; i < numMessages; i += 2)
    	{
    	   TextMessage message0 = (TextMessage)consumer0.receive(5000);
    	
    	   System.out.println("Got message: " + message0.getText() + " from node 0");
    	
    	   TextMessage message1 = (TextMessage)consumer1.receive(5000);
    	
    	   System.out.println("Got message: " + message1.getText() + " from node 1");
    	}
               
            
  29. We acknowledge the messages consumed on node 0. The sessions are CLIENT_ACKNOWLEDGE so messages will not get acknowledged until they are explicitly acknowledged. Note that we do not acknowledge the message consumed on node 1 yet.
  30.            
               message0.acknowledge();
               
            
  31. We now close the session and consumer on node 1. (Closing the session automatically closes the consumer)
  32.            session1.close();
            
  33. Since there is no more consumer on node 1, the messages on node 1 are now stranded (no local consumers) so ActiveMQ will redistribute them to node 0 so they can be consumed. We consume them from node 0.
  34.            
               for (int i = 0; i < numMessages; i += 2)
             {
                message0 = (TextMessage)consumer0.receive(5000);
    
                System.out.println("Got message: " + message0.getText() + " from node 0");           
             }
               
            
  35. We ack the redistributed messages.
  36.            message0.acknowledge();
            
  37. And finally (no pun intended), always remember to close your resources after use, in a finally block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects
  38.            
    	finally
    	{
    	      if (connection0 != null)
             {
                connection0.close();
             }
    
             if (connection1 != null)
             {
                connection1.close();
             }
    
             if (ic0 != null)
             {
                ic0.close();
             }
    
             if (ic1 != null)
             {
                ic1.close();
             }
    	}