ARTEMIS-4200 configurable link-stealing for MQTT

This commit is contained in:
Justin Bertram 2023-03-08 14:02:46 -06:00
parent 9b4204b345
commit 68c5bed159
6 changed files with 162 additions and 11 deletions

View File

@ -254,8 +254,11 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
return; return;
} }
MQTTConnection existingConnection = session.getProtocolManager().addConnectedClient(session.getConnection().getClientID(), session.getConnection()); if (handleLinkStealing() == LinkStealingResult.NEW_LINK_DENIED) {
disconnectExistingSession(existingConnection); return;
} else {
protocolManager.addConnectedClient(session.getConnection().getClientID(), session.getConnection());
}
if (connection.getTransportConnection().getRouter() == null || !protocolManager.getRoutingHandler().route(connection, session, connect)) { if (connection.getTransportConnection().getRouter() == null || !protocolManager.getRoutingHandler().route(connection, session, connect)) {
calculateKeepAlive(connect); calculateKeepAlive(connect);
@ -452,19 +455,49 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
return true; return true;
} }
// [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST disconnect the existing client /* The MQTT specification states:
private void disconnectExistingSession(MQTTConnection existingConnection) { *
if (existingConnection != null) { * [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST
MQTTSession existingSession = session.getProtocolManager().getSessionState(session.getConnection().getClientID()).getSession(); * disconnect the existing client
if (existingSession != null) { *
if (existingSession.getVersion() == MQTTVersion.MQTT_5) { * However, this behavior is configurable via the "allowLinkStealing" acceptor URL property.
existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER); */
private LinkStealingResult handleLinkStealing() {
final String clientID = session.getConnection().getClientID();
LinkStealingResult result;
if (protocolManager.isClientConnected(clientID)) {
MQTTConnection existingConnection = protocolManager.getConnectedClient(clientID);
if (protocolManager.isAllowLinkStealing()) {
MQTTSession existingSession = protocolManager.getSessionState(clientID).getSession();
if (existingSession != null) {
if (existingSession.getVersion() == MQTTVersion.MQTT_5) {
existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER);
}
existingSession.getConnectionManager().disconnect(false);
} else {
existingConnection.disconnect(false);
} }
existingSession.getConnectionManager().disconnect(false); logger.debug("Existing MQTT session from {} closed due to incoming session from {} with the same client ID: {}", existingConnection.getRemoteAddress(), connection.getRemoteAddress(), session.getConnection().getClientID());
result = LinkStealingResult.EXISTING_LINK_STOLEN;
} else { } else {
existingConnection.disconnect(false); if (session.getVersion() == MQTTVersion.MQTT_5) {
sendDisconnect(MQTTReasonCodes.UNSPECIFIED_ERROR);
}
logger.debug("Incoming MQTT session from {} closed due to existing session from {} with the same client ID: {}", connection.getRemoteAddress(), existingConnection.getRemoteAddress(), session.getConnection().getClientID());
/*
* Stopping the session here prevents the connection failure listener from inadvertently removing the
* existing session once the connection is disconnected.
*/
session.setStopped(true);
connection.disconnect(false);
result = LinkStealingResult.NEW_LINK_DENIED;
} }
} else {
result = LinkStealingResult.NO_ACTION;
} }
return result;
} }
private Pair<Boolean, String> validateUser(String username, String password) throws Exception { private Pair<Boolean, String> validateUser(String username, String password) throws Exception {
@ -510,4 +543,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
disconnect(true); disconnect(true);
return false; return false;
} }
private enum LinkStealingResult {
EXISTING_LINK_STOLEN, NEW_LINK_DENIED, NO_ACTION;
}
} }

View File

