mirror of https://github.com/apache/activemq.git
make the test use the Wait condition instead of hard sleeps git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1401666 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6b44c3c9d1
commit
d3571a6154
|
@ -16,6 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.mqtt;
|
package org.apache.activemq.transport.mqtt;
|
||||||
|
|
||||||
|
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
@ -39,6 +45,7 @@ import org.apache.activemq.transport.Transport;
|
||||||
import org.apache.activemq.transport.TransportListener;
|
import org.apache.activemq.transport.TransportListener;
|
||||||
import org.apache.activemq.transport.tcp.TcpTransport;
|
import org.apache.activemq.transport.tcp.TcpTransport;
|
||||||
import org.apache.activemq.util.ByteSequence;
|
import org.apache.activemq.util.ByteSequence;
|
||||||
|
import org.apache.activemq.util.Wait;
|
||||||
import org.fusesource.hawtbuf.UTF8Buffer;
|
import org.fusesource.hawtbuf.UTF8Buffer;
|
||||||
import org.fusesource.mqtt.client.BlockingConnection;
|
import org.fusesource.mqtt.client.BlockingConnection;
|
||||||
import org.fusesource.mqtt.client.MQTT;
|
import org.fusesource.mqtt.client.MQTT;
|
||||||
|
@ -51,8 +58,6 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
|
|
||||||
import static org.junit.Assert.*;
|
|
||||||
|
|
||||||
|
|
||||||
public class MQTTTest {
|
public class MQTTTest {
|
||||||
|
@ -296,7 +301,7 @@ public class MQTTTest {
|
||||||
|
|
||||||
addMQTTConnector(brokerService);
|
addMQTTConnector(brokerService);
|
||||||
brokerService.start();
|
brokerService.start();
|
||||||
TransportConnector mqttConnector = brokerService.getTransportConnectorByScheme("mqtt");
|
final TransportConnector mqttConnector = brokerService.getTransportConnectorByScheme("mqtt");
|
||||||
|
|
||||||
// manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn
|
// manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn
|
||||||
// from timing out
|
// from timing out
|
||||||
|
@ -306,19 +311,28 @@ public class MQTTTest {
|
||||||
clientTransport.oneway(connectFrame.encode());
|
clientTransport.oneway(connectFrame.encode());
|
||||||
|
|
||||||
// wait for broker to register the MQTT connection
|
// wait for broker to register the MQTT connection
|
||||||
TimeUnit.SECONDS.sleep(1);
|
assertTrue("MQTT Connection should be registered.", Wait.waitFor(new Wait.Condition() {
|
||||||
assertTrue(mqttConnector.getConnections().size() > 0);
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return mqttConnector.getConnections().size() > 0;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
// wait for broker to time out the MQTT connection due to inactivity
|
||||||
|
assertTrue("MQTT Connection should be timed out.", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return mqttConnector.getConnections().size() == 0;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
// wait for the inactivity monitor to remove the connection due to inactivity
|
|
||||||
TimeUnit.SECONDS.sleep(10);
|
|
||||||
assertTrue(mqttConnector.getConnections().size() == 0);
|
|
||||||
assertTrue("Should have seen client transport exception", exceptions.size() > 0);
|
assertTrue("Should have seen client transport exception", exceptions.size() > 0);
|
||||||
|
|
||||||
clientTransport.stop();
|
clientTransport.stop();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private Transport createManualMQTTClient() throws IOException, URISyntaxException {
|
private Transport createManualMQTTClient() throws IOException, URISyntaxException {
|
||||||
Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(),
|
Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(),
|
||||||
new URI("tcp://localhost:1883"), null);
|
new URI("tcp://localhost:1883"), null);
|
||||||
|
@ -352,9 +366,13 @@ public class MQTTTest {
|
||||||
final BlockingConnection connection = mqtt.blockingConnection();
|
final BlockingConnection connection = mqtt.blockingConnection();
|
||||||
connection.connect();
|
connection.connect();
|
||||||
|
|
||||||
TimeUnit.SECONDS.sleep(10);
|
assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
assertTrue("KeepAlive didn't work properly", connection.isConnected());
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return connection.isConnected();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
connection.disconnect();
|
connection.disconnect();
|
||||||
}
|
}
|
||||||
|
@ -368,10 +386,13 @@ public class MQTTTest {
|
||||||
final BlockingConnection connection = mqtt.blockingConnection();
|
final BlockingConnection connection = mqtt.blockingConnection();
|
||||||
connection.connect();
|
connection.connect();
|
||||||
|
|
||||||
TimeUnit.SECONDS.sleep(10);
|
assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
assertTrue("KeepAlive didn't work properly", connection.isConnected());
|
public boolean isSatisified() throws Exception {
|
||||||
|
return connection.isConnected();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
connection.disconnect();
|
connection.disconnect();
|
||||||
}
|
}
|
||||||
|
@ -385,9 +406,13 @@ public class MQTTTest {
|
||||||
final BlockingConnection connection = mqtt.blockingConnection();
|
final BlockingConnection connection = mqtt.blockingConnection();
|
||||||
connection.connect();
|
connection.connect();
|
||||||
|
|
||||||
TimeUnit.SECONDS.sleep(10);
|
assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
assertTrue("KeepAlive didn't work properly", connection.isConnected());
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return connection.isConnected();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
connection.disconnect();
|
connection.disconnect();
|
||||||
}
|
}
|
||||||
|
@ -402,12 +427,14 @@ public class MQTTTest {
|
||||||
final BlockingConnection connection = mqtt.blockingConnection();
|
final BlockingConnection connection = mqtt.blockingConnection();
|
||||||
connection.connect();
|
connection.connect();
|
||||||
|
|
||||||
TimeUnit.SECONDS.sleep(10);
|
assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
assertFalse("KeepAlive didn't work properly", connection.isConnected());
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return connection.isConnected();
|
||||||
|
}
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected void addMQTTConnector(BrokerService brokerService) throws Exception {
|
protected void addMQTTConnector(BrokerService brokerService) throws Exception {
|
||||||
brokerService.addConnector("mqtt://localhost:1883");
|
brokerService.addConnector("mqtt://localhost:1883");
|
||||||
|
@ -425,6 +452,4 @@ public class MQTTTest {
|
||||||
mqtt.setReconnectAttemptsMax(0);
|
mqtt.setReconnectAttemptsMax(0);
|
||||||
return mqtt;
|
return mqtt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue