From 919ca96cee67d7ceed06a5cc542628ddc21841e2 Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 27 Jun 2018 13:52:30 +0100 Subject: [PATCH] AMQ-6575 - take message durability from publish qos for retained messages, fix and test --- .../transport/mqtt/MQTTProtocolConverter.java | 2 +- .../MQTTVirtualTopicSubscriptionsTest.java | 67 +++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index ca6b4cba2a..ff6ee4335a 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -550,7 +550,7 @@ public class MQTTProtocolConverter { command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId()); msg.setTimestamp(System.currentTimeMillis()); msg.setPriority((byte) Message.DEFAULT_PRIORITY); - msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain()); + msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE); msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal()); if (command.retain()) { msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true); diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java index 052a7ee33c..578b20117b 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java @@ -16,20 +16,34 @@ */ package org.apache.activemq.transport.mqtt; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.util.concurrent.TimeUnit; /** * Run the basic tests with the NIO Transport. */ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest { + private static final Logger LOG = LoggerFactory.getLogger(MQTTVirtualTopicSubscriptionsTest.class); @Override @Before @@ -150,4 +164,57 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest { connection.disconnect(); } } + + @Test(timeout = 60 * 1000) + public void testRetainMessageDurability() throws Exception { + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("sub"); + + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + final String topicName = "foo/bah"; + + connection.subscribe(new Topic[] { new Topic(topicName, QoS.EXACTLY_ONCE)}); + + + // jms client + ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); + // MUST set to true to receive retained messages + activeMQConnection.setUseRetroactiveConsumer(true); + activeMQConnection.start(); + Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue consumerQ = s.createQueue("Consumer.RegularSub.VirtualTopic.foo.bah"); + MessageConsumer consumer = s.createConsumer(consumerQ); + + + // publisher + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); + + // send retained message + final String RETAINED = "RETAINED_MESSAGE_TEXT"; + provider.publish(topicName, RETAINED.getBytes(), EXACTLY_ONCE, true); + + Message message = connection.receive(5, TimeUnit.SECONDS); + assertNotNull("got message", message); + + String response = new String(message.getPayload()); + LOG.info("Got message:" + response); + + + // jms - verify retained message is persistent + ActiveMQMessage activeMQMessage = (ActiveMQMessage) consumer.receive(5000); + assertNotNull("Should get retained message", activeMQMessage); + ByteSequence bs = activeMQMessage.getContent(); + assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length)); + LOG.info("Got message with deliverMode:" + activeMQMessage.getJMSDeliveryMode()); + assertEquals(DeliveryMode.PERSISTENT, activeMQMessage.getJMSDeliveryMode()); + + activeMQConnection.close(); + connection.unsubscribe(new String[] { topicName }); + + connection.disconnect(); + } + }