This commit is contained in:
Clebert Suconic 2021-09-10 17:17:56 -04:00
commit cdc046a152
3 changed files with 37 additions and 2 deletions

View File

@ -22,6 +22,7 @@ import java.util.UUID;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.util.CharsetUtil;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -74,6 +75,7 @@ public class MQTTConnectionManager {
return;
}
boolean sessionPresent = session.getProtocolManager().getSessionStates().containsKey(clientId);
MQTTSessionState sessionState = getSessionState(clientId);
synchronized (sessionState) {
session.setSessionState(sessionState);
@ -104,7 +106,7 @@ public class MQTTConnectionManager {
}
session.getConnection().setConnected(true);
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED);
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent && !cleanSession, MqttProperties.NO_PROPERTIES);
// ensure we don't publish before the CONNACK
session.start();
}

View File

@ -195,8 +195,16 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
}
void sendConnack(MqttConnectReturnCode returnCode, MqttProperties properties) {
sendConnack(returnCode, true, properties);
}
void sendConnack(MqttConnectReturnCode returnCode, boolean sessionPresent, MqttProperties properties) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true, properties);
// [MQTT-3.2.2-4] If a server sends a CONNACK packet containing a non-zero return code it MUST set Session Present to 0.
if (returnCode.byteValue() != (byte) 0x00) {
sessionPresent = false;
}
MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent, properties);
MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
sendToClient(message);
}

View File

@ -24,9 +24,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
@ -161,6 +164,28 @@ public class PahoMQTTTest extends MQTTTestSupport {
producer.close();
}
@Test(timeout = 300000)
public void testSessionPresentWithCleanSession() throws Exception {
MqttClient client = createPahoClient(RandomUtil.randomString());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
IMqttToken result = client.connectWithResult(options);
assertFalse(result.getSessionPresent());
client.disconnect();
}
@Test(timeout = 300000)
public void testSessionPresent() throws Exception {
MqttClient client = createPahoClient(RandomUtil.randomString());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
IMqttToken result = client.connectWithResult(options);
assertFalse(result.getSessionPresent());
client.disconnect();
result = client.connectWithResult(options);
assertTrue(result.getSessionPresent());
}
private MqttClient createPahoClient(String clientId) throws MqttException {
return new MqttClient(protocol + "://localhost:" + getPort(), clientId, new MemoryPersistence());
}