mirror of https://github.com/apache/activemq.git
Adds a test case to show that things work as expected.
This commit is contained in:
parent
d9d9d5b666
commit
74d2c2425f
|
@ -1408,6 +1408,57 @@ public class MQTTTest extends MQTTTestSupport {
|
|||
assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exception {
|
||||
stopBroker();
|
||||
this.persistent = true;
|
||||
startBroker();
|
||||
|
||||
final byte[] payload = new byte[1024 * 32];
|
||||
for (int i = 0; i < payload.length; i++) {
|
||||
payload[i] = '2';
|
||||
}
|
||||
|
||||
int messagesPerRun = 10;
|
||||
|
||||
Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
|
||||
|
||||
{
|
||||
// Establish a durable subscription.
|
||||
MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
|
||||
BlockingConnection connectionSub = mqttSub.blockingConnection();
|
||||
connectionSub.connect();
|
||||
connectionSub.subscribe(topics);
|
||||
connectionSub.disconnect();
|
||||
}
|
||||
|
||||
MQTT mqttPubLoop = createMQTTConnection("MQTT-Pub-Client", true);
|
||||
BlockingConnection connectionPub = mqttPubLoop.blockingConnection();
|
||||
connectionPub.connect();
|
||||
|
||||
for (int i = 0; i < messagesPerRun; ++i) {
|
||||
connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
|
||||
}
|
||||
|
||||
connectionPub.disconnect();
|
||||
|
||||
stopBroker();
|
||||
startBroker();
|
||||
|
||||
MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
|
||||
BlockingConnection connectionSub = mqttSub.blockingConnection();
|
||||
connectionSub.connect();
|
||||
connectionSub.subscribe(topics);
|
||||
|
||||
for (int i = 0; i < messagesPerRun; ++i) {
|
||||
Message message = connectionSub.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
assertTrue(Arrays.equals(payload, message.getPayload()));
|
||||
message.ack();
|
||||
}
|
||||
connectionSub.disconnect();
|
||||
}
|
||||
|
||||
@Test(timeout = 30 * 1000)
|
||||
public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
|
||||
stopBroker();
|
||||
|
|
Loading…
Reference in New Issue