With ActiveMQ you can specify a maximum consume rate at which a JMS MessageConsumer will consume messages.
This can be specified when creating or deploying the connection factory. See activemq-jms.xml
If this value is specified then ActiveMQ will ensure that messages are never consumed at a rate higher than the specified rate. This is a form of consumer throttling.
In this example we specify a consumer-max-rate
of 10
messages per second in the activemq-jms.xml
file when deploying the connection factory:
<connection-factory name="ConnectionFactory">
<connector-ref connector-name="netty-connector"/>
<entries>
<entry name="ConnectionFactory"/>
</entries>
<!-- We limit consumers created on this connection factory to consume messages at a maximum rate
of 10 messages per sec -->
<consumer-max-rate>50</producer-max-rate>
</connection-factory>
We then simply consume as many messages as we can in 10 seconds and note how many messages are actually consumed.
We note that the number of messages consumed per second never exceeds the specified value of 10
messages per second.
To run the example, simply type mvn verify
from this directory
initialContext = getContext(0);
Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
final int numMessages = 150;
for (int i = 0; i < numMessages; i++)
{
TextMessage message = session.createTextMessage("This is text message: " + i);
producer.send(message);
}
final long duration = 10000;
int i = 0;
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= duration)
{
TextMessage message = (TextMessage)consumer.receive(2000);
if (message == null)
{
return false;
}
i++;
}
long end = System.currentTimeMillis();
double rate = 1000 * (double)i / (end - start);
System.out.println("We consumed " + i + " messages in " + (end - start) + " milliseconds");
System.out.println("Actual consume rate was " + rate + " messages per second");
[java] Sent messages
[java] Will now try and consume as many as we can in 10 seconds ...
[java] We consumed 100 messages in 10001 milliseconds
[java] Actual consume rate was 9.99900009999 messages per second
finally
{
if (initialContext != null)
{
initialContext.close();
}
if (connection != null)
{
connection.close();
}
}