https://issues.apache.org/jira/browse/AMQ-5108 - MQTT subscriptions for cleansession=true MUST be non-durable - patch applied with thanks to Dhiraj Bokde

This commit is contained in:
gtully 2014-03-19 14:24:51 +00:00
parent 47d72dd32a
commit bf8eb0e6ca
4 changed files with 142 additions and 56 deletions

View File

@ -398,7 +398,8 @@ public class MQTTProtocolConverter {
consumerInfo.setDestination(destination); consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch()); consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
consumerInfo.setDispatchAsync(true); consumerInfo.setDispatchAsync(true);
if ( connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) { // create durable subscriptions only when cleansession is false
if ( !connect.cleanSession() && connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) {
consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString()); consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
} }
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo); MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
@ -410,7 +411,7 @@ public class MQTTProtocolConverter {
// validate subscription request // validate subscription request
if (response.isException()) { if (response.isException()) {
final Throwable throwable = ((ExceptionResponse) response).getException(); final Throwable throwable = ((ExceptionResponse) response).getException();
LOG.debug("Error subscribing to " + topic.name(), throwable); LOG.warn("Error subscribing to " + topic.name(), throwable);
qos[0] = SUBSCRIBE_ERROR; qos[0] = SUBSCRIBE_ERROR;
} else { } else {
qos[0] = (byte) topic.qos().ordinal(); qos[0] = (byte) topic.qos().ordinal();
@ -654,7 +655,8 @@ public class MQTTProtocolConverter {
boolean willSent = false; boolean willSent = false;
public void onTransportError() { public void onTransportError() {
if (connect != null) { if (connect != null) {
if (connected.get() && connect.willTopic() != null && connect.willMessage() != null && !willSent) { if (connected.get()) {
if (connect.willTopic() != null && connect.willMessage() != null && !willSent) {
willSent = true; willSent = true;
try { try {
PUBLISH publish = new PUBLISH(); PUBLISH publish = new PUBLISH();
@ -671,6 +673,9 @@ public class MQTTProtocolConverter {
LOG.warn("Failed to publish Will Message " + connect.willMessage()); LOG.warn("Failed to publish Will Message " + connect.willMessage());
} }
} }
// remove connection info
sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
}
} }
} }
@ -721,11 +726,11 @@ public class MQTTProtocolConverter {
LOG.debug("Exception detail", exception); LOG.debug("Exception detail", exception);
} }
try { if (connected.get() && connectionInfo != null) {
getMQTTTransport().stop(); connected.set(false);
} catch (Throwable e) { sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
LOG.error("Failed to stop MQTTT Transport ", e);
} }
stopTransport();
} }
void checkConnected() throws MQTTProtocolException { void checkConnected() throws MQTTProtocolException {

View File

@ -16,11 +16,17 @@
*/ */
package org.apache.activemq.transport.mqtt; package org.apache.activemq.transport.mqtt;
import java.util.LinkedList;
import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.filter.DestinationMapEntry; import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.security.*; import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.AuthorizationEntry;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.DefaultAuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.MQTT;
@ -29,10 +35,6 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner; import org.junit.runners.BlockJUnit4ClassRunner;
import java.util.LinkedList;
import static org.junit.Assert.assertTrue;
@RunWith(BlockJUnit4ClassRunner.class) @RunWith(BlockJUnit4ClassRunner.class)
public class MQTTNioTest extends MQTTTest { public class MQTTNioTest extends MQTTTest {
@ -48,6 +50,13 @@ public class MQTTNioTest extends MQTTTest {
super.testReceiveMessageSentWhileOffline(); super.testReceiveMessageSentWhileOffline();
} }
@Ignore("See AMQ-4712")
@Override
@Test
public void testResendMessageId() throws Exception {
super.testResendMessageId();
}
@Test @Test
public void testPingOnMQTTNIO() throws Exception { public void testPingOnMQTTNIO() throws Exception {
addMQTTConnector("maxInactivityDuration=-1"); addMQTTConnector("maxInactivityDuration=-1");

View File

@ -60,6 +60,12 @@ public class MQTTSSLTest extends MQTTTest {
super.testReceiveMessageSentWhileOffline(); super.testReceiveMessageSentWhileOffline();
} }
@Ignore("See AMQ-4712")
@Override
@Test
public void testResendMessageId() throws Exception {
super.testResendMessageId();
}
protected MQTT createMQTTConnection() throws Exception { protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT(); MQTT mqtt = new MQTT();
@ -73,6 +79,15 @@ public class MQTTSSLTest extends MQTTTest {
return mqtt; return mqtt;
} }
protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception {
MQTT mqtt = createMQTTConnection();
if (clientId != null) {
mqtt.setClientId(clientId);
}
mqtt.setCleanSession(clean);
return mqtt;
}
protected void initializeConnection(MQTTClientProvider provider) throws Exception { protected void initializeConnection(MQTTClientProvider provider) throws Exception {
SSLContext ctx = SSLContext.getInstance("TLS"); SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.mqtt;
import java.net.ProtocolException; import java.net.ProtocolException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -115,7 +116,7 @@ public class MQTTTest extends AbstractMQTTTest {
String topic = "foo/bah"; String topic = "foo/bah";
subscriptionProvider.subscribe(topic,AT_MOST_ONCE); subscriptionProvider.subscribe(topic, AT_MOST_ONCE);
final CountDownLatch latch = new CountDownLatch(numberOfMessages/2); final CountDownLatch latch = new CountDownLatch(numberOfMessages/2);
@ -578,7 +579,7 @@ public class MQTTTest extends AbstractMQTTTest {
MQTT mqtt = createMQTTConnection(); MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo"); mqtt.setClientId("foo");
mqtt.setKeepAlive((short)2); mqtt.setKeepAlive((short) 2);
mqtt.setCleanSession(true); mqtt.setCleanSession(true);
final List<PUBLISH> publishList = new ArrayList<PUBLISH>(); final List<PUBLISH> publishList = new ArrayList<PUBLISH>();
@ -673,10 +674,7 @@ public class MQTTTest extends AbstractMQTTTest {
addMQTTConnector(); addMQTTConnector();
brokerService.start(); brokerService.start();
MQTT mqtt = createMQTTConnection(); final MQTT mqtt = createMQTTConnection("resend", false);
mqtt.setClientId("foo");
mqtt.setKeepAlive((short)2);
mqtt.setCleanSession(true);
final List<PUBLISH> publishList = new ArrayList<PUBLISH>(); final List<PUBLISH> publishList = new ArrayList<PUBLISH>();
mqtt.setTracer(new Tracer() { mqtt.setTracer(new Tracer() {
@ -729,8 +727,6 @@ public class MQTTTest extends AbstractMQTTTest {
// drop subs without acknowledging messages, then subscribe and receive again // drop subs without acknowledging messages, then subscribe and receive again
connection.unsubscribe(subs); connection.unsubscribe(subs);
connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1], QoS.EXACTLY_ONCE)}); connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1], QoS.EXACTLY_ONCE)});
// wait for all acks to be processed
Thread.sleep(1000);
msg = connection.receive(5000, TimeUnit.MILLISECONDS); msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg); assertNotNull(msg);
@ -742,24 +738,98 @@ public class MQTTTest extends AbstractMQTTTest {
msg.ack(); msg.ack();
// make sure we received duplicate message ids // make sure we received duplicate message ids
for (int i = 0; i < publishList.size(); i++) { List<Integer> dups = new ArrayList<Integer>();
for (int i = 0; i < publishList.size() - 1; i++) {
if (!dups.contains(i)) {
boolean found = false; boolean found = false;
for (int j = 0; j < publishList.size(); j++) { for (int j = i + 1; j < publishList.size(); j++) {
if (i != j) {
if (publishList.get(i).messageId() == publishList.get(j).messageId()) { if (publishList.get(i).messageId() == publishList.get(j).messageId()) {
// one of them is a duplicate // one of them is a duplicate
assertTrue(publishList.get(i).dup() || publishList.get(j).dup()); assertTrue(publishList.get(i).dup() || publishList.get(j).dup());
found = true; found = true;
} dups.add(j);
break;
} }
} }
assertTrue("Dup Not found " + publishList.get(i), found); assertTrue("Dup Not found " + publishList.get(i), found);
} }
}
connection.unsubscribe(subs); connection.unsubscribe(subs);
connection.disconnect(); connection.disconnect();
} }
@Test(timeout = 60 * 1000)
public void testClientConnectionFailure() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection("reconnect", false);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
assertTrue(connection.isConnected());
final String TOPIC = "TopicA";
final byte[] qos = connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]);
connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
// kill transport
connection.kill();
connection = mqtt.blockingConnection();
connection.connect();
assertTrue(connection.isConnected());
assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]);
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload()));
msg.ack();
connection.disconnect();
}
@Test(timeout = 60 * 1000)
public void testCleanSession() throws Exception {
addMQTTConnector();
brokerService.start();
final String CLIENTID = "cleansession";
final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false);
BlockingConnection notClean = mqttNotClean.blockingConnection();
final String TOPIC = "TopicA";
notClean.connect();
notClean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
notClean.disconnect();
// MUST receive message from previous not clean session
notClean = mqttNotClean.blockingConnection();
notClean.connect();
Message msg = notClean.receive(10000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload()));
msg.ack();
notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
notClean.disconnect();
// MUST NOT receive message from previous not clean session
final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
final BlockingConnection clean = mqttClean.blockingConnection();
clean.connect();
msg = clean.receive(10000, TimeUnit.MILLISECONDS);
assertNull(msg);
clean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
clean.disconnect();
// MUST NOT receive message from previous clean session
notClean = mqttNotClean.blockingConnection();
notClean.connect();
msg = notClean.receive(1000, TimeUnit.MILLISECONDS);
assertNull(msg);
notClean.disconnect();
}
@Test(timeout=60 * 1000) @Test(timeout=60 * 1000)
public void testSendMQTTReceiveJMS() throws Exception { public void testSendMQTTReceiveJMS() throws Exception {
addMQTTConnector(); addMQTTConnector();
@ -948,7 +1018,7 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout=60 * 1000) @Test(timeout=60 * 1000)
public void testReceiveMessageSentWhileOffline() throws Exception { public void testReceiveMessageSentWhileOffline() throws Exception {
byte[] payload = new byte[1024 * 32]; final byte[] payload = new byte[1024 * 32];
for (int i = 0; i < payload.length; i++){ for (int i = 0; i < payload.length; i++){
payload[i] = '2'; payload[i] = '2';
} }
@ -958,12 +1028,9 @@ public class MQTTTest extends AbstractMQTTTest {
addMQTTConnector("trace=true"); addMQTTConnector("trace=true");
brokerService.start(); brokerService.start();
MQTT mqttPub = createMQTTConnection(); final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
mqttPub.setClientId("MQTT-Pub-Client");
MQTT mqttSub = createMQTTConnection(); final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
mqttSub.setClientId("MQTT-Sub-Client");
mqttSub.setCleanSession(false);
final BlockingConnection connectionPub = mqttPub.blockingConnection(); final BlockingConnection connectionPub = mqttPub.blockingConnection();
connectionPub.connect(); connectionPub.connect();
@ -983,10 +1050,7 @@ public class MQTTTest extends AbstractMQTTTest {
Message message = connectionSub.receive(5, TimeUnit.SECONDS); Message message = connectionSub.receive(5, TimeUnit.SECONDS);
assertNotNull(message); assertNotNull(message);
received++; received++;
payload = message.getPayload(); assertTrue(Arrays.equals(payload, message.getPayload()));
String messageContent = new String(payload);
LOG.info("Received message from topic: " + message.getTopic() +
" Message content: " + messageContent);
message.ack(); message.ack();
} }
connectionSub.disconnect(); connectionSub.disconnect();
@ -997,10 +1061,6 @@ public class MQTTTest extends AbstractMQTTTest {
connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false); connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
} }
mqttSub = createMQTTConnection();
mqttSub.setClientId("MQTT-Sub-Client");
mqttSub.setCleanSession(false);
connectionSub = mqttSub.blockingConnection(); connectionSub = mqttSub.blockingConnection();
connectionSub.connect(); connectionSub.connect();
connectionSub.subscribe(topics); connectionSub.subscribe(topics);
@ -1009,10 +1069,7 @@ public class MQTTTest extends AbstractMQTTTest {
Message message = connectionSub.receive(5, TimeUnit.SECONDS); Message message = connectionSub.receive(5, TimeUnit.SECONDS);
assertNotNull(message); assertNotNull(message);
received++; received++;
payload = message.getPayload(); assertTrue(Arrays.equals(payload, message.getPayload()));
String messageContent = new String(payload);
LOG.info("Received message from topic: " + message.getTopic() +
" Message content: " + messageContent);
message.ack(); message.ack();
} }
connectionSub.disconnect(); connectionSub.disconnect();