ARTEMIS-3789 respect session expiry interval on MQTT disconnect message
This commit is contained in:
parent
d1e1faacc4
commit
96fa98fc93
|
@ -51,6 +51,7 @@ import org.jboss.logging.Logger;
|
|||
|
||||
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_DATA;
|
||||
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD;
|
||||
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL;
|
||||
|
||||
/**
|
||||
* This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the
|
||||
|
@ -173,7 +174,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
|||
handleUnsubscribe((MqttUnsubscribeMessage) message);
|
||||
break;
|
||||
case DISCONNECT:
|
||||
disconnect(false);
|
||||
disconnect(false, message);
|
||||
break;
|
||||
case UNSUBACK:
|
||||
case SUBACK:
|
||||
|
@ -280,6 +281,16 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
|||
}
|
||||
|
||||
void disconnect(boolean error) {
|
||||
disconnect(error, null);
|
||||
}
|
||||
|
||||
void disconnect(boolean error, MqttMessage disconnect) {
|
||||
if (disconnect != null && disconnect.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) {
|
||||
Integer sessionExpiryInterval = MQTTUtil.getProperty(Integer.class, ((MqttReasonCodeAndPropertiesVariableHeader)disconnect.variableHeader()).properties(), SESSION_EXPIRY_INTERVAL, null);
|
||||
if (sessionExpiryInterval != null) {
|
||||
session.getState().setClientSessionExpiryInterval(sessionExpiryInterval);
|
||||
}
|
||||
}
|
||||
session.getConnectionManager().disconnect(error);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,16 +20,17 @@ package org.apache.activemq.artemis.tests.integration.mqtt5;
|
|||
import javax.jms.JMSConsumer;
|
||||
import javax.jms.JMSContext;
|
||||
import javax.jms.Message;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
|
||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
|
||||
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
|
||||
import org.eclipse.paho.mqttv5.client.MqttClient;
|
||||
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
|
||||
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
|
||||
|
@ -164,4 +165,24 @@ public class MQTT5Test extends MQTT5TestSupport {
|
|||
client2.disconnectForcibly(0, 0, false);
|
||||
assertTrue(latch.await(2, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
/*
|
||||
* It's possible for a client to change their session expiry interval via the DISCONNECT packet. Ensure we respect
|
||||
* a new session expiry interval when disconnecting.
|
||||
*/
|
||||
@Test(timeout = DEFAULT_TIMEOUT)
|
||||
public void testExpiryDelayOnDisconnect() throws Exception {
|
||||
final String CONSUMER_ID = RandomUtil.randomString();
|
||||
|
||||
MqttAsyncClient consumer = createAsyncPahoClient(CONSUMER_ID);
|
||||
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
|
||||
.sessionExpiryInterval(300L)
|
||||
.build();
|
||||
consumer.connect(options).waitForCompletion();
|
||||
MqttProperties disconnectProperties = new MqttProperties();
|
||||
disconnectProperties.setSessionExpiryInterval(0L);
|
||||
consumer.disconnect(0, null, null, MQTTReasonCodes.SUCCESS, disconnectProperties).waitForCompletion();
|
||||
|
||||
Wait.assertEquals(0, () -> getSessionStates().size(), 5000, 10);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue