This commit fixes the deadlock described on ARTEMIS-3622 by moving the
synchronization "up" a level from the MQTTSession to the
MQTTConnectionManager. It also eliminates the synchronization on the
MQTTSessionState in the MQTTConnectionManager because it's no longer
needed. This change should not only eliminate the deadlock, but improve
performance relatively as well.
There is no test associated with this commit as I wasn't able to
reproduce the deadlock with any kind of straight-forward test. There was
a test linked on the Jira, but it involved intrusive and fragile
scaffolding and wasn't ultimately tenable. That said, I did test this
fix with that test and it was successful. In any case, I think static
analysis should be sufficient here as the changes are pretty
straight-forward.
Generate MQTT message IDs from full allowed range of 1-65535 and skip
currently used values. Do not use atomic integer for current ID, because
all accesses and modifications are performed in synchronized context.
Currently when an MQTT topic filter contains characters from the
configured wildcard syntax the conversion to/from this syntax breaks.
For example, when using the default wildcard syntax if an MQTT topic
filter contains a . the conversion from the MQTT wildcard syntax to the
core wildcard syntax and back will result in the `.` being replaced with
a `/.`.
This commit fixes that plus a few other things...
- Implements proper conversions to/from one WildcardConfiguration to
another.
- Refactors the MQTT code which invokes these conversion methods. This
includes simplifying a lot of test code.
- Adds lots of tests for everything.
- Clarifies some variable naming to better distinguish between core and
MQTT.
This commit:
- Eliminates MQTT session storage on every successful connection.
Instead data is only written when subsriptions are created or
destroyed.
- Adds a configuration property for the storage timeout.
- Updates the documentation with relevant information.
- Refactors a few bits of code to eliminate unnecessary variables, etc.
Starting with 2.28.0, the broker doesn't translate the character `/` to
the configured wildcard delimiter (i.e. `.` by default) when creating
subscription queues for MQTT clients.
This commit fixes that regression and restores the proper translation.
In accordance with the QoS2 protocol outlined in the MQTT
specification(s), once the broker receives a PUBLISH then any other
PUBLISH it receives on that same session with the same packet ID must be
ignored until the QoS2 protocol for that ID is completed.
The broker does this, but it doesn't log anything so it's not clear when
this is actually happening.
Durable subscrption state is part of the MQTT specification which has
not been supported until now. This functionality is implemented via an
internal last-value queue. When an MQTT client creates, updates, or
adds a subscription a message using the client-ID as the last-value is
sent to the internal queue. When the broker restarts this data is read
from the queue and populates the in-memory MQTT data-structures.
Therefore subscribers can reconnect and resume their session's
subscriptions without have to manually resubscribe.
MQTT state is now managed centrally per-broker rather than in the
MQTTProtocolManager since there is one instance of MQTTProtocolManager
for each acceptor allowing MQTT connections. Managing state per acceptor
would allow odd behavior with clients connecting to different acceptors
with the same client ID.
The subscriptions are serialized as raw bytes with a "version" byte for
potential future use, but I intentionally avoided adding complex
scaffolding to support multiple versions. We can add that complexity
later if necessary.
Some tests needed to be changed since instantiating an MQTT protocol
manager now creates an internal queue. A handful of tests assume that no
queues will exist other than the ones they create themselves. I updated
the main test super-class so that an MQTT protocol manager is not
automatically instantiated when configuring a broker for in-vm support.
When resource audit logging is enabled STOMP is completely inoperable
due to an NPE during the protocol handshake. Unfortunately the failure
is completely silent. There are no logs to indicate a problem.
This commit fixes this problem via the following changes:
- Mitigate the original NPE via a check for null
- Move the logic necessary to set the "protocol connection" on the
"transport connection" to a class shared by all implementations.
- Add exception handling to log failures like this in the future.
- Add tests to ensure the audit logging is correct.
- interrupted message breaking reference counting
After the server writing to the client is interrupted in AMQP, the reference counting was broken what would require the server restarted
in order to cleanup the files of any interrupted sends.
- Removed consumer during large message delivery damaging large messages
If the consumer failed to deliver messages for any reason, the message on the queue would be duplicated. what would wipe out the body of the message
and other journal errors would happen because of this.
extra debug capabilities added into RefCountMessage as part of ARTEMIS-4206 in order to identify these issues
o.a.a.a.c.p.m.MQTTSubscriptionManager#removeSubscription() had a chunk
of code from 971f673c60 removed. That code
was added under the assumption that there should only ever be one
consumer per queue. That was true for MQTT 3.x, but it's not always true
for MQTT 5 due to shared subscriptions. However, the tests from that
commit all still pass even with it removed now (as well as all the other
MQTT tests) so I think it's safe.