JMS Expiration Example

This example shows you how to configure ActiveMQ so messages are expipired after a certain time..

Messages can be retained in the messaging system for a limited period of time before being removed. JMS specification states that clients should not receive messages that have been expired (but it does not guarantee this will not happen).

ActiveMQ can assign a expiry address to a given queue so that when messages are expired, they are removed from the queue and routed to an this address. These "expired" messages can later be consumed for further inspection.

The example will send 1 message with a short time-to-live to a queue. We will wait for the message to expire and checks that the message is no longer in the queue it was sent to. We will instead consume it from an expiry queue where it was moved when it expired.

Example setup

Expiry destinations are defined in the configuration file activemq-configuration.xml:

         <address-setting match="jms.queue.exampleQueue">
            <expiry-address>jms.queue.expiryQueue</expiry-address>
         </address-setting>
         
     

This configuration will moved expired messages from the exampleQueue to the expiryQueue

ActiveMQ allows to specify either a Queue by prefixing the expiry-address with jms.queue. or a Topic by prefixing with jms.topic..
In this example, we will use a Queue to hold the expired messages.

Since we want to consume messages from this expiryQueue, we also need to add a JNDI binding to perform a lookup. This is configured in activemq-jms.xml

         <queue name="expiryQueue">
            <entry name="/queue/expiryQueue"/>
         </queue>
     

Example step-by-step

To run the example, simply type mvn verify -Pexample from this directory

  1. First we need to get an initial context so we can look-up the JMS connection factory and destination objects from JNDI. This initial context will get it's properties from the client-jndi.properties file in the directory ../common/config
  2.            InitialContext initialContext = getContext();
            
  3. We look up the JMS queue object from JNDI
  4.            Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");
            
  5. We look up the JMS connection factory object from JNDI
  6.            ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
            
  7. We create a JMS connection
  8.            connection = cf.createConnection();
            
  9. We create a JMS session. The session is created as non transacted and will auto acknowledge messages
  10.            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
  11. We create a JMS message producer on the session. This will be used to send the messages
  12.           MessageProducer messageProducer = session.createProducer(topic);
           
  13. Messages sent by this producer will be retained for 1s (1000ms) before expiration
  14.            producer.setTimeToLive(1000);
           
  15. We create a text messages
  16.             TextMessage message = session.createTextMessage("this is a text message");
            
  17. We send the message to the queue
  18.             producer.send(message);
            
  19. We sleep a little bit to let the message expire
  20.             Thread.sleep(5000);
            

    We will now try to consume the message from the queue but it won't be there since it has expired

  21. We create a JMS message consumer on the queue
  22.             MessageConsumer messageConsumer = session.createConsumer(queue);
            
  23. We start the connection. In order for delivery to occur on any consumers or subscribers on a connection, the connection must be started
  24.            connection.start();
            
  25. We try to receive a message from the queue. Since there is none, the call will timeout after 5000ms and messageReceived will be null
               TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
               System.out.println("Received message from " + queue.getQueueName() + ": " + messageReceived);
            

    However, we have configured ActiveMQ to send any expired messages to the expiryQueue. We will now consume messages from this expiry queue and receives the expired message.

  26. We look up the JMS expiry queue object from JNDI
  27.            Queue expiryQueue = (Queue)initialContext.lookup("/queue/expiryQueue");
            
  28. We create a JMS message consumer on the expiry queue
  29.             MessageConsumer expiryConsumer = session.createConsumer(expiryQueue);
            
  30. We consume a message from the expiry queue:
  31.             messageReceived = (TextMessage)expiryConsumer.receive(5000);
            
  32. The message consumed from the expiry queue has the same content than the message which was sent to the queue
                System.out.println("Received message from " + expiryQueue.getQueueName() + ": " + messageReceived.getText());
            

    JMS does not specify the notion of expiry queue. From JMS point of view, the message received from the expiry queue is a different message than the message expired from the queue: the two messages have the same content (properties and body) but their JMS headers differ.
    ActiveMQ defines additional properties to correlate the message received from the expiry queue with the message expired from the queue

  33. The expired message's destination is the expiry queue
  34.             System.out.println("Destination of the expired message: " + ((Queue)messageReceived.getJMSDestination()).getQueueName());
            
  35. The expired message has its own expiration time (its time to live in the expiry queue)
  36.             System.out.println("Expiration time of the expired message (relative to the expiry queue): " + messageReceived.getJMSExpiration());
            

    As we have not defined a time-to-live for the expiry queue, messages sent to the expiry queue will be kept forever (their JMS Expiration value is 0)

  37. The origin destination is stored in the _HORNETQ_ORIG_DESTINATION property
                System.out.println("*Origin destination* of the expired message: " + messageReceived.getStringProperty("_HORNETQ_ORIG_DESTINATION"));
            
  38. The actual expiration time (when the message was expired from the queue) is stored in the _HORNETQ_ACTUAL_EXPIRY property
                System.out.println("*Actual expiration time* of the expired message: " + messageReceived.getLongProperty("_HORNETQ_ACTUAL_EXPIRY"));
            

  39. And finally, always remember to close your JMS connections and resources after use, in a finally block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects
  40.            finally
               {
                  if (initialContext != null)
                  {
                    initialContext.close();
                  }
                  if (connection != null)
                  {
                     connection.close();
                  }
               }
            

More information