diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 5b3f7a6764..1c26fc4fed 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -457,7 +457,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { if (existingConnection != null) { MQTTSession existingSession = session.getProtocolManager().getSessionState(session.getConnection().getClientID()).getSession(); if (existingSession != null) { - if (session.getVersion() == MQTTVersion.MQTT_5) { + if (existingSession.getVersion() == MQTTVersion.MQTT_5) { existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER); } existingSession.getConnectionManager().disconnect(false); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java index 8482f9573b..6b191e0e34 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java @@ -106,6 +106,10 @@ public class MQTT5TestSupport extends ActiveMQTestBase { return new MqttClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new MemoryPersistence()); } + protected org.eclipse.paho.client.mqttv3.MqttClient createPaho3_1_1Client(String clientId) throws org.eclipse.paho.client.mqttv3.MqttException { + return new org.eclipse.paho.client.mqttv3.MqttClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence()); + } + protected MqttAsyncClient createAsyncPahoClient(String clientId) throws MqttException { return new MqttAsyncClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new MemoryPersistence()); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java index 9464382e0b..a3b6eb3c03 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java @@ -738,6 +738,47 @@ public class ConnectTests extends MQTT5TestSupport { client2.disconnect(); } + /* + * [MQTT-3.1.4-3] If the ClientID represents a Client already connected to the Server, the Server sends a DISCONNECT + * packet to the existing Client with Reason Code of 0x8E (Session taken over) as described in section 4.13 and MUST + * close the Network Connection of the existing Client. + */ + @Test(timeout = DEFAULT_TIMEOUT) + public void testConnectionStealingBy3_1_1() throws Exception { + final String CLIENT_ID = RandomUtil.randomString(); + + MqttClient client = createPahoClient(CLIENT_ID); + client.connect(); + final int[] reasonCode = new int[1]; + CountDownLatch disconnectedLatch = new CountDownLatch(1); + client.setCallback(new LatchedMqttCallback(disconnectedLatch) { + @Override + public void disconnected(MqttDisconnectResponse disconnectResponse) { + reasonCode[0] = disconnectResponse.getReturnCode(); + disconnectedLatch.countDown(); + } + + @Override + public void mqttErrorOccurred(MqttException exception) { + exception.printStackTrace(); + } + }); + + org.eclipse.paho.client.mqttv3.MqttClient client2 = createPaho3_1_1Client(CLIENT_ID); + client2.connect(); + + assertTrue(disconnectedLatch.await(500, TimeUnit.MILLISECONDS)); + assertEquals(MQTTReasonCodes.SESSION_TAKEN_OVER, (byte) reasonCode[0]); + + // only 1 session should exist + assertEquals(1, getSessionStates().size()); + assertNotNull(getSessionStates().get(CLIENT_ID)); + + assertFalse(client.isConnected()); + + client2.disconnect(); + } + /* * [MQTT-3.1.4-4] The Server MUST perform the processing of Clean Start. *