diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java index d265335910..7b632e37b7 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java @@ -56,7 +56,7 @@ public class MQTTSubscription { * @return a new {@link MessageAck} command to acknowledge the message. */ public MessageAck createMessageAck(MessageDispatch md) { - return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); + return new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); } /** diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java index 498ea6dd7a..e5e5fe5161 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java @@ -16,10 +16,7 @@ */ package org.apache.activemq.transport.mqtt; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -31,16 +28,28 @@ import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; -import org.eclipse.paho.client.mqttv3.MqttClient; +import org.apache.activemq.util.Wait; +import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.*; + public class PahoMQTTTest extends MQTTTestSupport { private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class); + @Override + @Before + public void setUp() throws Exception { + protocolConfig = "transport.activeMQSubscriptionPrefetch=32766"; + super.setUp(); + } + @Test(timeout = 300000) public void testLotsOfClients() throws Exception { @@ -130,4 +139,120 @@ public class PahoMQTTTest extends MQTTTestSupport { client.disconnect(); client.close(); } + + @Test(timeout = 300000) + public void testCleanSession() throws Exception { + String topic = "test"; + final DefaultListener listener = new DefaultListener(); + + // subscriber connects and creates durable sub + LOG.info("Connecting durable subscriber..."); + MqttClient client = createClient(false, "receive", listener); + // subscribe and wait for the retain message to arrive + LOG.info("Subscribing durable subscriber..."); + client.subscribe(topic, 1); + assertTrue(client.getPendingDeliveryTokens().length == 0); + disconnect(client); + LOG.info("Disconnected durable subscriber."); + + // Publish message with QoS 1 + MqttClient client2 = createClient(true, "publish", listener); + + LOG.info("Publish message with QoS 1..."); + String expectedResult = "QOS 1 message"; + client2.publish(topic, expectedResult.getBytes(), 1, false); + waitForDelivery(client2); + + // Publish message with QoS 0 + LOG.info("Publish message with QoS 0..."); + expectedResult = "QOS 0 message"; + client2.publish(topic, expectedResult.getBytes(), 0, false); + waitForDelivery(client2); + + // subscriber reconnects + LOG.info("Reconnecting durable subscriber..."); + MqttClient client3 = createClient(false, "receive", listener); + + LOG.info("Subscribing durable subscriber..."); + client3.subscribe(topic, 1); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.received == 2; + } + }); + assertEquals(2, listener.received); + disconnect(client3); + LOG.info("Disconnected durable subscriber."); + + // make sure we consumed everything + listener.received = 0; + + LOG.info("Reconnecting durable subscriber..."); + MqttClient client4 = createClient(false, "receive", listener); + + LOG.info("Subscribing durable subscriber..."); + client4.subscribe(topic, 1); + Thread.sleep(3 * 1000); + assertEquals(0, listener.received); + } + + protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception { + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(cleanSession); + final MqttClient client = new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence()); + client.setCallback(listener); + client.connect(options); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return client.isConnected(); + } + }); + return client; + } + + protected void disconnect(final MqttClient client) throws Exception { + client.disconnect(); + client.close(); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return !client.isConnected(); + } + }); + } + + protected void waitForDelivery(final MqttClient client) throws Exception { + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return client.getPendingDeliveryTokens().length == 0; + } + }); + assertTrue(client.getPendingDeliveryTokens().length == 0); + } + + static class DefaultListener implements MqttCallback { + + int received = 0; + + @Override + public void connectionLost(Throwable cause) { + + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + LOG.info("Received: " + message); + received++; + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + + } + } + } \ No newline at end of file