AMQ-4112: Upgraded to MQTT 1.4 and polished tests.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1399151 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Claus Ibsen 2012-10-17 08:35:00 +00:00
parent 021e35ad3c
commit 5b63698924
2 changed files with 19 additions and 10 deletions

View File

@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import static org.fusesource.hawtbuf.UTF8Buffer.utf8; import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class MQTTTest { public class MQTTTest {
@ -59,6 +60,7 @@ public class MQTTTest {
brokerService = new BrokerService(); brokerService = new BrokerService();
brokerService.setPersistent(false); brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false); brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false);
this.numberOfMessages = 2000; this.numberOfMessages = 2000;
} }
@ -86,7 +88,8 @@ public class MQTTTest {
public void run() { public void run() {
for (int i = 0; i < numberOfMessages; i++){ for (int i = 0; i < numberOfMessages; i++){
try { try {
Message message = subscribeConnection.receive(); Message message = subscribeConnection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
message.ack(); message.ack();
latch.countDown(); latch.countDown();
} catch (Exception e) { } catch (Exception e) {
@ -120,13 +123,13 @@ public class MQTTTest {
connection.connect(); connection.connect();
Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)}; Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)};
connection.subscribe(topics); connection.subscribe(topics);
for (int i = 0; i < numberOfMessages; i++) { for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i; String payload = "Test Message: " + i;
connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false); connection.publish("foo2", payload.getBytes(), QoS.AT_MOST_ONCE, false);
Message message = connection.receive(); Message message = connection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
assertEquals(payload, new String(message.getPayload())); assertEquals(payload, new String(message.getPayload()));
} }
connection.disconnect(); connection.disconnect();
@ -147,7 +150,8 @@ public class MQTTTest {
for (int i = 0; i < numberOfMessages; i++) { for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i; String payload = "Test Message: " + i;
connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false); connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Message message = connection.receive(); Message message = connection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
message.ack(); message.ack();
assertEquals(payload, new String(message.getPayload())); assertEquals(payload, new String(message.getPayload()));
} }
@ -173,7 +177,9 @@ public class MQTTTest {
for (int i = 0; i < numberOfMessages; i++) { for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i; String payload = "Test Message: " + i;
pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false); pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false);
Message message = subConnection.receive(); Message message = subConnection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
LOG.debug(payload);
message.ack(); message.ack();
assertEquals(payload, new String(message.getPayload())); assertEquals(payload, new String(message.getPayload()));
} }
@ -204,7 +210,8 @@ public class MQTTTest {
subConnection.subscribe(topics); subConnection.subscribe(topics);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false); pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false);
Message message = subConnection.receive(); Message message = subConnection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
message.ack(); message.ack();
assertArrayEquals(payload, message.getPayload()); assertArrayEquals(payload, message.getPayload());
} }
@ -232,7 +239,8 @@ public class MQTTTest {
for (int i = 0; i < numberOfMessages; i++) { for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i; String payload = "Test Message: " + i;
connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false); connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
ActiveMQMessage message = (ActiveMQMessage) consumer.receive(); ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
assertNotNull("Should get a message", message);
ByteSequence bs = message.getContent(); ByteSequence bs = message.getContent();
assertEquals(payload, new String(bs.data, bs.offset, bs.length)); assertEquals(payload, new String(bs.data, bs.offset, bs.length));
} }
@ -264,7 +272,8 @@ public class MQTTTest {
String payload = "This is Test Message: " + i; String payload = "This is Test Message: " + i;
TextMessage sendMessage = s.createTextMessage(payload); TextMessage sendMessage = s.createTextMessage(payload);
producer.send(sendMessage); producer.send(sendMessage);
Message message = connection.receive(); Message message = connection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
message.ack(); message.ack();
assertEquals(payload, new String(message.getPayload())); assertEquals(payload, new String(message.getPayload()));
} }

View File

@ -86,7 +86,7 @@
<karaf-version>2.2.9</karaf-version> <karaf-version>2.2.9</karaf-version>
<leveldbjni-version>1.3</leveldbjni-version> <leveldbjni-version>1.3</leveldbjni-version>
<log4j-version>1.2.17</log4j-version> <log4j-version>1.2.17</log4j-version>
<mqtt-client-version>1.3</mqtt-client-version> <mqtt-client-version>1.4</mqtt-client-version>
<openjpa-version>1.2.0</openjpa-version> <openjpa-version>1.2.0</openjpa-version>
<opensymphony-version>2.4.2</opensymphony-version> <opensymphony-version>2.4.2</opensymphony-version>
<org-apache-derby-version>10.9.1.0</org-apache-derby-version> <org-apache-derby-version>10.9.1.0</org-apache-derby-version>