mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5399 - mqtt - out of order acks
This commit is contained in:
parent
1a0bd45a4e
commit
28b45341d1
|
@ -56,7 +56,7 @@ public class MQTTSubscription {
|
||||||
* @return a new {@link MessageAck} command to acknowledge the message.
|
* @return a new {@link MessageAck} command to acknowledge the message.
|
||||||
*/
|
*/
|
||||||
public MessageAck createMessageAck(MessageDispatch md) {
|
public MessageAck createMessageAck(MessageDispatch md) {
|
||||||
return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
|
return new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -16,10 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.mqtt;
|
package org.apache.activemq.transport.mqtt;
|
||||||
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
import java.util.Random;
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -31,16 +28,28 @@ import javax.jms.MessageListener;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
import org.apache.activemq.util.Wait;
|
||||||
|
import org.eclipse.paho.client.mqttv3.*;
|
||||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||||
|
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
public class PahoMQTTTest extends MQTTTestSupport {
|
public class PahoMQTTTest extends MQTTTestSupport {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
|
private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
protocolConfig = "transport.activeMQSubscriptionPrefetch=32766";
|
||||||
|
super.setUp();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testLotsOfClients() throws Exception {
|
public void testLotsOfClients() throws Exception {
|
||||||
|
|
||||||
|
@ -130,4 +139,120 @@ public class PahoMQTTTest extends MQTTTestSupport {
|
||||||
client.disconnect();
|
client.disconnect();
|
||||||
client.close();
|
client.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testCleanSession() throws Exception {
|
||||||
|
String topic = "test";
|
||||||
|
final DefaultListener listener = new DefaultListener();
|
||||||
|
|
||||||
|
// subscriber connects and creates durable sub
|
||||||
|
LOG.info("Connecting durable subscriber...");
|
||||||
|
MqttClient client = createClient(false, "receive", listener);
|
||||||
|
// subscribe and wait for the retain message to arrive
|
||||||
|
LOG.info("Subscribing durable subscriber...");
|
||||||
|
client.subscribe(topic, 1);
|
||||||
|
assertTrue(client.getPendingDeliveryTokens().length == 0);
|
||||||
|
disconnect(client);
|
||||||
|
LOG.info("Disconnected durable subscriber.");
|
||||||
|
|
||||||
|
// Publish message with QoS 1
|
||||||
|
MqttClient client2 = createClient(true, "publish", listener);
|
||||||
|
|
||||||
|
LOG.info("Publish message with QoS 1...");
|
||||||
|
String expectedResult = "QOS 1 message";
|
||||||
|
client2.publish(topic, expectedResult.getBytes(), 1, false);
|
||||||
|
waitForDelivery(client2);
|
||||||
|
|
||||||
|
// Publish message with QoS 0
|
||||||
|
LOG.info("Publish message with QoS 0...");
|
||||||
|
expectedResult = "QOS 0 message";
|
||||||
|
client2.publish(topic, expectedResult.getBytes(), 0, false);
|
||||||
|
waitForDelivery(client2);
|
||||||
|
|
||||||
|
// subscriber reconnects
|
||||||
|
LOG.info("Reconnecting durable subscriber...");
|
||||||
|
MqttClient client3 = createClient(false, "receive", listener);
|
||||||
|
|
||||||
|
LOG.info("Subscribing durable subscriber...");
|
||||||
|
client3.subscribe(topic, 1);
|
||||||
|
|
||||||
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return listener.received == 2;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertEquals(2, listener.received);
|
||||||
|
disconnect(client3);
|
||||||
|
LOG.info("Disconnected durable subscriber.");
|
||||||
|
|
||||||
|
// make sure we consumed everything
|
||||||
|
listener.received = 0;
|
||||||
|
|
||||||
|
LOG.info("Reconnecting durable subscriber...");
|
||||||
|
MqttClient client4 = createClient(false, "receive", listener);
|
||||||
|
|
||||||
|
LOG.info("Subscribing durable subscriber...");
|
||||||
|
client4.subscribe(topic, 1);
|
||||||
|
Thread.sleep(3 * 1000);
|
||||||
|
assertEquals(0, listener.received);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception {
|
||||||
|
MqttConnectOptions options = new MqttConnectOptions();
|
||||||
|
options.setCleanSession(cleanSession);
|
||||||
|
final MqttClient client = new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence());
|
||||||
|
client.setCallback(listener);
|
||||||
|
client.connect(options);
|
||||||
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return client.isConnected();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void disconnect(final MqttClient client) throws Exception {
|
||||||
|
client.disconnect();
|
||||||
|
client.close();
|
||||||
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return !client.isConnected();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void waitForDelivery(final MqttClient client) throws Exception {
|
||||||
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return client.getPendingDeliveryTokens().length == 0;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertTrue(client.getPendingDeliveryTokens().length == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static class DefaultListener implements MqttCallback {
|
||||||
|
|
||||||
|
int received = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionLost(Throwable cause) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
||||||
|
LOG.info("Received: " + message);
|
||||||
|
received++;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue