From ea04426bcd2b3dab266abd17c2568524a5e3b6b5 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Wed, 12 Oct 2022 11:03:09 -0500 Subject: [PATCH] ARTEMIS-4037 refactor MQTTRetainMessageManagerTest Commit 5a42de5fa6ee1b96f6f3e404f5a3d11a702e1776 called my attention to this test. It really needs to be refactored because: - It belongs in the integration-tests module rather than the MQTT protocol module. - It is using a lot of non-standard components (e.g. EmbeddedJMSResource, Awaitility, etc.). - It is overly complicated (e.g. using its own MqttClientService). This commit resolves all those problems. The new implementation is quite a bit different but still equivalent. I reverted the original fix from ARTEMIS-2476 and the test still fails. --- .../artemis-mqtt-protocol/pom.xml | 38 --- .../mqtt/MQTTRetainMessageManagerTest.java | 263 ------------------ .../core/protocol/mqtt/MqttClientService.java | 128 --------- .../embedded-artemis-server-mqtt.xml | 34 --- .../mqtt5/MQTTRetainMessageManagerTest.java | 168 +++++++++++ 5 files changed, 168 insertions(+), 463 deletions(-) delete mode 100644 artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManagerTest.java delete mode 100644 artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MqttClientService.java delete mode 100644 artemis-protocols/artemis-mqtt-protocol/src/test/resources/embedded-artemis-server-mqtt.xml create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTTRetainMessageManagerTest.java diff --git a/artemis-protocols/artemis-mqtt-protocol/pom.xml b/artemis-protocols/artemis-mqtt-protocol/pom.xml index 50e08ec8da..4197967846 100644 --- a/artemis-protocols/artemis-mqtt-protocol/pom.xml +++ b/artemis-protocols/artemis-mqtt-protocol/pom.xml @@ -45,11 +45,6 @@ org.slf4j slf4j-api - - org.apache.logging.log4j - log4j-slf4j-impl - test - org.apache.activemq artemis-server @@ -94,44 +89,11 @@ org.osgi osgi.cmpn - - org.apache.activemq - artemis-junit - ${project.version} - test - - - junit - junit - test - - - org.eclipse.paho - org.eclipse.paho.client.mqttv3 - test - - - org.eclipse.paho - org.eclipse.paho.mqttv5.client - test - - - org.awaitility - awaitility - 4.0.1 - test - jakarta.annotation jakarta.annotation-api test jar - - org.apache.activemq - artemis-unit-test-support - ${project.version} - test - diff --git a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManagerTest.java b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManagerTest.java deleted file mode 100644 index f54f66aeba..0000000000 --- a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManagerTest.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.mqtt; - -import static java.util.Objects.nonNull; -import static org.awaitility.Awaitility.await; -import static org.awaitility.Durations.FIVE_HUNDRED_MILLISECONDS; -import static org.awaitility.Durations.TEN_SECONDS; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.nio.charset.StandardCharsets; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.IntStream; - -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.config.WildcardConfiguration; -import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.junit.EmbeddedJMSResource; -import org.apache.activemq.artemis.utils.collections.LinkedListIterator; -import org.apache.commons.lang3.RandomStringUtils; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.RuleChain; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; - -@SuppressWarnings("deprecation") -public class MQTTRetainMessageManagerTest { - - private Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-server-mqtt.xml"); - - private MqttClientService mqttPublisher; - - private MqttClientService mqttConsumerCount; - private MqttClientService mqttConsumerBeforePublish; - private MqttClientService mqttConsumerAfterPublish; - private MqttClientService mqttConsumerAfterPublish2; - - private final AtomicInteger publishCount = new AtomicInteger(0); - - private final AtomicInteger arrivedCountBeforePublish = new AtomicInteger(); - private final AtomicInteger arrivedCountAferPublish = new AtomicInteger(); - private final AtomicInteger arrivedCountAferPublish2 = new AtomicInteger(); - - private final AtomicReference lastMessagePublished = new AtomicReference<>(); - private final AtomicReference lastMessageArrivedOnConsumerBeforePublish = new AtomicReference<>(); - private final AtomicReference lastMessageArrivedOnConsumerAfterPublish = new AtomicReference<>(); - private final AtomicReference lastMessageArrivedOnConsumerAfterPublish2 = new AtomicReference<>(); - - private final String topic = "fact"; - - private final int numberOfMessages = 1000; - private final int numberOfTests = 10; - - @Rule - public RuleChain rulechain = RuleChain.outerRule(jmsServer); - - @Before - public void beforeEach() throws MqttException { - publishCount.set(0); - mqttPublisher = new MqttClientService("publisher", null); - mqttPublisher.init(); - - final MqttMessage clearRetainedMessage = new MqttMessage(new byte[] {}); - clearRetainedMessage.setRetained(true); - clearRetainedMessage.setQos(1); - mqttPublisher.publish(topic, clearRetainedMessage); - - mqttConsumerCount = new MqttClientService("consumer-count", null); - mqttConsumerCount.init(); - mqttConsumerCount.setMessageConsumer(message -> publishCount.incrementAndGet()); - - arrivedCountBeforePublish.set(0); - mqttConsumerBeforePublish = new MqttClientService("consumer-before", - message -> { - final String payload = new String(message.getPayload()); - lastMessageArrivedOnConsumerBeforePublish.set(message); - arrivedCountBeforePublish.incrementAndGet(); - log.debug("[MQTT][before ][retained: {}][duplicate: {}][qos: {}] {}", - message.isRetained(), message.isDuplicate(), message.getQos(), payload); - }); - mqttConsumerBeforePublish.init(); - - arrivedCountAferPublish.set(0); - arrivedCountAferPublish2.set(0); - mqttConsumerAfterPublish = new MqttClientService("consumer-after", - message -> { - final String payload = new String(message.getPayload()); - lastMessageArrivedOnConsumerAfterPublish.set(message); - arrivedCountAferPublish.incrementAndGet(); - log.info("[MQTT][after ][retained: {}][duplicate: {}][qos: {}] {}", - message.isRetained(), message.isDuplicate(), message.getQos(), payload); - }); - mqttConsumerAfterPublish2 = new MqttClientService("consumer-after2", - message -> { - final String payload = new String(message.getPayload()); - lastMessageArrivedOnConsumerAfterPublish2.set(message); - arrivedCountAferPublish2.incrementAndGet(); - log.info("[MQTT][after2 ][retained: {}][duplicate: {}][qos: {}] {}", - message.isRetained(), message.isDuplicate(), message.getQos(), payload); - }); - mqttConsumerAfterPublish.init(); - mqttConsumerAfterPublish2.init(); - } - - @After - public void afterEach() throws MqttException { - mqttPublisher.destroy(); - - mqttConsumerCount.unsubsribe(topic); - mqttConsumerCount.destroy(); - - mqttConsumerBeforePublish.unsubsribe(topic); - mqttConsumerBeforePublish.destroy(); - - mqttConsumerAfterPublish.unsubsribe(topic); - mqttConsumerAfterPublish.destroy(); - - mqttConsumerAfterPublish2.unsubsribe(topic); - mqttConsumerAfterPublish2.destroy(); - } - - @Test - public void testAtMostOnce() throws MqttException { - IntStream.of(numberOfTests).forEach(i -> actAndAssert(i, 0)); - } - - @Test - public void testAtLeastOnce() throws MqttException { - IntStream.of(numberOfTests).forEach(i -> actAndAssert(i, 1)); - } - - @Test - public void testExactlyOnce() throws MqttException { - IntStream.of(numberOfTests).forEach(i -> actAndAssert(i, 2)); - } - - private void actAndAssert(int i, int qos) { - try { - // Act - mqttConsumerBeforePublish.subscribe(topic, qos); - publish(qos); - logAftePublish(i, qos); - logRetainedMessagesQueue(); - mqttConsumerAfterPublish.subscribe(topic, qos); - mqttConsumerAfterPublish2.subscribe(topic, qos); - awaitUntilLastMessageArrivedOnConsumerAfterPublish(); - awaitUntilLastMessageArrivedOnConsumerAfterPublish2(); - - // Assert - assertEquals(1, arrivedCountAferPublish.get()); - assertLastMessageOnConsumerBeforePublishArrivedEqualsLastMessagePublished(); - assertLastMessageOnConsumerAfterPublishArrivedEqualsLastMessagePublished(); - } catch (MqttException e) { - fail(e.getMessage()); - } - } - - protected void publish(final int qos) throws MqttException { - mqttConsumerCount.subscribe(topic, qos); - IntStream.range(0, numberOfMessages).forEach(i -> { - final String fact = String.format("[%s] %s", i, RandomStringUtils.randomAlphanumeric(128)); - final MqttMessage message = message(fact, qos, true); - mqttPublisher.publish(topic, message); - lastMessagePublished.set(message); - }); - awaitUntilPiblishCount(); - } - - protected MqttMessage message(final String payload, final int qos, final boolean retained) { - final MqttMessage message = new MqttMessage(); - message.setQos(qos); - message.setRetained(retained); - message.setPayload(payload.getBytes()); - return message; - } - - private void awaitUntilPiblishCount() { - await() - .with() - .pollDelay(FIVE_HUNDRED_MILLISECONDS) - .atMost(TEN_SECONDS) - .until(() -> publishCount.get() >= numberOfMessages); - } - - private void awaitUntilLastMessageArrivedOnConsumerAfterPublish() { - await() - .pollDelay(FIVE_HUNDRED_MILLISECONDS) - .atMost(TEN_SECONDS) - .until(() -> nonNull(lastMessageArrivedOnConsumerAfterPublish.get())); - } - - private void awaitUntilLastMessageArrivedOnConsumerAfterPublish2() { - await() - .pollDelay(FIVE_HUNDRED_MILLISECONDS) - .atMost(TEN_SECONDS) - .until(() -> nonNull(lastMessageArrivedOnConsumerAfterPublish2.get())); - } - - private void assertLastMessageOnConsumerBeforePublishArrivedEqualsLastMessagePublished() { - assertArrayEquals(String.format( - "\nMessage arrived on consumer subscribed before the publish is different from the last published message!\nPublished: %s\nArrived : %s\n", - new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())), - lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerBeforePublish.get().getPayload()); - } - - private void assertLastMessageOnConsumerAfterPublishArrivedEqualsLastMessagePublished() { - assertArrayEquals(String.format( - "\nMessage arrived on consumer subscribed after the publish is different from the last published message!\nPublished: %s\nArrived : %s\n", - new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())), - lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish.get().getPayload()); - assertArrayEquals(String.format( - "\nMessage arrived on consumer subscribed after the publish (2) is different from the last published message!\nPublished: %s\nArrived : %s\n", - new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())), - lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish2.get().getPayload()); - } - - private void logAftePublish(int i, int qos) { - log.info("--- QoS: {} --- {}/{}---", qos, i, numberOfTests); - log.info("[MQTT][publish][retained: {}][duplicate: {}][qos: {}] {}", - lastMessagePublished.get().isRetained(), lastMessagePublished.get().isDuplicate(), lastMessagePublished.get().getQos(), lastMessagePublished.get()); - log.info("[MQTT][before ][retained: {}][duplicate: {}][qos: {}] {}", - lastMessageArrivedOnConsumerBeforePublish.get().isRetained(), - lastMessageArrivedOnConsumerBeforePublish.get().isDuplicate(), - lastMessageArrivedOnConsumerBeforePublish.get().getQos(), - new String(lastMessageArrivedOnConsumerBeforePublish.get().getPayload())); - } - - private void logRetainedMessagesQueue() { - final WildcardConfiguration wildcardConfiguration = new WildcardConfiguration(); - final String retainAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, topic, wildcardConfiguration); - final Queue queue = jmsServer.getDestinationQueue(retainAddress); - final LinkedListIterator browserIterator = queue.browserIterator(); - browserIterator.forEachRemaining(messageReference -> { - final Message message = messageReference.getMessage(); - final String body = message.toCore().getBuffer().toString(StandardCharsets.UTF_8); - log.info("[MQTT][{}][{}][{}]", retainAddress, message, body); - }); - } -} diff --git a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MqttClientService.java b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MqttClientService.java deleted file mode 100644 index 4e315cc919..0000000000 --- a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MqttClientService.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.mqtt; - -import static java.util.Objects.nonNull; -import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.function.Consumer; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; - -import io.netty.util.concurrent.DefaultThreadFactory; - -public class MqttClientService implements MqttCallback { - - private final String clientId; - - private Consumer messageConsumer; - - private MqttClient mqttClient; - - private MemoryPersistence persistence = new MemoryPersistence(); - private ScheduledExecutorService executorService; - private int corePoolSize = 5; - - private Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - public MqttClientService() { - this("producer", null); - } - - public MqttClientService(final String clientId, Consumer messageConsumer) { - this.clientId = clientId; - this.messageConsumer = messageConsumer; - } - - @PostConstruct - public void init() throws MqttException { - final String serverURI = "tcp://localhost:1883"; - final MqttConnectOptions options = new MqttConnectOptions(); - options.setAutomaticReconnect(true); - options.setCleanSession(false); - options.setMaxInflight(1000); - options.setServerURIs(new String[] {serverURI}); - options.setMqttVersion(MQTT_VERSION_3_1_1); - - final ThreadFactory threadFactory = new DefaultThreadFactory("mqtt-client-exec"); - executorService = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); - mqttClient = new MqttClient(serverURI, clientId, persistence, executorService); - mqttClient.setTimeToWait(-1); - mqttClient.connect(options); - mqttClient.setCallback(this); - log.debug("[MQTT][Connected][client: {}]", clientId); - } - - @PreDestroy - public void destroy() throws MqttException { - mqttClient.disconnect(); - executorService.shutdownNow(); - log.debug("[MQTT][Disconnected][client: {}]", clientId); - } - - @Override - public void connectionLost(Throwable cause) { - log.error("[MQTT][connectionLost][{}]", cause.getMessage()); - } - - @Override - public void messageArrived(String topic, MqttMessage message) { - log.debug("[MQTT][messageArrived][client: {}][topic: {}][message: {}]", clientId, topic, message); - if (nonNull(messageConsumer)) { - messageConsumer.accept(message); - } - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - log.trace("[MQTT][deliveryComplete][token: {}]", token); - } - - public void publish(final String topic, final MqttMessage message) { - try { - mqttClient.publish(topic, message); - } catch (final MqttException e) { - log.error(e.getMessage(), e); - } - } - - public void subscribe(final String topicFilter, int qos) throws MqttException { - mqttClient.subscribe(topicFilter, qos); - } - - public void unsubsribe(final String topicFilter) throws MqttException { - mqttClient.unsubscribe(topicFilter); - } - - public void setMessageConsumer(Consumer messageConsumer) { - this.messageConsumer = messageConsumer; - } -} diff --git a/artemis-protocols/artemis-mqtt-protocol/src/test/resources/embedded-artemis-server-mqtt.xml b/artemis-protocols/artemis-mqtt-protocol/src/test/resources/embedded-artemis-server-mqtt.xml deleted file mode 100644 index b0b65e0fe5..0000000000 --- a/artemis-protocols/artemis-mqtt-protocol/src/test/resources/embedded-artemis-server-mqtt.xml +++ /dev/null @@ -1,34 +0,0 @@ - - - - false - false - - - - - vm://0 - tcp://127.0.0.1:1883?protocols=MQTT&useEpoll=false&useKQueue=false - - - - diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTTRetainMessageManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTTRetainMessageManagerTest.java new file mode 100644 index 0000000000..8ae10aaad0 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTTRetainMessageManagerTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.mqtt5; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class MQTTRetainMessageManagerTest extends MQTT5TestSupport { + + private MqttClient mqttPublisher; + + private MqttClient mqttConsumerBeforePublish; + private MqttClient mqttConsumerAfterPublish; + private MqttClient mqttConsumerAfterPublish2; + + private final AtomicInteger arrivedCountBeforePublish = new AtomicInteger(); + private final AtomicInteger arrivedCountAferPublish = new AtomicInteger(); + private final AtomicInteger arrivedCountAferPublish2 = new AtomicInteger(); + + private final AtomicReference lastMessagePublished = new AtomicReference<>(); + private final AtomicReference lastMessageArrivedOnConsumerBeforePublish = new AtomicReference<>(); + private final AtomicReference lastMessageArrivedOnConsumerAfterPublish = new AtomicReference<>(); + private final AtomicReference lastMessageArrivedOnConsumerAfterPublish2 = new AtomicReference<>(); + + private final String topic = "fact"; + + private final int numberOfMessages = 1000; + private final int numberOfTests = 10; + + public MQTTRetainMessageManagerTest(String protocol) { + super(protocol); + } + + @Before + public void beforeEach() throws MqttException { + mqttPublisher = createPahoClient("publisher"); + mqttPublisher.connect(); + + final MqttMessage clearRetainedMessage = new MqttMessage(new byte[] {}); + clearRetainedMessage.setRetained(true); + clearRetainedMessage.setQos(1); + mqttPublisher.publish(topic, clearRetainedMessage); + + arrivedCountBeforePublish.set(0); + mqttConsumerBeforePublish = createPahoClient("consumer-before"); + mqttConsumerBeforePublish.setCallback(new DefaultMqttCallback() { + @Override + public void messageArrived(String topic, MqttMessage message) { + lastMessageArrivedOnConsumerBeforePublish.set(message); + arrivedCountBeforePublish.incrementAndGet(); + } + }); + mqttConsumerBeforePublish.connect(); + + arrivedCountAferPublish.set(0); + mqttConsumerAfterPublish = createPahoClient("consumer-after"); + mqttConsumerAfterPublish.setCallback(new DefaultMqttCallback() { + @Override + public void messageArrived(String topic, MqttMessage message) { + lastMessageArrivedOnConsumerAfterPublish.set(message); + arrivedCountAferPublish.incrementAndGet(); + } + }); + mqttConsumerAfterPublish.connect(); + + arrivedCountAferPublish2.set(0); + mqttConsumerAfterPublish2 = createPahoClient("consumer-after2"); + mqttConsumerAfterPublish2.setCallback(new DefaultMqttCallback() { + @Override + public void messageArrived(String topic, MqttMessage message) { + lastMessageArrivedOnConsumerAfterPublish2.set(message); + arrivedCountAferPublish2.incrementAndGet(); + } + }); + mqttConsumerAfterPublish2.connect(); + } + + @After + public void afterEach() throws MqttException { + mqttPublisher.disconnect(); + mqttPublisher.close(); + + mqttConsumerBeforePublish.unsubscribe(topic); + mqttConsumerBeforePublish.disconnect(); + mqttConsumerBeforePublish.close(); + + mqttConsumerAfterPublish.unsubscribe(topic); + mqttConsumerAfterPublish.disconnect(); + mqttConsumerAfterPublish.close(); + + mqttConsumerAfterPublish2.unsubscribe(topic); + mqttConsumerAfterPublish2.disconnect(); + mqttConsumerAfterPublish2.close(); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testAtMostOnce() { + IntStream.of(numberOfTests).forEach(i -> test(0)); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testAtLeastOnce() { + IntStream.of(numberOfTests).forEach(i -> test(1)); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testExactlyOnce() { + IntStream.of(numberOfTests).forEach(i -> test(2)); + } + + private void test(int qos) { + try { + mqttConsumerBeforePublish.subscribe(topic, qos); + for (int i = 0; i < numberOfMessages; i++) { + final MqttMessage message = new MqttMessage(); + message.setQos(qos); + message.setRetained(true); + message.setPayload(RandomUtil.randomBytes(128)); + mqttPublisher.publish(topic, message); + lastMessagePublished.set(message); + } + Wait.waitFor(() -> server.getAddressInfo(SimpleString.toSimpleString(topic)).getRoutedMessageCount() >= numberOfMessages, 5000, 100); + mqttConsumerAfterPublish.subscribe(topic, qos); + mqttConsumerAfterPublish2.subscribe(topic, qos); + Wait.waitFor(() -> lastMessageArrivedOnConsumerAfterPublish.get() != null, 5000, 100); + Wait.waitFor(() -> lastMessageArrivedOnConsumerAfterPublish2.get() != null, 5000, 100); + + assertEquals(1, arrivedCountAferPublish.get()); + assertArrayEquals(String.format( + "\nMessage arrived on consumer subscribed before the publish is different from the last published message!\nPublished: %s\nArrived : %s\n", + new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())), + lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerBeforePublish.get().getPayload()); + assertArrayEquals(String.format( + "\nMessage arrived on consumer subscribed after the publish is different from the last published message!\nPublished: %s\nArrived : %s\n", + new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())), + lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish.get().getPayload()); + assertArrayEquals(String.format( + "\nMessage arrived on consumer subscribed after the publish (2) is different from the last published message!\nPublished: %s\nArrived : %s\n", + new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())), + lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish2.get().getPayload()); + } catch (MqttException e) { + fail(e.getMessage()); + } + } +}