ARTEMIS-4201 send proper MQTT disconnect code on stolen link

This commit is contained in:
Justin Bertram 2023-03-08 13:33:08 -06:00
parent 39b0f01ca9
commit 9b4204b345
3 changed files with 46 additions and 1 deletions

View File

@ -457,7 +457,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
if (existingConnection != null) {
MQTTSession existingSession = session.getProtocolManager().getSessionState(session.getConnection().getClientID()).getSession();
if (existingSession != null) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
if (existingSession.getVersion() == MQTTVersion.MQTT_5) {
existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER);
}
existingSession.getConnectionManager().disconnect(false);

View File

@ -106,6 +106,10 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
return new MqttClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new MemoryPersistence());
}
protected org.eclipse.paho.client.mqttv3.MqttClient createPaho3_1_1Client(String clientId) throws org.eclipse.paho.client.mqttv3.MqttException {
return new org.eclipse.paho.client.mqttv3.MqttClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence());
}
protected MqttAsyncClient createAsyncPahoClient(String clientId) throws MqttException {
return new MqttAsyncClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new MemoryPersistence());
}

View File

@ -738,6 +738,47 @@ public class ConnectTests extends MQTT5TestSupport {
client2.disconnect();
}
/*
* [MQTT-3.1.4-3] If the ClientID represents a Client already connected to the Server, the Server sends a DISCONNECT
* packet to the existing Client with Reason Code of 0x8E (Session taken over) as described in section 4.13 and MUST
* close the Network Connection of the existing Client.
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testConnectionStealingBy3_1_1() throws Exception {
final String CLIENT_ID = RandomUtil.randomString();
MqttClient client = createPahoClient(CLIENT_ID);
client.connect();
final int[] reasonCode = new int[1];
CountDownLatch disconnectedLatch = new CountDownLatch(1);
client.setCallback(new LatchedMqttCallback(disconnectedLatch) {
@Override
public void disconnected(MqttDisconnectResponse disconnectResponse) {
reasonCode[0] = disconnectResponse.getReturnCode();
disconnectedLatch.countDown();
}
@Override
public void mqttErrorOccurred(MqttException exception) {
exception.printStackTrace();
}
});
org.eclipse.paho.client.mqttv3.MqttClient client2 = createPaho3_1_1Client(CLIENT_ID);
client2.connect();
assertTrue(disconnectedLatch.await(500, TimeUnit.MILLISECONDS));
assertEquals(MQTTReasonCodes.SESSION_TAKEN_OVER, (byte) reasonCode[0]);
// only 1 session should exist
assertEquals(1, getSessionStates().size());
assertNotNull(getSessionStates().get(CLIENT_ID));
assertFalse(client.isConnected());
client2.disconnect();
}
/*
* [MQTT-3.1.4-4] The Server MUST perform the processing of Clean Start.
*