From a6abf68ba5826c1bd217ac1a26abe9811cf509ce Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 12 Apr 2022 10:05:03 -0500 Subject: [PATCH] ARTEMIS-3774 support user properties on MQTT will message --- .../protocol/mqtt/MQTTConnectionManager.java | 5 ++ .../core/protocol/mqtt/MQTTSession.java | 11 ++++ .../core/protocol/mqtt/MQTTSessionState.java | 11 ++++ .../tests/integration/mqtt5/MQTT5Test.java | 54 +++++++++++++++++++ 4 files changed, 81 insertions(+) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index 8a36d83c2d..91b9fb8fc7 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import java.util.UUID; +import java.util.List; import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.mqtt.MqttConnectMessage; @@ -126,6 +127,10 @@ public class MQTTConnectionManager { if (willDelayInterval != null) { session.getState().setWillDelayInterval(( int) willDelayInterval.value()); } + List userProperties = willProperties.getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()); + if (userProperties != null) { + session.getState().setWillUserProperties(userProperties); + } } } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index d8e7fb91ed..3f7b98ba28 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -20,6 +20,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import java.util.UUID; import io.netty.handler.codec.mqtt.MqttMessageBuilders; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; import org.apache.activemq.artemis.core.config.WildcardConfiguration; @@ -252,12 +253,22 @@ public class MQTTSession { public void sendWillMessage() { try { + MqttProperties properties; + if (state.getWillUserProperties() == null) { + properties = MqttProperties.NO_PROPERTIES; + } else { + properties = new MqttProperties(); + for (MqttProperties.MqttProperty userProperty : state.getWillUserProperties()) { + properties.add(userProperty); + } + } MqttPublishMessage publishMessage = MqttMessageBuilders.publish() .messageId(0) .qos(MqttQoS.valueOf(state.getWillQoSLevel())) .retained(state.isWillRetain()) .topicName(state.getWillTopic()) .payload(state.getWillMessage()) + .properties(properties) .build(); logger.debugf("%s sending will message: %s", this, publishMessage); getMqttPublishManager().sendToQueue(publishMessage, true); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java index ba23ec7026..4174931cb6 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.config.WildcardConfiguration; @@ -75,6 +76,8 @@ public class MQTTSessionState { private long willDelayInterval = 0; + private List willUserProperties; + private boolean willSent = false; private boolean failed = false; @@ -278,6 +281,14 @@ public class MQTTSessionState { this.willDelayInterval = willDelayInterval; } + public void setWillUserProperties(List userProperties) { + this.willUserProperties = userProperties; + } + + public List getWillUserProperties() { + return willUserProperties; + } + public boolean isWillSent() { return willSent; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java index 76b20445ac..1a4d398158 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java @@ -22,6 +22,8 @@ import javax.jms.JMSContext; import javax.jms.Message; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -29,7 +31,11 @@ import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.utils.Wait; import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder; import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; import org.jboss.logging.Logger; import org.junit.Assume; import org.junit.Test; @@ -110,4 +116,52 @@ public class MQTT5Test extends MQTT5TestSupport { consumer.disconnect(); consumer.close(); } + + /* + * There is no normative statement in the spec about supporting user properties on will messages, but it is implied + * in various places. + */ + @Test(timeout = DEFAULT_TIMEOUT) + public void testWillMessageProperties() throws Exception { + final byte[] WILL = RandomUtil.randomBytes(); + final String[][] properties = new String[10][2]; + for (String[] property : properties) { + property[0] = RandomUtil.randomString(); + property[1] = RandomUtil.randomString(); + } + + // consumer of the will message + MqttClient client1 = createPahoClient("willConsumer"); + CountDownLatch latch = new CountDownLatch(1); + client1.setCallback(new DefaultMqttCallback() { + @Override + public void messageArrived(String topic, MqttMessage message) { + int i = 0; + for (UserProperty property : message.getProperties().getUserProperties()) { + assertEquals(properties[i][0], property.getKey()); + assertEquals(properties[i][1], property.getValue()); + i++; + } + latch.countDown(); + } + }); + client1.connect(); + client1.subscribe("/topic/foo", 1); + + // consumer to generate the will + MqttClient client2 = createPahoClient("willGenerator"); + MqttProperties willMessageProperties = new MqttProperties(); + List userProperties = new ArrayList<>(); + for (String[] property : properties) { + userProperties.add(new UserProperty(property[0], property[1])); + } + willMessageProperties.setUserProperties(userProperties); + MqttConnectionOptions options = new MqttConnectionOptionsBuilder() + .will("/topic/foo", new MqttMessage(WILL)) + .build(); + options.setWillMessageProperties(willMessageProperties); + client2.connect(options); + client2.disconnectForcibly(0, 0, false); + assertTrue(latch.await(2, TimeUnit.SECONDS)); + } }