Some cleanups of the tests to speed things up.
This commit is contained in:
Timothy Bish 2015-02-20 19:24:46 -05:00
parent a2b78fdeb0
commit 101b7123fa
7 changed files with 154 additions and 146 deletions

View File

@ -52,7 +52,7 @@ public class MQTTCodecTest {
private MQTTCodec codec;
private final int MESSAGE_SIZE = 5 * 1024 * 1024;
private final int ITERATIONS = 1000;
private final int ITERATIONS = 500;
@Before
public void setUp() throws Exception {

View File

@ -61,6 +61,7 @@ public class MQTTCompositeQueueRetainedTest extends MQTTTestSupport {
brokerService.setAdvisorySupport(false);
brokerService.setSchedulerSupport(isSchedulerSupportEnabled());
brokerService.setPopulateJMSXUserID(true);
brokerService.setUseJmx(false);
final CompositeTopic compositeTopic = new CompositeTopic();
compositeTopic.setName(COMPOSITE_TOPIC);

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.Socket;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLSocketFactory;
@ -73,7 +74,7 @@ public class MQTTConnectTest extends MQTTTestSupport {
@Override
public String getProtocolConfig() {
return "transport.connectAttemptTimeout=2000";
return "transport.connectAttemptTimeout=1000";
}
@Test(timeout = 60 * 1000)
@ -101,7 +102,7 @@ public class MQTTConnectTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception {
return 1 == brokerService.getTransportConnectors().get(0).connectionCount();
}
}));
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100)));
// and it should be closed due to inactivity
assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
@ -109,7 +110,7 @@ public class MQTTConnectTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception {
return 0 == brokerService.getTransportConnectors().get(0).connectionCount();
}
}));
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100)));
assertTrue("no exceptions", exceptions.isEmpty());
}

View File

