ARTEMIS-4365 MQTT retain flag not set correctly

This commit is contained in:
Justin Bertram 2023-07-17 15:14:29 -05:00 committed by Robbie Gemmell
parent e5b18b80f7
commit c8d685ee87
7 changed files with 180 additions and 51 deletions

View File

@ -29,6 +29,7 @@ import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@ -60,6 +61,7 @@ import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCR
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS;
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_CONTENT_TYPE_KEY;
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_CORRELATION_DATA_KEY;
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY;
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_KEY;
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_PAYLOAD_FORMAT_INDICATOR_KEY;
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_RESPONSE_TOPIC_KEY;
@ -404,12 +406,15 @@ public class MQTTPublishManager {
// [MQTT-3.3.1-2] The DUP flag MUST be set to 0 for all QoS 0 messages.
boolean redelivery = qos == 0 ? false : (deliveryCount > 1);
boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
boolean isRetain = message.containsProperty(MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY);
MqttProperties mqttProperties = getPublishProperties(message);
if (session.getVersion() == MQTTVersion.MQTT_5) {
if (session.getState().getSubscription(message.getAddress()) != null && !session.getState().getSubscription(message.getAddress()).option().isRetainAsPublished()) {
isRetain = false;
if (!isRetain && message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY)) {
MqttTopicSubscription sub = session.getState().getSubscription(message.getAddress());
if (sub != null && sub.option().isRetainAsPublished()) {
isRetain = true;
}
}
if (session.getState().getClientTopicAliasMaximum() != null) {

View File

@ -28,6 +28,8 @@ import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY;
public class MQTTRetainMessageManager {
private MQTTSession session;
@ -59,7 +61,6 @@ public class MQTTRetainMessageManager {
Message message = LargeServerMessageImpl.checkLargeMessage(messageParameter, session.getServer().getStorageManager());
sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
}
}
void addRetainedMessagesToQueue(Queue queue, String address) throws Exception {
@ -82,6 +83,7 @@ public class MQTTRetainMessageManager {
}
}
Message message = ref.getMessage().copy(session.getServer().getStorageManager().generateID());
message.putStringProperty(MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY, (String) null);
sendToQueue(message, queue, tx);
}
}

View File

@ -96,6 +96,8 @@ public class MQTTUtil {
public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = SimpleString.toSimpleString("mqtt.message.retain");
public static final SimpleString MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY = SimpleString.toSimpleString("mqtt.message.retain.initial.distribution");
public static final SimpleString MQTT_PAYLOAD_FORMAT_INDICATOR_KEY = SimpleString.toSimpleString("mqtt.payload.format.indicator");
public static final SimpleString MQTT_RESPONSE_TOPIC_KEY = SimpleString.toSimpleString("mqtt.response.topic");

View File

@ -64,12 +64,6 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
expectedProperties.put(MESSAGE_TEXT, msgText);
expectedProperties.put(RETAINED, retained);
final MQTTClientProvider subscribeProvider = getMQTTClientProvider();
initializeConnection(subscribeProvider);
subscribeProvider.subscribe(addressQueue, AT_MOST_ONCE);
final CountDownLatch latch = new CountDownLatch(1);
MQTTInterceptor incomingInterceptor = (packet, connection) -> {
if (packet.fixedHeader().messageType() == MqttMessageType.PUBLISH) {
@ -89,25 +83,25 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
byte[] payload = subscribeProvider.receive(10000);
assertNotNull("Should get a message", payload);
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
});
thread.start();
final MQTTClientProvider publishProvider = getMQTTClientProvider();
initializeConnection(publishProvider);
publishProvider.publish(addressQueue, msgText.getBytes(), AT_MOST_ONCE, retained);
final MQTTClientProvider subscribeProvider = getMQTTClientProvider();
initializeConnection(subscribeProvider);
subscribeProvider.subscribe(addressQueue, AT_MOST_ONCE);
Thread thread = new Thread(() -> {
try {
byte[] payload = subscribeProvider.receive(10000);
assertNotNull("Should get a message", payload);
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
thread.start();
latch.await(10, TimeUnit.SECONDS);
subscribeProvider.disconnect();
publishProvider.disconnect();

View File

@ -23,8 +23,10 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.EOFException;
import java.lang.invoke.MethodHandles;
import java.net.ProtocolException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
@ -61,6 +63,8 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
@ -74,7 +78,6 @@ import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
@ -2219,4 +2222,71 @@ public class MQTTTest extends MQTTTestSupport {
Wait.assertTrue(() -> server.locateQueue(RETAINED_QUEUE).getMessageCount() == 0, 3000, 50);
Wait.assertTrue(() -> server.locateQueue(RETAINED_QUEUE) == null, 3000, 50);
}
/*
* [MQTT-3.3.1-9] When sending a PUBLISH Packet to a Client the Server...MUST set the RETAIN flag to 0 when a PUBLISH
* Packet is sent to a Client because it matches an *established* subscription regardless of how the flag was set in
* the message it received.
*/
@Test(timeout = 60 * 1000)
public void testRetainFlagOnEstablishedSubscription() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final String topic = getTopicName();
MqttClient subscriber = createPaho3_1_1Client("subscriber");
subscriber.setCallback(new DefaultMqtt3Callback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
if (!message.isRetained()) {
latch.countDown();
}
}
});
subscriber.connect();
subscriber.subscribe(topic, 1);
MqttClient publisher = createPaho3_1_1Client("publisher");
publisher.connect();
publisher.publish(topic, "retained".getBytes(StandardCharsets.UTF_8), 1, true);
publisher.disconnect();
publisher.close();
assertTrue("Did not receive expected message within timeout", latch.await(1, TimeUnit.SECONDS));
subscriber.disconnect();
subscriber.close();
}
/*
* [MQTT-3.3.1-8] When sending a PUBLISH Packet to a Client the Server MUST set the RETAIN flag to 1 if a message is
* sent as a result of a new subscription being made by a Client.
*/
@Test(timeout = 60 * 1000)
public void testRetainFlagOnNewSubscription() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final String topic = getTopicName();
MqttClient publisher = createPaho3_1_1Client("publisher");
publisher.connect();
publisher.publish(topic, "retained".getBytes(StandardCharsets.UTF_8), 1, true);
publisher.disconnect();
publisher.close();
MqttClient subscriber = createPaho3_1_1Client("subscriber");
subscriber.setCallback(new DefaultMqtt3Callback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
if (message.isRetained()) {
latch.countDown();
}
}
});
subscriber.connect();
subscriber.subscribe(topic, 1);
assertTrue("Did not receive expected message within timeout", latch.await(1, TimeUnit.SECONDS));
subscriber.disconnect();
subscriber.close();
}
}

