This commit is contained in:
Clebert Suconic 2019-01-09 16:48:56 -05:00
commit 72b85b1938
3 changed files with 63 additions and 25 deletions

View File

@ -74,14 +74,21 @@ public class MQTTConnectionManager {
return;
}
session.setSessionState(getSessionState(clientId));
String password = passwordInBytes == null ? null : new String(passwordInBytes, CharsetUtil.UTF_8);
session.getConnection().setClientID(clientId);
ServerSessionImpl serverSession = createServerSession(username, password);
serverSession.start();
session.setServerSession(serverSession);
session.setIsClean(cleanSession);
session.setSessionState(getSessionState(clientId));
if (cleanSession) {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one. This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
session.clean();
session.setClean(true);
}
if (will) {
isWill = true;
@ -154,21 +161,15 @@ public class MQTTConnectionManager {
}
}
private MQTTSessionState getSessionState(String clientId) throws InterruptedException {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
/* [MQTT-3.1.2-4] Attach an existing session if one exists (if cleanSession flag is false) otherwise create
a new one. */
private MQTTSessionState getSessionState(String clientId) {
/* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */
MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
if (state != null) {
return state;
} else {
if (state == null) {
state = new MQTTSessionState(clientId);
MQTTSession.SESSIONS.put(clientId, state);
return state;
}
return state;
}
private String validateClientId(String clientId, boolean cleanSession) {

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -57,7 +58,7 @@ public class MQTTSession {
private MQTTProtocolManager protocolManager;
private boolean isClean;
private boolean clean;
private WildcardConfiguration wildcardConfiguration;
@ -107,6 +108,7 @@ public class MQTTSession {
if (isClean()) {
clean();
SESSIONS.remove(connection.getClientID());
}
}
stopped = true;
@ -117,14 +119,11 @@ public class MQTTSession {
}
boolean isClean() {
return isClean;
return clean;
}
void setIsClean(boolean isClean) throws Exception {
this.isClean = isClean;
if (isClean) {
clean();
}
void setClean(boolean clean) {
this.clean = clean;
}
MQTTPublishManager getMqttPublishManager() {
@ -201,4 +200,8 @@ public class MQTTSession {
public CoreMessageObjectPools getCoreMessageObjectPools() {
return coreMessageObjectPools;
}
public static Map<String, MQTTSessionState> getSessions() {
return new HashMap<>(SESSIONS);
}
}

View File

@ -1090,7 +1090,7 @@ public class MQTTTest extends MQTTTestSupport {
}
@Test(timeout = 60 * 1000)
public void testCleanSession() throws Exception {
public void testCleanSessionForSubscriptions() throws Exception {
final String CLIENTID = "cleansession";
final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false);
BlockingConnection notClean = mqttNotClean.blockingConnection();
@ -1100,7 +1100,9 @@ public class MQTTTest extends MQTTTestSupport {
notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
notClean.disconnect();
// MUST receive message from previous not clean session
assertEquals(1, MQTTSession.getSessions().size());
// MUST receive message from existing subscription from previous not clean session
notClean = mqttNotClean.blockingConnection();
notClean.connect();
Message msg = notClean.receive(10000, TimeUnit.MILLISECONDS);
@ -1110,7 +1112,9 @@ public class MQTTTest extends MQTTTestSupport {
notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
notClean.disconnect();
// MUST NOT receive message from previous not clean session
assertEquals(1, MQTTSession.getSessions().size());
// MUST NOT receive message from previous not clean session as existing subscription should be gone
final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
final BlockingConnection clean = mqttClean.blockingConnection();
clean.connect();
@ -1120,12 +1124,42 @@ public class MQTTTest extends MQTTTestSupport {
clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
clean.disconnect();
// MUST NOT receive message from previous clean session
assertEquals(0, MQTTSession.getSessions().size());
// MUST NOT receive message from previous clean session as existing subscription should be gone
notClean = mqttNotClean.blockingConnection();
notClean.connect();
msg = notClean.receive(1000, TimeUnit.MILLISECONDS);
assertNull(msg);
notClean.disconnect();
assertEquals(1, MQTTSession.getSessions().size());
}
@Test(timeout = 60 * 1000)
public void testCleanSessionForMessages() throws Exception {
final String CLIENTID = "cleansession";
final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false);
BlockingConnection notClean = mqttNotClean.blockingConnection();
final String TOPIC = "TopicA";
notClean.connect();
notClean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
notClean.disconnect();
assertEquals(1, MQTTSession.getSessions().size());
// MUST NOT receive message from previous not clean session even when creating a new subscription
final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
final BlockingConnection clean = mqttClean.blockingConnection();
clean.connect();
clean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
Message msg = clean.receive(10000, TimeUnit.MILLISECONDS);
assertNull(msg);
clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
clean.disconnect();
assertEquals(0, MQTTSession.getSessions().size());
}
@Test(timeout = 60 * 1000)