diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 31198bae4c..17ad1e4cf9 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.transport.mqtt; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotEquals; + import java.net.ProtocolException; import java.util.ArrayList; import java.util.Arrays; @@ -26,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; @@ -34,8 +38,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertNotEquals; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerPlugin; @@ -65,22 +67,21 @@ public class MQTTTest extends AbstractMQTTTest { private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class); - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testSendAndReceiveMQTT() throws Exception { addMQTTConnector(); brokerService.start(); final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); initializeConnection(subscriptionProvider); - - subscriptionProvider.subscribe("foo/bah",AT_MOST_ONCE); + subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE); final CountDownLatch latch = new CountDownLatch(numberOfMessages); Thread thread = new Thread(new Runnable() { @Override public void run() { - for (int i = 0; i < numberOfMessages; i++){ + for (int i = 0; i < numberOfMessages; i++) { try { byte[] payload = subscriptionProvider.receive(10000); assertNotNull("Should get a message", payload); @@ -98,9 +99,9 @@ public class MQTTTest extends AbstractMQTTTest { final MQTTClientProvider publishProvider = getMQTTClientProvider(); initializeConnection(publishProvider); - for (int i = 0; i < numberOfMessages; i++){ + for (int i = 0; i < numberOfMessages; i++) { String payload = "Message " + i; - publishProvider.publish("foo/bah",payload.getBytes(),AT_LEAST_ONCE); + publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE); } latch.await(10, TimeUnit.SECONDS); @@ -109,7 +110,7 @@ public class MQTTTest extends AbstractMQTTTest { publishProvider.disconnect(); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testUnsubscribeMQTT() throws Exception { addMQTTConnector(); brokerService.start(); @@ -120,12 +121,12 @@ public class MQTTTest extends AbstractMQTTTest { subscriptionProvider.subscribe(topic, AT_MOST_ONCE); - final CountDownLatch latch = new CountDownLatch(numberOfMessages/2); + final CountDownLatch latch = new CountDownLatch(numberOfMessages / 2); Thread thread = new Thread(new Runnable() { @Override public void run() { - for (int i = 0; i < numberOfMessages; i++){ + for (int i = 0; i < numberOfMessages; i++) { try { byte[] payload = subscriptionProvider.receive(10000); assertNotNull("Should get a message", payload); @@ -143,12 +144,12 @@ public class MQTTTest extends AbstractMQTTTest { final MQTTClientProvider publishProvider = getMQTTClientProvider(); initializeConnection(publishProvider); - for (int i = 0; i < numberOfMessages; i++){ + for (int i = 0; i < numberOfMessages; i++) { String payload = "Message " + i; - if (i == numberOfMessages/2){ + if (i == numberOfMessages / 2) { subscriptionProvider.unsubscribe(topic); } - publishProvider.publish(topic,payload.getBytes(),AT_LEAST_ONCE); + publishProvider.publish(topic, payload.getBytes(), AT_LEAST_ONCE); } latch.await(10, TimeUnit.SECONDS); @@ -157,19 +158,19 @@ public class MQTTTest extends AbstractMQTTTest { publishProvider.disconnect(); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testSendAtMostOnceReceiveExactlyOnce() throws Exception { /** * Although subscribing with EXACTLY ONCE, the message gets published - * with AT_MOST_ONCE - in MQTT the QoS is always determined by the message - * as published - not the wish of the subscriber + * with AT_MOST_ONCE - in MQTT the QoS is always determined by the + * message as published - not the wish of the subscriber */ addMQTTConnector(); brokerService.start(); final MQTTClientProvider provider = getMQTTClientProvider(); initializeConnection(provider); - provider.subscribe("foo",EXACTLY_ONCE); + provider.subscribe("foo", EXACTLY_ONCE); for (int i = 0; i < numberOfMessages; i++) { String payload = "Test Message: " + i; provider.publish("foo", payload.getBytes(), AT_MOST_ONCE); @@ -180,14 +181,14 @@ public class MQTTTest extends AbstractMQTTTest { provider.disconnect(); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception { addMQTTConnector(); brokerService.start(); final MQTTClientProvider provider = getMQTTClientProvider(); initializeConnection(provider); - provider.subscribe("foo",EXACTLY_ONCE); + provider.subscribe("foo", EXACTLY_ONCE); for (int i = 0; i < numberOfMessages; i++) { String payload = "Test Message: " + i; provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE); @@ -198,14 +199,14 @@ public class MQTTTest extends AbstractMQTTTest { provider.disconnect(); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception { addMQTTConnector(); brokerService.start(); final MQTTClientProvider provider = getMQTTClientProvider(); initializeConnection(provider); - provider.subscribe("foo",AT_MOST_ONCE); + provider.subscribe("foo", AT_MOST_ONCE); for (int i = 0; i < numberOfMessages; i++) { String payload = "Test Message: " + i; provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE); @@ -216,14 +217,14 @@ public class MQTTTest extends AbstractMQTTTest { provider.disconnect(); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testSendAndReceiveAtMostOnce() throws Exception { addMQTTConnector(); brokerService.start(); final MQTTClientProvider provider = getMQTTClientProvider(); initializeConnection(provider); - provider.subscribe("foo",AT_MOST_ONCE); + provider.subscribe("foo", AT_MOST_ONCE); for (int i = 0; i < numberOfMessages; i++) { String payload = "Test Message: " + i; provider.publish("foo", payload.getBytes(), AT_MOST_ONCE); @@ -234,14 +235,14 @@ public class MQTTTest extends AbstractMQTTTest { provider.disconnect(); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testSendAndReceiveAtLeastOnce() throws Exception { addMQTTConnector(); brokerService.start(); final MQTTClientProvider provider = getMQTTClientProvider(); initializeConnection(provider); - provider.subscribe("foo",AT_LEAST_ONCE); + provider.subscribe("foo", AT_LEAST_ONCE); for (int i = 0; i < numberOfMessages; i++) { String payload = "Test Message: " + i; provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE); @@ -252,7 +253,7 @@ public class MQTTTest extends AbstractMQTTTest { provider.disconnect(); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testSendAndReceiveExactlyOnce() throws Exception { addMQTTConnector(); brokerService.start(); @@ -262,22 +263,22 @@ public class MQTTTest extends AbstractMQTTTest { final MQTTClientProvider subscriber = getMQTTClientProvider(); initializeConnection(subscriber); - subscriber.subscribe("foo",EXACTLY_ONCE); + subscriber.subscribe("foo", EXACTLY_ONCE); for (int i = 0; i < numberOfMessages; i++) { String payload = "Test Message: " + i; publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE); byte[] message = subscriber.receive(5000); - assertNotNull("Should get a message + ["+ i + "]", message); + assertNotNull("Should get a message + [" + i + "]", message); assertEquals(payload, new String(message)); } subscriber.disconnect(); publisher.disconnect(); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testSendAndReceiveLargeMessages() throws Exception { byte[] payload = new byte[1024 * 32]; - for (int i = 0; i < payload.length; i++){ + for (int i = 0; i < payload.length; i++) { payload[i] = '2'; } addMQTTConnector(); @@ -289,7 +290,7 @@ public class MQTTTest extends AbstractMQTTTest { final MQTTClientProvider subscriber = getMQTTClientProvider(); initializeConnection(subscriber); - subscriber.subscribe("foo",AT_LEAST_ONCE); + subscriber.subscribe("foo", AT_LEAST_ONCE); for (int i = 0; i < 10; i++) { publisher.publish("foo", payload, AT_LEAST_ONCE); byte[] message = subscriber.receive(5000); @@ -301,7 +302,7 @@ public class MQTTTest extends AbstractMQTTTest { publisher.disconnect(); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testSendAndReceiveRetainedMessages() throws Exception { addMQTTConnector(); @@ -317,20 +318,20 @@ public class MQTTTest extends AbstractMQTTTest { publisher.publish("foo", RETAINED.getBytes(), AT_LEAST_ONCE, true); List messages = new ArrayList(); - for (int i = 0; i < 10; i++){ + for (int i = 0; i < 10; i++) { messages.add("TEST MESSAGE:" + i); } - subscriber.subscribe("foo",AT_LEAST_ONCE); + subscriber.subscribe("foo", AT_LEAST_ONCE); for (int i = 0; i < 10; i++) { publisher.publish("foo", messages.get(i).getBytes(), AT_LEAST_ONCE); } byte[] msg = subscriber.receive(5000); assertNotNull(msg); - assertEquals(RETAINED,new String(msg)); + assertEquals(RETAINED, new String(msg)); - for (int i =0; i < 10; i++){ + for (int i = 0; i < 10; i++) { msg = subscriber.receive(5000); assertNotNull(msg); assertEquals(messages.get(i), new String(msg)); @@ -339,7 +340,7 @@ public class MQTTTest extends AbstractMQTTTest { publisher.disconnect(); } - @Test(timeout=30000) + @Test(timeout = 30 * 1000) public void testValidZeroLengthClientId() throws Exception { addMQTTConnector(); brokerService.start(); @@ -366,12 +367,12 @@ public class MQTTTest extends AbstractMQTTTest { connection.connect(); final String RETAINED = "RETAINED"; - String[] topics = {"TopicA", "/TopicA", "/", "TopicA/", "//"}; + String[] topics = { "TopicA", "/TopicA", "/", "TopicA/", "//" }; for (String topic : topics) { // test retained message connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true); - connection.subscribe(new Topic[]{new Topic(topic, QoS.AT_LEAST_ONCE)}); + connection.subscribe(new Topic[] { new Topic(topic, QoS.AT_LEAST_ONCE) }); Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); assertNotNull(msg); assertEquals(RETAINED + topic, new String(msg.getPayload())); @@ -384,26 +385,25 @@ public class MQTTTest extends AbstractMQTTTest { assertEquals(topic, new String(msg.getPayload())); msg.ack(); - connection.unsubscribe(new String[] {topic}); + connection.unsubscribe(new String[] { topic }); } connection.disconnect(); // test wildcard patterns with above topics - String[] wildcards = {"#", "+", "+/#", "/+", "+/", "+/+", "+/+/", "+/+/+"}; + String[] wildcards = { "#", "+", "+/#", "/+", "+/", "+/+", "+/+/", "+/+/+" }; for (String wildcard : wildcards) { final Pattern pattern = Pattern.compile(wildcard.replaceAll("/?#", "(/?.*)*").replaceAll("\\+", "[^/]*")); connection = mqtt.blockingConnection(); connection.connect(); - connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)}); + connection.subscribe(new Topic[] { new Topic(wildcard, QoS.AT_LEAST_ONCE) }); // test retained messages Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); do { assertNotNull("RETAINED null " + wildcard, msg); assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED)); - assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), - pattern.matcher(msg.getTopic()).matches()); + assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches()); msg.ack(); msg = connection.receive(1000, TimeUnit.MILLISECONDS); } while (msg != null); @@ -412,7 +412,7 @@ public class MQTTTest extends AbstractMQTTTest { connection.disconnect(); connection = mqtt.blockingConnection(); connection.connect(); - connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)}); + connection.subscribe(new Topic[] { new Topic(wildcard, QoS.AT_LEAST_ONCE) }); // test non-retained message for (String topic : topics) { @@ -421,8 +421,7 @@ public class MQTTTest extends AbstractMQTTTest { msg = connection.receive(1000, TimeUnit.MILLISECONDS); do { assertNotNull("Non-retained Null " + wildcard, msg); - assertTrue("Non-retained matching " + wildcard + " " + msg.getTopic(), - pattern.matcher(msg.getTopic()).matches()); + assertTrue("Non-retained matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches()); msg.ack(); msg = connection.receive(1000, TimeUnit.MILLISECONDS); } while (msg != null); @@ -443,9 +442,9 @@ public class MQTTTest extends AbstractMQTTTest { MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo"); - mqtt.setKeepAlive((short)2); + mqtt.setKeepAlive((short) 2); - final int[] actualQoS = {-1}; + final int[] actualQoS = { -1 }; mqtt.setTracer(new Tracer() { @Override public void onReceive(MQTTFrame frame) { @@ -459,7 +458,7 @@ public class MQTTTest extends AbstractMQTTTest { final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true); - connection.subscribe(new Topic[]{ new Topic(topic, QoS.valueOf(topic)) }); + connection.subscribe(new Topic[] { new Topic(topic, QoS.valueOf(topic)) }); final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull(msg); @@ -472,7 +471,7 @@ public class MQTTTest extends AbstractMQTTTest { assertEquals(i, actualQoS[0]); msg.ack(); - connection.unsubscribe(new String[]{topic}); + connection.unsubscribe(new String[] { topic }); connection.disconnect(); } @@ -485,9 +484,9 @@ public class MQTTTest extends AbstractMQTTTest { MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo"); - mqtt.setKeepAlive((short)2); + mqtt.setKeepAlive((short) 2); - final int[] actualQoS = {-1}; + final int[] actualQoS = { -1 }; mqtt.setTracer(new Tracer() { @Override public void onReceive(MQTTFrame frame) { @@ -506,7 +505,7 @@ public class MQTTTest extends AbstractMQTTTest { QoS[] qoss = { QoS.AT_MOST_ONCE, QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE }; for (QoS qos : qoss) { - connection.subscribe(new Topic[]{ new Topic("TopicA", qos) }); + connection.subscribe(new Topic[] { new Topic("TopicA", qos) }); final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull(msg); @@ -520,7 +519,7 @@ public class MQTTTest extends AbstractMQTTTest { assertEquals(qos.ordinal(), actualQoS[0]); } - connection.unsubscribe(new String[]{"TopicA"}); + connection.unsubscribe(new String[] { "TopicA" }); connection.disconnect(); } @@ -539,7 +538,7 @@ public class MQTTTest extends AbstractMQTTTest { map.put(new ActiveMQTopic(ANONYMOUS), new GroupPrincipal(ANONYMOUS)); final AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(new SimpleAuthorizationMap(map, map, map)); - brokerService.setPlugins(new BrokerPlugin[] {authorizationPlugin, authenticationPlugin}); + brokerService.setPlugins(new BrokerPlugin[] { authorizationPlugin, authenticationPlugin }); brokerService.start(); MQTT mqtt = createMQTTConnection(); @@ -550,10 +549,9 @@ public class MQTTTest extends AbstractMQTTTest { connection.connect(); final String NAMED = "named"; - byte[] qos = connection.subscribe(new Topic[] { - new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE)}); - assertEquals((byte)0x80, qos[0]); - assertEquals((byte)QoS.EXACTLY_ONCE.ordinal(), qos[1]); + byte[] qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE) }); + assertEquals((byte) 0x80, qos[0]); + assertEquals((byte) QoS.EXACTLY_ONCE.ordinal(), qos[1]); // validate the subscription by sending a retained message connection.publish(ANONYMOUS, ANONYMOUS.getBytes(), QoS.AT_MOST_ONCE, true); @@ -562,9 +560,9 @@ public class MQTTTest extends AbstractMQTTTest { assertEquals(ANONYMOUS, new String(msg.getPayload())); msg.ack(); - connection.unsubscribe(new String[]{ANONYMOUS}); - qos = connection.subscribe(new Topic[]{new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE)}); - assertEquals((byte)QoS.AT_LEAST_ONCE.ordinal(), qos[0]); + connection.unsubscribe(new String[] { ANONYMOUS }); + qos = connection.subscribe(new Topic[] { new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE) }); + assertEquals((byte) QoS.AT_LEAST_ONCE.ordinal(), qos[0]); msg = connection.receive(1000, TimeUnit.MILLISECONDS); assertNotNull(msg); @@ -616,9 +614,9 @@ public class MQTTTest extends AbstractMQTTTest { // publish retained message connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, true); - String[] subs = {TOPIC, "TopicA/#", "TopicA/+"}; + String[] subs = { TOPIC, "TopicA/#", "TopicA/+" }; for (int i = 0; i < qoss.length; i++) { - connection.subscribe(new Topic[]{ new Topic(subs[i], qoss[i]) }); + connection.subscribe(new Topic[] { new Topic(subs[i], qoss[i]) }); } // publish non-retained message @@ -639,7 +637,8 @@ public class MQTTTest extends AbstractMQTTTest { } while (msg != null && received++ < subs.length * 2); assertEquals("Unexpected number of messages", subs.length * 2, received + 1); - // make sure we received distinct ids for QoS != AT_MOST_ONCE, and 0 for AT_MOST_ONCE + // make sure we received distinct ids for QoS != AT_MOST_ONCE, and 0 for + // AT_MOST_ONCE for (int i = 0; i < publishList.size(); i++) { for (int j = i + 1; j < publishList.size(); j++) { final PUBLISH publish1 = publishList.get(i); @@ -696,8 +695,8 @@ public class MQTTTest extends AbstractMQTTTest { BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); final String TOPIC = "TopicA/"; - final String[] topics = new String[] {TOPIC, "TopicA/+"}; - connection.subscribe(new Topic[]{new Topic(topics[0], QoS.AT_LEAST_ONCE), new Topic(topics[1], QoS.EXACTLY_ONCE)}); + final String[] topics = new String[] { TOPIC, "TopicA/+" }; + connection.subscribe(new Topic[] { new Topic(topics[0], QoS.AT_LEAST_ONCE), new Topic(topics[1], QoS.EXACTLY_ONCE) }); // publish non-retained message connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); @@ -724,10 +723,8 @@ public class MQTTTest extends AbstractMQTTTest { assertEquals(4, publishList.size()); // make sure we received duplicate message ids - assertTrue(publishList.get(0).messageId() == publishList.get(2).messageId() || - publishList.get(0).messageId() == publishList.get(3).messageId()); - assertTrue(publishList.get(1).messageId() == publishList.get(3).messageId() || - publishList.get(1).messageId() == publishList.get(2).messageId()); + assertTrue(publishList.get(0).messageId() == publishList.get(2).messageId() || publishList.get(0).messageId() == publishList.get(3).messageId()); + assertTrue(publishList.get(1).messageId() == publishList.get(3).messageId() || publishList.get(1).messageId() == publishList.get(2).messageId()); assertTrue(publishList.get(2).dup() && publishList.get(3).dup()); connection.unsubscribe(topics); @@ -771,7 +768,7 @@ public class MQTTTest extends AbstractMQTTTest { BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); final String TOPIC = "TopicA/"; - connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)}); + connection.subscribe(new Topic[] { new Topic(TOPIC, QoS.EXACTLY_ONCE) }); // publish non-retained messages final int TOTAL_MESSAGES = 10; @@ -815,7 +812,7 @@ public class MQTTTest extends AbstractMQTTTest { addMQTTConnector("trace=true"); brokerService.start(); - final String[] cleanClientIds = new String[] { "", "clean-packetid", null}; + final String[] cleanClientIds = new String[] { "", "clean-packetid", null }; final Map publishMap = new ConcurrentHashMap(); MQTT[] mqtts = new MQTT[cleanClientIds.length]; for (int i = 0; i < cleanClientIds.length; i++) { @@ -853,7 +850,7 @@ public class MQTTTest extends AbstractMQTTTest { BlockingConnection connection = mqtts[random.nextInt(cleanClientIds.length)].blockingConnection(); connection.connect(); final String TOPIC = "TopicA/"; - connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)}); + connection.subscribe(new Topic[] { new Topic(TOPIC, QoS.EXACTLY_ONCE) }); // publish non-retained message connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); @@ -888,7 +885,7 @@ public class MQTTTest extends AbstractMQTTTest { }); final String TOPIC = "TopicA"; - final byte[] qos = connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)}); + final byte[] qos = connection.subscribe(new Topic[] { new Topic(TOPIC, QoS.EXACTLY_ONCE) }); assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]); connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); // kill transport @@ -921,7 +918,7 @@ public class MQTTTest extends AbstractMQTTTest { BlockingConnection notClean = mqttNotClean.blockingConnection(); final String TOPIC = "TopicA"; notClean.connect(); - notClean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)}); + notClean.subscribe(new Topic[] { new Topic(TOPIC, QoS.EXACTLY_ONCE) }); notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.disconnect(); @@ -941,7 +938,7 @@ public class MQTTTest extends AbstractMQTTTest { clean.connect(); msg = clean.receive(10000, TimeUnit.MILLISECONDS); assertNull(msg); - clean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)}); + clean.subscribe(new Topic[] { new Topic(TOPIC, QoS.EXACTLY_ONCE) }); clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); clean.disconnect(); @@ -953,7 +950,7 @@ public class MQTTTest extends AbstractMQTTTest { notClean.disconnect(); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testSendMQTTReceiveJMS() throws Exception { addMQTTConnector(); TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); @@ -982,7 +979,7 @@ public class MQTTTest extends AbstractMQTTTest { provider.disconnect(); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testSendJMSReceiveMQTT() throws Exception { addMQTTConnector(); TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); @@ -996,7 +993,7 @@ public class MQTTTest extends AbstractMQTTTest { javax.jms.Topic jmsTopic = s.createTopic("foo.far"); MessageProducer producer = s.createProducer(jmsTopic); - provider.subscribe("foo/+",AT_MOST_ONCE); + provider.subscribe("foo/+", AT_MOST_ONCE); for (int i = 0; i < numberOfMessages; i++) { String payload = "This is Test Message: " + i; TextMessage sendMessage = s.createTextMessage(payload); @@ -1010,13 +1007,13 @@ public class MQTTTest extends AbstractMQTTTest { activeMQConnection.close(); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testPingKeepsInactivityMonitorAlive() throws Exception { addMQTTConnector(); brokerService.start(); MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo"); - mqtt.setKeepAlive((short)2); + mqtt.setKeepAlive((short) 2); final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); @@ -1031,13 +1028,13 @@ public class MQTTTest extends AbstractMQTTTest { connection.disconnect(); } - @Test(timeout=60 * 1000) - public void testTurnOffInactivityMonitor()throws Exception{ + @Test(timeout = 60 * 1000) + public void testTurnOffInactivityMonitor() throws Exception { addMQTTConnector("transport.useInactivityMonitor=false"); brokerService.start(); MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo3"); - mqtt.setKeepAlive((short)2); + mqtt.setKeepAlive((short) 2); final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); @@ -1052,7 +1049,7 @@ public class MQTTTest extends AbstractMQTTTest { connection.disconnect(); } - @Test(timeout = 300000) + @Test(timeout = 30 * 10000) public void testJmsMapping() throws Exception { addMQTTConnector(); addOpenwireConnector(); @@ -1069,7 +1066,7 @@ public class MQTTTest extends AbstractMQTTTest { // set up mqtt producer MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo3"); - mqtt.setKeepAlive((short)2); + mqtt.setKeepAlive((short) 2); final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); @@ -1099,7 +1096,7 @@ public class MQTTTest extends AbstractMQTTTest { } - @Test(timeout = 300000) + @Test(timeout = 30 * 10000) public void testSubscribeMultipleTopics() throws Exception { byte[] payload = new byte[1024 * 32]; @@ -1116,8 +1113,8 @@ public class MQTTTest extends AbstractMQTTTest { final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); - Topic[] topics = {new Topic("Topic/A", QoS.EXACTLY_ONCE), new Topic("Topic/B", QoS.EXACTLY_ONCE)}; - Topic[] wildcardTopic = {new Topic("Topic/#", QoS.AT_LEAST_ONCE)}; + Topic[] topics = { new Topic("Topic/A", QoS.EXACTLY_ONCE), new Topic("Topic/B", QoS.EXACTLY_ONCE) }; + Topic[] wildcardTopic = { new Topic("Topic/#", QoS.AT_LEAST_ONCE) }; connection.subscribe(wildcardTopic); for (Topic topic : topics) { @@ -1131,18 +1128,17 @@ public class MQTTTest extends AbstractMQTTTest { received++; payload = message.getPayload(); String messageContent = new String(payload); - LOG.info("Received message from topic: " + message.getTopic() + - " Message content: " + messageContent); + LOG.info("Received message from topic: " + message.getTopic() + " Message content: " + messageContent); message.ack(); } assertEquals("Should have received " + topics.length + " messages", topics.length, received); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testReceiveMessageSentWhileOffline() throws Exception { final byte[] payload = new byte[1024 * 32]; - for (int i = 0; i < payload.length; i++){ + for (int i = 0; i < payload.length; i++) { payload[i] = '2'; } @@ -1161,7 +1157,7 @@ public class MQTTTest extends AbstractMQTTTest { BlockingConnection connectionSub = mqttSub.blockingConnection(); connectionSub.connect(); - Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)}; + Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) }; connectionSub.subscribe(topics); for (int i = 0; i < messagesPerRun; ++i) { @@ -1178,7 +1174,7 @@ public class MQTTTest extends AbstractMQTTTest { } connectionSub.disconnect(); - for(int j = 0; j < numberOfRuns; j++) { + for (int j = 0; j < numberOfRuns; j++) { for (int i = 0; i < messagesPerRun; ++i) { connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false); @@ -1200,14 +1196,14 @@ public class MQTTTest extends AbstractMQTTTest { assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received); } - @Test(timeout=30000) + @Test(timeout = 30 * 1000) public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception { // default keep alive in milliseconds addMQTTConnector("transport.defaultKeepAlive=2000"); brokerService.start(); MQTT mqtt = createMQTTConnection(); mqtt.setClientId("foo"); - mqtt.setKeepAlive((short)0); + mqtt.setKeepAlive((short) 0); final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); @@ -1220,7 +1216,7 @@ public class MQTTTest extends AbstractMQTTTest { })); } - @Test(timeout=60 * 1000) + @Test(timeout = 60 * 1000) public void testReuseConnection() throws Exception { addMQTTConnector(); brokerService.start(); @@ -1242,6 +1238,62 @@ public class MQTTTest extends AbstractMQTTTest { } } + @Test(timeout = 60 * 1000) + public void testNoMessageReceivedAfterUnsubscribeMQTT() throws Exception { + addMQTTConnector(); + brokerService.setPersistent(true); + brokerService.start(); + Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) }; + + MQTT mqttPub = createMQTTConnection("MQTTPub-Client", true); + // mqttPub.setVersion("3.1.1"); + + MQTT mqttSub = createMQTTConnection("MQTTSub-Client", false); + // mqttSub.setVersion("3.1.1"); + + BlockingConnection connectionPub = mqttPub.blockingConnection(); + connectionPub.connect(); + + BlockingConnection connectionSub = mqttSub.blockingConnection(); + connectionSub.connect(); + connectionSub.subscribe(topics); + connectionSub.unsubscribe(new String[] { "TopicA" }); + connectionSub.disconnect(); + + for (int i = 0; i < 10; i++) { + String payload = "Message " + i; + connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false); + } + + connectionSub = mqttSub.blockingConnection(); + connectionSub.connect(); + + int received = 0; + for (int i = 0; i < 10; ++i) { + Message message = connectionSub.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + LOG.info("Message is " + new String(message.getPayload())); + received++; + message.ack(); + } + assertEquals(10, received); + + connectionSub.disconnect(); + connectionPub.disconnect(); + } + + @Test(timeout = 60 * 1000) + public void testMQTT311Connection() throws Exception { + addMQTTConnector(); + brokerService.start(); + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("foo"); + mqtt.setVersion("3.1.1"); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.disconnect(); + } + @Override protected String getProtocolScheme() { return "mqtt"; @@ -1270,10 +1322,10 @@ public class MQTTTest extends AbstractMQTTTest { } protected Tracer createTracer() { - return new Tracer(){ + return new Tracer() { @Override public void onReceive(MQTTFrame frame) { - LOG.info("Client Received:\n"+frame); + LOG.info("Client Received:\n" + frame); } @Override @@ -1287,17 +1339,4 @@ public class MQTTTest extends AbstractMQTTTest { } }; } - - @Test(timeout=60 * 1000) - public void testMQTT311Connection()throws Exception{ - addMQTTConnector(); - brokerService.start(); - MQTT mqtt = createMQTTConnection(); - mqtt.setClientId("foo"); - mqtt.setVersion("3.1.1"); - final BlockingConnection connection = mqtt.blockingConnection(); - connection.connect(); - connection.disconnect(); - } - }