Refactor test which has some flawed assumptions about the incoming

messages based on subscriptions that it makes.  Adds better thread
safety as well.
This commit is contained in:
Timothy Bish 2016-05-13 18:30:44 -04:00
parent 15405af2e6
commit ab434ee776
1 changed files with 99 additions and 124 deletions

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
@ -16,13 +16,14 @@
*/ */
package org.apache.activemq.transport.mqtt; package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -41,6 +42,7 @@ import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -53,7 +55,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
return s.createConsumer(s.createTopic(topic)); return s.createConsumer(s.createTopic(topic));
} }
@Test(timeout = 300000) @Test(timeout = 90000)
public void testLotsOfClients() throws Exception { public void testLotsOfClients() throws Exception {
final int CLIENTS = Integer.getInteger("PahoMQTTTest.CLIENTS", 100); final int CLIENTS = Integer.getInteger("PahoMQTTTest.CLIENTS", 100);
@ -88,7 +90,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
sendBarrier.await(); sendBarrier.await();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
Thread.sleep(1000); Thread.sleep(1000);
client.publish("test", "hello".getBytes(), 1, false); client.publish("test", "hello".getBytes(StandardCharsets.UTF_8), 1, false);
} }
client.disconnect(); client.disconnect();
client.close(); client.close();
@ -122,7 +124,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
assertNull("Async error: " + asyncError.get(), asyncError.get()); assertNull("Async error: " + asyncError.get(), asyncError.get());
} }
@Test(timeout = 300000) @Test(timeout = 90000)
public void testSendAndReceiveMQTT() throws Exception { public void testSendAndReceiveMQTT() throws Exception {
ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
@ -132,15 +134,16 @@ public class PahoMQTTTest extends MQTTTestSupport {
MqttClient client = new MqttClient("tcp://localhost:" + getPort(), "clientid", new MemoryPersistence()); MqttClient client = new MqttClient("tcp://localhost:" + getPort(), "clientid", new MemoryPersistence());
client.connect(); client.connect();
client.publish("test", "hello".getBytes(), 1, false); client.publish("test", "hello".getBytes(StandardCharsets.UTF_8), 1, false);
Message msg = consumer.receive(100 * 5); Message msg = consumer.receive(100 * 5);
assertNotNull(msg); assertNotNull(msg);
client.disconnect(); client.disconnect();
client.close();
} }
@Test(timeout = 300000) @Test(timeout = 90000)
public void testSubs() throws Exception { public void testSubs() throws Exception {
final DefaultListener listener = new DefaultListener(); final DefaultListener listener = new DefaultListener();
@ -155,29 +158,22 @@ public class PahoMQTTTest extends MQTTTestSupport {
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
String expectedResult = "should get everything"; String expectedResult = "should get everything";
client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(), 0, false); client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200));
// One delivery for topic ACCOUNT_PREFIX + "#"
String result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
assertEquals(expectedResult, listener.result); assertEquals(expectedResult, result);
expectedResult = "should get everything"; expectedResult = "should get everything";
listener.result = null; client.publish(ACCOUNT_PREFIX + "a/1/2", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
client.publish(ACCOUNT_PREFIX + "a/1/2", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() { // One delivery for topic ACCOUNT_PREFIX + "a/1/2"
@Override result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
public boolean isSatisified() throws Exception { assertEquals(expectedResult, result);
return listener.result != null; // One delivery for topic ACCOUNT_PREFIX + "#"
} result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)); assertEquals(expectedResult, result);
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
client.unsubscribe(ACCOUNT_PREFIX + "a/+/#"); client.unsubscribe(ACCOUNT_PREFIX + "a/+/#");
@ -185,19 +181,27 @@ public class PahoMQTTTest extends MQTTTestSupport {
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "should still get 1/2/3"; expectedResult = "should still get 1/2/3";
listener.result = null; client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() { // One delivery for topic ACCOUNT_PREFIX + "1/2/3"
@Override result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
public boolean isSatisified() throws Exception { assertEquals(expectedResult, result);
return listener.result != null;
}
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200));
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
client.disconnect();
client.close();
} }
@Ignore
@Test(timeout = 300000) @Test(timeout = 300000)
public void testOverlappingTopicsLooped() throws Exception {
for (int i = 0; i < 100; ++i) {
LOG.info("Running test iteration: {}", i);
testOverlappingTopics();
}
}
@Test(timeout = 90000)
public void testOverlappingTopics() throws Exception { public void testOverlappingTopics() throws Exception {
final DefaultListener listener = new DefaultListener(); final DefaultListener listener = new DefaultListener();
@ -212,26 +216,16 @@ public class PahoMQTTTest extends MQTTTestSupport {
client.subscribe(ACCOUNT_PREFIX + "#"); client.subscribe(ACCOUNT_PREFIX + "#");
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
String expectedResult = "hello mqtt broker on hash"; String expectedResult = "hello mqtt broker on hash";
client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(), 0, false); client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override String result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
public boolean isSatisified() throws Exception { assertEquals(expectedResult, result);
return listener.result != null;
}
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "hello mqtt broker on a different topic"; expectedResult = "hello mqtt broker on a different topic";
listener.result = null; client.publish(ACCOUNT_PREFIX + "1/2/3/4/5/6", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
client.publish(ACCOUNT_PREFIX + "1/2/3/4/5/6", expectedResult.getBytes(), 0, false); result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
assertTrue(Wait.waitFor(new Wait.Condition() { assertEquals(expectedResult, result);
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
// ***************************************** // *****************************************
@ -242,27 +236,22 @@ public class PahoMQTTTest extends MQTTTestSupport {
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "hello mqtt broker on explicit topic"; expectedResult = "hello mqtt broker on explicit topic";
listener.result = null; client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(), 0, false);
assertTrue(Wait.waitFor(new Wait.Condition() { // One message from topic subscription on ACCOUNT_PREFIX + "#"
@Override result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
public boolean isSatisified() throws Exception { assertEquals(expectedResult, result);
return listener.result != null;
} // One message from topic subscription on ACCOUNT_PREFIX + "1/2/3"
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200))); result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
assertEquals(expectedResult, listener.result); assertEquals(expectedResult, result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "hello mqtt broker on some other topic"; expectedResult = "hello mqtt broker on some other topic";
listener.result = null; client.publish(ACCOUNT_PREFIX + "a/b/c/d/e", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
client.publish(ACCOUNT_PREFIX + "a/b/c/d/e", expectedResult.getBytes(), 0, false); result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
assertTrue(Wait.waitFor(new Wait.Condition() { assertEquals(expectedResult, result);
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
// ***************************************** // *****************************************
@ -272,31 +261,22 @@ public class PahoMQTTTest extends MQTTTestSupport {
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "this should not come back..."; expectedResult = "this should not come back...";
listener.result = null; client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(), 0, false); result = listener.messageQ.poll(3, TimeUnit.SECONDS);
assertFalse(Wait.waitFor(new Wait.Condition() { assertNull(result);
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
}, TimeUnit.SECONDS.toMillis(5)));
assertNull(listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "this should not come back either..."; expectedResult = "this should not come back either...";
listener.result = null; client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(), 0, false); result = listener.messageQ.poll(3, TimeUnit.SECONDS);
assertFalse(Wait.waitFor(new Wait.Condition() { assertNull(result);
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
}, TimeUnit.SECONDS.toMillis(5)));
assertNull(listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
client.disconnect();
client.close();
} }
@Test(timeout = 300000) @Test(timeout = 90000)
public void testCleanSession() throws Exception { public void testCleanSession() throws Exception {
String topic = "test"; String topic = "test";
final DefaultListener listener = new DefaultListener(); final DefaultListener listener = new DefaultListener();
@ -316,13 +296,13 @@ public class PahoMQTTTest extends MQTTTestSupport {
LOG.info("Publish message with QoS 1..."); LOG.info("Publish message with QoS 1...");
String expectedResult = "QOS 1 message"; String expectedResult = "QOS 1 message";
client2.publish(topic, expectedResult.getBytes(), 1, false); client2.publish(topic, expectedResult.getBytes(StandardCharsets.UTF_8), 1, false);
waitForDelivery(client2); waitForDelivery(client2);
// Publish message with QoS 0 // Publish message with QoS 0
LOG.info("Publish message with QoS 0..."); LOG.info("Publish message with QoS 0...");
expectedResult = "QOS 0 message"; expectedResult = "QOS 0 message";
client2.publish(topic, expectedResult.getBytes(), 0, false); client2.publish(topic, expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
waitForDelivery(client2); waitForDelivery(client2);
// subscriber reconnects // subscriber reconnects
@ -335,26 +315,31 @@ public class PahoMQTTTest extends MQTTTestSupport {
assertTrue(Wait.waitFor(new Wait.Condition() { assertTrue(Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return listener.received == 2; return listener.received.get() == 2;
} }
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(100))); }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(100)));
assertEquals(2, listener.received); assertEquals(2, listener.received.get());
disconnect(client3); disconnect(client3);
LOG.info("Disconnected durable subscriber."); LOG.info("Disconnected durable subscriber.");
// make sure we consumed everything // make sure we consumed everything
listener.received = 0; assertTrue(listener.received.compareAndSet(2, 0));
LOG.info("Reconnecting durable subscriber..."); LOG.info("Reconnecting durable subscriber...");
MqttClient client4 = createClient(false, "receive", listener); MqttClient client4 = createClient(false, "receive", listener);
LOG.info("Subscribing durable subscriber..."); LOG.info("Subscribing durable subscriber...");
client4.subscribe(topic, 1); client4.subscribe(topic, 1);
Thread.sleep(3 * 1000); TimeUnit.SECONDS.sleep(3);
assertEquals(0, listener.received); assertEquals(0, listener.received.get());
client2.disconnect();
client2.close();
client4.disconnect();
client4.close();
} }
@Test(timeout = 300000) @Test(timeout = 90000)
public void testClientIdSpecialChars() throws Exception { public void testClientIdSpecialChars() throws Exception {
testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1); testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1);
testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1_1); testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1_1);
@ -374,28 +359,18 @@ public class PahoMQTTTest extends MQTTTestSupport {
client1.subscribe(topic, 1); client1.subscribe(topic, 1);
String message = "Message from client: " + clientId; String message = "Message from client: " + clientId;
client1.publish(topic, message.getBytes(), 1, false); client1.publish(topic, message.getBytes(StandardCharsets.UTF_8), 1, false);
String result = client1MqttCallback.messageQ.poll(45, TimeUnit.MILLISECONDS);
assertEquals(message, result);
assertEquals(1, client1MqttCallback.received.get());
assertTrue(Wait.waitFor(new Wait.Condition() { result = clientAdminMqttCallback.messageQ.poll(45, TimeUnit.MILLISECONDS);
@Override assertEquals(message, result);
public boolean isSatisified() throws Exception {
return client1MqttCallback.result != null;
}
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
assertEquals(message, client1MqttCallback.result);
assertEquals(1, client1MqttCallback.received);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return clientAdminMqttCallback.result != null;
}
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
assertEquals(message, clientAdminMqttCallback.result);
assertTrue(client1.isConnected()); assertTrue(client1.isConnected());
client1.disconnect(); client1.disconnect();
client1.close();
} }
protected void testClientIdSpecialChars(int mqttVersion) throws Exception { protected void testClientIdSpecialChars(int mqttVersion) throws Exception {
@ -414,8 +389,10 @@ public class PahoMQTTTest extends MQTTTestSupport {
testClientId("Consumer:id:AT_LEAST_ONCE", mqttVersion, clientAdminMqttCallback); testClientId("Consumer:id:AT_LEAST_ONCE", mqttVersion, clientAdminMqttCallback);
testClientId("Consumer:qid:EXACTLY_ONCE:VirtualTopic", mqttVersion, clientAdminMqttCallback); testClientId("Consumer:qid:EXACTLY_ONCE:VirtualTopic", mqttVersion, clientAdminMqttCallback);
testClientId("Consumertestmin:testst:AT_LEAST_ONCE.VirtualTopic::AT_LEAST_ONCE", mqttVersion, clientAdminMqttCallback); testClientId("Consumertestmin:testst:AT_LEAST_ONCE.VirtualTopic::AT_LEAST_ONCE", mqttVersion, clientAdminMqttCallback);
}
clientAdmin.disconnect();
clientAdmin.close();
}
protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception { protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception {
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
@ -453,30 +430,28 @@ public class PahoMQTTTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return client.getPendingDeliveryTokens().length == 0; return client.getPendingDeliveryTokens().length == 0;
} }
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(200)); }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100));
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
} }
static class DefaultListener implements MqttCallback { static class DefaultListener implements MqttCallback {
int received = 0; final AtomicInteger received = new AtomicInteger();
volatile String result; final BlockingQueue<String> messageQ = new ArrayBlockingQueue<String>(10);
@Override @Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
} }
@Override @Override
public void messageArrived(String topic, MqttMessage message) throws Exception { public void messageArrived(String topic, MqttMessage message) throws Exception {
LOG.debug("Received: " + message); LOG.info("Received: {}", message);
received++; received.incrementAndGet();
result = new String(message.getPayload()); messageQ.put(new String(message.getPayload(), StandardCharsets.UTF_8));
} }
@Override @Override
public void deliveryComplete(IMqttDeliveryToken token) { public void deliveryComplete(IMqttDeliveryToken token) {
} }
} }
} }