diff --git a/artemis-protocols/artemis-mqtt-protocol/pom.xml b/artemis-protocols/artemis-mqtt-protocol/pom.xml
index 50e08ec8da..4197967846 100644
--- a/artemis-protocols/artemis-mqtt-protocol/pom.xml
+++ b/artemis-protocols/artemis-mqtt-protocol/pom.xml
@@ -45,11 +45,6 @@
org.slf4j
slf4j-api
-
- org.apache.logging.log4j
- log4j-slf4j-impl
- test
-
org.apache.activemq
artemis-server
@@ -94,44 +89,11 @@
org.osgi
osgi.cmpn
-
- org.apache.activemq
- artemis-junit
- ${project.version}
- test
-
-
- junit
- junit
- test
-
-
- org.eclipse.paho
- org.eclipse.paho.client.mqttv3
- test
-
-
- org.eclipse.paho
- org.eclipse.paho.mqttv5.client
- test
-
-
- org.awaitility
- awaitility
- 4.0.1
- test
-
jakarta.annotation
jakarta.annotation-api
test
jar
-
- org.apache.activemq
- artemis-unit-test-support
- ${project.version}
- test
-
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManagerTest.java b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManagerTest.java
deleted file mode 100644
index f54f66aeba..0000000000
--- a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManagerTest.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.mqtt;
-
-import static java.util.Objects.nonNull;
-import static org.awaitility.Awaitility.await;
-import static org.awaitility.Durations.FIVE_HUNDRED_MILLISECONDS;
-import static org.awaitility.Durations.TEN_SECONDS;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.IntStream;
-
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.config.WildcardConfiguration;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.junit.EmbeddedJMSResource;
-import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.RuleChain;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
-
-@SuppressWarnings("deprecation")
-public class MQTTRetainMessageManagerTest {
-
- private Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-server-mqtt.xml");
-
- private MqttClientService mqttPublisher;
-
- private MqttClientService mqttConsumerCount;
- private MqttClientService mqttConsumerBeforePublish;
- private MqttClientService mqttConsumerAfterPublish;
- private MqttClientService mqttConsumerAfterPublish2;
-
- private final AtomicInteger publishCount = new AtomicInteger(0);
-
- private final AtomicInteger arrivedCountBeforePublish = new AtomicInteger();
- private final AtomicInteger arrivedCountAferPublish = new AtomicInteger();
- private final AtomicInteger arrivedCountAferPublish2 = new AtomicInteger();
-
- private final AtomicReference lastMessagePublished = new AtomicReference<>();
- private final AtomicReference lastMessageArrivedOnConsumerBeforePublish = new AtomicReference<>();
- private final AtomicReference lastMessageArrivedOnConsumerAfterPublish = new AtomicReference<>();
- private final AtomicReference lastMessageArrivedOnConsumerAfterPublish2 = new AtomicReference<>();
-
- private final String topic = "fact";
-
- private final int numberOfMessages = 1000;
- private final int numberOfTests = 10;
-
- @Rule
- public RuleChain rulechain = RuleChain.outerRule(jmsServer);
-
- @Before
- public void beforeEach() throws MqttException {
- publishCount.set(0);
- mqttPublisher = new MqttClientService("publisher", null);
- mqttPublisher.init();
-
- final MqttMessage clearRetainedMessage = new MqttMessage(new byte[] {});
- clearRetainedMessage.setRetained(true);
- clearRetainedMessage.setQos(1);
- mqttPublisher.publish(topic, clearRetainedMessage);
-
- mqttConsumerCount = new MqttClientService("consumer-count", null);
- mqttConsumerCount.init();
- mqttConsumerCount.setMessageConsumer(message -> publishCount.incrementAndGet());
-
- arrivedCountBeforePublish.set(0);
- mqttConsumerBeforePublish = new MqttClientService("consumer-before",
- message -> {
- final String payload = new String(message.getPayload());
- lastMessageArrivedOnConsumerBeforePublish.set(message);
- arrivedCountBeforePublish.incrementAndGet();
- log.debug("[MQTT][before ][retained: {}][duplicate: {}][qos: {}] {}",
- message.isRetained(), message.isDuplicate(), message.getQos(), payload);
- });
- mqttConsumerBeforePublish.init();
-
- arrivedCountAferPublish.set(0);
- arrivedCountAferPublish2.set(0);
- mqttConsumerAfterPublish = new MqttClientService("consumer-after",
- message -> {
- final String payload = new String(message.getPayload());
- lastMessageArrivedOnConsumerAfterPublish.set(message);
- arrivedCountAferPublish.incrementAndGet();
- log.info("[MQTT][after ][retained: {}][duplicate: {}][qos: {}] {}",
- message.isRetained(), message.isDuplicate(), message.getQos(), payload);
- });
- mqttConsumerAfterPublish2 = new MqttClientService("consumer-after2",
- message -> {
- final String payload = new String(message.getPayload());
- lastMessageArrivedOnConsumerAfterPublish2.set(message);
- arrivedCountAferPublish2.incrementAndGet();
- log.info("[MQTT][after2 ][retained: {}][duplicate: {}][qos: {}] {}",
- message.isRetained(), message.isDuplicate(), message.getQos(), payload);
- });
- mqttConsumerAfterPublish.init();
- mqttConsumerAfterPublish2.init();
- }
-
- @After
- public void afterEach() throws MqttException {
- mqttPublisher.destroy();
-
- mqttConsumerCount.unsubsribe(topic);
- mqttConsumerCount.destroy();
-
- mqttConsumerBeforePublish.unsubsribe(topic);
- mqttConsumerBeforePublish.destroy();
-
- mqttConsumerAfterPublish.unsubsribe(topic);
- mqttConsumerAfterPublish.destroy();
-
- mqttConsumerAfterPublish2.unsubsribe(topic);
- mqttConsumerAfterPublish2.destroy();
- }
-
- @Test
- public void testAtMostOnce() throws MqttException {
- IntStream.of(numberOfTests).forEach(i -> actAndAssert(i, 0));
- }
-
- @Test
- public void testAtLeastOnce() throws MqttException {
- IntStream.of(numberOfTests).forEach(i -> actAndAssert(i, 1));
- }
-
- @Test
- public void testExactlyOnce() throws MqttException {
- IntStream.of(numberOfTests).forEach(i -> actAndAssert(i, 2));
- }
-
- private void actAndAssert(int i, int qos) {
- try {
- // Act
- mqttConsumerBeforePublish.subscribe(topic, qos);
- publish(qos);
- logAftePublish(i, qos);
- logRetainedMessagesQueue();
- mqttConsumerAfterPublish.subscribe(topic, qos);
- mqttConsumerAfterPublish2.subscribe(topic, qos);
- awaitUntilLastMessageArrivedOnConsumerAfterPublish();
- awaitUntilLastMessageArrivedOnConsumerAfterPublish2();
-
- // Assert
- assertEquals(1, arrivedCountAferPublish.get());
- assertLastMessageOnConsumerBeforePublishArrivedEqualsLastMessagePublished();
- assertLastMessageOnConsumerAfterPublishArrivedEqualsLastMessagePublished();
- } catch (MqttException e) {
- fail(e.getMessage());
- }
- }
-
- protected void publish(final int qos) throws MqttException {
- mqttConsumerCount.subscribe(topic, qos);
- IntStream.range(0, numberOfMessages).forEach(i -> {
- final String fact = String.format("[%s] %s", i, RandomStringUtils.randomAlphanumeric(128));
- final MqttMessage message = message(fact, qos, true);
- mqttPublisher.publish(topic, message);
- lastMessagePublished.set(message);
- });
- awaitUntilPiblishCount();
- }
-
- protected MqttMessage message(final String payload, final int qos, final boolean retained) {
- final MqttMessage message = new MqttMessage();
- message.setQos(qos);
- message.setRetained(retained);
- message.setPayload(payload.getBytes());
- return message;
- }
-
- private void awaitUntilPiblishCount() {
- await()
- .with()
- .pollDelay(FIVE_HUNDRED_MILLISECONDS)
- .atMost(TEN_SECONDS)
- .until(() -> publishCount.get() >= numberOfMessages);
- }
-
- private void awaitUntilLastMessageArrivedOnConsumerAfterPublish() {
- await()
- .pollDelay(FIVE_HUNDRED_MILLISECONDS)
- .atMost(TEN_SECONDS)
- .until(() -> nonNull(lastMessageArrivedOnConsumerAfterPublish.get()));
- }
-
- private void awaitUntilLastMessageArrivedOnConsumerAfterPublish2() {
- await()
- .pollDelay(FIVE_HUNDRED_MILLISECONDS)
- .atMost(TEN_SECONDS)
- .until(() -> nonNull(lastMessageArrivedOnConsumerAfterPublish2.get()));
- }
-
- private void assertLastMessageOnConsumerBeforePublishArrivedEqualsLastMessagePublished() {
- assertArrayEquals(String.format(
- "\nMessage arrived on consumer subscribed before the publish is different from the last published message!\nPublished: %s\nArrived : %s\n",
- new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
- lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerBeforePublish.get().getPayload());
- }
-
- private void assertLastMessageOnConsumerAfterPublishArrivedEqualsLastMessagePublished() {
- assertArrayEquals(String.format(
- "\nMessage arrived on consumer subscribed after the publish is different from the last published message!\nPublished: %s\nArrived : %s\n",
- new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
- lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish.get().getPayload());
- assertArrayEquals(String.format(
- "\nMessage arrived on consumer subscribed after the publish (2) is different from the last published message!\nPublished: %s\nArrived : %s\n",
- new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
- lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish2.get().getPayload());
- }
-
- private void logAftePublish(int i, int qos) {
- log.info("--- QoS: {} --- {}/{}---", qos, i, numberOfTests);
- log.info("[MQTT][publish][retained: {}][duplicate: {}][qos: {}] {}",
- lastMessagePublished.get().isRetained(), lastMessagePublished.get().isDuplicate(), lastMessagePublished.get().getQos(), lastMessagePublished.get());
- log.info("[MQTT][before ][retained: {}][duplicate: {}][qos: {}] {}",
- lastMessageArrivedOnConsumerBeforePublish.get().isRetained(),
- lastMessageArrivedOnConsumerBeforePublish.get().isDuplicate(),
- lastMessageArrivedOnConsumerBeforePublish.get().getQos(),
- new String(lastMessageArrivedOnConsumerBeforePublish.get().getPayload()));
- }
-
- private void logRetainedMessagesQueue() {
- final WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
- final String retainAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, topic, wildcardConfiguration);
- final Queue queue = jmsServer.getDestinationQueue(retainAddress);
- final LinkedListIterator browserIterator = queue.browserIterator();
- browserIterator.forEachRemaining(messageReference -> {
- final Message message = messageReference.getMessage();
- final String body = message.toCore().getBuffer().toString(StandardCharsets.UTF_8);
- log.info("[MQTT][{}][{}][{}]", retainAddress, message, body);
- });
- }
-}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MqttClientService.java b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MqttClientService.java
deleted file mode 100644
index 4e315cc919..0000000000
--- a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MqttClientService.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.mqtt;
-
-import static java.util.Objects.nonNull;
-import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.function.Consumer;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-public class MqttClientService implements MqttCallback {
-
- private final String clientId;
-
- private Consumer messageConsumer;
-
- private MqttClient mqttClient;
-
- private MemoryPersistence persistence = new MemoryPersistence();
- private ScheduledExecutorService executorService;
- private int corePoolSize = 5;
-
- private Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public MqttClientService() {
- this("producer", null);
- }
-
- public MqttClientService(final String clientId, Consumer messageConsumer) {
- this.clientId = clientId;
- this.messageConsumer = messageConsumer;
- }
-
- @PostConstruct
- public void init() throws MqttException {
- final String serverURI = "tcp://localhost:1883";
- final MqttConnectOptions options = new MqttConnectOptions();
- options.setAutomaticReconnect(true);
- options.setCleanSession(false);
- options.setMaxInflight(1000);
- options.setServerURIs(new String[] {serverURI});
- options.setMqttVersion(MQTT_VERSION_3_1_1);
-
- final ThreadFactory threadFactory = new DefaultThreadFactory("mqtt-client-exec");
- executorService = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
- mqttClient = new MqttClient(serverURI, clientId, persistence, executorService);
- mqttClient.setTimeToWait(-1);
- mqttClient.connect(options);
- mqttClient.setCallback(this);
- log.debug("[MQTT][Connected][client: {}]", clientId);
- }
-
- @PreDestroy
- public void destroy() throws MqttException {
- mqttClient.disconnect();
- executorService.shutdownNow();
- log.debug("[MQTT][Disconnected][client: {}]", clientId);
- }
-
- @Override
- public void connectionLost(Throwable cause) {
- log.error("[MQTT][connectionLost][{}]", cause.getMessage());
- }
-
- @Override
- public void messageArrived(String topic, MqttMessage message) {
- log.debug("[MQTT][messageArrived][client: {}][topic: {}][message: {}]", clientId, topic, message);
- if (nonNull(messageConsumer)) {
- messageConsumer.accept(message);
- }
- }
-
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- log.trace("[MQTT][deliveryComplete][token: {}]", token);
- }
-
- public void publish(final String topic, final MqttMessage message) {
- try {
- mqttClient.publish(topic, message);
- } catch (final MqttException e) {
- log.error(e.getMessage(), e);
- }
- }
-
- public void subscribe(final String topicFilter, int qos) throws MqttException {
- mqttClient.subscribe(topicFilter, qos);
- }
-
- public void unsubsribe(final String topicFilter) throws MqttException {
- mqttClient.unsubscribe(topicFilter);
- }
-
- public void setMessageConsumer(Consumer messageConsumer) {
- this.messageConsumer = messageConsumer;
- }
-}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/test/resources/embedded-artemis-server-mqtt.xml b/artemis-protocols/artemis-mqtt-protocol/src/test/resources/embedded-artemis-server-mqtt.xml
deleted file mode 100644
index b0b65e0fe5..0000000000
--- a/artemis-protocols/artemis-mqtt-protocol/src/test/resources/embedded-artemis-server-mqtt.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-
-
-
- false
- false
-
-
-
-
- vm://0
- tcp://127.0.0.1:1883?protocols=MQTT&useEpoll=false&useKQueue=false
-
-
-
-
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTTRetainMessageManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTTRetainMessageManagerTest.java
new file mode 100644
index 0000000000..8ae10aaad0
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTTRetainMessageManagerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt5;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.eclipse.paho.mqttv5.client.MqttClient;
+import org.eclipse.paho.mqttv5.common.MqttException;
+import org.eclipse.paho.mqttv5.common.MqttMessage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MQTTRetainMessageManagerTest extends MQTT5TestSupport {
+
+ private MqttClient mqttPublisher;
+
+ private MqttClient mqttConsumerBeforePublish;
+ private MqttClient mqttConsumerAfterPublish;
+ private MqttClient mqttConsumerAfterPublish2;
+
+ private final AtomicInteger arrivedCountBeforePublish = new AtomicInteger();
+ private final AtomicInteger arrivedCountAferPublish = new AtomicInteger();
+ private final AtomicInteger arrivedCountAferPublish2 = new AtomicInteger();
+
+ private final AtomicReference lastMessagePublished = new AtomicReference<>();
+ private final AtomicReference lastMessageArrivedOnConsumerBeforePublish = new AtomicReference<>();
+ private final AtomicReference lastMessageArrivedOnConsumerAfterPublish = new AtomicReference<>();
+ private final AtomicReference lastMessageArrivedOnConsumerAfterPublish2 = new AtomicReference<>();
+
+ private final String topic = "fact";
+
+ private final int numberOfMessages = 1000;
+ private final int numberOfTests = 10;
+
+ public MQTTRetainMessageManagerTest(String protocol) {
+ super(protocol);
+ }
+
+ @Before
+ public void beforeEach() throws MqttException {
+ mqttPublisher = createPahoClient("publisher");
+ mqttPublisher.connect();
+
+ final MqttMessage clearRetainedMessage = new MqttMessage(new byte[] {});
+ clearRetainedMessage.setRetained(true);
+ clearRetainedMessage.setQos(1);
+ mqttPublisher.publish(topic, clearRetainedMessage);
+
+ arrivedCountBeforePublish.set(0);
+ mqttConsumerBeforePublish = createPahoClient("consumer-before");
+ mqttConsumerBeforePublish.setCallback(new DefaultMqttCallback() {
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ lastMessageArrivedOnConsumerBeforePublish.set(message);
+ arrivedCountBeforePublish.incrementAndGet();
+ }
+ });
+ mqttConsumerBeforePublish.connect();
+
+ arrivedCountAferPublish.set(0);
+ mqttConsumerAfterPublish = createPahoClient("consumer-after");
+ mqttConsumerAfterPublish.setCallback(new DefaultMqttCallback() {
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ lastMessageArrivedOnConsumerAfterPublish.set(message);
+ arrivedCountAferPublish.incrementAndGet();
+ }
+ });
+ mqttConsumerAfterPublish.connect();
+
+ arrivedCountAferPublish2.set(0);
+ mqttConsumerAfterPublish2 = createPahoClient("consumer-after2");
+ mqttConsumerAfterPublish2.setCallback(new DefaultMqttCallback() {
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ lastMessageArrivedOnConsumerAfterPublish2.set(message);
+ arrivedCountAferPublish2.incrementAndGet();
+ }
+ });
+ mqttConsumerAfterPublish2.connect();
+ }
+
+ @After
+ public void afterEach() throws MqttException {
+ mqttPublisher.disconnect();
+ mqttPublisher.close();
+
+ mqttConsumerBeforePublish.unsubscribe(topic);
+ mqttConsumerBeforePublish.disconnect();
+ mqttConsumerBeforePublish.close();
+
+ mqttConsumerAfterPublish.unsubscribe(topic);
+ mqttConsumerAfterPublish.disconnect();
+ mqttConsumerAfterPublish.close();
+
+ mqttConsumerAfterPublish2.unsubscribe(topic);
+ mqttConsumerAfterPublish2.disconnect();
+ mqttConsumerAfterPublish2.close();
+ }
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testAtMostOnce() {
+ IntStream.of(numberOfTests).forEach(i -> test(0));
+ }
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testAtLeastOnce() {
+ IntStream.of(numberOfTests).forEach(i -> test(1));
+ }
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testExactlyOnce() {
+ IntStream.of(numberOfTests).forEach(i -> test(2));
+ }
+
+ private void test(int qos) {
+ try {
+ mqttConsumerBeforePublish.subscribe(topic, qos);
+ for (int i = 0; i < numberOfMessages; i++) {
+ final MqttMessage message = new MqttMessage();
+ message.setQos(qos);
+ message.setRetained(true);
+ message.setPayload(RandomUtil.randomBytes(128));
+ mqttPublisher.publish(topic, message);
+ lastMessagePublished.set(message);
+ }
+ Wait.waitFor(() -> server.getAddressInfo(SimpleString.toSimpleString(topic)).getRoutedMessageCount() >= numberOfMessages, 5000, 100);
+ mqttConsumerAfterPublish.subscribe(topic, qos);
+ mqttConsumerAfterPublish2.subscribe(topic, qos);
+ Wait.waitFor(() -> lastMessageArrivedOnConsumerAfterPublish.get() != null, 5000, 100);
+ Wait.waitFor(() -> lastMessageArrivedOnConsumerAfterPublish2.get() != null, 5000, 100);
+
+ assertEquals(1, arrivedCountAferPublish.get());
+ assertArrayEquals(String.format(
+ "\nMessage arrived on consumer subscribed before the publish is different from the last published message!\nPublished: %s\nArrived : %s\n",
+ new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
+ lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerBeforePublish.get().getPayload());
+ assertArrayEquals(String.format(
+ "\nMessage arrived on consumer subscribed after the publish is different from the last published message!\nPublished: %s\nArrived : %s\n",
+ new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
+ lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish.get().getPayload());
+ assertArrayEquals(String.format(
+ "\nMessage arrived on consumer subscribed after the publish (2) is different from the last published message!\nPublished: %s\nArrived : %s\n",
+ new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
+ lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish2.get().getPayload());
+ } catch (MqttException e) {
+ fail(e.getMessage());
+ }
+ }
+}