No Consumer Buffering Example

By default, ActiveMQ consumers buffer messages from the server in a client side buffer before actual delivery actually occurs.

This improves performance since otherwise every time you called receive() or had processed the last message in a MessageListener onMessage() method, the ActiveMQ client would have to go the server to request the next message involving a network round trip for every message reducing performance.

Therefore, by default, ActiveMQ pre-fetches messages into a buffer on each consumer. The total maximum size of messages in bytes that will be buffered on each consumer is determined by the consumer-window-size parameter on the connection factory.

In some cases it is not desirable to buffer any messages on the client side consumer.

An example would be an order queue which had multiple consumers that processed orders from the queue. Each order takes a significant time to process, but each one should be processed in a timely fashion.

If orders were buffered in each consumer, and a new consumer was added that consumer would not be able to process orders which were already in the client side buffer of another consumer.

To turn off client side buffering of messages, set consumer-window-size to zero.

With ActiveMQ you can specify a maximum consume rate at which a JMS MessageConsumer will consume messages. This can be specified when creating or deploying the connection factory. See activemq-jms.xml

Example step-by-step

In this example we specify a consumer-window-size of 0 bytes in the activemq-jms.xml file when deploying the connection factory:

     
   <connection-factory name="ConnectionFactory">
      <connector-ref connector-name="netty-connector"/>
      <entries>
         <entry name="ConnectionFactory"/>       
      </entries>
      
      <!-- We set the consumer window size to 0, which means messages are not buffered at all
      on the client side -->
      <consumer-window-size>0</consumer-window-size>
      
   </connection-factory>
     
     

We create a consumer on a queue and send 10 messages to it. We then create another consumer on the same queue.

We then consume messages from each consumer in a semi-random order. We note that the messages are consumed in the order they were sent.

If the messages had been buffered in each consumer they would not be available to be consumed in an order determined afer delivery.

To run the example, simply type mvn verify -Pexample from this directory

  1. Create an initial context to perform the JNDI lookup.
  2.            initialContext = getContext(0);
            
  3. Perfom a lookup on the queue
  4.            Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
            
  5. Perform a lookup on the Connection Factory
  6.            ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
            
  7. Create a JMS Connection
  8.            connection = cf.createConnection();
            
  9. Create a JMS Session
  10.            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
  11. Create a JMS MessageProducer
  12.           MessageProducer producer = session.createProducer(queue);
            
  13. Create a JMS MessageConsumer
  14.            MessageConsumer consumer1 = session.createConsumer(queue);
            
  15. Start the connection
  16.            
         connection.start();
               
            
  17. Send 10 messages to the queue
  18.            
         final int numMessages = 10;
             
         for (int i = 0; i < numMessages; i++)
         {
            TextMessage message = session.createTextMessage("This is text message: " + i);
    
            producer.send(message);
         }           
               
            
  19. Create another JMS MessageConsumer on the same queue.
  20.            MessageConsumer consumer2 = session.createConsumer(queue);
            
  21. Consume three messages from consumer2
  22.            
       for (int i = 0; i < 3; i++)
       {         
          TextMessage message = (TextMessage)consumer2.receive(2000);
                
          System.out.println("Consumed message from consumer2: " + message.getText());
       }    
               
            
  23. Consume five messages from consumer1
  24.            
       for (int i = 0; i < 5; i++)
       {         
          TextMessage message = (TextMessage)consumer1.receive(2000);
                
          System.out.println("Consumed message from consumer1: " + message.getText());
       }    
               
            
  25. Consume two more messages from consumer2
  26.            
       for (int i = 0; i < 2; i++)
       {         
          TextMessage message = (TextMessage)consumer1.receive(2000);
                
          System.out.println("Consumed message from consumer2: " + message.getText());
       }    
               
            
  27. Be sure to close our resources!
  28.            
               finally
               {
                  if (initialContext != null)
                  {
                    initialContext.close();
                  }
                  
                  if (connection != null)
                  {
                     connection.close();
                  }
               }
            

More information