ARTEMIS-4542 improve MQTT state storage

This commit:

 - Eliminates MQTT session storage on every successful connection.
   Instead data is only written when subsriptions are created or
   destroyed.
 - Adds a configuration property for the storage timeout.
 - Updates the documentation with relevant information.
 - Refactors a few bits of code to eliminate unnecessary variables, etc.
This commit is contained in:
Justin Bertram 2023-12-21 11:32:28 -06:00
parent f56595b89b
commit 019fc86138
15 changed files with 99 additions and 37 deletions

View File

@ -679,6 +679,9 @@ public final class ActiveMQDefaultConfiguration {
// How often (in ms) to scan for expired MQTT sessions
private static long DEFAULT_MQTT_SESSION_SCAN_INTERVAL = 500;
// How long (in ms) to wait to persist MQTT session state
private static long DEFAULT_MQTT_SESSION_STATE_PERSISTENCE_TIMEOUT = 5000;
// If SESSION-notifications should be suppressed or not
public static boolean DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS = false;
@ -1869,6 +1872,13 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_MQTT_SESSION_SCAN_INTERVAL;
}
/**
* How long (in ms) to wait to persist MQTT session state
*/
public static long getMqttSessionStatePersistenceTimeout() {
return DEFAULT_MQTT_SESSION_STATE_PERSISTENCE_TIMEOUT;
}
public static boolean getDefaultSuppressSessionNotifications() {
return DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS;
}

View File

@ -64,4 +64,7 @@ public interface MQTTLogger {
@LogMessage(id = 834009, value = "Ignoring duplicate MQTT QoS2 PUBLISH packet for packet ID {} from client with ID {}.", level = LogMessage.Level.WARN)
void ignoringQoS2Publish(String clientId, long packetId);
@LogMessage(id = 834010, value = "Unable to scan MQTT sessions", level = LogMessage.Level.ERROR)
void unableToScanSessions(Exception e);
}

View File

@ -22,7 +22,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.protocol.ProtocolHandler;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -79,16 +78,11 @@ public class MQTTProtocolManagerFactory extends AbstractProtocolManagerFactory<M
}
@Override
public void run() {
server.getRemotingService().getAcceptors().forEach((key, acceptor) -> {
ProtocolHandler protocolHandler = acceptor.getProtocolHandler();
if (protocolHandler != null) {
protocolHandler.getProtocolMap().values().forEach(m -> {
if (m instanceof MQTTProtocolManager) {
((MQTTProtocolManager)m).getStateManager().scanSessions();
}
});
}
});
try {
MQTTStateManager.getInstance(server).scanSessions();
} catch (Exception e) {
MQTTLogger.LOGGER.unableToScanSessions(e);
}
}
}
}

View File

