This example shows you how to send and receive very large messages with ActiveMQ.
ActiveMQ supports the sending and receiving of huge messages, much larger than can fit in available RAM on the client or server. Effectively the only limit to message size is the amount of disk space you have on the server.
Large messages are persisted on the server so they can survive a server restart. In other words ActiveMQ doesn't just do a simple socket stream from the sender to the consumer.
In order to do this ActiveMQ provides an extension to JMS where you can use an InputStream or OutputStream as the source and destination for your messages. You can send messages as large as it would fit in your disk.
You may also choose to read LargeMessages using the regular ByteStream or ByteMessage methods, but using the InputStream and OutputStream will provide you a much better performance
To run the example, simply type mvn verify
from this directory
In this example we limit both the server and the client to be running in a maximum of 50MB of RAM, and we send a message with a body of size 256MB.
ActiveMQ can support much large message sizes but we choose these sizes and limit RAM so the example runs more quickly.
We create a file on disk representing the message body, create a FileInputStream on that file and set that InputStream as the body of the message before sending.
The message is sent, then we stop the server, and restart it. This demonstrates the large message will survive a restart of the server.
Once the server is restarted we receive the message and stream it's body to another file on disk.
initialContext = getContext(0);
Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
File fileInput = new File("huge_message_to_send.dat");
fileInput.createNewFile();
createFile(fileInput, FILE_SIZE);
BytesMessage message = session.createBytesMessage();
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
message.setObjectProperty("JMS_HQ_InputStream", bufferedInput);
producer.send(message);
messageProducer.send(message);
connection.close();
initialContext.close();
stopServer(0);
// Give the server a little time to shutdown properly
Thread.sleep(5000);
startServer(0);
initialContext = getContext(0);
queue = (Queue)initialContext.lookup("/queue/exampleQueue");
cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
connection = cf.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000);
File outputFile = new File("huge_message_received.dat");
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
messageReceived.setObjectProperty("JMS_HQ_SaveStream", bufferedOutput);
finally
{
if (initialContext != null)
{
initialContext.close();
}
if (connection != null)
{
connection.close();
}
}