ARTEMIS-3774 support user properties on MQTT will message

This commit is contained in:
Justin Bertram 2022-04-12 10:05:03 -05:00 committed by clebertsuconic
parent 9d943483d5
commit a6abf68ba5
4 changed files with 81 additions and 0 deletions

View File

@ -18,6 +18,7 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.UUID;
import java.util.List;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
@ -126,6 +127,10 @@ public class MQTTConnectionManager {
if (willDelayInterval != null) {
session.getState().setWillDelayInterval(( int) willDelayInterval.value());
}
List<? extends MqttProperties.MqttProperty> userProperties = willProperties.getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value());
if (userProperties != null) {
session.getState().setWillUserProperties(userProperties);
}
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.UUID;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
@ -252,12 +253,22 @@ public class MQTTSession {
public void sendWillMessage() {
try {
MqttProperties properties;
if (state.getWillUserProperties() == null) {
properties = MqttProperties.NO_PROPERTIES;
} else {
properties = new MqttProperties();
for (MqttProperties.MqttProperty userProperty : state.getWillUserProperties()) {
properties.add(userProperty);
}
}
MqttPublishMessage publishMessage = MqttMessageBuilders.publish()
.messageId(0)
.qos(MqttQoS.valueOf(state.getWillQoSLevel()))
.retained(state.isWillRetain())
.topicName(state.getWillTopic())
.payload(state.getWillMessage())
.properties(properties)
.build();
logger.debugf("%s sending will message: %s", this, publishMessage);
getMqttPublishManager().sendToQueue(publishMessage, true);

View File

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
@ -75,6 +76,8 @@ public class MQTTSessionState {
private long willDelayInterval = 0;
private List<? extends MqttProperties.MqttProperty> willUserProperties;
private boolean willSent = false;
private boolean failed = false;
@ -278,6 +281,14 @@ public class MQTTSessionState {
this.willDelayInterval = willDelayInterval;
}
public void setWillUserProperties(List<? extends MqttProperties.MqttProperty> userProperties) {
this.willUserProperties = userProperties;
}
public List<? extends MqttProperties.MqttProperty> getWillUserProperties() {
return willUserProperties;
}
public boolean isWillSent() {
return willSent;
}

View File

@ -22,6 +22,8 @@ import javax.jms.JMSContext;
import javax.jms.Message;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -29,7 +31,11 @@ import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import org.jboss.logging.Logger;
import org.junit.Assume;
import org.junit.Test;
@ -110,4 +116,52 @@ public class MQTT5Test extends MQTT5TestSupport {
consumer.disconnect();
consumer.close();
}
/*
* There is no normative statement in the spec about supporting user properties on will messages, but it is implied
* in various places.
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testWillMessageProperties() throws Exception {
final byte[] WILL = RandomUtil.randomBytes();
final String[][] properties = new String[10][2];
for (String[] property : properties) {
property[0] = RandomUtil.randomString();
property[1] = RandomUtil.randomString();
}
// consumer of the will message
MqttClient client1 = createPahoClient("willConsumer");
CountDownLatch latch = new CountDownLatch(1);
client1.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
int i = 0;
for (UserProperty property : message.getProperties().getUserProperties()) {
assertEquals(properties[i][0], property.getKey());
assertEquals(properties[i][1], property.getValue());
i++;
}
latch.countDown();
}
});
client1.connect();
client1.subscribe("/topic/foo", 1);
// consumer to generate the will
MqttClient client2 = createPahoClient("willGenerator");
MqttProperties willMessageProperties = new MqttProperties();
List<UserProperty> userProperties = new ArrayList<>();
for (String[] property : properties) {
userProperties.add(new UserProperty(property[0], property[1]));
}
willMessageProperties.setUserProperties(userProperties);
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
.will("/topic/foo", new MqttMessage(WILL))
.build();
options.setWillMessageProperties(willMessageProperties);
client2.connect(options);
client2.disconnectForcibly(0, 0, false);
assertTrue(latch.await(2, TimeUnit.SECONDS));
}
}