View File

@ -56,6 +56,10 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.fusesource.hawtdispatch.DispatchPriority;
import org.fusesource.hawtdispatch.internal.DispatcherConfig;
import org.fusesource.mqtt.client.MQTT;
@ -374,6 +378,10 @@ public class MQTTTestSupport extends ActiveMQTestBase {
return mqtt;
}
protected MqttClient createPaho3_1_1Client(String clientId) throws org.eclipse.paho.client.mqttv3.MqttException {
return new MqttClient("tcp://localhost:" + port, clientId, new MemoryPersistence());
}
public Map<String, MQTTSessionState> getSessions() {
Acceptor acceptor = server.getRemotingService().getAcceptor("MQTT");
if (acceptor instanceof AbstractAcceptor) {
@ -481,4 +489,18 @@ public class MQTTTestSupport extends ActiveMQTestBase {
return messageCount;
}
}
protected interface DefaultMqtt3Callback extends MqttCallback {
@Override
default void connectionLost(Throwable cause) {
}
@Override
default void messageArrived(String topic, org.eclipse.paho.client.mqttv3.MqttMessage message) throws Exception {
}
@Override
default void deliveryComplete(IMqttDeliveryToken token) {
}
}
}

View File

@ -339,6 +339,11 @@ public class PublishTests extends MQTT5TestSupport {
}
/*
* When a new Nonshared Subscription is made, the last retained message, if any, on each matching topic name is sent
* to the Client as directed by the Retain Handling Subscription Option. These messages are sent with the RETAIN flag
* set to 1. Which retained messages are sent is controlled by the Retain Handling Subscription Option. At the time
* of the Subscription...
*
* [MQTT-3.3.1-9] If Retain Handling is set to 0 the Server MUST send the retained messages matching the Topic Filter
* of the subscription to the Client.
*
@ -350,6 +355,11 @@ public class PublishTests extends MQTT5TestSupport {
}
/*
* When a new Nonshared Subscription is made, the last retained message, if any, on each matching topic name is sent
* to the Client as directed by the Retain Handling Subscription Option. These messages are sent with the RETAIN flag
* set to 1. Which retained messages are sent is controlled by the Retain Handling Subscription Option. At the time
* of the Subscription...
*
* [MQTT-3.3.1-9] If Retain Handling is set to 0 the Server MUST send the retained messages matching the Topic Filter
* of the subscription to the Client.
*
@ -361,6 +371,11 @@ public class PublishTests extends MQTT5TestSupport {
}
/*
* When a new Nonshared Subscription is made, the last retained message, if any, on each matching topic name is sent
* to the Client as directed by the Retain Handling Subscription Option. These messages are sent with the RETAIN flag
* set to 1. Which retained messages are sent is controlled by the Retain Handling Subscription Option. At the time
* of the Subscription...
*
* [MQTT-3.3.1-9] If Retain Handling is set to 0 the Server MUST send the retained messages matching the Topic Filter
* of the subscription to the Client.
*
@ -408,6 +423,7 @@ public class PublishTests extends MQTT5TestSupport {
}
}
assertTrue(payloadMatched);
assertTrue(message.isRetained());
latch.countDown();
}
});
@ -432,6 +448,11 @@ public class PublishTests extends MQTT5TestSupport {
}
/*
* When a new Nonshared Subscription is made, the last retained message, if any, on each matching topic name is sent
* to the Client as directed by the Retain Handling Subscription Option. These messages are sent with the RETAIN flag
* set to 1. Which retained messages are sent is controlled by the Retain Handling Subscription Option. At the time
* of the Subscription...
*
* [MQTT-3.3.1-10] If Retain Handling is set to 1 then if the subscription did not already exist, the Server MUST
* send all retained message matching the Topic Filter of the subscription to the Client, and if the subscription did
* exist the Server MUST NOT send the retained messages.
@ -462,8 +483,9 @@ public class PublishTests extends MQTT5TestSupport {
final CountDownLatch latch = new CountDownLatch(1);
consumer.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
public void messageArrived(String topic, MqttMessage message) {
assertEqualsByteArrays("retained".getBytes(StandardCharsets.UTF_8), message.getPayload());
assertTrue(message.isRetained());
latch.countDown();
}
});
@ -492,7 +514,12 @@ public class PublishTests extends MQTT5TestSupport {
}
/*
* [MQTT-3.3.1-11] If Retain Handling is set to 2, the Server MUST NOT send the retained
* When a new Nonshared Subscription is made, the last retained message, if any, on each matching topic name is sent
* to the Client as directed by the Retain Handling Subscription Option. These messages are sent with the RETAIN flag
* set to 1. Which retained messages are sent is controlled by the Retain Handling Subscription Option. At the time
* of the Subscription...
*
* [MQTT-3.3.1-11] If Retain Handling is set to 2, the Server MUST NOT send the retained messages
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testRetainHandlingTwo() throws Exception {
@ -527,23 +554,18 @@ public class PublishTests extends MQTT5TestSupport {
}
/*
* The setting of the RETAIN flag in an Application Message forwarded by the Server from an *established* connection
* is controlled by the Retain As Published subscription option.
*
* [MQTT-3.3.1-12] If the value of Retain As Published subscription option is set to 0, the Server MUST set the
* RETAIN flag to 0 when forwarding an Application Message regardless of how the RETAIN flag was set in the received
* PUBLISH packet.
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testRetainAsPublishedZero() throws Exception {
public void testRetainAsPublishedZeroOnEstablishedSubscription() throws Exception {
final String CONSUMER_ID = RandomUtil.randomString();
final String TOPIC = this.getTopicName();
// send retained message
MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC, "retained".getBytes(), 2, true);
Wait.assertTrue(() -> server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
producer.disconnect();
producer.close();
// create consumer
final CountDownLatch latch = new CountDownLatch(1);
MqttClient consumer = createPahoClient(CONSUMER_ID);
@ -560,20 +582,6 @@ public class PublishTests extends MQTT5TestSupport {
subscription.setRetainAsPublished(false);
consumer.subscribe(new MqttSubscription[]{subscription});
assertTrue(latch.await(2, TimeUnit.SECONDS));
consumer.disconnect();
consumer.close();
}
/*
* [MQTT-3.3.1-13] If the value of Retain As Published subscription option is set to 1, the Server MUST set the
* RETAIN flag equal to the RETAIN flag in the received PUBLISH packet.
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testRetainAsPublishedOne() throws Exception {
final String CONSUMER_ID = RandomUtil.randomString();
final String TOPIC = this.getTopicName();
// send retained message
MqttClient producer = createPahoClient("producer");
producer.connect();
@ -582,6 +590,23 @@ public class PublishTests extends MQTT5TestSupport {
producer.disconnect();
producer.close();
assertTrue(latch.await(2, TimeUnit.SECONDS));
consumer.disconnect();
consumer.close();
}
/*
* The setting of the RETAIN flag in an Application Message forwarded by the Server from an *established* connection
* is controlled by the Retain As Published subscription option.
*
* [MQTT-3.3.1-13] If the value of Retain As Published subscription option is set to 1, the Server MUST set the
* RETAIN flag equal to the RETAIN flag in the received PUBLISH packet.
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testRetainAsPublishedOneOnEstablishedSubscription() throws Exception {
final String CONSUMER_ID = RandomUtil.randomString();
final String TOPIC = this.getTopicName();
// create consumer
final CountDownLatch latchOne = new CountDownLatch(1);
final CountDownLatch latchTwo = new CountDownLatch(1);
@ -605,6 +630,15 @@ public class PublishTests extends MQTT5TestSupport {
MqttSubscription subscription = new MqttSubscription(TOPIC, 2);
subscription.setRetainAsPublished(true);
consumer.subscribe(new MqttSubscription[]{subscription});
// send retained message
MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC, "retained".getBytes(), 2, true);
Wait.assertTrue(() -> server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
producer.disconnect();
producer.close();
assertTrue(latchOne.await(2, TimeUnit.SECONDS));
producer = createPahoClient("producer");