@ -73,7 +73,7 @@ public class MQTTTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
private static final int NUM_MESSAGES = 250;
private static final int NUM_MESSAGES = 200;
@Test(timeout = 60 * 1000)
public void testSendAndReceiveMQTT() throws Exception {
@ -191,7 +191,7 @@ public class MQTTTest extends MQTTTestSupport {
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Test Message: " + i;
provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
byte[] message = provider.receive(5000);
byte[] message = provider.receive(2000);
assertNotNull("Should get a message", message);
assertEquals(payload, new String(message));
}
@ -394,7 +394,7 @@ public class MQTTTest extends MQTTTestSupport {
assertNotEquals("Subscribe failed " + wildcard, (byte)0x80, qos[0]);
// test retained messages
Message msg = connection.receive(5, TimeUnit.SECONDS);
Message msg = connection.receive(2, TimeUnit.SECONDS);
do {
assertNotNull("RETAINED null " + wildcard, msg);
assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
@ -657,7 +657,7 @@ public class MQTTTest extends MQTTTestSupport {
mqtt.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client received:\n" + frame);
LOG.debug("Client received:\n" + frame);
if (frame.messageType() == PUBLISH.TYPE) {
PUBLISH publish = new PUBLISH();
try {
@ -671,7 +671,7 @@ public class MQTTTest extends MQTTTestSupport {
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client sent:\n" + frame);
LOG.debug("Client sent:\n" + frame);
}
});
@ -694,7 +694,7 @@ public class MQTTTest extends MQTTTestSupport {
connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
int received = 0;
Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
Message msg = connection.receive(2000, TimeUnit.MILLISECONDS);
do {
assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload()));
@ -704,7 +704,7 @@ public class MQTTTest extends MQTTTestSupport {
Thread.sleep(1000);
waitCount++;
}
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
msg = connection.receive(2000, TimeUnit.MILLISECONDS);
} while (msg != null && received++ < subs.length * 2);
assertEquals("Unexpected number of messages", subs.length * 2, received + 1);
@ -742,7 +742,7 @@ public class MQTTTest extends MQTTTestSupport {
mqtt.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client received:\n" + frame);
LOG.debug("Client received:\n" + frame);
if (frame.messageType() == PUBLISH.TYPE) {
PUBLISH publish = new PUBLISH();
try {
@ -756,7 +756,7 @@ public class MQTTTest extends MQTTTestSupport {
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client sent:\n" + frame);
LOG.debug("Client sent:\n" + frame);
}
});
@ -774,7 +774,7 @@ public class MQTTTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception {
return publishList.size() == 2;
}
}, 5000);
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
assertEquals(2, publishList.size());
connection.disconnect();
@ -787,7 +787,7 @@ public class MQTTTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception {
return publishList.size() == 4;
}
}, 5000);
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
assertEquals(4, publishList.size());
// make sure we received duplicate message ids
@ -808,12 +808,12 @@ public class MQTTTest extends MQTTTestSupport {
mqtt.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client received:\n" + frame);
LOG.debug("Client received:\n" + frame);
if (frame.messageType() == PUBLISH.TYPE) {
PUBLISH publish = new PUBLISH();
try {
publish.decode(frame);
LOG.info("PUBLISH " + publish);
LOG.debug("PUBLISH " + publish);
} catch (ProtocolException e) {
fail("Error decoding publish " + e.getMessage());
}
@ -826,7 +826,7 @@ public class MQTTTest extends MQTTTestSupport {
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client sent:\n" + frame);
LOG.debug("Client sent:\n" + frame);
}
});
@ -884,7 +884,7 @@ public class MQTTTest extends MQTTTestSupport {
mqtts[i].setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client received:\n" + frame);
LOG.debug("Client received:\n" + frame);
if (frame.messageType() == PUBLISH.TYPE) {
PUBLISH publish = new PUBLISH();
try {
@ -902,7 +902,7 @@ public class MQTTTest extends MQTTTestSupport {
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client sent:\n" + frame);
LOG.debug("Client sent:\n" + frame);
}
});
}
@ -928,7 +928,6 @@ public class MQTTTest extends MQTTTestSupport {
connection.disconnect();
}
}
@Test(timeout = 60 * 1000)
@ -952,12 +951,12 @@ public class MQTTTest extends MQTTTestSupport {
final BlockingConnection newConnection = mqtt.blockingConnection();
newConnection.connect();
Wait.waitFor(new Wait.Condition() {
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return newConnection.isConnected();
}
});
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100)));
assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]);
Message msg = newConnection.receive(1000, TimeUnit.MILLISECONDS);
@ -972,20 +971,20 @@ public class MQTTTest extends MQTTTestSupport {
final MQTT mqtt = createMQTTConnection("", true);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Wait.waitFor(new Wait.Condition() {
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return connection.isConnected();
}
});
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100)));
connection.subscribe(new Topic[]{new Topic("TopicA", QoS.AT_LEAST_ONCE)});
connection.publish("TopicA", "test".getBytes(), QoS.AT_LEAST_ONCE, true);
Message message = connection.receive(3, TimeUnit.SECONDS);
assertNotNull(message);
Thread.sleep(2000);
connection.subscribe(new Topic[]{new Topic("TopicA", QoS.AT_LEAST_ONCE)});
//TODO fix audit problem for retained messages
//Thread.sleep(2000);
//connection.subscribe(new Topic[]{new Topic("TopicA", QoS.AT_LEAST_ONCE)});
//message = connection.receive(3, TimeUnit.SECONDS);
//assertNotNull(message);
}
@ -1015,7 +1014,7 @@ public class MQTTTest extends MQTTTestSupport {
final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
final BlockingConnection clean = mqttClean.blockingConnection();
clean.connect();
msg = clean.receive(10000, TimeUnit.MILLISECONDS);
msg = clean.receive(2000, TimeUnit.MILLISECONDS);
assertNull(msg);
clean.subscribe(new Topic[] { new Topic(TOPIC, QoS.EXACTLY_ONCE) });
clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
@ -1168,8 +1167,9 @@ public class MQTTTest extends MQTTTestSupport {
connection.subscribe(new Topic[] { new Topic(DOLLAR_TOPIC, QoS.EXACTLY_ONCE)});
connection.publish(DOLLAR_TOPIC, DOLLAR_TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
Message message = connection.receive(10, TimeUnit.SECONDS);
Message message = connection.receive(3, TimeUnit.SECONDS);
assertNull("Publish enabled for $ Topics by default", message);
connection.disconnect();
stopBroker();
@ -1214,14 +1214,14 @@ public class MQTTTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception {
return connection1.isConnected();
}
}));
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100)));
assertTrue("Old client still connected", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return !connection.isConnected();
}
}));
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100)));
connection1.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
connection1.disconnect();
@ -1273,16 +1273,15 @@ public class MQTTTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception {
return connection.isConnected();
}
}));
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(200)));
if (oldConnection.get() != null) {
assertTrue("Old client still connected", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return !oldConnection.get().isConnected();
}
}));
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(200)));
}
oldConnection.set(connection);
@ -1513,13 +1512,13 @@ public class MQTTTest extends MQTTTestSupport {
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.disconnect();
Thread.sleep(1000);
Thread.sleep(500);
}
{
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.disconnect();
Thread.sleep(1000);
Thread.sleep(500);
}
}
@ -1569,7 +1568,7 @@ public class MQTTTest extends MQTTTestSupport {
}
// these should not be received
assertNull(connectionSub.receive(5, TimeUnit.SECONDS));
assertNull(connectionSub.receive(2, TimeUnit.SECONDS));
connectionSub.disconnect();
connectionPub.disconnect();

