diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index f36c962a71..ab03c2916b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -22,6 +22,7 @@ import java.util.UUID; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.util.CharsetUtil; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -74,6 +75,7 @@ public class MQTTConnectionManager { return; } + boolean sessionPresent = session.getProtocolManager().getSessionStates().containsKey(clientId); MQTTSessionState sessionState = getSessionState(clientId); synchronized (sessionState) { session.setSessionState(sessionState); @@ -104,7 +106,7 @@ public class MQTTConnectionManager { } session.getConnection().setConnected(true); - session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED); + session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent && !cleanSession, MqttProperties.NO_PROPERTIES); // ensure we don't publish before the CONNACK session.start(); } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 82bf22192a..f33c5decec 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -195,8 +195,16 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { } void sendConnack(MqttConnectReturnCode returnCode, MqttProperties properties) { + sendConnack(returnCode, true, properties); + } + + void sendConnack(MqttConnectReturnCode returnCode, boolean sessionPresent, MqttProperties properties) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true, properties); + // [MQTT-3.2.2-4] If a server sends a CONNACK packet containing a non-zero return code it MUST set Session Present to 0. + if (returnCode.byteValue() != (byte) 0x00) { + sessionPresent = false; + } + MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent, properties); MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader); sendToClient(message); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java index 464adda8f9..f321f346e2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java @@ -24,9 +24,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; @@ -161,6 +164,28 @@ public class PahoMQTTTest extends MQTTTestSupport { producer.close(); } + @Test(timeout = 300000) + public void testSessionPresentWithCleanSession() throws Exception { + MqttClient client = createPahoClient(RandomUtil.randomString()); + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(true); + IMqttToken result = client.connectWithResult(options); + assertFalse(result.getSessionPresent()); + client.disconnect(); + } + + @Test(timeout = 300000) + public void testSessionPresent() throws Exception { + MqttClient client = createPahoClient(RandomUtil.randomString()); + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(false); + IMqttToken result = client.connectWithResult(options); + assertFalse(result.getSessionPresent()); + client.disconnect(); + result = client.connectWithResult(options); + assertTrue(result.getSessionPresent()); + } + private MqttClient createPahoClient(String clientId) throws MqttException { return new MqttClient(protocol + "://localhost:" + getPort(), clientId, new MemoryPersistence()); }