From bf8eb0e6ca595308d5a20b4301d985e68903a3dd Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 19 Mar 2014 14:24:51 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5108 - MQTT subscriptions for cleansession=true MUST be non-durable - patch applied with thanks to Dhiraj Bokde --- .../transport/mqtt/MQTTProtocolConverter.java | 45 ++++--- .../activemq/transport/mqtt/MQTTNioTest.java | 19 ++- .../activemq/transport/mqtt/MQTTSSLTest.java | 15 +++ .../activemq/transport/mqtt/MQTTTest.java | 119 +++++++++++++----- 4 files changed, 142 insertions(+), 56 deletions(-) diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 91f81ab03e..614b1339cb 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -398,7 +398,8 @@ public class MQTTProtocolConverter { consumerInfo.setDestination(destination); consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch()); consumerInfo.setDispatchAsync(true); - if ( connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) { + // create durable subscriptions only when cleansession is false + if ( !connect.cleanSession() && connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) { consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString()); } MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo); @@ -410,7 +411,7 @@ public class MQTTProtocolConverter { // validate subscription request if (response.isException()) { final Throwable throwable = ((ExceptionResponse) response).getException(); - LOG.debug("Error subscribing to " + topic.name(), throwable); + LOG.warn("Error subscribing to " + topic.name(), throwable); qos[0] = SUBSCRIBE_ERROR; } else { qos[0] = (byte) topic.qos().ordinal(); @@ -654,22 +655,26 @@ public class MQTTProtocolConverter { boolean willSent = false; public void onTransportError() { if (connect != null) { - if (connected.get() && connect.willTopic() != null && connect.willMessage() != null && !willSent) { - willSent = true; - try { - PUBLISH publish = new PUBLISH(); - publish.topicName(connect.willTopic()); - publish.qos(connect.willQos()); - publish.messageId(getNextSequenceId()); - publish.payload(connect.willMessage()); - ActiveMQMessage message = convertMessage(publish); - message.setProducerId(producerId); - message.onSend(); + if (connected.get()) { + if (connect.willTopic() != null && connect.willMessage() != null && !willSent) { + willSent = true; + try { + PUBLISH publish = new PUBLISH(); + publish.topicName(connect.willTopic()); + publish.qos(connect.willQos()); + publish.messageId(getNextSequenceId()); + publish.payload(connect.willMessage()); + ActiveMQMessage message = convertMessage(publish); + message.setProducerId(producerId); + message.onSend(); - sendToActiveMQ(message, null); - } catch (Exception e) { - LOG.warn("Failed to publish Will Message " + connect.willMessage()); + sendToActiveMQ(message, null); + } catch (Exception e) { + LOG.warn("Failed to publish Will Message " + connect.willMessage()); + } } + // remove connection info + sendToActiveMQ(connectionInfo.createRemoveCommand(), null); } } } @@ -721,11 +726,11 @@ public class MQTTProtocolConverter { LOG.debug("Exception detail", exception); } - try { - getMQTTTransport().stop(); - } catch (Throwable e) { - LOG.error("Failed to stop MQTTT Transport ", e); + if (connected.get() && connectionInfo != null) { + connected.set(false); + sendToActiveMQ(connectionInfo.createRemoveCommand(), null); } + stopTransport(); } void checkConnected() throws MQTTProtocolException { diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java index c89c84e53a..2433d2e357 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java @@ -16,11 +16,17 @@ */ package org.apache.activemq.transport.mqtt; +import java.util.LinkedList; + import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.filter.DestinationMapEntry; -import org.apache.activemq.security.*; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.AuthorizationEntry; +import org.apache.activemq.security.AuthorizationPlugin; +import org.apache.activemq.security.DefaultAuthorizationMap; +import org.apache.activemq.security.SimpleAuthenticationPlugin; import org.apache.activemq.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; @@ -29,10 +35,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.BlockJUnit4ClassRunner; -import java.util.LinkedList; - -import static org.junit.Assert.assertTrue; - @RunWith(BlockJUnit4ClassRunner.class) public class MQTTNioTest extends MQTTTest { @@ -48,6 +50,13 @@ public class MQTTNioTest extends MQTTTest { super.testReceiveMessageSentWhileOffline(); } + @Ignore("See AMQ-4712") + @Override + @Test + public void testResendMessageId() throws Exception { + super.testResendMessageId(); + } + @Test public void testPingOnMQTTNIO() throws Exception { addMQTTConnector("maxInactivityDuration=-1"); diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java index c98f921806..6b44ae2dd4 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java @@ -60,6 +60,12 @@ public class MQTTSSLTest extends MQTTTest { super.testReceiveMessageSentWhileOffline(); } + @Ignore("See AMQ-4712") + @Override + @Test + public void testResendMessageId() throws Exception { + super.testResendMessageId(); + } protected MQTT createMQTTConnection() throws Exception { MQTT mqtt = new MQTT(); @@ -73,6 +79,15 @@ public class MQTTSSLTest extends MQTTTest { return mqtt; } + protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception { + MQTT mqtt = createMQTTConnection(); + if (clientId != null) { + mqtt.setClientId(clientId); + } + mqtt.setCleanSession(clean); + return mqtt; + } + protected void initializeConnection(MQTTClientProvider provider) throws Exception { SSLContext ctx = SSLContext.getInstance("TLS"); ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index f44dfd4a26..d5d3983026 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.mqtt; import java.net.ProtocolException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -115,7 +116,7 @@ public class MQTTTest extends AbstractMQTTTest { String topic = "foo/bah"; - subscriptionProvider.subscribe(topic,AT_MOST_ONCE); + subscriptionProvider.subscribe(topic, AT_MOST_ONCE); final CountDownLatch latch = new CountDownLatch(numberOfMessages/2); @@ -578,7 +579,7 @@ public class MQTTTest extends AbstractMQTTTest { MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo"); - mqtt.setKeepAlive((short)2); + mqtt.setKeepAlive((short) 2); mqtt.setCleanSession(true); final List publishList = new ArrayList(); @@ -673,10 +674,7 @@ public class MQTTTest extends AbstractMQTTTest { addMQTTConnector(); brokerService.start(); - MQTT mqtt = createMQTTConnection(); - mqtt.setClientId("foo"); - mqtt.setKeepAlive((short)2); - mqtt.setCleanSession(true); + final MQTT mqtt = createMQTTConnection("resend", false); final List publishList = new ArrayList(); mqtt.setTracer(new Tracer() { @@ -729,8 +727,6 @@ public class MQTTTest extends AbstractMQTTTest { // drop subs without acknowledging messages, then subscribe and receive again connection.unsubscribe(subs); connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1], QoS.EXACTLY_ONCE)}); - // wait for all acks to be processed - Thread.sleep(1000); msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull(msg); @@ -742,24 +738,98 @@ public class MQTTTest extends AbstractMQTTTest { msg.ack(); // make sure we received duplicate message ids - for (int i = 0; i < publishList.size(); i++) { - boolean found = false; - for (int j = 0; j < publishList.size(); j++) { - if (i != j) { + List dups = new ArrayList(); + for (int i = 0; i < publishList.size() - 1; i++) { + if (!dups.contains(i)) { + boolean found = false; + for (int j = i + 1; j < publishList.size(); j++) { if (publishList.get(i).messageId() == publishList.get(j).messageId()) { // one of them is a duplicate assertTrue(publishList.get(i).dup() || publishList.get(j).dup()); found = true; + dups.add(j); + break; } } + assertTrue("Dup Not found " + publishList.get(i), found); } - assertTrue("Dup Not found " + publishList.get(i), found); } connection.unsubscribe(subs); connection.disconnect(); } + @Test(timeout = 60 * 1000) + public void testClientConnectionFailure() throws Exception { + addMQTTConnector(); + brokerService.start(); + + MQTT mqtt = createMQTTConnection("reconnect", false); + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + assertTrue(connection.isConnected()); + + final String TOPIC = "TopicA"; + final byte[] qos = connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)}); + assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]); + connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); + // kill transport + connection.kill(); + + connection = mqtt.blockingConnection(); + connection.connect(); + assertTrue(connection.isConnected()); + + assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]); + Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(TOPIC, new String(msg.getPayload())); + msg.ack(); + connection.disconnect(); + } + + @Test(timeout = 60 * 1000) + public void testCleanSession() throws Exception { + addMQTTConnector(); + brokerService.start(); + + final String CLIENTID = "cleansession"; + final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false); + BlockingConnection notClean = mqttNotClean.blockingConnection(); + final String TOPIC = "TopicA"; + notClean.connect(); + notClean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)}); + notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); + notClean.disconnect(); + + // MUST receive message from previous not clean session + notClean = mqttNotClean.blockingConnection(); + notClean.connect(); + Message msg = notClean.receive(10000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(TOPIC, new String(msg.getPayload())); + msg.ack(); + notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); + notClean.disconnect(); + + // MUST NOT receive message from previous not clean session + final MQTT mqttClean = createMQTTConnection(CLIENTID, true); + final BlockingConnection clean = mqttClean.blockingConnection(); + clean.connect(); + msg = clean.receive(10000, TimeUnit.MILLISECONDS); + assertNull(msg); + clean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)}); + clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); + clean.disconnect(); + + // MUST NOT receive message from previous clean session + notClean = mqttNotClean.blockingConnection(); + notClean.connect(); + msg = notClean.receive(1000, TimeUnit.MILLISECONDS); + assertNull(msg); + notClean.disconnect(); + } + @Test(timeout=60 * 1000) public void testSendMQTTReceiveJMS() throws Exception { addMQTTConnector(); @@ -948,7 +1018,7 @@ public class MQTTTest extends AbstractMQTTTest { @Test(timeout=60 * 1000) public void testReceiveMessageSentWhileOffline() throws Exception { - byte[] payload = new byte[1024 * 32]; + final byte[] payload = new byte[1024 * 32]; for (int i = 0; i < payload.length; i++){ payload[i] = '2'; } @@ -958,12 +1028,9 @@ public class MQTTTest extends AbstractMQTTTest { addMQTTConnector("trace=true"); brokerService.start(); - MQTT mqttPub = createMQTTConnection(); - mqttPub.setClientId("MQTT-Pub-Client"); + final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true); - MQTT mqttSub = createMQTTConnection(); - mqttSub.setClientId("MQTT-Sub-Client"); - mqttSub.setCleanSession(false); + final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false); final BlockingConnection connectionPub = mqttPub.blockingConnection(); connectionPub.connect(); @@ -983,10 +1050,7 @@ public class MQTTTest extends AbstractMQTTTest { Message message = connectionSub.receive(5, TimeUnit.SECONDS); assertNotNull(message); received++; - payload = message.getPayload(); - String messageContent = new String(payload); - LOG.info("Received message from topic: " + message.getTopic() + - " Message content: " + messageContent); + assertTrue(Arrays.equals(payload, message.getPayload())); message.ack(); } connectionSub.disconnect(); @@ -997,10 +1061,6 @@ public class MQTTTest extends AbstractMQTTTest { connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false); } - mqttSub = createMQTTConnection(); - mqttSub.setClientId("MQTT-Sub-Client"); - mqttSub.setCleanSession(false); - connectionSub = mqttSub.blockingConnection(); connectionSub.connect(); connectionSub.subscribe(topics); @@ -1009,10 +1069,7 @@ public class MQTTTest extends AbstractMQTTTest { Message message = connectionSub.receive(5, TimeUnit.SECONDS); assertNotNull(message); received++; - payload = message.getPayload(); - String messageContent = new String(payload); - LOG.info("Received message from topic: " + message.getTopic() + - " Message content: " + messageContent); + assertTrue(Arrays.equals(payload, message.getPayload())); message.ack(); } connectionSub.disconnect();