Durable subscrption state is part of the MQTT specification which has
not been supported until now. This functionality is implemented via an
internal last-value queue. When an MQTT client creates, updates, or
adds a subscription a message using the client-ID as the last-value is
sent to the internal queue. When the broker restarts this data is read
from the queue and populates the in-memory MQTT data-structures.
Therefore subscribers can reconnect and resume their session's
subscriptions without have to manually resubscribe.
MQTT state is now managed centrally per-broker rather than in the
MQTTProtocolManager since there is one instance of MQTTProtocolManager
for each acceptor allowing MQTT connections. Managing state per acceptor
would allow odd behavior with clients connecting to different acceptors
with the same client ID.
The subscriptions are serialized as raw bytes with a "version" byte for
potential future use, but I intentionally avoided adding complex
scaffolding to support multiple versions. We can add that complexity
later if necessary.
Some tests needed to be changed since instantiating an MQTT protocol
manager now creates an internal queue. A handful of tests assume that no
queues will exist other than the ones they create themselves. I updated
the main test super-class so that an MQTT protocol manager is not
automatically instantiated when configuring a broker for in-vm support.
When resource audit logging is enabled STOMP is completely inoperable
due to an NPE during the protocol handshake. Unfortunately the failure
is completely silent. There are no logs to indicate a problem.
This commit fixes this problem via the following changes:
- Mitigate the original NPE via a check for null
- Move the logic necessary to set the "protocol connection" on the
"transport connection" to a class shared by all implementations.
- Add exception handling to log failures like this in the future.
- Add tests to ensure the audit logging is correct.
- interrupted message breaking reference counting
After the server writing to the client is interrupted in AMQP, the reference counting was broken what would require the server restarted
in order to cleanup the files of any interrupted sends.
- Removed consumer during large message delivery damaging large messages
If the consumer failed to deliver messages for any reason, the message on the queue would be duplicated. what would wipe out the body of the message
and other journal errors would happen because of this.
extra debug capabilities added into RefCountMessage as part of ARTEMIS-4206 in order to identify these issues
o.a.a.a.c.p.m.MQTTSubscriptionManager#removeSubscription() had a chunk
of code from 971f673c60 removed. That code
was added under the assumption that there should only ever be one
consumer per queue. That was true for MQTT 3.x, but it's not always true
for MQTT 5 due to shared subscriptions. However, the tests from that
commit all still pass even with it removed now (as well as all the other
MQTT tests) so I think it's safe.
Commit 5a42de5fa6 called my attention to
this test. It really needs to be refactored because:
- It belongs in the integration-tests module rather than the MQTT
protocol module.
- It is using a lot of non-standard components (e.g.
EmbeddedJMSResource, Awaitility, etc.).
- It is overly complicated (e.g. using its own MqttClientService).
This commit resolves all those problems. The new implementation is quite
a bit different but still equivalent. I reverted the original fix from
ARTEMIS-2476 and the test still fails.
Logger statements should use formatting syntax and let the normal framework checks take care of
checking if a logger is enabled instead of string concats and isXEnabled logger checks except
in cases there is known expense to the specifc logging message/arg preparation or passing.
Changes from myself and Robbie Gemmell.
Co-authored-by: Robbie Gemmell <robbie@apache.org>
Sometimes users want to perform custom client ID validation, and in the
case of an invalid client ID the proper reason code should be returned
in the CONNACK packet.
Due to the changes in 682f505e32 we now
send "Last Will & Testament" MQTT messages via ServerSession. This means
sending will fail if the disk is full. For MQTT this triggers a
connection failure which in turns triggers sending an LWT message. This
process will recurse infinitely until it results in a
java.lang.StackOverflowError.
This commit fixes that by tracking whether or not sending a LWT message
is already in progress.
org.apache.activemq.artemis.spi.core.protocol.RemotingConnection has a
number of implementations most notably an abstract version which
provides many methods shared among the implementations. The sharing
could be improved to eliminate duplicate code.
This commit eliminates more than 700 lines of unnecessary code.
There should be no semantic changes.
Using direct routing skips authorization for "Last Will and Testament"
messages (a.k.a. "will" messages). This commit fixes that problem by
using the internal session that is established for normal message
production and consumption.
MQTT 3.1 and 3.1.1 clients using a clean session should have a
*non-durable* subscription queue. If the broker restarts the queue
should be removed. This is due to [MQTT-3.1.2-6] which states that the
session (and any state) must last only as long as the network
connection.