With ActiveMQ Artemis you can specify a maximum consume rate at which a JMS MessageConsumer will consume messages.
This can be specified when creating or configuring the connection factory. See jndi.properties
.
If this value is specified then ActiveMQ Artemis will ensure that messages are never consumed at a rate higher than the specified rate. This is a form of consumer throttling.
In this example we specify a consumer-max-rate
of 10
messages per second in the jndi.properties
file when configuring the connection factory:
connectionFactory.ConnectionFactory=tcp://localhost:61616?consumerMaxRate=10
We then simply consume as many messages as we can in 10 seconds and note how many messages are actually consumed.
We note that the number of messages consumed per second never exceeds the specified value of 10
messages per second.
To run the example, simply type mvn verify -Pexample
from this directory
initialContext = getContext(0);
Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
final int numMessages = 150;
for (int i = 0; i < numMessages; i++)
{
TextMessage message = session.createTextMessage("This is text message: " + i);
producer.send(message);
}
final long duration = 10000;
int i = 0;
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= duration)
{
TextMessage message = (TextMessage)consumer.receive(2000);
if (message == null)
{
return false;
}
i++;
}
long end = System.currentTimeMillis();
double rate = 1000 * (double)i / (end - start);
System.out.println("We consumed " + i + " messages in " + (end - start) + " milliseconds");
System.out.println("Actual consume rate was " + rate + " messages per second");
[java] Sent messages
[java] Will now try and consume as many as we can in 10 seconds ...
[java] We consumed 100 messages in 10001 milliseconds
[java] Actual consume rate was 9.99900009999 messages per second
finally
{
if (initialContext != null)
{
initialContext.close();
}
if (connection != null)
{
connection.close();
}
}