271 lines
9.6 KiB
Plaintext
271 lines
9.6 KiB
Plaintext
= Large Messages
|
|
:idprefix:
|
|
:idseparator: -
|
|
|
|
Apache ActiveMQ Artemis can be configured to give special treatment to messages which are beyond a configured size.
|
|
Instead of keeping the entire contents of these messages _in memory_ the broker will hold just a thin object on the queues with a reference to the content (e.g. in a file or a database table).
|
|
|
|
This is supported on Core Protocol and on the AMQP Protocol.
|
|
|
|
== Configuring the server
|
|
|
|
When using the xref:persistence.adoc#file-journal-default[file journal] large messages are stored on disk on the server.
|
|
The configuration property `large-messages-directory` specifies where large messages are stored.
|
|
|
|
[,xml]
|
|
----
|
|
<configuration...>
|
|
<core...>
|
|
...
|
|
<large-messages-directory>data/large-messages</large-messages-directory>
|
|
...
|
|
</core>
|
|
</configuration>
|
|
----
|
|
|
|
By default `large-messages-directory` is `data/largemessages`.
|
|
|
|
[NOTE]
|
|
====
|
|
For the best performance we recommend using the file journal with the large messages directory on a different physical volume to the message journal or paging directory.
|
|
====
|
|
|
|
For xref:persistence.adoc#jdbc-persistence[JDBC persistence] the `large-message-table` should be configured.
|
|
|
|
[,xml]
|
|
----
|
|
<configuration...>
|
|
<core...>
|
|
...
|
|
<store>
|
|
<database-store>
|
|
...
|
|
<large-message-table-name>LARGE_MESSAGES_TABLE</large-message-table-name>
|
|
...
|
|
</database-store>
|
|
</store>
|
|
...
|
|
</core>
|
|
</configuration>
|
|
----
|
|
|
|
By default `large-message-table` is `LARGE_MESSAGE_TABLE`.
|
|
|
|
By default when writing the final bytes to a large message all writes are synchronized to the storage medium.
|
|
This can be configured via `large-message-sync`, e.g.:
|
|
|
|
[,xml]
|
|
----
|
|
<configuration...>
|
|
<core...>
|
|
...
|
|
<large-message-sync>true</large-message-sync>
|
|
...
|
|
</core>
|
|
</configuration>
|
|
----
|
|
|
|
By default `large-message-sync` is `true`.
|
|
|
|
== Configuring the Core Client
|
|
|
|
Any message larger than a certain size is considered a large message.
|
|
Large messages will be split up and sent in fragments.
|
|
This is determined by the URL parameter `minLargeMessageSize`
|
|
|
|
[NOTE]
|
|
====
|
|
Apache ActiveMQ Artemis messages are encoded using 2 bytes per character so if the message data is filled with ASCII characters (which are 1 byte) the size of the resulting Apache ActiveMQ Artemis message would roughly double.
|
|
This is important when calculating the size of a "large" message as it may appear to be less than the `minLargeMessageSize` before it is sent, but it then turns into a "large" message once it is encoded.
|
|
====
|
|
|
|
The default value is 100KiB.
|
|
|
|
xref:configuring-transports.adoc#configuring-the-transport-directly-from-the-client[Configuring the transport directly from the client side] will provide more information on how to instantiate the core session factory or JMS connection factory.
|
|
|
|
== Compressed Large Messages on Core Protocol
|
|
|
|
You can choose to send large messages in compressed form using `compressLargeMessage` URL parameter.
|
|
|
|
If you specify the boolean URL parameter `compressLargeMessage` as true, the system will use the ZIP algorithm to compress the message body as the message is transferred to the server's side.
|
|
Notice that there's no special treatment at the server's side, all the compressing and uncompressing is done at the client.
|
|
|
|
This behavior can be tuned further by setting an optional parameter: `compressionLevel`.
|
|
This will decide how much the message body should be compressed.
|
|
`compressionLevel` accepts an integer of `-1` or a value between `0-9`.
|
|
The default value is `-1` which corresponds to around level 6-7.
|
|
|
|
If the compressed size of a large message is below `minLargeMessageSize`, it is sent to server as regular messages.
|
|
This means that the message won't be written into the server's large-message data directory, thus reducing the disk I/O.
|
|
|
|
NOTE: A higher `compressionLevel` means the message body will get further compressed, but this is at the cost of speed and computational overhead.
|
|
Make sure to tune this value according to its specific use-case.
|
|
|
|
== Streaming large messages from Core Protocol
|
|
|
|
Apache ActiveMQ Artemis supports setting the body of messages using input and output streams (`java.lang.io`)
|
|
|
|
These streams are then used directly for sending (input streams) and receiving (output streams) messages.
|
|
|
|
When receiving messages there are 2 ways to deal with the output stream;
|
|
you may choose to block while the output stream is recovered using the method `ClientMessage.saveOutputStream` or alternatively using the method `ClientMessage.setOutputstream` which will asynchronously write the message to the stream.
|
|
If you choose the latter the consumer must be kept alive until the message has been fully received.
|
|
|
|
You can use any kind of stream you like.
|
|
The most common use case is to send files stored in your disk, but you could also send things like JDBC Blobs, `SocketInputStream`, things you recovered from `HTTPRequests` etc.
|
|
Anything as long as it implements `java.io.InputStream` for sending messages or `java.io.OutputStream` for receiving them.
|
|
|
|
=== Streaming over Core API
|
|
|
|
The following table shows a list of methods available at `ClientMessage` which are also available through JMS by the use of object properties.
|
|
|
|
|===
|
|
| Name | Description | JMS Equivalent
|
|
|
|
| setBodyInputStream(InputStream)
|
|
| Set the InputStream used to read a message body when sending it.
|
|
| JMS_AMQ_InputStream
|
|
|
|
| setOutputStream(OutputStream)
|
|
| Set the OutputStream that will receive the body of a message.
|
|
This method does not block.
|
|
| JMS_AMQ_OutputStream
|
|
|
|
| saveOutputStream(OutputStream)
|
|
| Save the body of the message to the `OutputStream`.
|
|
It will block until the entire content is transferred to the `OutputStream`.
|
|
| JMS_AMQ_SaveStream
|
|
|===
|
|
|
|
To set the output stream when receiving a core message:
|
|
|
|
[,java]
|
|
----
|
|
ClientMessage msg = consumer.receive(...);
|
|
|
|
// This will block here until the stream was transferred
|
|
msg.saveOutputStream(someOutputStream);
|
|
|
|
ClientMessage msg2 = consumer.receive(...);
|
|
|
|
// This will not wait the transfer to finish
|
|
msg2.setOutputStream(someOtherOutputStream);
|
|
----
|
|
|
|
Set the input stream when sending a core message:
|
|
|
|
[,java]
|
|
----
|
|
ClientMessage msg = session.createMessage();
|
|
msg.setInputStream(dataInputStream);
|
|
----
|
|
|
|
Notice also that for messages with more than 2GiB the getBodySize() will return invalid values since this is an integer (which is also exposed to the JMS API).
|
|
On those cases you can use the message property _AMQ_LARGE_SIZE.
|
|
|
|
=== Streaming over JMS
|
|
|
|
When using JMS, Apache ActiveMQ Artemis maps the streaming methods on the core API (see ClientMessage API table above) by setting object properties . You can use the method `Message.setObjectProperty` to set the input and output streams.
|
|
|
|
The `InputStream` can be defined through the JMS Object Property JMS_AMQ_InputStream on messages being sent:
|
|
|
|
[,java]
|
|
----
|
|
BytesMessage message = session.createBytesMessage();
|
|
|
|
FileInputStream fileInputStream = new FileInputStream(fileInput);
|
|
|
|
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
|
|
|
|
message.setObjectProperty("JMS_AMQ_InputStream", bufferedInput);
|
|
|
|
someProducer.send(message);
|
|
----
|
|
|
|
The `OutputStream` can be set through the JMS Object Property JMS_AMQ_SaveStream on messages being received in a blocking way.
|
|
|
|
[,java]
|
|
----
|
|
BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000);
|
|
|
|
File outputFile = new File("huge_message_received.dat");
|
|
|
|
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
|
|
|
|
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
|
|
|
|
// This will block until the entire content is saved on disk
|
|
messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);
|
|
----
|
|
|
|
Setting the `OutputStream` could also be done in a non blocking way using the property JMS_AMQ_OutputStream.
|
|
|
|
[,java]
|
|
----
|
|
// This won't wait the stream to finish. You need to keep the consumer active.
|
|
messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
|
|
----
|
|
|
|
[NOTE]
|
|
====
|
|
|
|
|
|
When using JMS, Streaming large messages are only supported on `StreamMessage` and `BytesMessage`.
|
|
====
|
|
|
|
=== Streaming Alternative on Core Protocol
|
|
|
|
If you choose not to use the `InputStream` or `OutputStream` capability of Apache ActiveMQ Artemis You could still access the data directly in an alternative fashion.
|
|
|
|
On the Core API just get the bytes of the body as you normally would.
|
|
|
|
[,java]
|
|
----
|
|
ClientMessage msg = consumer.receive();
|
|
|
|
byte[] bytes = new byte[1024];
|
|
for (int i = 0 ; i < msg.getBodySize(); i += bytes.length)
|
|
{
|
|
msg.getBody().readBytes(bytes);
|
|
// Whatever you want to do with the bytes
|
|
}
|
|
----
|
|
|
|
If using JMS API, `BytesMessage` and `StreamMessage` also supports it transparently.
|
|
|
|
[,java]
|
|
----
|
|
BytesMessage rm = (BytesMessage)cons.receive(10000);
|
|
|
|
byte data[] = new byte[1024];
|
|
|
|
for (int i = 0; i < rm.getBodyLength(); i += 1024)
|
|
{
|
|
int numberOfBytes = rm.readBytes(data);
|
|
// Do whatever you want with the data
|
|
}
|
|
----
|
|
|
|
== Configuring AMQP Acceptor
|
|
|
|
You can configure the property `amqpMinLargeMessageSize` at the acceptor.
|
|
|
|
The default value is 102400 (100KBytes).
|
|
|
|
Setting it to -1 will disable large message support.
|
|
|
|
WARNING: setting amqpMinLargeMessageSize to -1, your AMQP message might be stored as a Core Large Message if the size of the message does not fit into the journal.
|
|
This is the former semantic of the broker and it is kept this way for compatibility reasons.
|
|
|
|
[,xml]
|
|
----
|
|
<acceptors>
|
|
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
|
<acceptor name="amqp">tcp://0.0.0.0:5672?; ..... amqpMinLargeMessageSize=102400; .... </acceptor>
|
|
</acceptors>
|
|
----
|
|
|
|
== Large message example
|
|
|
|
Please see the xref:examples.adoc#large-message[Large Message Example] which shows how large messages are configured and used with JMS.
|