ARTEMIS-3788 don't send MQTT will message if will flag = false

This commit is contained in:
Justin Bertram 2022-04-20 12:01:33 -05:00 committed by clebertsuconic
parent 98eb31a2d7
commit 97b4f6a578
4 changed files with 48 additions and 11 deletions

View File

@ -195,7 +195,7 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
if (!state.isAttached() && sessionExpiryInterval > 0 && state.getDisconnectedTime() + (sessionExpiryInterval * 1000) < System.currentTimeMillis()) {
toRemove.add(entry.getKey());
}
if (!state.isAttached() && state.isFailed() && !state.isWillSent() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) {
if (state.isWill() && !state.isAttached() && state.isFailed() && !state.isWillSent() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) {
state.getSession().sendWillMessage();
}
}
@ -203,7 +203,7 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
for (String key : toRemove) {
logger.debugf("Removing state for session: %s", key);
MQTTSessionState state = sessionStates.remove(key);
if (state != null && !state.isAttached() && state.isFailed() && !state.isWillSent()) {
if (state != null && state.isWill() && !state.isAttached() && state.isFailed() && !state.isWillSent()) {
state.getSession().sendWillMessage();
}
}

View File

@ -19,6 +19,8 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.UUID;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
@ -268,7 +270,7 @@ public class MQTTSession {
.qos(MqttQoS.valueOf(state.getWillQoSLevel()))
.retained(state.isWillRetain())
.topicName(state.getWillTopic())
.payload(state.getWillMessage())
.payload(state.getWillMessage() == null ? new EmptyByteBuf(PooledByteBufAllocator.DEFAULT) : state.getWillMessage())
.properties(properties)
.build();
logger.debugf("%s sending will message: %s", this, publishMessage);

View File

@ -26,8 +26,11 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
@ -185,4 +188,24 @@ public class MQTT5Test extends MQTT5TestSupport {
Wait.assertEquals(0, () -> getSessionStates().size(), 5000, 10);
}
/*
* If the Will flag is false then don't send a will message even if the session expiry is > 0
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testWillFlagFalseWithSessionExpiryDelay() throws Exception {
// enable send-to-dla-on-no-route so that we can detect an errant will message on disconnect
server.createQueue(new QueueConfiguration("activemq.notifications"));
server.createQueue(new QueueConfiguration("DLA"));
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setSendToDLAOnNoRoute(true).setDeadLetterAddress(SimpleString.toSimpleString("DLA")));
MqttClient client = createPahoClient("willGenerator");
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
.sessionExpiryInterval(1L)
.build();
client.connect(options);
client.disconnectForcibly(0, 0, false);
scanSessions();
assertEquals(0, server.locateQueue("DLA").getMessageCount());
}
}

View File

@ -314,15 +314,27 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
}
public Map<String, MQTTSessionState> getSessionStates() {
Acceptor acceptor = server.getRemotingService().getAcceptor("MQTT");
if (acceptor instanceof AbstractAcceptor) {
ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT");
if (protocolManager instanceof MQTTProtocolManager) {
return ((MQTTProtocolManager) protocolManager).getSessionStates();
}
MQTTProtocolManager protocolManager = getProtocolManager();
if (protocolManager == null) {
return Collections.emptyMap();
} else {
return protocolManager.getSessionStates();
}
return Collections.emptyMap();
}
public void scanSessions() {
getProtocolManager().scanSessions();
}
public MQTTProtocolManager getProtocolManager() {
Acceptor acceptor = server.getRemotingService().getAcceptor(MQTT_PROTOCOL_NAME);
if (acceptor instanceof AbstractAcceptor) {
ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get(MQTT_PROTOCOL_NAME);
if (protocolManager instanceof MQTTProtocolManager) {
return (MQTTProtocolManager) protocolManager;
}
}
return null;
}
protected Queue getSubscriptionQueue(String TOPIC) {