AMQ-6575 - take message durability from publish qos for retained messages, fix and test

This commit is contained in:
gtully 2018-06-27 13:52:30 +01:00
parent 8fd107559c
commit 919ca96cee
2 changed files with 68 additions and 1 deletions

View File

@ -550,7 +550,7 @@ public class MQTTProtocolConverter {
command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId()); command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId());
msg.setTimestamp(System.currentTimeMillis()); msg.setTimestamp(System.currentTimeMillis());
msg.setPriority((byte) Message.DEFAULT_PRIORITY); msg.setPriority((byte) Message.DEFAULT_PRIORITY);
msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain()); msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal()); msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
if (command.retain()) { if (command.retain()) {
msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true); msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);

View File

@ -16,20 +16,34 @@
*/ */
package org.apache.activemq.transport.mqtt; package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.client.Topic;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import java.util.concurrent.TimeUnit;
/** /**
* Run the basic tests with the NIO Transport. * Run the basic tests with the NIO Transport.
*/ */
public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest { public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTVirtualTopicSubscriptionsTest.class);
@Override @Override
@Before @Before
@ -150,4 +164,57 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
connection.disconnect(); connection.disconnect();
} }
} }
@Test(timeout = 60 * 1000)
public void testRetainMessageDurability() throws Exception {
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("sub");
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
final String topicName = "foo/bah";
connection.subscribe(new Topic[] { new Topic(topicName, QoS.EXACTLY_ONCE)});
// jms client
ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
// MUST set to true to receive retained messages
activeMQConnection.setUseRetroactiveConsumer(true);
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue consumerQ = s.createQueue("Consumer.RegularSub.VirtualTopic.foo.bah");
MessageConsumer consumer = s.createConsumer(consumerQ);
// publisher
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
// send retained message
final String RETAINED = "RETAINED_MESSAGE_TEXT";
provider.publish(topicName, RETAINED.getBytes(), EXACTLY_ONCE, true);
Message message = connection.receive(5, TimeUnit.SECONDS);
assertNotNull("got message", message);
String response = new String(message.getPayload());
LOG.info("Got message:" + response);
// jms - verify retained message is persistent
ActiveMQMessage activeMQMessage = (ActiveMQMessage) consumer.receive(5000);
assertNotNull("Should get retained message", activeMQMessage);
ByteSequence bs = activeMQMessage.getContent();
assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
LOG.info("Got message with deliverMode:" + activeMQMessage.getJMSDeliveryMode());
assertEquals(DeliveryMode.PERSISTENT, activeMQMessage.getJMSDeliveryMode());
activeMQConnection.close();
connection.unsubscribe(new String[] { topicName });
connection.disconnect();
}
} }