ARTEMIS-4037 refactor MQTTRetainMessageManagerTest

Commit 5a42de5fa6 called my attention to
this test. It really needs to be refactored because:

 - It belongs in the integration-tests module rather than the MQTT
   protocol module.
 - It is using a lot of non-standard components (e.g.
   EmbeddedJMSResource, Awaitility, etc.).
 - It is overly complicated (e.g. using its own MqttClientService).

This commit resolves all those problems. The new implementation is quite
a bit different but still equivalent. I reverted the original fix from
ARTEMIS-2476 and the test still fails.
This commit is contained in:
Justin Bertram 2022-10-12 11:03:09 -05:00 committed by Robbie Gemmell
parent d968f0c901
commit ea04426bcd
5 changed files with 168 additions and 463 deletions

View File

@ -45,11 +45,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
@ -94,44 +89,11 @@
<groupId>org.osgi</groupId>
<artifactId>osgi.cmpn</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-junit</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
<scope>test</scope>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-unit-test-support</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,263 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import static java.util.Objects.nonNull;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.FIVE_HUNDRED_MILLISECONDS;
import static org.awaitility.Durations.TEN_SECONDS;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.junit.EmbeddedJMSResource;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.commons.lang3.RandomStringUtils;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
@SuppressWarnings("deprecation")
public class MQTTRetainMessageManagerTest {
private Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-server-mqtt.xml");
private MqttClientService mqttPublisher;
private MqttClientService mqttConsumerCount;
private MqttClientService mqttConsumerBeforePublish;
private MqttClientService mqttConsumerAfterPublish;
private MqttClientService mqttConsumerAfterPublish2;
private final AtomicInteger publishCount = new AtomicInteger(0);
private final AtomicInteger arrivedCountBeforePublish = new AtomicInteger();
private final AtomicInteger arrivedCountAferPublish = new AtomicInteger();
private final AtomicInteger arrivedCountAferPublish2 = new AtomicInteger();
private final AtomicReference<MqttMessage> lastMessagePublished = new AtomicReference<>();
private final AtomicReference<MqttMessage> lastMessageArrivedOnConsumerBeforePublish = new AtomicReference<>();
private final AtomicReference<MqttMessage> lastMessageArrivedOnConsumerAfterPublish = new AtomicReference<>();
private final AtomicReference<MqttMessage> lastMessageArrivedOnConsumerAfterPublish2 = new AtomicReference<>();
private final String topic = "fact";
private final int numberOfMessages = 1000;
private final int numberOfTests = 10;
@Rule
public RuleChain rulechain = RuleChain.outerRule(jmsServer);
@Before
public void beforeEach() throws MqttException {
publishCount.set(0);
mqttPublisher = new MqttClientService("publisher", null);
mqttPublisher.init();
final MqttMessage clearRetainedMessage = new MqttMessage(new byte[] {});
clearRetainedMessage.setRetained(true);
clearRetainedMessage.setQos(1);
mqttPublisher.publish(topic, clearRetainedMessage);
mqttConsumerCount = new MqttClientService("consumer-count", null);
mqttConsumerCount.init();
mqttConsumerCount.setMessageConsumer(message -> publishCount.incrementAndGet());
arrivedCountBeforePublish.set(0);
mqttConsumerBeforePublish = new MqttClientService("consumer-before",
message -> {
final String payload = new String(message.getPayload());
lastMessageArrivedOnConsumerBeforePublish.set(message);
arrivedCountBeforePublish.incrementAndGet();
log.debug("[MQTT][before ][retained: {}][duplicate: {}][qos: {}] {}",
message.isRetained(), message.isDuplicate(), message.getQos(), payload);
});
mqttConsumerBeforePublish.init();
arrivedCountAferPublish.set(0);
arrivedCountAferPublish2.set(0);
mqttConsumerAfterPublish = new MqttClientService("consumer-after",
message -> {
final String payload = new String(message.getPayload());
lastMessageArrivedOnConsumerAfterPublish.set(message);
arrivedCountAferPublish.incrementAndGet();
log.info("[MQTT][after ][retained: {}][duplicate: {}][qos: {}] {}",
message.isRetained(), message.isDuplicate(), message.getQos(), payload);
});
mqttConsumerAfterPublish2 = new MqttClientService("consumer-after2",
message -> {
final String payload = new String(message.getPayload());
lastMessageArrivedOnConsumerAfterPublish2.set(message);
arrivedCountAferPublish2.incrementAndGet();
log.info("[MQTT][after2 ][retained: {}][duplicate: {}][qos: {}] {}",
message.isRetained(), message.isDuplicate(), message.getQos(), payload);
});
mqttConsumerAfterPublish.init();
mqttConsumerAfterPublish2.init();
}
@After
public void afterEach() throws MqttException {
mqttPublisher.destroy();
mqttConsumerCount.unsubsribe(topic);
mqttConsumerCount.destroy();
mqttConsumerBeforePublish.unsubsribe(topic);
mqttConsumerBeforePublish.destroy();
mqttConsumerAfterPublish.unsubsribe(topic);
mqttConsumerAfterPublish.destroy();
mqttConsumerAfterPublish2.unsubsribe(topic);
mqttConsumerAfterPublish2.destroy();
}
@Test
public void testAtMostOnce() throws MqttException {
IntStream.of(numberOfTests).forEach(i -> actAndAssert(i, 0));
}
@Test
public void testAtLeastOnce() throws MqttException {
IntStream.of(numberOfTests).forEach(i -> actAndAssert(i, 1));
}
@Test
public void testExactlyOnce() throws MqttException {
IntStream.of(numberOfTests).forEach(i -> actAndAssert(i, 2));
}
private void actAndAssert(int i, int qos) {
try {
// Act
mqttConsumerBeforePublish.subscribe(topic, qos);
publish(qos);
logAftePublish(i, qos);
logRetainedMessagesQueue();
mqttConsumerAfterPublish.subscribe(topic, qos);
mqttConsumerAfterPublish2.subscribe(topic, qos);
awaitUntilLastMessageArrivedOnConsumerAfterPublish();
awaitUntilLastMessageArrivedOnConsumerAfterPublish2();
// Assert
assertEquals(1, arrivedCountAferPublish.get());
assertLastMessageOnConsumerBeforePublishArrivedEqualsLastMessagePublished();
assertLastMessageOnConsumerAfterPublishArrivedEqualsLastMessagePublished();
} catch (MqttException e) {
fail(e.getMessage());
}
}
protected void publish(final int qos) throws MqttException {
mqttConsumerCount.subscribe(topic, qos);
IntStream.range(0, numberOfMessages).forEach(i -> {
final String fact = String.format("[%s] %s", i, RandomStringUtils.randomAlphanumeric(128));
final MqttMessage message = message(fact, qos, true);
mqttPublisher.publish(topic, message);
lastMessagePublished.set(message);
});
awaitUntilPiblishCount();
}
protected MqttMessage message(final String payload, final int qos, final boolean retained) {
final MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(payload.getBytes());
return message;
}
private void awaitUntilPiblishCount() {
await()
.with()
.pollDelay(FIVE_HUNDRED_MILLISECONDS)
.atMost(TEN_SECONDS)
.until(() -> publishCount.get() >= numberOfMessages);
}
private void awaitUntilLastMessageArrivedOnConsumerAfterPublish() {
await()
.pollDelay(FIVE_HUNDRED_MILLISECONDS)
.atMost(TEN_SECONDS)
.until(() -> nonNull(lastMessageArrivedOnConsumerAfterPublish.get()));
}
private void awaitUntilLastMessageArrivedOnConsumerAfterPublish2() {
await()
.pollDelay(FIVE_HUNDRED_MILLISECONDS)
.atMost(TEN_SECONDS)
.until(() -> nonNull(lastMessageArrivedOnConsumerAfterPublish2.get()));
}
private void assertLastMessageOnConsumerBeforePublishArrivedEqualsLastMessagePublished() {
assertArrayEquals(String.format(
"\nMessage arrived on consumer subscribed before the publish is different from the last published message!\nPublished: %s\nArrived : %s\n",
new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerBeforePublish.get().getPayload());
}
private void assertLastMessageOnConsumerAfterPublishArrivedEqualsLastMessagePublished() {
assertArrayEquals(String.format(
"\nMessage arrived on consumer subscribed after the publish is different from the last published message!\nPublished: %s\nArrived : %s\n",
new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish.get().getPayload());
assertArrayEquals(String.format(
"\nMessage arrived on consumer subscribed after the publish (2) is different from the last published message!\nPublished: %s\nArrived : %s\n",
new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish2.get().getPayload());
}
private void logAftePublish(int i, int qos) {
log.info("--- QoS: {} --- {}/{}---", qos, i, numberOfTests);
log.info("[MQTT][publish][retained: {}][duplicate: {}][qos: {}] {}",
lastMessagePublished.get().isRetained(), lastMessagePublished.get().isDuplicate(), lastMessagePublished.get().getQos(), lastMessagePublished.get());
log.info("[MQTT][before ][retained: {}][duplicate: {}][qos: {}] {}",
lastMessageArrivedOnConsumerBeforePublish.get().isRetained(),
lastMessageArrivedOnConsumerBeforePublish.get().isDuplicate(),
lastMessageArrivedOnConsumerBeforePublish.get().getQos(),
new String(lastMessageArrivedOnConsumerBeforePublish.get().getPayload()));
}
private void logRetainedMessagesQueue() {
final WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
final String retainAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, topic, wildcardConfiguration);
final Queue queue = jmsServer.getDestinationQueue(retainAddress);
final LinkedListIterator<MessageReference> browserIterator = queue.browserIterator();
browserIterator.forEachRemaining(messageReference -> {
final Message message = messageReference.getMessage();
final String body = message.toCore().getBuffer().toString(StandardCharsets.UTF_8);
log.info("[MQTT][{}][{}][{}]", retainAddress, message, body);
});
}
}

View File

@ -1,128 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import static java.util.Objects.nonNull;
import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import io.netty.util.concurrent.DefaultThreadFactory;
public class MqttClientService implements MqttCallback {
private final String clientId;
private Consumer<MqttMessage> messageConsumer;
private MqttClient mqttClient;
private MemoryPersistence persistence = new MemoryPersistence();
private ScheduledExecutorService executorService;
private int corePoolSize = 5;
private Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public MqttClientService() {
this("producer", null);
}
public MqttClientService(final String clientId, Consumer<MqttMessage> messageConsumer) {
this.clientId = clientId;
this.messageConsumer = messageConsumer;
}
@PostConstruct
public void init() throws MqttException {
final String serverURI = "tcp://localhost:1883";
final MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(false);
options.setMaxInflight(1000);
options.setServerURIs(new String[] {serverURI});
options.setMqttVersion(MQTT_VERSION_3_1_1);
final ThreadFactory threadFactory = new DefaultThreadFactory("mqtt-client-exec");
executorService = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
mqttClient = new MqttClient(serverURI, clientId, persistence, executorService);
mqttClient.setTimeToWait(-1);
mqttClient.connect(options);
mqttClient.setCallback(this);
log.debug("[MQTT][Connected][client: {}]", clientId);
}
@PreDestroy
public void destroy() throws MqttException {
mqttClient.disconnect();
executorService.shutdownNow();
log.debug("[MQTT][Disconnected][client: {}]", clientId);
}
@Override
public void connectionLost(Throwable cause) {
log.error("[MQTT][connectionLost][{}]", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
log.debug("[MQTT][messageArrived][client: {}][topic: {}][message: {}]", clientId, topic, message);
if (nonNull(messageConsumer)) {
messageConsumer.accept(message);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.trace("[MQTT][deliveryComplete][token: {}]", token);
}
public void publish(final String topic, final MqttMessage message) {
try {
mqttClient.publish(topic, message);
} catch (final MqttException e) {
log.error(e.getMessage(), e);
}
}
public void subscribe(final String topicFilter, int qos) throws MqttException {
mqttClient.subscribe(topicFilter, qos);
}
public void unsubsribe(final String topicFilter) throws MqttException {
mqttClient.unsubscribe(topicFilter);
}
public void setMessageConsumer(Consumer<MqttMessage> messageConsumer) {
this.messageConsumer = messageConsumer;
}
}

View File

@ -1,34 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
urn:activemq /schema/artemis-server.xsd
urn:activemq:core /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- Acceptors -->
<acceptors>
<!-- In VM acceptor -->
<acceptor name="in-vm">vm://0</acceptor>
<acceptor name="mqtt">tcp://127.0.0.1:1883?protocols=MQTT&amp;useEpoll=false&amp;useKQueue=false</acceptor>
</acceptors>
</core>
</configuration>

View File

@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt5;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class MQTTRetainMessageManagerTest extends MQTT5TestSupport {
private MqttClient mqttPublisher;
private MqttClient mqttConsumerBeforePublish;
private MqttClient mqttConsumerAfterPublish;
private MqttClient mqttConsumerAfterPublish2;
private final AtomicInteger arrivedCountBeforePublish = new AtomicInteger();
private final AtomicInteger arrivedCountAferPublish = new AtomicInteger();
private final AtomicInteger arrivedCountAferPublish2 = new AtomicInteger();
private final AtomicReference<MqttMessage> lastMessagePublished = new AtomicReference<>();
private final AtomicReference<MqttMessage> lastMessageArrivedOnConsumerBeforePublish = new AtomicReference<>();
private final AtomicReference<MqttMessage> lastMessageArrivedOnConsumerAfterPublish = new AtomicReference<>();
private final AtomicReference<MqttMessage> lastMessageArrivedOnConsumerAfterPublish2 = new AtomicReference<>();
private final String topic = "fact";
private final int numberOfMessages = 1000;
private final int numberOfTests = 10;
public MQTTRetainMessageManagerTest(String protocol) {
super(protocol);
}
@Before
public void beforeEach() throws MqttException {
mqttPublisher = createPahoClient("publisher");
mqttPublisher.connect();
final MqttMessage clearRetainedMessage = new MqttMessage(new byte[] {});
clearRetainedMessage.setRetained(true);
clearRetainedMessage.setQos(1);
mqttPublisher.publish(topic, clearRetainedMessage);
arrivedCountBeforePublish.set(0);
mqttConsumerBeforePublish = createPahoClient("consumer-before");
mqttConsumerBeforePublish.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
lastMessageArrivedOnConsumerBeforePublish.set(message);
arrivedCountBeforePublish.incrementAndGet();
}
});
mqttConsumerBeforePublish.connect();
arrivedCountAferPublish.set(0);
mqttConsumerAfterPublish = createPahoClient("consumer-after");
mqttConsumerAfterPublish.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
lastMessageArrivedOnConsumerAfterPublish.set(message);
arrivedCountAferPublish.incrementAndGet();
}
});
mqttConsumerAfterPublish.connect();
arrivedCountAferPublish2.set(0);
mqttConsumerAfterPublish2 = createPahoClient("consumer-after2");
mqttConsumerAfterPublish2.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
lastMessageArrivedOnConsumerAfterPublish2.set(message);
arrivedCountAferPublish2.incrementAndGet();
}
});
mqttConsumerAfterPublish2.connect();
}
@After
public void afterEach() throws MqttException {
mqttPublisher.disconnect();
mqttPublisher.close();
mqttConsumerBeforePublish.unsubscribe(topic);
mqttConsumerBeforePublish.disconnect();
mqttConsumerBeforePublish.close();
mqttConsumerAfterPublish.unsubscribe(topic);
mqttConsumerAfterPublish.disconnect();
mqttConsumerAfterPublish.close();
mqttConsumerAfterPublish2.unsubscribe(topic);
mqttConsumerAfterPublish2.disconnect();
mqttConsumerAfterPublish2.close();
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testAtMostOnce() {
IntStream.of(numberOfTests).forEach(i -> test(0));
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testAtLeastOnce() {
IntStream.of(numberOfTests).forEach(i -> test(1));
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testExactlyOnce() {
IntStream.of(numberOfTests).forEach(i -> test(2));
}
private void test(int qos) {
try {
mqttConsumerBeforePublish.subscribe(topic, qos);
for (int i = 0; i < numberOfMessages; i++) {
final MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(true);
message.setPayload(RandomUtil.randomBytes(128));
mqttPublisher.publish(topic, message);
lastMessagePublished.set(message);
}
Wait.waitFor(() -> server.getAddressInfo(SimpleString.toSimpleString(topic)).getRoutedMessageCount() >= numberOfMessages, 5000, 100);
mqttConsumerAfterPublish.subscribe(topic, qos);
mqttConsumerAfterPublish2.subscribe(topic, qos);
Wait.waitFor(() -> lastMessageArrivedOnConsumerAfterPublish.get() != null, 5000, 100);
Wait.waitFor(() -> lastMessageArrivedOnConsumerAfterPublish2.get() != null, 5000, 100);
assertEquals(1, arrivedCountAferPublish.get());
assertArrayEquals(String.format(
"\nMessage arrived on consumer subscribed before the publish is different from the last published message!\nPublished: %s\nArrived : %s\n",
new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerBeforePublish.get().getPayload());
assertArrayEquals(String.format(
"\nMessage arrived on consumer subscribed after the publish is different from the last published message!\nPublished: %s\nArrived : %s\n",
new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish.get().getPayload());
assertArrayEquals(String.format(
"\nMessage arrived on consumer subscribed after the publish (2) is different from the last published message!\nPublished: %s\nArrived : %s\n",
new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish2.get().getPayload());
} catch (MqttException e) {
fail(e.getMessage());
}
}
}