ARTEMIS-3622 MQTT can deadlock on client cxn/discxn

This commit fixes the deadlock described on ARTEMIS-3622 by moving the
synchronization "up" a level from the MQTTSession to the
MQTTConnectionManager. It also eliminates the synchronization on the
MQTTSessionState in the MQTTConnectionManager because it's no longer
needed. This change should not only eliminate the deadlock, but improve
performance relatively as well.

There is no test associated with this commit as I wasn't able to
reproduce the deadlock with any kind of straight-forward test. There was
a test linked on the Jira, but it involved intrusive and fragile
scaffolding and wasn't ultimately tenable. That said, I did test this
fix with that test and it was successful. In any case, I think static
analysis should be sufficient here as the changes are pretty
straight-forward.
This commit is contained in:
Justin Bertram 2024-04-30 08:08:37 -05:00 committed by Robbie Gemmell
parent 3474a39301
commit 3c058e98f1
2 changed files with 78 additions and 79 deletions

View File

@ -51,7 +51,7 @@ public class MQTTConnectionManager {
session.getConnection().addFailureListener(failureListener);
}
void connect(MqttConnectMessage connect, String validatedUser, String username, String password) throws Exception {
synchronized void connect(MqttConnectMessage connect, String validatedUser, String username, String password) throws Exception {
if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getConnection().setProtocolVersion(Byte.toString(MqttVersion.MQTT_5.protocolLevel()));
String authenticationMethod = MQTTUtil.getProperty(String.class, connect.variableHeader().properties(), AUTHENTICATION_METHOD);
@ -68,67 +68,65 @@ public class MQTTConnectionManager {
String clientId = session.getConnection().getClientID();
boolean sessionPresent = session.getStateManager().getSessionStates().containsKey(clientId);
MQTTSessionState sessionState = getSessionState(clientId);
synchronized (sessionState) {
session.setSessionState(sessionState);
sessionState.setFailed(false);
ServerSessionImpl serverSession = createServerSession(username, password, validatedUser);
serverSession.start();
ServerSessionImpl internalServerSession = createServerSession(username, password, validatedUser);
internalServerSession.disableSecurity();
internalServerSession.start();
session.setServerSession(serverSession, internalServerSession);
MQTTSessionState sessionState = session.getStateManager().getSessionState(clientId);
session.setSessionState(sessionState);
sessionState.setFailed(false);
ServerSessionImpl serverSession = createServerSession(username, password, validatedUser);
serverSession.start();
ServerSessionImpl internalServerSession = createServerSession(username, password, validatedUser);
internalServerSession.disableSecurity();
internalServerSession.start();
session.setServerSession(serverSession, internalServerSession);
if (cleanStart) {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one. This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
session.clean(true);
session.setClean(true);
}
if (cleanStart) {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one. This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
session.clean(true);
session.setClean(true);
}
if (connect.variableHeader().isWillFlag()) {
session.getState().setWill(true);
byte[] willMessage = connect.payload().willMessageInBytes();
session.getState().setWillMessage(ByteBufAllocator.DEFAULT.buffer(willMessage.length).writeBytes(willMessage));
session.getState().setWillQoSLevel(connect.variableHeader().willQos());
session.getState().setWillRetain(connect.variableHeader().isWillRetain());
session.getState().setWillTopic(connect.payload().willTopic());
if (connect.variableHeader().isWillFlag()) {
session.getState().setWill(true);
byte[] willMessage = connect.payload().willMessageInBytes();
session.getState().setWillMessage(ByteBufAllocator.DEFAULT.buffer(willMessage.length).writeBytes(willMessage));
session.getState().setWillQoSLevel(connect.variableHeader().willQos());
session.getState().setWillRetain(connect.variableHeader().isWillRetain());
session.getState().setWillTopic(connect.payload().willTopic());
if (session.getVersion() == MQTTVersion.MQTT_5) {
MqttProperties willProperties = connect.payload().willProperties();
if (willProperties != null) {
MqttProperties.MqttProperty willDelayInterval = willProperties.getProperty(WILL_DELAY_INTERVAL.value());
if (willDelayInterval != null) {
session.getState().setWillDelayInterval(( int) willDelayInterval.value());
}
List<? extends MqttProperties.MqttProperty> userProperties = willProperties.getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value());
if (userProperties != null) {
session.getState().setWillUserProperties(userProperties);
}
if (session.getVersion() == MQTTVersion.MQTT_5) {
MqttProperties willProperties = connect.payload().willProperties();
if (willProperties != null) {
MqttProperties.MqttProperty willDelayInterval = willProperties.getProperty(WILL_DELAY_INTERVAL.value());
if (willDelayInterval != null) {
session.getState().setWillDelayInterval(( int) willDelayInterval.value());
}
List<? extends MqttProperties.MqttProperty> userProperties = willProperties.getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value());
if (userProperties != null) {
session.getState().setWillUserProperties(userProperties);
}
}
}
MqttProperties connackProperties;
if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getConnection().setReceiveMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), RECEIVE_MAXIMUM, -1));
sessionState.setClientSessionExpiryInterval(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), SESSION_EXPIRY_INTERVAL, 0));
sessionState.setClientMaxPacketSize(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), MAXIMUM_PACKET_SIZE, 0));
sessionState.setClientTopicAliasMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), TOPIC_ALIAS_MAXIMUM));
connackProperties = getConnackProperties();
} else {
sessionState.setClientSessionExpiryInterval(session.getProtocolManager().getDefaultMqttSessionExpiryInterval());
connackProperties = MqttProperties.NO_PROPERTIES;
}
session.getConnection().setConnected(true);
session.getProtocolHandler().sendConnack(MQTTReasonCodes.SUCCESS, sessionPresent && !cleanStart, connackProperties);
// ensure we don't publish before the CONNACK
session.start();
}
MqttProperties connackProperties;
if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getConnection().setReceiveMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), RECEIVE_MAXIMUM, -1));
sessionState.setClientSessionExpiryInterval(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), SESSION_EXPIRY_INTERVAL, 0));
sessionState.setClientMaxPacketSize(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), MAXIMUM_PACKET_SIZE, 0));
sessionState.setClientTopicAliasMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), TOPIC_ALIAS_MAXIMUM));
connackProperties = getConnackProperties();
} else {
sessionState.setClientSessionExpiryInterval(session.getProtocolManager().getDefaultMqttSessionExpiryInterval());
connackProperties = MqttProperties.NO_PROPERTIES;
}
session.getConnection().setConnected(true);
session.getProtocolHandler().sendConnack(MQTTReasonCodes.SUCCESS, sessionPresent && !cleanStart, connackProperties);
// ensure we don't publish before the CONNACK
session.start();
}
private MqttProperties getConnackProperties() {
@ -176,33 +174,27 @@ public class MQTTConnectionManager {
return (ServerSessionImpl) serverSession;
}
void disconnect(boolean failure) {
synchronized void disconnect(boolean failure) {
if (session == null || session.getStopped()) {
return;
}
synchronized (session.getState()) {
try {
session.stop(failure);
session.getConnection().destroy();
} catch (Exception e) {
MQTTLogger.LOGGER.errorDisconnectingClient(e);
} finally {
if (session.getState() != null) {
String clientId = session.getState().getClientId();
/**
* ensure that the connection for the client ID matches *this* connection otherwise we could remove the
* entry for the client who "stole" this client ID via [MQTT-3.1.4-2]
*/
if (clientId != null && session.getStateManager().isClientConnected(clientId, session.getConnection())) {
session.getStateManager().removeConnectedClient(clientId);
}
try {
session.stop(failure);
session.getConnection().destroy();
} catch (Exception e) {
MQTTLogger.LOGGER.errorDisconnectingClient(e);
} finally {
if (session.getState() != null) {
String clientId = session.getState().getClientId();
/**
* ensure that the connection for the client ID matches *this* connection otherwise we could remove the
* entry for the client who "stole" this client ID via [MQTT-3.1.4-2]
*/
if (clientId != null && session.getStateManager().isClientConnected(clientId, session.getConnection())) {
session.getStateManager().removeConnectedClient(clientId);
}
}
}
}
private synchronized MQTTSessionState getSessionState(String clientId) throws Exception {
return session.getStateManager().getSessionState(clientId);
}
}

View File

@ -98,14 +98,21 @@ public class MQTTSession {
logger.debug("MQTT session created: {}", id);
}
// Called after the client has Connected.
synchronized void start() throws Exception {
/*
* This method is only called by MQTTConnectionManager.connect
* which is synchronized with MQTTConnectionManager.disconnect
*/
void start() throws Exception {
mqttPublishManager.start();
subscriptionManager.start();
stopped = false;
}
synchronized void stop(boolean failure) throws Exception {
/*
* This method is only called by MQTTConnectionManager.disconnect
* which is synchronized with MQTTConnectionManager.connect
*/
void stop(boolean failure) throws Exception {
state.setFailed(failure);
if (!stopped) {