@ -75,6 +75,8 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
private boolean closeMqttConnectionOnPublishAuthorizationFailure = true; private boolean closeMqttConnectionOnPublishAuthorizationFailure = true;
private boolean allowLinkStealing = true;
private final MQTTRoutingHandler routingHandler; private final MQTTRoutingHandler routingHandler;
MQTTProtocolManager(ActiveMQServer server, MQTTProtocolManager(ActiveMQServer server,
@ -139,6 +141,14 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure; this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure;
} }
public boolean isAllowLinkStealing() {
return allowLinkStealing;
}
public void setAllowLinkStealing(boolean allowLinkStealing) {
this.allowLinkStealing = allowLinkStealing;
}
@Override @Override
public void onNotification(Notification notification) { public void onNotification(Notification notification) {
if (!(notification.getType() instanceof CoreNotificationType)) if (!(notification.getType() instanceof CoreNotificationType))
@ -348,6 +358,10 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
return false; return false;
} }
public boolean isClientConnected(String clientId) {
return connectedClients.containsKey(clientId);
}
public void removeConnectedClient(String clientId) { public void removeConnectedClient(String clientId) {
connectedClients.remove(clientId); connectedClients.remove(clientId);
} }
@ -362,6 +376,10 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
return connectedClients.put(clientId, connection); return connectedClients.put(clientId, connection);
} }
public MQTTConnection getConnectedClient(String clientId) {
return connectedClients.get(clientId);
}
public MQTTSessionState getSessionState(String clientId) { public MQTTSessionState getSessionState(String clientId) {
/* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */ /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */
return sessionStates.computeIfAbsent(clientId, MQTTSessionState::new); return sessionStates.computeIfAbsent(clientId, MQTTSessionState::new);

View File

@ -154,6 +154,10 @@ public class MQTTSession {
return stopped; return stopped;
} }
void setStopped(boolean stopped) {
this.stopped = stopped;
}
boolean isClean() { boolean isClean() {
return clean; return clean;
} }

View File

@ -185,6 +185,20 @@ SSL/TLS is also available, e.g.:
Web browsers can then connect to `wss://<server>:8883` using a Web Socket to Web browsers can then connect to `wss://<server>:8883` using a Web Socket to
send and receive MQTT messages. send and receive MQTT messages.
## Link Stealing
The MQTT specifications define a behavior often referred to as "link stealing."
This means that whenever a new client connects with the same client ID as
another existing client then the existing client's session will be closed and
its network connection will be terminated.
In certain use-cases this behavior is not desired so it is configurable. The
URL parameter `allowLinkStealing` can be configured on the MQTT `acceptor` to
modify this behavior. By default `allowLinkStealing` is `true`. If it is set to
`false` then whenever a new client connects with the same client ID as another
existing client then the _new_ client's session will be closed and its network
connection will be terminated. In the case of MQTT 5 clients they will receive
a disconnect reason code of [`0x80` (i.e. "Unspecified error")](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901208).
## Automatic Subscription Clean-up ## Automatic Subscription Clean-up

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.junit.Test;
public class MQTTDisabledLinkStealingTest extends MQTTTestSupport {
@Test(timeout = 60 * 1000)
public void testDisabledLinkStealing() throws Exception {
final String clientId = RandomUtil.randomString();
MQTT mqtt = createMQTTConnection(clientId, false);
mqtt.setKeepAlive((short) 2);
final BlockingConnection connection1 = mqtt.blockingConnection();
connection1.connect();
final BlockingConnection connection2 = mqtt.blockingConnection();
try {
connection2.connect();
fail("Should have thrown an exception on connect due to disabled link stealing");
} catch (Exception e) {
// ignore expected exception
}
assertTrue("Client no longer connected!", Wait.waitFor(() -> connection1.isConnected(), 3000, 200));
connection1.disconnect();
}
@Override
protected void addMQTTConnector() throws Exception {
server.getConfiguration().addAcceptorConfiguration("MQTT", "tcp://localhost:" + port + "?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:;allowLinkStealing=false");
}
}

View File

@ -490,4 +490,30 @@ public class MQTT5Test extends MQTT5TestSupport {
consumer.disconnect(); consumer.disconnect();
consumer.close(); consumer.close();
} }
@Test(timeout = DEFAULT_TIMEOUT)
public void testConnectionStealingDisabled() throws Exception {
setAcceptorProperty("allowLinkStealing=false");
final String CLIENT_ID = RandomUtil.randomString();
MqttClient client = createPahoClient(CLIENT_ID);
client.connect();
MqttClient client2 = createPahoClient(CLIENT_ID);
try {
client2.connect();
fail("Should have thrown an exception on connect due to disabled link stealing");
} catch (Exception e) {
// ignore expected exception
}
// only 1 session should exist
Wait.assertEquals(1, () -> getSessionStates().size(), 2000, 100);
assertNotNull(getSessionStates().get(CLIENT_ID));
assertTrue(client.isConnected());
client.disconnect();
client.close();
}
} }