Add test case to show this is now fixed in 5.10
This commit is contained in:
Timothy Bish 2014-03-31 12:42:44 -04:00
parent f5ddcf06d7
commit f4a8b117ce
1 changed files with 155 additions and 116 deletions

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotEquals;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Arrays;
@ -26,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
@ -34,8 +38,6 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotEquals;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
@ -72,7 +74,6 @@ public class MQTTTest extends AbstractMQTTTest {
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
final CountDownLatch latch = new CountDownLatch(numberOfMessages);
@ -161,8 +162,8 @@ public class MQTTTest extends AbstractMQTTTest {
public void testSendAtMostOnceReceiveExactlyOnce() throws Exception {
/**
* Although subscribing with EXACTLY ONCE, the message gets published
* with AT_MOST_ONCE - in MQTT the QoS is always determined by the message
* as published - not the wish of the subscriber
* with AT_MOST_ONCE - in MQTT the QoS is always determined by the
* message as published - not the wish of the subscriber
*/
addMQTTConnector();
brokerService.start();
@ -339,7 +340,7 @@ public class MQTTTest extends AbstractMQTTTest {
publisher.disconnect();
}
@Test(timeout=30000)
@Test(timeout = 30 * 1000)
public void testValidZeroLengthClientId() throws Exception {
addMQTTConnector();
brokerService.start();
@ -402,8 +403,7 @@ public class MQTTTest extends AbstractMQTTTest {
do {
assertNotNull("RETAINED null " + wildcard, msg);
assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(),
pattern.matcher(msg.getTopic()).matches());
assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
msg.ack();
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
} while (msg != null);
@ -421,8 +421,7 @@ public class MQTTTest extends AbstractMQTTTest {
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
do {
assertNotNull("Non-retained Null " + wildcard, msg);
assertTrue("Non-retained matching " + wildcard + " " + msg.getTopic(),
pattern.matcher(msg.getTopic()).matches());
assertTrue("Non-retained matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
msg.ack();
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
} while (msg != null);
@ -550,8 +549,7 @@ public class MQTTTest extends AbstractMQTTTest {
connection.connect();
final String NAMED = "named";
byte[] qos = connection.subscribe(new Topic[] {
new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE)});
byte[] qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE) });
assertEquals((byte) 0x80, qos[0]);
assertEquals((byte) QoS.EXACTLY_ONCE.ordinal(), qos[1]);
@ -639,7 +637,8 @@ public class MQTTTest extends AbstractMQTTTest {
} while (msg != null && received++ < subs.length * 2);
assertEquals("Unexpected number of messages", subs.length * 2, received + 1);
// make sure we received distinct ids for QoS != AT_MOST_ONCE, and 0 for AT_MOST_ONCE
// make sure we received distinct ids for QoS != AT_MOST_ONCE, and 0 for
// AT_MOST_ONCE
for (int i = 0; i < publishList.size(); i++) {
for (int j = i + 1; j < publishList.size(); j++) {
final PUBLISH publish1 = publishList.get(i);
@ -724,10 +723,8 @@ public class MQTTTest extends AbstractMQTTTest {
assertEquals(4, publishList.size());
// make sure we received duplicate message ids
assertTrue(publishList.get(0).messageId() == publishList.get(2).messageId() ||
publishList.get(0).messageId() == publishList.get(3).messageId());
assertTrue(publishList.get(1).messageId() == publishList.get(3).messageId() ||
publishList.get(1).messageId() == publishList.get(2).messageId());
assertTrue(publishList.get(0).messageId() == publishList.get(2).messageId() || publishList.get(0).messageId() == publishList.get(3).messageId());
assertTrue(publishList.get(1).messageId() == publishList.get(3).messageId() || publishList.get(1).messageId() == publishList.get(2).messageId());
assertTrue(publishList.get(2).dup() && publishList.get(3).dup());
connection.unsubscribe(topics);
@ -1052,7 +1049,7 @@ public class MQTTTest extends AbstractMQTTTest {
connection.disconnect();
}
@Test(timeout = 300000)
@Test(timeout = 30 * 10000)
public void testJmsMapping() throws Exception {
addMQTTConnector();
addOpenwireConnector();
@ -1099,7 +1096,7 @@ public class MQTTTest extends AbstractMQTTTest {
}
@Test(timeout = 300000)
@Test(timeout = 30 * 10000)
public void testSubscribeMultipleTopics() throws Exception {
byte[] payload = new byte[1024 * 32];
@ -1131,8 +1128,7 @@ public class MQTTTest extends AbstractMQTTTest {
received++;
payload = message.getPayload();
String messageContent = new String(payload);
LOG.info("Received message from topic: " + message.getTopic() +
" Message content: " + messageContent);
LOG.info("Received message from topic: " + message.getTopic() + " Message content: " + messageContent);
message.ack();
}
@ -1200,7 +1196,7 @@ public class MQTTTest extends AbstractMQTTTest {
assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
}
@Test(timeout=30000)
@Test(timeout = 30 * 1000)
public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
// default keep alive in milliseconds
addMQTTConnector("transport.defaultKeepAlive=2000");
@ -1242,6 +1238,62 @@ public class MQTTTest extends AbstractMQTTTest {
}
}
@Test(timeout = 60 * 1000)
public void testNoMessageReceivedAfterUnsubscribeMQTT() throws Exception {
addMQTTConnector();
brokerService.setPersistent(true);
brokerService.start();
Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
MQTT mqttPub = createMQTTConnection("MQTTPub-Client", true);
// mqttPub.setVersion("3.1.1");
MQTT mqttSub = createMQTTConnection("MQTTSub-Client", false);
// mqttSub.setVersion("3.1.1");
BlockingConnection connectionPub = mqttPub.blockingConnection();
connectionPub.connect();
BlockingConnection connectionSub = mqttSub.blockingConnection();
connectionSub.connect();
connectionSub.subscribe(topics);
connectionSub.unsubscribe(new String[] { "TopicA" });
connectionSub.disconnect();
for (int i = 0; i < 10; i++) {
String payload = "Message " + i;
connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false);
}
connectionSub = mqttSub.blockingConnection();
connectionSub.connect();
int received = 0;
for (int i = 0; i < 10; ++i) {
Message message = connectionSub.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
LOG.info("Message is " + new String(message.getPayload()));
received++;
message.ack();
}
assertEquals(10, received);
connectionSub.disconnect();
connectionPub.disconnect();
}
@Test(timeout = 60 * 1000)
public void testMQTT311Connection() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setVersion("3.1.1");
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.disconnect();
}
@Override
protected String getProtocolScheme() {
return "mqtt";
@ -1287,17 +1339,4 @@ public class MQTTTest extends AbstractMQTTTest {
}
};
}
@Test(timeout=60 * 1000)
public void testMQTT311Connection()throws Exception{
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setVersion("3.1.1");
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.disconnect();
}
}