diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index 54d5390f8f..bc511ea1dc 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -74,14 +74,21 @@ public class MQTTConnectionManager { return; } - session.setSessionState(getSessionState(clientId)); String password = passwordInBytes == null ? null : new String(passwordInBytes, CharsetUtil.UTF_8); session.getConnection().setClientID(clientId); ServerSessionImpl serverSession = createServerSession(username, password); serverSession.start(); - session.setServerSession(serverSession); - session.setIsClean(cleanSession); + + session.setSessionState(getSessionState(clientId)); + + if (cleanSession) { + /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and + * start a new one. This Session lasts as long as the Network Connection. State data associated with this Session + * MUST NOT be reused in any subsequent Session */ + session.clean(); + session.setClean(true); + } if (will) { isWill = true; @@ -154,21 +161,15 @@ public class MQTTConnectionManager { } } - private MQTTSessionState getSessionState(String clientId) throws InterruptedException { - /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and - * start a new one This Session lasts as long as the Network Connection. State data associated with this Session - * MUST NOT be reused in any subsequent Session */ - - /* [MQTT-3.1.2-4] Attach an existing session if one exists (if cleanSession flag is false) otherwise create - a new one. */ + private MQTTSessionState getSessionState(String clientId) { + /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */ MQTTSessionState state = MQTTSession.SESSIONS.get(clientId); - if (state != null) { - return state; - } else { + if (state == null) { state = new MQTTSessionState(clientId); MQTTSession.SESSIONS.put(clientId, state); - return state; } + + return state; } private String validateClientId(String clientId, boolean cleanSession) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index da10f47db1..640b893958 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -57,7 +58,7 @@ public class MQTTSession { private MQTTProtocolManager protocolManager; - private boolean isClean; + private boolean clean; private WildcardConfiguration wildcardConfiguration; @@ -107,6 +108,7 @@ public class MQTTSession { if (isClean()) { clean(); + SESSIONS.remove(connection.getClientID()); } } stopped = true; @@ -117,14 +119,11 @@ public class MQTTSession { } boolean isClean() { - return isClean; + return clean; } - void setIsClean(boolean isClean) throws Exception { - this.isClean = isClean; - if (isClean) { - clean(); - } + void setClean(boolean clean) { + this.clean = clean; } MQTTPublishManager getMqttPublishManager() { @@ -201,4 +200,8 @@ public class MQTTSession { public CoreMessageObjectPools getCoreMessageObjectPools() { return coreMessageObjectPools; } + + public static Map getSessions() { + return new HashMap<>(SESSIONS); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index bfc83e02ff..9fc6cfd4d3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -1090,7 +1090,7 @@ public class MQTTTest extends MQTTTestSupport { } @Test(timeout = 60 * 1000) - public void testCleanSession() throws Exception { + public void testCleanSessionForSubscriptions() throws Exception { final String CLIENTID = "cleansession"; final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false); BlockingConnection notClean = mqttNotClean.blockingConnection(); @@ -1100,7 +1100,9 @@ public class MQTTTest extends MQTTTestSupport { notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.disconnect(); - // MUST receive message from previous not clean session + assertEquals(1, MQTTSession.getSessions().size()); + + // MUST receive message from existing subscription from previous not clean session notClean = mqttNotClean.blockingConnection(); notClean.connect(); Message msg = notClean.receive(10000, TimeUnit.MILLISECONDS); @@ -1110,7 +1112,9 @@ public class MQTTTest extends MQTTTestSupport { notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.disconnect(); - // MUST NOT receive message from previous not clean session + assertEquals(1, MQTTSession.getSessions().size()); + + // MUST NOT receive message from previous not clean session as existing subscription should be gone final MQTT mqttClean = createMQTTConnection(CLIENTID, true); final BlockingConnection clean = mqttClean.blockingConnection(); clean.connect(); @@ -1120,12 +1124,42 @@ public class MQTTTest extends MQTTTestSupport { clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); clean.disconnect(); - // MUST NOT receive message from previous clean session + assertEquals(0, MQTTSession.getSessions().size()); + + // MUST NOT receive message from previous clean session as existing subscription should be gone notClean = mqttNotClean.blockingConnection(); notClean.connect(); msg = notClean.receive(1000, TimeUnit.MILLISECONDS); assertNull(msg); notClean.disconnect(); + + assertEquals(1, MQTTSession.getSessions().size()); + } + + @Test(timeout = 60 * 1000) + public void testCleanSessionForMessages() throws Exception { + final String CLIENTID = "cleansession"; + final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false); + BlockingConnection notClean = mqttNotClean.blockingConnection(); + final String TOPIC = "TopicA"; + notClean.connect(); + notClean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)}); + notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); + notClean.disconnect(); + + assertEquals(1, MQTTSession.getSessions().size()); + + // MUST NOT receive message from previous not clean session even when creating a new subscription + final MQTT mqttClean = createMQTTConnection(CLIENTID, true); + final BlockingConnection clean = mqttClean.blockingConnection(); + clean.connect(); + clean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)}); + Message msg = clean.receive(10000, TimeUnit.MILLISECONDS); + assertNull(msg); + clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); + clean.disconnect(); + + assertEquals(0, MQTTSession.getSessions().size()); } @Test(timeout = 60 * 1000)