git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1494222 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-06-18 17:37:01 +00:00
parent 7ef79f2d2a
commit 2484c67cb1
1 changed files with 70 additions and 0 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.transport.mqtt;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
@ -72,6 +74,7 @@ public class MQTTTest extends AbstractMQTTTest {
connection.disconnect();
}
@Test(timeout=300000)
public void testSubscribeMultipleTopics() throws Exception {
byte[] payload = new byte[1024 * 32];
for (int i = 0; i < payload.length; i++){
@ -109,6 +112,65 @@ public class MQTTTest extends AbstractMQTTTest {
assertEquals("Should have received " + topics.length + " messages", topics.length, received);
}
@Test(timeout=300000)
public void testReceiveMessageSentWhileOffline() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider publisher = getMQTTClientProvider();
initializeConnection(publisher);
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("MQTT-Client");
mqtt.setCleanSession(false);
{
final BlockingConnection subscriber = mqtt.blockingConnection();
subscriber.connect();
Topic[] topic = {new Topic("foo", QoS.EXACTLY_ONCE)};
subscriber.subscribe(topic);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
}
for (int i = 0; i < numberOfMessages / 2; i++) {
Message message = subscriber.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
byte[] payload = message.getPayload();
String messageContent = new String(payload);
if (i % 100 == 0) {
LOG.debug("Received message from topic: " + message.getTopic() +
" Message content: " + messageContent);
}
message.ack();
}
subscriber.disconnect();
}
publisher.disconnect();
final BlockingConnection subscriber = mqtt.blockingConnection();
subscriber.connect();
Topic[] topic = {new Topic("foo", QoS.EXACTLY_ONCE)};
subscriber.subscribe(topic);
for (int i = 0; i < numberOfMessages / 2; i++) {
Message message = subscriber.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
byte[] payload = message.getPayload();
String messageContent = new String(payload);
if (i % 100 == 0) {
LOG.debug("Received message from topic: " + message.getTopic() +
" Message content: " + messageContent);
}
message.ack();
}
subscriber.disconnect();
}
@Test(timeout=30000)
public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
// default keep alive in milliseconds
@ -166,10 +228,18 @@ public class MQTTTest extends AbstractMQTTTest {
}
protected MQTT createMQTTConnection() throws Exception {
return createMQTTConnection(null, false);
}
protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);
mqtt.setReconnectAttemptsMax(0);
mqtt.setTracer(createTracer());
if (clientId != null) {
mqtt.setClientId(clientId);
}
mqtt.setCleanSession(clean);
mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
// shut off connect retry
return mqtt;