git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1494283 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-06-18 20:22:51 +00:00
parent 2484c67cb1
commit e2b932ef03
1 changed files with 64 additions and 49 deletions

View File

@ -114,61 +114,76 @@ public class MQTTTest extends AbstractMQTTTest {
@Test(timeout=300000)
public void testReceiveMessageSentWhileOffline() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider publisher = getMQTTClientProvider();
initializeConnection(publisher);
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("MQTT-Client");
mqtt.setCleanSession(false);
{
final BlockingConnection subscriber = mqtt.blockingConnection();
subscriber.connect();
Topic[] topic = {new Topic("foo", QoS.EXACTLY_ONCE)};
subscriber.subscribe(topic);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
}
for (int i = 0; i < numberOfMessages / 2; i++) {
Message message = subscriber.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
byte[] payload = message.getPayload();
String messageContent = new String(payload);
if (i % 100 == 0) {
LOG.debug("Received message from topic: " + message.getTopic() +
" Message content: " + messageContent);
}
message.ack();
}
subscriber.disconnect();
byte[] payload = new byte[1024 * 32];
for (int i = 0; i < payload.length; i++){
payload[i] = '2';
}
publisher.disconnect();
int numberOfRuns = 100;
int messagesPerRun = 2;
final BlockingConnection subscriber = mqtt.blockingConnection();
subscriber.connect();
Topic[] topic = {new Topic("foo", QoS.EXACTLY_ONCE)};
subscriber.subscribe(topic);
addMQTTConnector("trace=true");
brokerService.start();
MQTT mqttPub = createMQTTConnection();
mqttPub.setClientId("MQTT-Pub-Client");
for (int i = 0; i < numberOfMessages / 2; i++) {
Message message = subscriber.receive(5, TimeUnit.SECONDS);
MQTT mqttSub = createMQTTConnection();
mqttSub.setClientId("MQTT-Sub-Client");
mqttSub.setCleanSession(false);
final BlockingConnection connectionPub = mqttPub.blockingConnection();
connectionPub.connect();
BlockingConnection connectionSub = mqttSub.blockingConnection();
connectionSub.connect();
Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
connectionSub.subscribe(topics);
for (int i = 0; i < messagesPerRun; ++i) {
connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
}
int received = 0;
for (int i = 0; i < messagesPerRun; ++i) {
Message message = connectionSub.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
byte[] payload = message.getPayload();
received++;
payload = message.getPayload();
String messageContent = new String(payload);
if (i % 100 == 0) {
LOG.debug("Received message from topic: " + message.getTopic() +
" Message content: " + messageContent);
}
LOG.info("Received message from topic: " + message.getTopic() +
" Message content: " + messageContent);
message.ack();
}
connectionSub.disconnect();
subscriber.disconnect();
for(int j = 0; j < numberOfRuns; j++) {
for (int i = 0; i < messagesPerRun; ++i) {
connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
}
mqttSub = createMQTTConnection();
mqttSub.setClientId("MQTT-Sub-Client");
mqttSub.setCleanSession(false);
connectionSub = mqttSub.blockingConnection();
connectionSub.connect();
connectionSub.subscribe(topics);
for (int i = 0; i < messagesPerRun; ++i) {
Message message = connectionSub.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
received++;
payload = message.getPayload();
String messageContent = new String(payload);
LOG.info("Received message from topic: " + message.getTopic() +
" Message content: " + messageContent);
message.ack();
}
connectionSub.disconnect();
}
assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
}
@Test(timeout=30000)
@ -219,7 +234,7 @@ public class MQTTTest extends AbstractMQTTTest {
@Override
protected void addMQTTConnector() throws Exception {
addMQTTConnector("");
addMQTTConnector();
}
@Override
@ -249,12 +264,12 @@ public class MQTTTest extends AbstractMQTTTest {
return new Tracer(){
@Override
public void onReceive(MQTTFrame frame) {
// LOG.info("Client Received:\n"+frame);
LOG.info("Client Received:\n"+frame);
}
@Override
public void onSend(MQTTFrame frame) {
// LOG.info("Client Sent:\n" + frame);
LOG.info("Client Sent:\n" + frame);
}
@Override