af2672e79a
Durable subscrption state is part of the MQTT specification which has not been supported until now. This functionality is implemented via an internal last-value queue. When an MQTT client creates, updates, or adds a subscription a message using the client-ID as the last-value is sent to the internal queue. When the broker restarts this data is read from the queue and populates the in-memory MQTT data-structures. Therefore subscribers can reconnect and resume their session's subscriptions without have to manually resubscribe. MQTT state is now managed centrally per-broker rather than in the MQTTProtocolManager since there is one instance of MQTTProtocolManager for each acceptor allowing MQTT connections. Managing state per acceptor would allow odd behavior with clients connecting to different acceptors with the same client ID. The subscriptions are serialized as raw bytes with a "version" byte for potential future use, but I intentionally avoided adding complex scaffolding to support multiple versions. We can add that complexity later if necessary. Some tests needed to be changed since instantiating an MQTT protocol manager now creates an internal queue. A handful of tests assume that no queues will exist other than the ones they create themselves. I updated the main test super-class so that an MQTT protocol manager is not automatically instantiated when configuring a broker for in-vm support. |
||
---|---|---|
.. | ||
java/org/apache/activemq/artemis/jms/example | ||
resources/activemq |