View File

@ -168,6 +168,8 @@ public class MQTTTestSupport {
brokerService = new BrokerService();
brokerService.setPersistent(isPersistent());
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateConnector(false);
brokerService.setSchedulerSupport(isSchedulerSupportEnabled());
brokerService.setPopulateJMSXUserID(true);
}
@ -397,17 +399,17 @@ public class MQTTTestSupport {
return new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client Received:\n" + frame);
LOG.debug("Client Received:\n" + frame);
}
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client Sent:\n" + frame);
LOG.debug("Client Sent:\n" + frame);
}
@Override
public void debug(String message, Object... args) {
LOG.info(String.format(message, args));
LOG.debug(String.format(message, args));
}
};
}

View File

@ -16,24 +16,33 @@
*/
package org.apache.activemq.transport.mqtt;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.Wait;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.Wait;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PahoMQTTTest extends MQTTTestSupport {
@ -68,9 +77,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
@Override
public void run() {
try {
MqttClient client = new MqttClient("tcp://localhost:" + getPort(),
Thread.currentThread().getName(),
new MemoryPersistence());
MqttClient client = new MqttClient("tcp://localhost:" + getPort(), Thread.currentThread().getName(), new MemoryPersistence());
client.connect();
connectedDoneLatch.countDown();
sendBarrier.await();
@ -110,7 +117,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
assertNull("Async error: " + asyncError.get(), asyncError.get());
}
@Test(timeout=300000)
@Test(timeout = 300000)
public void testSendAndReceiveMQTT() throws Exception {
ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
@ -130,7 +137,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
}
@Test(timeout = 300000)
public void testSubs() throws Exception {
public void testSubs() throws Exception {
stopBroker();
protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
@ -140,17 +147,15 @@ public class PahoMQTTTest extends MQTTTestSupport {
// subscriber connects and creates durable sub
MqttClient client = createClient(false, "receive", listener);
final String ACCOUNT_PREFIX = "test/";
final String ACCOUNT_PREFIX = "test/";
client.subscribe(ACCOUNT_PREFIX+"1/2/3");
client.subscribe(ACCOUNT_PREFIX+"a/+/#");
client.subscribe(ACCOUNT_PREFIX+"#");
client.subscribe(ACCOUNT_PREFIX + "1/2/3");
client.subscribe(ACCOUNT_PREFIX + "a/+/#");
client.subscribe(ACCOUNT_PREFIX + "#");
assertTrue(client.getPendingDeliveryTokens().length == 0);
String expectedResult = "should get everything";
client.publish(ACCOUNT_PREFIX+"1/2/3/4", expectedResult.getBytes(), 0, false);
String expectedResult = "should get everything";
client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
@ -159,11 +164,11 @@ public class PahoMQTTTest extends MQTTTestSupport {
}
});
assertTrue(client.getPendingDeliveryTokens().length == 0);
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
assertEquals(expectedResult, listener.result);
}
@Test(timeout=300000)
@Test(timeout = 300000)
public void testOverlappingTopics() throws Exception {
stopBroker();
@ -174,95 +179,96 @@ public class PahoMQTTTest extends MQTTTestSupport {
// subscriber connects and creates durable sub
MqttClient client = createClient(false, "receive", listener);
final String ACCOUNT_PREFIX = "test/";
final String ACCOUNT_PREFIX = "test/";
// *****************************************
// check a simple # subscribe works
// *****************************************
client.subscribe(ACCOUNT_PREFIX+"#");
// *****************************************
// check a simple # subscribe works
// *****************************************
client.subscribe(ACCOUNT_PREFIX + "#");
assertTrue(client.getPendingDeliveryTokens().length == 0);
String expectedResult = "hello mqtt broker on hash";
client.publish(ACCOUNT_PREFIX+"a/b/c", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
String expectedResult = "hello mqtt broker on hash";
client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(), 0, false);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
});
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "hello mqtt broker on a different topic";
listener.result = null;
client.publish(ACCOUNT_PREFIX+"1/2/3/4/5/6", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
});
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
// *****************************************
// now subscribe on a topic that overlaps the root # wildcard - we should still get everything
// *****************************************
client.subscribe(ACCOUNT_PREFIX+"1/2/3");
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "hello mqtt broker on explicit topic";
listener.result = null;
client.publish(ACCOUNT_PREFIX+"1/2/3", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
});
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "hello mqtt broker on some other topic";
listener.result = null;
client.publish(ACCOUNT_PREFIX+"a/b/c/d/e", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
});
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
// *****************************************
// now unsub hash - we should only get called back on 1/2/3
// *****************************************
client.unsubscribe(ACCOUNT_PREFIX+"#");
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "this should not come back...";
expectedResult = "hello mqtt broker on a different topic";
listener.result = null;
client.publish(ACCOUNT_PREFIX+"1/2/3/4", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
client.publish(ACCOUNT_PREFIX + "1/2/3/4/5/6", expectedResult.getBytes(), 0, false);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
});
assertNull(listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "this should not come back either...";
// *****************************************
// now subscribe on a topic that overlaps the root # wildcard - we
// should still get everything
// *****************************************
client.subscribe(ACCOUNT_PREFIX + "1/2/3");
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "hello mqtt broker on explicit topic";
listener.result = null;
client.publish(ACCOUNT_PREFIX+"a/b/c", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(), 0, false);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
});
assertNull(listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "hello mqtt broker on some other topic";
listener.result = null;
client.publish(ACCOUNT_PREFIX + "a/b/c/d/e", expectedResult.getBytes(), 0, false);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
// *****************************************
// now unsub hash - we should only get called back on 1/2/3
// *****************************************
client.unsubscribe(ACCOUNT_PREFIX + "#");
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "this should not come back...";
listener.result = null;
client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(), 0, false);
assertFalse(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
}, TimeUnit.SECONDS.toMillis(20)));
assertNull(listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "this should not come back either...";
listener.result = null;
client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(), 0, false);
assertFalse(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
}, TimeUnit.SECONDS.toMillis(20)));
assertNull(listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
}
@Test(timeout = 300000)
@ -301,12 +307,12 @@ public class PahoMQTTTest extends MQTTTestSupport {
LOG.info("Subscribing durable subscriber...");
client3.subscribe(topic, 1);
Wait.waitFor(new Wait.Condition() {
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.received == 2;
}
});
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(100)));
assertEquals(2, listener.received);
disconnect(client3);
LOG.info("Disconnected durable subscriber.");
@ -334,7 +340,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception {
return client.isConnected();
}
});
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(100));
return client;
}
@ -346,7 +352,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception {
return !client.isConnected();
}
});
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(100));
}
protected void waitForDelivery(final MqttClient client) throws Exception {
@ -355,7 +361,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception {
return client.getPendingDeliveryTokens().length == 0;
}
});
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(200));
assertTrue(client.getPendingDeliveryTokens().length == 0);
}
@ -371,7 +377,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
LOG.info("Received: " + message);
LOG.debug("Received: " + message);
received++;
result = new String(message.getPayload());
}
@ -381,5 +387,4 @@ public class PahoMQTTTest extends MQTTTestSupport {
}
}
}

View File

@ -20,7 +20,7 @@
#
log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.transport.mqtt=TRACE
log4j.logger.org.apache.activemq.transport.mqtt=INFO
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender