This example shows how ActiveMQ would avoid running out of memory resources by paging messages.
A maxSize can be specified per Destination via the destinations settings configuration file (activemq-configuration.xml).
When messages routed to an address exceed the specified maxSize the server will begin to write messages to the file system, this is called paging. This will continue to occur until messages have been delivered to consumers and subsequently acknowledged freeing up memory. Messages will then be read from the file system , i.e. depaged, and routed as normal.
Acknowledgement plays an important factor on paging as messages will stay on the file system until the memory is released so it is important to make sure that the client acknowledges its messages.
To run the example, simply type mvn verify
from this directory
client-jndi.properties
file in the directory ../common/config
InitialContext initialContext = getContext();
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
Queue pageQueue = (Queue) initialContext.lookup("/queue/pagingQueue");
Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");
connection = cf.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer pageMessageProducer = session.createProducer(pageQueue);
pageMessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[10 * 1024]);
for (int i = 0; i < 20; i++)
{
pageMessageProducer.send(message);
}
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < 30000; i++)
{
messageProducer.send(message);
}
// Thread.sleep(30000); // if you want to just our of curiosity, you can sleep here and inspect the created files just for
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < 30000; i++)
{
message = (BytesMessage)messageConsumer.receive(1000);
if (i % 1000 == 0)
{
System.out.println("Received " + i + " messages");
message.acknowledge();
}
}
messageConsumer.close();
messageConsumer = session.createConsumer(pageQueue);
for (int i = 0; i < 20; i++)
{
message = (BytesMessage)messageConsumer.receive(1000);
System.out.println("Received message " + i + " from pageQueue");
message.acknowledge();
}
finally
block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects
finally
{
if (initialContext != null)
{
initialContext.close();
}
if (connection != null)
{
connection.close();
}
}