331 lines
15 KiB
Markdown
331 lines
15 KiB
Markdown
# Flow Control
|
|
|
|
Flow control is used to limit the flow of data between a client and
|
|
server, or a server and another server in order to prevent the client or
|
|
server being overwhelmed with data.
|
|
|
|
## Consumer Flow Control
|
|
|
|
This controls the flow of data between the server and the client as the
|
|
client consumes messages. For performance reasons clients normally
|
|
buffer messages before delivering to the consumer via the `receive()`
|
|
method or asynchronously via a message listener. If the consumer cannot
|
|
process messages as fast as they are being delivered and stored in the
|
|
internal buffer, then you could end up with a situation where messages
|
|
would keep building up possibly causing out of memory on the client if
|
|
they cannot be processed in time.
|
|
|
|
## Window-Based Flow Control
|
|
|
|
By default, Apache ActiveMQ Artemis consumers buffer messages from the server in a
|
|
client side buffer before the client consumes them. This improves
|
|
performance: otherwise every time the client consumes a message,
|
|
Apache ActiveMQ Artemis would have to go the server to request the next message. In
|
|
turn, this message would then get sent to the client side, if one was
|
|
available.
|
|
|
|
A network round trip would be involved for *every* message and
|
|
considerably reduce performance.
|
|
|
|
To prevent this, Apache ActiveMQ Artemis pre-fetches messages into a buffer on each
|
|
consumer. The total maximum size of messages (in bytes) that will be
|
|
buffered on each consumer is determined by the `consumerWindowSize`
|
|
parameter.
|
|
|
|
By default, the `consumerWindowSize` is set to 1 MiB (1024 \* 1024
|
|
bytes).
|
|
|
|
The value can be:
|
|
|
|
- `-1` for an *unbounded* buffer
|
|
|
|
- `0` to not buffer any messages.
|
|
|
|
- `>0` for a buffer with the given maximum size in bytes.
|
|
|
|
Setting the consumer window size can considerably improve performance
|
|
depending on the messaging use case. As an example, let's consider the
|
|
two extremes:
|
|
|
|
### Fast consumers
|
|
Fast consumers can process messages as fast as they consume them (or
|
|
even faster)
|
|
|
|
To allow fast consumers, set the `consumerWindowSize` to -1. This
|
|
will allow *unbounded* message buffering on the client side.
|
|
|
|
Use this setting with caution: it can overflow the client memory if
|
|
the consumer is not able to process messages as fast as it receives
|
|
them.
|
|
|
|
### Slow consumers
|
|
Slow consumers takes significant time to process each message and it
|
|
is desirable to prevent buffering messages on the client side so
|
|
that they can be delivered to another consumer instead.
|
|
|
|
Consider a situation where a queue has 2 consumers; 1 of which is
|
|
very slow. Messages are delivered in a round robin fashion to both
|
|
consumers, the fast consumer processes all of its messages very
|
|
quickly until its buffer is empty. At this point there are still
|
|
messages awaiting to be processed in the buffer of the slow consumer
|
|
thus preventing them being processed by the fast consumer. The fast
|
|
consumer is therefore sitting idle when it could be processing the
|
|
other messages.
|
|
|
|
To allow slow consumers, set the `consumerWindowSize` to 0 (for no
|
|
buffer at all). This will prevent the slow consumer from buffering
|
|
any messages on the client side. Messages will remain on the server
|
|
side ready to be consumed by other consumers.
|
|
|
|
Setting this to 0 can give deterministic distribution between
|
|
multiple consumers on a queue.
|
|
|
|
Most of the consumers cannot be clearly identified as fast or slow
|
|
consumers but are in-between. In that case, setting the value of
|
|
`consumerWindowSize` to optimize performance depends on the messaging
|
|
use case and requires benchmarks to find the optimal value, but a value
|
|
of 1MiB is fine in most cases.
|
|
|
|
### Using Core API
|
|
|
|
If Apache ActiveMQ Artemis Core API is used, the consumer window size is specified by
|
|
`ServerLocator.setConsumerWindowSize()` method and some of the
|
|
`ClientSession.createConsumer()` methods.
|
|
|
|
### Using JMS
|
|
|
|
If JNDI is used on the client to instantiate and look up the connection
|
|
factory the consumer window size is configured in the JNDI context
|
|
environment, e.g. `jndi.properties`. Here's a simple example using the
|
|
"ConnectionFactory" connection factory which is available in the context
|
|
by default:
|
|
|
|
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
|
|
connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=0
|
|
|
|
If the connection factory is directly instantiated, the consumer window
|
|
size is specified by `ActiveMQConnectionFactory.setConsumerWindowSize()`
|
|
method.
|
|
|
|
Please see the examples for an example which shows how to configure Apache ActiveMQ Artemis to
|
|
prevent consumer buffering when dealing with slow consumers.
|
|
|
|
## Rate limited flow control
|
|
|
|
It is also possible to control the *rate* at which a consumer can
|
|
consume messages. This is a form of throttling and can be used to make
|
|
sure that a consumer never consumes messages at a rate faster than the
|
|
rate specified.
|
|
|
|
The rate must be a positive integer to enable this functionality and is
|
|
the maximum desired message consumption rate specified in units of
|
|
messages per second. Setting this to `-1` disables rate limited flow
|
|
control. The default value is `-1`.
|
|
|
|
Please see [the examples chapter](examples.md) for a working example of limiting consumer rate.
|
|
|
|
### Using Core API
|
|
|
|
If the Apache ActiveMQ Artemis core API is being used the rate can be set via the
|
|
`ServerLocator.setConsumerMaxRate(int consumerMaxRate)` method or
|
|
alternatively via some of the `ClientSession.createConsumer()` methods.
|
|
|
|
### Using JMS
|
|
|
|
If JNDI is used to instantiate and look up the connection factory, the
|
|
max rate can be configured in the JNDI context environment, e.g.
|
|
`jndi.properties`. Here's a simple example using the "ConnectionFactory"
|
|
connection factory which is available in the context by default:
|
|
|
|
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
|
|
java.naming.provider.url=tcp://localhost:61616?consumerMaxRate=10
|
|
|
|
If the connection factory is directly instantiated, the max rate size
|
|
can be set via the `ActiveMQConnectionFactory.setConsumerMaxRate(int
|
|
consumerMaxRate)` method.
|
|
|
|
> **Note**
|
|
>
|
|
> Rate limited flow control can be used in conjunction with window based
|
|
> flow control. Rate limited flow control only effects how many messages
|
|
> a client can consume in a second and not how many messages are in its
|
|
> buffer. So if you had a slow rate limit and a high window based limit
|
|
> the clients internal buffer would soon fill up with messages.
|
|
|
|
Please see [the examples chapter](examples.md) for an example which shows how to configure ActiveMQ Artemis to
|
|
prevent consumer buffering when dealing with slow consumers.
|
|
|
|
## Producer flow control
|
|
|
|
Apache ActiveMQ Artemis also can limit the amount of data sent from a client to a
|
|
server to prevent the server being overwhelmed.
|
|
|
|
### Window based flow control
|
|
|
|
In a similar way to consumer window based flow control, Apache ActiveMQ Artemis
|
|
producers, by default, can only send messages to an address as long as
|
|
they have sufficient credits to do so. The amount of credits required to
|
|
send a message is given by the size of the message.
|
|
|
|
As producers run low on credits they request more from the server, when
|
|
the server sends them more credits they can send more messages.
|
|
|
|
The amount of credits a producer requests in one go is known as the
|
|
*window size*.
|
|
|
|
The window size therefore determines the amount of bytes that can be
|
|
in-flight at any one time before more need to be requested - this
|
|
prevents the remoting connection from getting overloaded.
|
|
|
|
#### Using Core API
|
|
|
|
If the Apache ActiveMQ Artemis core API is being used, window size can be set via the
|
|
`ServerLocator.setProducerWindowSize(int producerWindowSize)` method.
|
|
|
|
#### Using JMS
|
|
|
|
If JNDI is used to instantiate and look up the connection factory, the
|
|
producer window size can be configured in the JNDI context environment,
|
|
e.g. `jndi.properties`. Here's a simple example using the
|
|
"ConnectionFactory" connection factory which is available in the context
|
|
by default:
|
|
|
|
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
|
|
connectionFactory.myConnectionFactory=tcp://localhost:61616?producerWindowSize=10
|
|
|
|
If the connection factory is directly instantiated, the producer window
|
|
size can be set via the
|
|
`ActiveMQConnectionFactory.setProducerWindowSize(int
|
|
producerWindowSize)` method.
|
|
|
|
#### Blocking producer window based flow control using CORE protocol
|
|
|
|
When using the CORE protocol (used by both the Artemis Core Client and Artemis JMS Client)
|
|
the server will always aim give the same number of credits as have been requested.
|
|
However, it is also possible to set a maximum size on any address, and the server
|
|
will never send more credits to any one producer than what is available according to
|
|
the address's upper memory limit. Although a single producer will be issued more
|
|
credits than available (at the time of issue) it is possible that more than 1
|
|
producer be associated with the same address and so it is theoretically possible
|
|
that more credits are allocated across total producers than what is available.
|
|
It is therefore possible to go over the address limit by approximately:
|
|
|
|
'''total number of producers on address * producer window size'''
|
|
|
|
For example, if I have a JMS queue called "myqueue", I could set the
|
|
maximum memory size to 10MiB, and the the server will control the number
|
|
of credits sent to any producers which are sending any messages to
|
|
myqueue such that the total messages in the queue never exceeds 10MiB.
|
|
|
|
When the address gets full, producers will block on the client side
|
|
until more space frees up on the address, i.e. until messages are
|
|
consumed from the queue thus freeing up space for more messages to be
|
|
sent.
|
|
|
|
We call this blocking producer flow control, and it's an efficient way
|
|
to prevent the server running out of memory due to producers sending
|
|
more messages than can be handled at any time.
|
|
|
|
It is an alternative approach to paging, which does not block producers
|
|
but instead pages messages to storage.
|
|
|
|
To configure an address with a maximum size and tell the server that you
|
|
want to block producers for this address if it becomes full, you need to
|
|
define an AddressSettings ([Configuring Queues Via Address Settings](queue-attributes.md)) block for the address and specify
|
|
`max-size-bytes` and `address-full-policy`
|
|
|
|
The address block applies to all queues registered to that address. I.e.
|
|
the total memory for all queues bound to that address will not exceed
|
|
`max-size-bytes`. In the case of JMS topics this means the *total*
|
|
memory of all subscriptions in the topic won't exceed max-size-bytes.
|
|
|
|
Here's an example:
|
|
|
|
<address-settings>
|
|
<address-setting match="jms.queue.exampleQueue">
|
|
<max-size-bytes>100000</max-size-bytes>
|
|
<address-full-policy>BLOCK</address-full-policy>
|
|
</address-setting>
|
|
</address-settings>
|
|
|
|
The above example would set the max size of the JMS queue "exampleQueue"
|
|
to be 100000 bytes and would block any producers sending to that address
|
|
to prevent that max size being exceeded.
|
|
|
|
Note the policy must be set to `BLOCK` to enable blocking producer flow
|
|
control.
|
|
|
|
> **Note**
|
|
>
|
|
> Note that in the default configuration all addresses are set to block
|
|
> producers after 10 MiB of message data is in the address. This means
|
|
> you cannot send more than 10MiB of message data to an address without
|
|
> it being consumed before the producers will be blocked. If you do not
|
|
> want this behaviour increase the `max-size-bytes` parameter or change
|
|
> the address full message policy.
|
|
|
|
> **Note**
|
|
>
|
|
> Producer credits are allocated from the broker to the client. Flow control
|
|
> credit checking (i.e. checking a producer has enough credit) is done on the
|
|
> client side only. It is possible for the broker to over allocate credits, like
|
|
> in the multiple producer scenario outlined above. It is also possible for
|
|
> a misbehaving client to ignore the flow control credits issued by the broker
|
|
> and continue sending with out sufficient credit.
|
|
|
|
#### Blocking producer window based flow control using AMQP
|
|
|
|
Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support flow control. Artemis CORE protocol and
|
|
AMQP. Both protocols implement flow control slightly differently and therefore address full BLOCK policy behaves slightly
|
|
different for clients that use each protocol respectively.
|
|
|
|
As explained earlier in this chapter the CORE protocol uses a producer window size flow control system. Where credits
|
|
(representing bytes) are allocated to producers, if a producer wants to send a message it should wait until it has
|
|
enough byte credits available for it to send. AMQP flow control credits are not representative of bytes but instead
|
|
represent the number of messages a producer is permitted to send (regardless of the message size).
|
|
|
|
BLOCK for AMQP works mostly in the same way as the producer window size mechanism above. Artemis will issue 100 credits
|
|
to a client at a time and refresh them when the clients credits reaches 30. The broker will stop issuing credits once an
|
|
address is full. However, since AMQP credits represent whole messages and not bytes, it would be possible in some
|
|
scenarios for an AMQP client to significantly exceed an address upper bound should the broker continue accepting
|
|
messages until the clients credits are exhausted. For this reason there is an additional parameter available on address
|
|
settings that specifies an upper bound on an address size in bytes. Once this upper bound is reach Artemis will start
|
|
rejecting AMQP messages. This limit is the max-size-bytes-reject-threshold and is by default set to -1 (or no limit).
|
|
This is additional parameter allows a kind of soft and hard limit, in normal circumstances the broker will utilize the
|
|
max-size-bytes parameter using using flow control to put back pressure on the client, but will protect the broker by
|
|
rejecting messages once the address size is reached.
|
|
|
|
### Rate limited flow control
|
|
|
|
Apache ActiveMQ Artemis also allows the rate a producer can emit message to be limited,
|
|
in units of messages per second. By specifying such a rate, Apache ActiveMQ Artemis
|
|
will ensure that producer never produces messages at a rate higher than
|
|
that specified.
|
|
|
|
The rate must be a positive integer to enable this functionality and is
|
|
the maximum desired message consumption rate specified in units of
|
|
messages per second. Setting this to `-1` disables rate limited flow
|
|
control. The default value is `-1`.
|
|
|
|
Please see [the examples chapter](examples.md) for a working example of limiting producer rate.
|
|
|
|
#### Using Core API
|
|
|
|
If the Apache ActiveMQ Artemis core API is being used the rate can be set via the
|
|
`ServerLocator.setProducerMaxRate(int producerMaxRate)` method or
|
|
alternatively via some of the `ClientSession.createProducer()` methods.
|
|
|
|
#### Using JMS
|
|
|
|
If JNDI is used to instantiate and look up the connection factory, the
|
|
max rate size can be configured in the JNDI context environment, e.g.
|
|
`jndi.properties`. Here's a simple example using the "ConnectionFactory"
|
|
connection factory which is available in the context by default:
|
|
|
|
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
|
|
connectionFactory.myConnectionFactory=tcp://localhost:61616?producerMaxRate=10
|
|
|
|
If the connection factory is directly instantiated, the max rate size
|
|
can be set via the `ActiveMQConnectionFactory.setProducerMaxRate(int
|
|
producerMaxRate)` method.
|