@ -47,14 +47,12 @@ public class MQTTSessionState {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final MQTTSessionState DEFAULT = new MQTTSessionState((String) null, null);
public static final MQTTSessionState DEFAULT = new MQTTSessionState((String) null);
private MQTTSession session;
private final String clientId;
private final MQTTStateManager stateManager;
private final ConcurrentMap<String, Pair<MqttTopicSubscription, Integer>> subscriptions = new ConcurrentHashMap<>();
// Used to store Packet ID of Publish QoS1 and QoS2 message. See spec: 4.3.3 QoS 2: Exactly once delivery. Method B.
@ -98,9 +96,8 @@ public class MQTTSessionState {
private Map<String, Integer> serverTopicAliases;
public MQTTSessionState(String clientId, MQTTStateManager stateManager) {
public MQTTSessionState(String clientId) {
this.clientId = clientId;
this.stateManager = stateManager;
}
/**
@ -119,12 +116,10 @@ public class MQTTSessionState {
* - int (nullable): subscription identifier
*
* @param message the message holding the MQTT session data
* @param stateManager the manager used to add and remove sessions from storage
*/
public MQTTSessionState(CoreMessage message, MQTTStateManager stateManager) {
public MQTTSessionState(CoreMessage message) {
logger.debug("Deserializing MQTT session state from {}", message);
this.clientId = message.getStringProperty(Message.HDR_LAST_VALUE_NAME);
this.stateManager = stateManager;
ActiveMQBuffer buf = message.getDataBuffer();
// no need to use the version at this point

View File

@ -54,6 +54,7 @@ public class MQTTStateManager {
private final Queue sessionStore;
private static Map<Integer, MQTTStateManager> INSTANCES = new HashMap<>();
private final Map<String, MQTTConnection> connectedClients = new ConcurrentHashMap<>();
private final long timeout;
/*
* Even though there may be multiple instances of MQTTProtocolManager (e.g. for MQTT on different ports) we only want
@ -76,20 +77,19 @@ public class MQTTStateManager {
private MQTTStateManager(ActiveMQServer server) throws Exception {
this.server = server;
sessionStore = server.createQueue(new QueueConfiguration(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true), true);
this.timeout = server.getConfiguration().getMqttSessionStatePersistenceTimeout();
this.sessionStore = server.createQueue(new QueueConfiguration(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true), true);
// load session data from queue
try (LinkedListIterator<MessageReference> iterator = sessionStore.browserIterator()) {
try {
while (iterator.hasNext()) {
MessageReference ref = iterator.next();
String clientId = ref.getMessage().getStringProperty(Message.HDR_LAST_VALUE_NAME);
MQTTSessionState sessionState = new MQTTSessionState((CoreMessage) ref.getMessage(), this);
sessionStates.put(clientId, sessionState);
}
} catch (NoSuchElementException ignored) {
// this could happen through paging browsing
while (iterator.hasNext()) {
MessageReference ref = iterator.next();
String clientId = ref.getMessage().getStringProperty(Message.HDR_LAST_VALUE_NAME);
MQTTSessionState sessionState = new MQTTSessionState((CoreMessage) ref.getMessage());
sessionStates.put(clientId, sessionState);
}
} catch (NoSuchElementException ignored) {
// this could happen through paging browsing
}
}
@ -127,10 +127,9 @@ public class MQTTStateManager {
if (sessionStates.containsKey(clientId)) {
return sessionStates.get(clientId);
} else {
MQTTSessionState sessionState = new MQTTSessionState(clientId, this);
MQTTSessionState sessionState = new MQTTSessionState(clientId);
logger.debug("Adding MQTT session state for: {}", clientId);
sessionStates.put(clientId, sessionState);
storeSessionState(sessionState);
return sessionState;
}
}
@ -175,7 +174,6 @@ public class MQTTStateManager {
}
});
tx.commit();
final long timeout = 5000;
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
throw MQTTBundle.BUNDLE.unableToStoreMqttState(timeout);
}

View File

@ -34,7 +34,7 @@ public class StateSerDeTest {
public void testSerDe() throws Exception {
for (int i = 0; i < 500; i++) {
String clientId = RandomUtil.randomString();
MQTTSessionState unserialized = new MQTTSessionState(clientId, null);
MQTTSessionState unserialized = new MQTTSessionState(clientId);
Integer subscriptionIdentifier = RandomUtil.randomPositiveIntOrNull();
for (int j = 0; j < RandomUtil.randomInterval(1, 50); j++) {
MqttTopicSubscription sub = new MqttTopicSubscription(RandomUtil.randomString(),
@ -46,7 +46,7 @@ public class StateSerDeTest {
}
CoreMessage serializedState = MQTTStateManager.serializeState(unserialized, 0);
MQTTSessionState deserialized = new MQTTSessionState(serializedState, null);
MQTTSessionState deserialized = new MQTTSessionState(serializedState);
assertEquals(unserialized.getClientId(), deserialized.getClientId());
for (Pair<MqttTopicSubscription, Integer> unserializedEntry : unserialized.getSubscriptionsPlusID()) {

View File

@ -1445,8 +1445,8 @@ public interface Configuration {
Configuration setTemporaryQueueNamespace(String temporaryQueueNamespace);
/**
* This is specific to MQTT, and it's necessary because the session scan interval is a broker-wide setting and can't
* be set on a per-connector basis like the rest of the MQTT-specific settings.
* This is necessary because the MQTT session scan interval is a broker-wide setting and can't be set on a
* per-connector basis like most of the other MQTT-specific settings.
*/
Configuration setMqttSessionScanInterval(long mqttSessionScanInterval);
@ -1457,6 +1457,19 @@ public interface Configuration {
*/
long getMqttSessionScanInterval();
/**
* This is necessary because MQTT sessions and handled on a broker-wide basis so this can't be set on a per-connector
* basis like most of the other MQTT-specific settings.
*/
Configuration setMqttSessionStatePersistenceTimeout(long mqttSessionStatePersistenceTimeout);
/**
* @see Configuration#setMqttSessionStatePersistenceTimeout(long)
*
* @return
*/
long getMqttSessionStatePersistenceTimeout();
/**
* Returns whether suppression of session-notifications is enabled for this server. <br>
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_SUPPRESS_SESSION_NOTIFICATIONS}.

View File

@ -423,6 +423,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private long mqttSessionScanInterval = ActiveMQDefaultConfiguration.getMqttSessionScanInterval();
private long mqttSessionStatePersistenceTimeout = ActiveMQDefaultConfiguration.getMqttSessionStatePersistenceTimeout();
private boolean suppressSessionNotifications = ActiveMQDefaultConfiguration.getDefaultSuppressSessionNotifications();
private String literalMatchMarkers = ActiveMQDefaultConfiguration.getLiteralMatchMarkers();
@ -3216,6 +3218,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public long getMqttSessionStatePersistenceTimeout() {
return mqttSessionStatePersistenceTimeout;
}
@Override
public Configuration setMqttSessionStatePersistenceTimeout(long mqttSessionStatePersistenceTimeout) {
this.mqttSessionStatePersistenceTimeout = mqttSessionStatePersistenceTimeout;
return this;
}
@Override
public boolean isSuppressSessionNotifications() {
return suppressSessionNotifications;

View File

@ -491,6 +491,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setMqttSessionScanInterval(getLong(e, "mqtt-session-scan-interval", config.getMqttSessionScanInterval(), GT_ZERO));
config.setMqttSessionStatePersistenceTimeout(getLong(e, "mqtt-session-state-persistence-timeout", config.getMqttSessionStatePersistenceTimeout(), GT_ZERO));
long globalMaxSize = getTextBytesAsLongBytes(e, GLOBAL_MAX_SIZE, -1, MINUS_ONE_OR_GT_ZERO);
if (globalMaxSize > 0) {

View File

@ -455,6 +455,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="mqtt-session-state-persistence-timeout" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
how long (in ms) to wait to persist MQTT session state
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element ref="connectors" maxOccurs="1" minOccurs="0"/>
<xsd:element ref="acceptors" maxOccurs="1" minOccurs="0"/>

View File

@ -179,6 +179,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(true, conf.isPopulateValidatedUser());
Assert.assertEquals(false, conf.isRejectEmptyValidatedUser());
Assert.assertEquals(123456, conf.getMqttSessionScanInterval());
Assert.assertEquals(567890, conf.getMqttSessionStatePersistenceTimeout());
Assert.assertEquals(98765, conf.getConnectionTtlCheckInterval());
Assert.assertEquals(1234567, conf.getConfigurationFileRefreshPeriod());
Assert.assertEquals("TEMP", conf.getTemporaryQueueNamespace());

View File

@ -57,6 +57,7 @@
<populate-validated-user>true</populate-validated-user>
<reject-empty-validated-user>false</reject-empty-validated-user>
<mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>

View File

@ -58,6 +58,7 @@
<populate-validated-user>true</populate-validated-user>
<reject-empty-validated-user>false</reject-empty-validated-user>
<mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>

View File

@ -58,6 +58,7 @@
<populate-validated-user>true</populate-validated-user>
<reject-empty-validated-user>false</reject-empty-validated-user>
<mqtt-session-scan-interval>123456</mqtt-session-scan-interval>
<mqtt-session-state-persistence-timeout>567890</mqtt-session-state-persistence-timeout>
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>

View File

@ -91,10 +91,32 @@ Payload logging is limited to avoid filling the logs with potentially hundreds o
== Persistent Subscriptions
The subscription information for MQTT sessions is stored in an internal queue named `$sys.mqtt.sessions` and persisted to disk (assuming persistence is enabled).
The subscription information for MQTT sessions is stored in an internal queue named `$sys.mqtt.sessions` and persisted to storage (assuming persistence is enabled).
The information is durable so that MQTT subscribers can reconnect and resume their subscriptions seamlessly after a broker restart, failure, etc.
When brokers are configured for high availability this information will be available on the backup so even in the case of a broker fail-over subscribers will be able to resume their subscriptions.
While persistent subscriptions can be convenient they impose a performance penalty since data must be written to storage.
If you don't need the convenience (e.g. you always use clean sessions) and you don't want the performance penalty then you can disable it by disabling durability for the `$sys.mqtt.sessions` queue in `broker.xml`, e.g.:
[,xml]
----
<addresses>
...
<address name="$sys.mqtt.sessions">
<anycast>
<queue name="$sys.mqtt.sessions">
<durable>false</durable>
</queue>
</anycast>
</address>
...
</addresses>
----
The setting `mqtt-session-state-persistence-timeout` controls how long the broker will wait for the data to be written to storage before throwing an error.
It is measured in milliseconds.
The default is `5000`.
== Custom Client ID Handling
The client ID used by an MQTT application is very important as it uniquely identifies the application.