diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 31a5409a44..57b9605d21 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -29,6 +29,7 @@ import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; @@ -60,6 +61,7 @@ import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCR import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS; import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_CONTENT_TYPE_KEY; import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_CORRELATION_DATA_KEY; +import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY; import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_KEY; import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_PAYLOAD_FORMAT_INDICATOR_KEY; import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_RESPONSE_TOPIC_KEY; @@ -404,12 +406,15 @@ public class MQTTPublishManager { // [MQTT-3.3.1-2] The DUP flag MUST be set to 0 for all QoS 0 messages. boolean redelivery = qos == 0 ? false : (deliveryCount > 1); - boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY); + boolean isRetain = message.containsProperty(MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY); MqttProperties mqttProperties = getPublishProperties(message); if (session.getVersion() == MQTTVersion.MQTT_5) { - if (session.getState().getSubscription(message.getAddress()) != null && !session.getState().getSubscription(message.getAddress()).option().isRetainAsPublished()) { - isRetain = false; + if (!isRetain && message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY)) { + MqttTopicSubscription sub = session.getState().getSubscription(message.getAddress()); + if (sub != null && sub.option().isRetainAsPublished()) { + isRetain = true; + } } if (session.getState().getClientTopicAliasMaximum() != null) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java index 6b3e03a5d4..1a21aa2c9f 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java @@ -28,6 +28,8 @@ import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY; + public class MQTTRetainMessageManager { private MQTTSession session; @@ -59,7 +61,6 @@ public class MQTTRetainMessageManager { Message message = LargeServerMessageImpl.checkLargeMessage(messageParameter, session.getServer().getStorageManager()); sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx); } - } void addRetainedMessagesToQueue(Queue queue, String address) throws Exception { @@ -82,6 +83,7 @@ public class MQTTRetainMessageManager { } } Message message = ref.getMessage().copy(session.getServer().getStorageManager().generateID()); + message.putStringProperty(MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY, (String) null); sendToQueue(message, queue, tx); } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index b7bc1f8b9d..5ac19a0f0a 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -96,6 +96,8 @@ public class MQTTUtil { public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = SimpleString.toSimpleString("mqtt.message.retain"); + public static final SimpleString MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY = SimpleString.toSimpleString("mqtt.message.retain.initial.distribution"); + public static final SimpleString MQTT_PAYLOAD_FORMAT_INDICATOR_KEY = SimpleString.toSimpleString("mqtt.payload.format.indicator"); public static final SimpleString MQTT_RESPONSE_TOPIC_KEY = SimpleString.toSimpleString("mqtt.response.topic"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTInterceptorPropertiesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTInterceptorPropertiesTest.java index 279c38afb7..e1ef5ffc13 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTInterceptorPropertiesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTInterceptorPropertiesTest.java @@ -64,12 +64,6 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport { expectedProperties.put(MESSAGE_TEXT, msgText); expectedProperties.put(RETAINED, retained); - - final MQTTClientProvider subscribeProvider = getMQTTClientProvider(); - initializeConnection(subscribeProvider); - - subscribeProvider.subscribe(addressQueue, AT_MOST_ONCE); - final CountDownLatch latch = new CountDownLatch(1); MQTTInterceptor incomingInterceptor = (packet, connection) -> { if (packet.fixedHeader().messageType() == MqttMessageType.PUBLISH) { @@ -89,25 +83,25 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport { server.getRemotingService().addIncomingInterceptor(incomingInterceptor); server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor); - - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - try { - byte[] payload = subscribeProvider.receive(10000); - assertNotNull("Should get a message", payload); - latch.countDown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - thread.start(); - final MQTTClientProvider publishProvider = getMQTTClientProvider(); initializeConnection(publishProvider); publishProvider.publish(addressQueue, msgText.getBytes(), AT_MOST_ONCE, retained); + final MQTTClientProvider subscribeProvider = getMQTTClientProvider(); + initializeConnection(subscribeProvider); + subscribeProvider.subscribe(addressQueue, AT_MOST_ONCE); + + Thread thread = new Thread(() -> { + try { + byte[] payload = subscribeProvider.receive(10000); + assertNotNull("Should get a message", payload); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread.start(); + latch.await(10, TimeUnit.SECONDS); subscribeProvider.disconnect(); publishProvider.disconnect(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java index 7ceb43fa87..964a70f87f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java @@ -23,8 +23,10 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import java.io.EOFException; +import java.lang.invoke.MethodHandles; import java.net.ProtocolException; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; @@ -61,6 +63,8 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttMessage; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; @@ -74,7 +78,6 @@ import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf; @@ -2219,4 +2222,71 @@ public class MQTTTest extends MQTTTestSupport { Wait.assertTrue(() -> server.locateQueue(RETAINED_QUEUE).getMessageCount() == 0, 3000, 50); Wait.assertTrue(() -> server.locateQueue(RETAINED_QUEUE) == null, 3000, 50); } + + /* + * [MQTT-3.3.1-9] When sending a PUBLISH Packet to a Client the Server...MUST set the RETAIN flag to 0 when a PUBLISH + * Packet is sent to a Client because it matches an *established* subscription regardless of how the flag was set in + * the message it received. + */ + @Test(timeout = 60 * 1000) + public void testRetainFlagOnEstablishedSubscription() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + final String topic = getTopicName(); + + MqttClient subscriber = createPaho3_1_1Client("subscriber"); + subscriber.setCallback(new DefaultMqtt3Callback() { + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + if (!message.isRetained()) { + latch.countDown(); + } + } + }); + subscriber.connect(); + subscriber.subscribe(topic, 1); + + MqttClient publisher = createPaho3_1_1Client("publisher"); + publisher.connect(); + publisher.publish(topic, "retained".getBytes(StandardCharsets.UTF_8), 1, true); + publisher.disconnect(); + publisher.close(); + + assertTrue("Did not receive expected message within timeout", latch.await(1, TimeUnit.SECONDS)); + + subscriber.disconnect(); + subscriber.close(); + } + + /* + * [MQTT-3.3.1-8] When sending a PUBLISH Packet to a Client the Server MUST set the RETAIN flag to 1 if a message is + * sent as a result of a new subscription being made by a Client. + */ + @Test(timeout = 60 * 1000) + public void testRetainFlagOnNewSubscription() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + final String topic = getTopicName(); + + MqttClient publisher = createPaho3_1_1Client("publisher"); + publisher.connect(); + publisher.publish(topic, "retained".getBytes(StandardCharsets.UTF_8), 1, true); + publisher.disconnect(); + publisher.close(); + + MqttClient subscriber = createPaho3_1_1Client("subscriber"); + subscriber.setCallback(new DefaultMqtt3Callback() { + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + if (message.isRetained()) { + latch.countDown(); + } + } + }); + subscriber.connect(); + subscriber.subscribe(topic, 1); + + assertTrue("Did not receive expected message within timeout", latch.await(1, TimeUnit.SECONDS)); + + subscriber.disconnect(); + subscriber.close(); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java index 82e9a337b4..9f8bcd0d0a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java @@ -56,6 +56,10 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.fusesource.hawtdispatch.DispatchPriority; import org.fusesource.hawtdispatch.internal.DispatcherConfig; import org.fusesource.mqtt.client.MQTT; @@ -374,6 +378,10 @@ public class MQTTTestSupport extends ActiveMQTestBase { return mqtt; } + protected MqttClient createPaho3_1_1Client(String clientId) throws org.eclipse.paho.client.mqttv3.MqttException { + return new MqttClient("tcp://localhost:" + port, clientId, new MemoryPersistence()); + } + public Map getSessions() { Acceptor acceptor = server.getRemotingService().getAcceptor("MQTT"); if (acceptor instanceof AbstractAcceptor) { @@ -481,4 +489,18 @@ public class MQTTTestSupport extends ActiveMQTestBase { return messageCount; } } + + protected interface DefaultMqtt3Callback extends MqttCallback { + @Override + default void connectionLost(Throwable cause) { + } + + @Override + default void messageArrived(String topic, org.eclipse.paho.client.mqttv3.MqttMessage message) throws Exception { + } + + @Override + default void deliveryComplete(IMqttDeliveryToken token) { + } + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java index b06999fddb..84ad469961 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java @@ -339,6 +339,11 @@ public class PublishTests extends MQTT5TestSupport { } /* + * When a new Non‑shared Subscription is made, the last retained message, if any, on each matching topic name is sent + * to the Client as directed by the Retain Handling Subscription Option. These messages are sent with the RETAIN flag + * set to 1. Which retained messages are sent is controlled by the Retain Handling Subscription Option. At the time + * of the Subscription... + * * [MQTT-3.3.1-9] If Retain Handling is set to 0 the Server MUST send the retained messages matching the Topic Filter * of the subscription to the Client. * @@ -350,6 +355,11 @@ public class PublishTests extends MQTT5TestSupport { } /* + * When a new Non‑shared Subscription is made, the last retained message, if any, on each matching topic name is sent + * to the Client as directed by the Retain Handling Subscription Option. These messages are sent with the RETAIN flag + * set to 1. Which retained messages are sent is controlled by the Retain Handling Subscription Option. At the time + * of the Subscription... + * * [MQTT-3.3.1-9] If Retain Handling is set to 0 the Server MUST send the retained messages matching the Topic Filter * of the subscription to the Client. * @@ -361,6 +371,11 @@ public class PublishTests extends MQTT5TestSupport { } /* + * When a new Non‑shared Subscription is made, the last retained message, if any, on each matching topic name is sent + * to the Client as directed by the Retain Handling Subscription Option. These messages are sent with the RETAIN flag + * set to 1. Which retained messages are sent is controlled by the Retain Handling Subscription Option. At the time + * of the Subscription... + * * [MQTT-3.3.1-9] If Retain Handling is set to 0 the Server MUST send the retained messages matching the Topic Filter * of the subscription to the Client. * @@ -408,6 +423,7 @@ public class PublishTests extends MQTT5TestSupport { } } assertTrue(payloadMatched); + assertTrue(message.isRetained()); latch.countDown(); } }); @@ -432,6 +448,11 @@ public class PublishTests extends MQTT5TestSupport { } /* + * When a new Non‑shared Subscription is made, the last retained message, if any, on each matching topic name is sent + * to the Client as directed by the Retain Handling Subscription Option. These messages are sent with the RETAIN flag + * set to 1. Which retained messages are sent is controlled by the Retain Handling Subscription Option. At the time + * of the Subscription... + * * [MQTT-3.3.1-10] If Retain Handling is set to 1 then if the subscription did not already exist, the Server MUST * send all retained message matching the Topic Filter of the subscription to the Client, and if the subscription did * exist the Server MUST NOT send the retained messages. @@ -462,8 +483,9 @@ public class PublishTests extends MQTT5TestSupport { final CountDownLatch latch = new CountDownLatch(1); consumer.setCallback(new DefaultMqttCallback() { @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { + public void messageArrived(String topic, MqttMessage message) { assertEqualsByteArrays("retained".getBytes(StandardCharsets.UTF_8), message.getPayload()); + assertTrue(message.isRetained()); latch.countDown(); } }); @@ -492,7 +514,12 @@ public class PublishTests extends MQTT5TestSupport { } /* - * [MQTT-3.3.1-11] If Retain Handling is set to 2, the Server MUST NOT send the retained + * When a new Non‑shared Subscription is made, the last retained message, if any, on each matching topic name is sent + * to the Client as directed by the Retain Handling Subscription Option. These messages are sent with the RETAIN flag + * set to 1. Which retained messages are sent is controlled by the Retain Handling Subscription Option. At the time + * of the Subscription... + * + * [MQTT-3.3.1-11] If Retain Handling is set to 2, the Server MUST NOT send the retained messages */ @Test(timeout = DEFAULT_TIMEOUT) public void testRetainHandlingTwo() throws Exception { @@ -527,23 +554,18 @@ public class PublishTests extends MQTT5TestSupport { } /* + * The setting of the RETAIN flag in an Application Message forwarded by the Server from an *established* connection + * is controlled by the Retain As Published subscription option. + * * [MQTT-3.3.1-12] If the value of Retain As Published subscription option is set to 0, the Server MUST set the * RETAIN flag to 0 when forwarding an Application Message regardless of how the RETAIN flag was set in the received * PUBLISH packet. */ @Test(timeout = DEFAULT_TIMEOUT) - public void testRetainAsPublishedZero() throws Exception { + public void testRetainAsPublishedZeroOnEstablishedSubscription() throws Exception { final String CONSUMER_ID = RandomUtil.randomString(); final String TOPIC = this.getTopicName(); - // send retained message - MqttClient producer = createPahoClient("producer"); - producer.connect(); - producer.publish(TOPIC, "retained".getBytes(), 2, true); - Wait.assertTrue(() -> server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100); - producer.disconnect(); - producer.close(); - // create consumer final CountDownLatch latch = new CountDownLatch(1); MqttClient consumer = createPahoClient(CONSUMER_ID); @@ -560,20 +582,6 @@ public class PublishTests extends MQTT5TestSupport { subscription.setRetainAsPublished(false); consumer.subscribe(new MqttSubscription[]{subscription}); - assertTrue(latch.await(2, TimeUnit.SECONDS)); - consumer.disconnect(); - consumer.close(); - } - - /* - * [MQTT-3.3.1-13] If the value of Retain As Published subscription option is set to 1, the Server MUST set the - * RETAIN flag equal to the RETAIN flag in the received PUBLISH packet. - */ - @Test(timeout = DEFAULT_TIMEOUT) - public void testRetainAsPublishedOne() throws Exception { - final String CONSUMER_ID = RandomUtil.randomString(); - final String TOPIC = this.getTopicName(); - // send retained message MqttClient producer = createPahoClient("producer"); producer.connect(); @@ -582,6 +590,23 @@ public class PublishTests extends MQTT5TestSupport { producer.disconnect(); producer.close(); + assertTrue(latch.await(2, TimeUnit.SECONDS)); + consumer.disconnect(); + consumer.close(); + } + + /* + * The setting of the RETAIN flag in an Application Message forwarded by the Server from an *established* connection + * is controlled by the Retain As Published subscription option. + * + * [MQTT-3.3.1-13] If the value of Retain As Published subscription option is set to 1, the Server MUST set the + * RETAIN flag equal to the RETAIN flag in the received PUBLISH packet. + */ + @Test(timeout = DEFAULT_TIMEOUT) + public void testRetainAsPublishedOneOnEstablishedSubscription() throws Exception { + final String CONSUMER_ID = RandomUtil.randomString(); + final String TOPIC = this.getTopicName(); + // create consumer final CountDownLatch latchOne = new CountDownLatch(1); final CountDownLatch latchTwo = new CountDownLatch(1); @@ -605,6 +630,15 @@ public class PublishTests extends MQTT5TestSupport { MqttSubscription subscription = new MqttSubscription(TOPIC, 2); subscription.setRetainAsPublished(true); consumer.subscribe(new MqttSubscription[]{subscription}); + + // send retained message + MqttClient producer = createPahoClient("producer"); + producer.connect(); + producer.publish(TOPIC, "retained".getBytes(), 2, true); + Wait.assertTrue(() -> server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100); + producer.disconnect(); + producer.close(); + assertTrue(latchOne.await(2, TimeUnit.SECONDS)); producer = createPahoClient("producer");