JMS Message Producer Rate Limiting

With ActiveMQ you can specify a maximum send rate at which a JMS MessageProducer will send messages. This can be specified when creating or deploying the connection factory. See activemq-jms.xml

If this value is specified then ActiveMQ will ensure that messages are never produced at a rate higher than specified. This is a form of producer throttling.

Example step-by-step

In this example we specify a producer-max-rate of 50 messages per second in the activemq-jms.xml file when deploying the connection factory:

     
   <connection-factory name="ConnectionFactory">
      <connector-ref connector-name="netty-connector"/>
      <entries>
         <entry name="ConnectionFactory"/>       
      </entries>
      
      <!-- We limit producers created on this connection factory to produce messages at a maximum rate
      of 50 messages per sec -->
      <producer-max-rate>50</producer-max-rate>
      
   </connection-factory>
     
     

We then simply send as many messages as we can in 10 seconds and note how many messages are actually sent.

We note that the number of messages sent per second never exceeds the specified value of 50 messages per second.

To run the example, simply type mvn verify from this directory

  1. Create an initial context to perform the JNDI lookup.
  2.            initialContext = getContext(0);
            
  3. Perfom a lookup on the queue
  4.            Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
            
  5. Perform a lookup on the Connection Factory
  6.            ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
            
  7. Create a JMS Connection
  8.            connection = cf.createConnection();
            
  9. Create a JMS Session
  10.            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
  11. Create a JMS Message Producer
  12.           MessageProducer producer = session.createProducer(queue);
            
  13. Send as many messages as we can in 10 seconds
  14.            
            final long duration = 10000;
    
            int i = 0;
    
            long start = System.currentTimeMillis();
    
            while (System.currentTimeMillis() - start <= duration)
            {
               TextMessage message = session.createTextMessage("This is text message: " + i++);
    
               producer.send(message);
            }
    
            long end = System.currentTimeMillis();
    
            double rate = 1000 * (double)i / (end - start);
    
            System.out.println("We sent " + i + " messages in " + (end - start) + " milliseconds");
    
            System.out.println("Actual send rate was " + rate + " messages per second");                      
               
            
  15. We note that the sending rate doesn't exceed 50 messages per second. Here's some example output from a real run
  16.            
         [java] Will now send as many messages as we can in 10 seconds...
         [java] We sent 500 messages in 10072 milliseconds
         [java] Actual send rate was 49.64257347100874 messages per second
               
            
  17. For good measure we consumer the messages we produced.
  18.            
            MessageConsumer messageConsumer = session.createConsumer(queue);
    
            connection.start();
    
            System.out.println("Now consuming the messages...");
    
            i = 0;
            while (true)
            {
               TextMessage messageReceived = (TextMessage)messageConsumer.receive(5000);
    
               if (messageReceived == null)
               {
                  break;
               }
    
               i++;
            }
    
            System.out.println("Received " + i + " messages");           
               
               
            
  19. Be sure to close our resources!
  20.            
               finally
               {
                  if (initialContext != null)
                  {
                    initialContext.close();
                  }
                  
                  if (connection != null)
                  {
                     connection.close();
                  }
               }