diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 791e7981ba..ce585a5430 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -1671,24 +1671,29 @@ public class MQTTTest extends MQTTTestSupport { } connectionSub.disconnect(); - for (int j = 0; j < numberOfRuns; j++) { + try { + for (int j = 0; j < numberOfRuns; j++) { - for (int i = 0; i < messagesPerRun; ++i) { - connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false); + for (int i = 0; i < messagesPerRun; ++i) { + connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false); + } + + connectionSub = mqttSub.blockingConnection(); + connectionSub.connect(); + connectionSub.subscribe(topics); + + for (int i = 0; i < messagesPerRun; ++i) { + Message message = connectionSub.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + received++; + assertTrue(Arrays.equals(payload, message.getPayload())); + message.ack(); + } + connectionSub.disconnect(); } - - connectionSub = mqttSub.blockingConnection(); - connectionSub.connect(); - connectionSub.subscribe(topics); - - for (int i = 0; i < messagesPerRun; ++i) { - Message message = connectionSub.receive(5, TimeUnit.SECONDS); - assertNotNull(message); - received++; - assertTrue(Arrays.equals(payload, message.getPayload())); - message.ack(); - } - connectionSub.disconnect(); + } catch (Exception exception) { + LOG.error("unexpected exception", exception); + exception.printStackTrace(); } assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received); }