ARTEMIS-4501 clean up MQTT subscription queues when session expires
This commit is contained in:
parent
60200b44e2
commit
8e68bb1902
|
@ -110,9 +110,12 @@ public class MQTTStateManager {
|
|||
for (String key : toRemove) {
|
||||
try {
|
||||
MQTTSessionState state = removeSessionState(key);
|
||||
if (state != null && state.isWill() && !state.isAttached() && state.isFailed()) {
|
||||
if (state != null) {
|
||||
if (state.isWill() && !state.isAttached() && state.isFailed()) {
|
||||
state.getSession().sendWillMessage();
|
||||
}
|
||||
state.getSession().clean(false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
MQTTLogger.LOGGER.failedToRemoveSessionState(key, e);
|
||||
}
|
||||
|
|
|
@ -165,27 +165,15 @@ In the case of MQTT 5 clients they will receive a disconnect reason code of http
|
|||
|
||||
== Automatic Subscription Clean-up
|
||||
|
||||
Sometimes MQTT clients using `CleanSession=false` don't clean up their subscriptions.
|
||||
In such situations the following address-setting can be used to clean up the abandoned subscription queues:
|
||||
Sometimes MQTT 3.x clients using `CleanSession=false` don't properly unsubscribe. The URL parameter `defaultMqttSessionExpiryInterval` can be configured on the MQTT `acceptor` so that abandoned sessions and subscription queues will be cleaned up automatically after the expiry interval elapses.
|
||||
|
||||
[,xml]
|
||||
----
|
||||
<address-setting match="myMqttAddress">
|
||||
<auto-delete-created-queues>true</auto-delete-created-queues>
|
||||
<auto-delete-queues-delay>3600000</auto-delete-queues-delay> <!-- 1 hour delay -->
|
||||
<auto-delete-queues-message-count>-1</auto-delete-queues-message-count> <!-- doesn't matter how many messages there are -->
|
||||
</address-setting>
|
||||
----
|
||||
MQTT 5 has the same basic semantics with slightly different configuration.
|
||||
The `CleanSession` flag was replaced with `CleanStart` and a https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048[session expiry interval] property.
|
||||
The broker will use the client's session expiry interval if it is set.
|
||||
If it is not set then the broker will apply the `defaultMqttSessionExpiryInterval`.
|
||||
|
||||
However, the MQTT session meta-data is still present in memory and needs to be cleaned up as well.
|
||||
The URL parameter `defaultMqttSessionExpiryInterval` can be configured on the MQTT `acceptor` to deal with this situation.
|
||||
|
||||
MQTT 5 added a new https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048[session expiry interval] property with the same basic semantics.
|
||||
The broker will use the client's value for this property if it is set.
|
||||
If it is not set then it will apply the `defaultMqttSessionExpiryInterval`.
|
||||
|
||||
The default `defaultMqttSessionExpiryInterval` is `-1` which means no MQTT 3.x session states will be expired and no MQTT 5 session states which do not pass their own session expiry interval will be expired.
|
||||
Otherwise it represents the number of *seconds* which must elapse after the client has disconnected before the broker will remove the session state.
|
||||
The default `defaultMqttSessionExpiryInterval` is `-1` which means no clean up will happen for MQTT 3.x clients or for MQTT 5 clients which do not pass their own session expiry interval.
|
||||
Otherwise it represents the number of *seconds* which must elapse after the client has disconnected before the broker will remove the session state and subscription queues.
|
||||
|
||||
MQTT session state is scanned every 5,000 milliseconds by default.
|
||||
This can be changed using the `mqtt-session-scan-interval` element set in the `core` section of `broker.xml`.
|
||||
|
|
|
@ -12,6 +12,25 @@ NOTE: If the upgrade spans multiple versions then the steps from *each* version
|
|||
|
||||
NOTE: Follow the general upgrade procedure outlined in the xref:upgrading.adoc#upgrading-the-broker[Upgrading the Broker] chapter in addition to any version-specific upgrade instructions outlined here.
|
||||
|
||||
== 2.32.0
|
||||
|
||||
https://issues.apache.org/jira/secure/ReleaseNote.jspa...
|
||||
|
||||
=== Highlights
|
||||
|
||||
* highlight 1
|
||||
* highlight 2
|
||||
|
||||
=== Upgrading from 2.31.x
|
||||
|
||||
* Due to https://issues.apache.org/jira/browse/ARTEMIS-4501[ARTEMIS-4501] MQTT subscription queues will be automatically removed when the corresponding session expires, either based on the session expiry interval passed by an MQTT 5 client or based on the configured `defaultMqttSessionExpiryInterval` for MQTT 3.x clients or MQTT 5 clients which don't explicitly pass a session expiry interval.
|
||||
+
|
||||
Prior to this change removing subscription queues relied on the generic `auto-delete-*` `address-settings`.
|
||||
+
|
||||
These settings are now no longer required.
|
||||
+
|
||||
Configure `defaultMqttSessionExpiryInterval` instead.
|
||||
|
||||
== 2.31.2
|
||||
|
||||
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12353776[Full release notes]
|
||||
|
|
|
@ -968,13 +968,20 @@ public class ConnectTests extends MQTT5TestSupport {
|
|||
.build();
|
||||
consumer.connect(options);
|
||||
consumer.subscribe(TOPIC, 2);
|
||||
long start = System.currentTimeMillis();
|
||||
consumer.disconnect();
|
||||
|
||||
// session should *not* still exist since session expiry interval has passed
|
||||
long start = System.currentTimeMillis();
|
||||
// ensure the subscription queue still exists since the session hasn't expired
|
||||
assertNotNull(getSubscriptionQueue(TOPIC, CONSUMER_ID));
|
||||
|
||||
Wait.assertEquals(0, () -> getSessionStates().size(), EXPIRY_INTERVAL * 1000 * 2, 100);
|
||||
assertTrue(System.currentTimeMillis() - start > (EXPIRY_INTERVAL * 1000));
|
||||
assertTrue(System.currentTimeMillis() - start >= (EXPIRY_INTERVAL * 1000));
|
||||
|
||||
// session should *not* still exist since session expiry interval has passed
|
||||
assertNull(getSessionStates().get(CONSUMER_ID));
|
||||
|
||||
// ensure the subscription queue is cleaned up when the session expires
|
||||
Wait.assertTrue(() -> getSubscriptionQueue(TOPIC, CONSUMER_ID) == null, 2000, 100);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
Loading…
Reference in New Issue