ARTEMIS-2476: implemented MQTTRetainMessageManagerTest

This commit is contained in:
Assen Sharlandjiev 2019-09-16 15:02:04 +03:00 committed by Michael Pearce
parent e608c9af2c
commit 2ce8f01911
5 changed files with 527 additions and 0 deletions

View File

@ -95,6 +95,47 @@
<groupId>org.osgi</groupId>
<artifactId>osgi.cmpn</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-junit</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>0.15</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>bintray</id>
<url>https://jcenter.bintray.com</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>

View File

@ -0,0 +1,265 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import static java.util.Objects.nonNull;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.FIVE_HUNDRED_MILLISECONDS;
import static org.awaitility.Durations.TEN_SECONDS;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.junit.EmbeddedJMSResource;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import com.github.javafaker.ChuckNorris;
import com.github.javafaker.Faker;
@SuppressWarnings("deprecation")
public class MQTTRetainMessageManagerTest {
private Logger log = Logger.getLogger(MQTTRetainMessageManagerTest.class);
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-server-mqtt.xml");
private MqttClientService mqttPublisher;
private MqttClientService mqttConsumerCount;
private MqttClientService mqttConsumerBeforePublish;
private MqttClientService mqttConsumerAfterPublish;
private MqttClientService mqttConsumerAfterPublish2;
private final AtomicInteger publishCount = new AtomicInteger(0);
private final AtomicInteger arrivedCountBeforePublish = new AtomicInteger();
private final AtomicInteger arrivedCountAferPublish = new AtomicInteger();
private final AtomicInteger arrivedCountAferPublish2 = new AtomicInteger();
private final AtomicReference<MqttMessage> lastMessagePublished = new AtomicReference<>();
private final AtomicReference<MqttMessage> lastMessageArrivedOnConsumerBeforePublish = new AtomicReference<>();
private final AtomicReference<MqttMessage> lastMessageArrivedOnConsumerAfterPublish = new AtomicReference<>();
private final AtomicReference<MqttMessage> lastMessageArrivedOnConsumerAfterPublish2 = new AtomicReference<>();
private final String topic = "fact";
private final ChuckNorris chuckNorris = (new Faker()).chuckNorris();
private final int numberOfMessages = 1000;
private final int numberOfTests = 10;
@Rule
public RuleChain rulechain = RuleChain.outerRule(jmsServer);
@Before
public void beforeEach() throws MqttException {
publishCount.set(0);
mqttPublisher = new MqttClientService("publisher", null);
mqttPublisher.init();
final MqttMessage clearRetainedMessage = new MqttMessage(new byte[] {});
clearRetainedMessage.setRetained(true);
clearRetainedMessage.setQos(1);
mqttPublisher.publish(topic, clearRetainedMessage);
mqttConsumerCount = new MqttClientService("consumer-count", null);
mqttConsumerCount.init();
mqttConsumerCount.setMessageConsumer(message -> publishCount.incrementAndGet());
arrivedCountBeforePublish.set(0);
mqttConsumerBeforePublish = new MqttClientService("consumer-before",
message -> {
final String payload = new String(message.getPayload());
lastMessageArrivedOnConsumerBeforePublish.set(message);
arrivedCountBeforePublish.incrementAndGet();
log.debugf("[MQTT][before ][retained: %s][duplicate: %s][qos: %s] %s",
message.isRetained(), message.isDuplicate(), message.getQos(), payload);
});
mqttConsumerBeforePublish.init();
arrivedCountAferPublish.set(0);
arrivedCountAferPublish2.set(0);
mqttConsumerAfterPublish = new MqttClientService("consumer-after",
message -> {
final String payload = new String(message.getPayload());
lastMessageArrivedOnConsumerAfterPublish.set(message);
arrivedCountAferPublish.incrementAndGet();
log.infof("[MQTT][after ][retained: %s][duplicate: %s][qos: %s] %s",
message.isRetained(), message.isDuplicate(), message.getQos(), payload);
});
mqttConsumerAfterPublish2 = new MqttClientService("consumer-after2",
message -> {
final String payload = new String(message.getPayload());
lastMessageArrivedOnConsumerAfterPublish2.set(message);
arrivedCountAferPublish2.incrementAndGet();
log.infof("[MQTT][after2 ][retained: %s][duplicate: %s][qos: %s] %s",
message.isRetained(), message.isDuplicate(), message.getQos(), payload);
});
mqttConsumerAfterPublish.init();
mqttConsumerAfterPublish2.init();
}
@After
public void afterEach() throws MqttException {
mqttPublisher.destroy();
mqttConsumerCount.unsubsribe(topic);
mqttConsumerCount.destroy();
mqttConsumerBeforePublish.unsubsribe(topic);
mqttConsumerBeforePublish.destroy();
mqttConsumerAfterPublish.unsubsribe(topic);
mqttConsumerAfterPublish.destroy();
mqttConsumerAfterPublish2.unsubsribe(topic);
mqttConsumerAfterPublish2.destroy();
}
@Test
public void testAtMostOnce() throws MqttException {
IntStream.of(numberOfTests).forEach(i -> actAndAssert(i, 0));
}
@Test
public void testAtLeastOnce() throws MqttException {
IntStream.of(numberOfTests).forEach(i -> actAndAssert(i, 1));
}
@Test
public void testExactlyOnce() throws MqttException {
IntStream.of(numberOfTests).forEach(i -> actAndAssert(i, 2));
}
private void actAndAssert(int i, int qos) {
try {
// Act
mqttConsumerBeforePublish.subscribe(topic, qos);
publish(qos);
logAftePublish(i, qos);
logRetainedMessagesQueue();
mqttConsumerAfterPublish.subscribe(topic, qos);
mqttConsumerAfterPublish2.subscribe(topic, qos);
awaitUntilLastMessageArrivedOnConsumerAfterPublish();
awaitUntilLastMessageArrivedOnConsumerAfterPublish2();
// Assert
assertEquals(1, arrivedCountAferPublish.get());
assertLastMessageOnConsumerBeforePublishArrivedEqualsLastMessagePublished();
assertLastMessageOnConsumerAfterPublishArrivedEqualsLastMessagePublished();
} catch (MqttException e) {
fail(e.getMessage());
}
}
protected void publish(final int qos) throws MqttException {
mqttConsumerCount.subscribe(topic, qos);
IntStream.range(0, numberOfMessages).forEach(i -> {
final String fact = String.format("[%s] %s", i, chuckNorris.fact());
final MqttMessage message = message(fact, qos, true);
mqttPublisher.publish(topic, message);
lastMessagePublished.set(message);
});
awaitUntilPiblishCount();
}
protected MqttMessage message(final String payload, final int qos, final boolean retained) {
final MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(payload.getBytes());
return message;
}
private void awaitUntilPiblishCount() {
await()
.with()
.pollDelay(FIVE_HUNDRED_MILLISECONDS)
.atMost(TEN_SECONDS)
.until(() -> publishCount.get() >= numberOfMessages);
}
private void awaitUntilLastMessageArrivedOnConsumerAfterPublish() {
await()
.pollDelay(FIVE_HUNDRED_MILLISECONDS)
.atMost(TEN_SECONDS)
.until(() -> nonNull(lastMessageArrivedOnConsumerAfterPublish.get()));
}
private void awaitUntilLastMessageArrivedOnConsumerAfterPublish2() {
await()
.pollDelay(FIVE_HUNDRED_MILLISECONDS)
.atMost(TEN_SECONDS)
.until(() -> nonNull(lastMessageArrivedOnConsumerAfterPublish2.get()));
}
private void assertLastMessageOnConsumerBeforePublishArrivedEqualsLastMessagePublished() {
assertArrayEquals(String.format(
"\nMessage arrived on consumer subscribed before the publish is different from the last published message!\nPublished: %s\nArrived : %s\n",
new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerBeforePublish.get().getPayload());
}
private void assertLastMessageOnConsumerAfterPublishArrivedEqualsLastMessagePublished() {
assertArrayEquals(String.format(
"\nMessage arrived on consumer subscribed after the publish is different from the last published message!\nPublished: %s\nArrived : %s\n",
new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish.get().getPayload());
assertArrayEquals(String.format(
"\nMessage arrived on consumer subscribed after the publish (2) is different from the last published message!\nPublished: %s\nArrived : %s\n",
new String(lastMessagePublished.get().getPayload()), new String(lastMessageArrivedOnConsumerAfterPublish.get().getPayload())),
lastMessagePublished.get().getPayload(), lastMessageArrivedOnConsumerAfterPublish2.get().getPayload());
}
private void logAftePublish(int i, int qos) {
log.infof("--- QoS: %s --- %s/%s---", qos, i, numberOfTests);
log.infof("[MQTT][publish][retained: %s][duplicate: %s][qos: %s] %s",
lastMessagePublished.get().isRetained(), lastMessagePublished.get().isDuplicate(), lastMessagePublished.get().getQos(), lastMessagePublished.get());
log.infof("[MQTT][before ][retained: %s][duplicate: %s][qos: %s] %s",
lastMessageArrivedOnConsumerBeforePublish.get().isRetained(),
lastMessageArrivedOnConsumerBeforePublish.get().isDuplicate(),
lastMessageArrivedOnConsumerBeforePublish.get().getQos(),
new String(lastMessageArrivedOnConsumerBeforePublish.get().getPayload()));
}
private void logRetainedMessagesQueue() {
final WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
final String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(topic, wildcardConfiguration);
final Queue queue = jmsServer.getDestinationQueue(retainAddress);
final LinkedListIterator<MessageReference> browserIterator = queue.browserIterator();
browserIterator.forEachRemaining(messageReference -> {
final Message message = messageReference.getMessage();
final String body = message.getBuffer().toString(StandardCharsets.UTF_8);
log.infof("[MQTT][%s][%s][%s]", retainAddress, message, body);
});
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import static java.util.Objects.nonNull;
import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.jboss.logging.Logger;
import io.netty.util.concurrent.DefaultThreadFactory;
public class MqttClientService implements MqttCallback {
private final String clientId;
private Consumer<MqttMessage> messageConsumer;
private MqttClient mqttClient;
private MemoryPersistence persistence = new MemoryPersistence();
private ScheduledExecutorService executorService;
private int corePoolSize = 5;
private Logger log = Logger.getLogger(MqttClientService.class);
public MqttClientService() {
this("producer", null);
}
public MqttClientService(final String clientId, Consumer<MqttMessage> messageConsumer) {
this.clientId = clientId;
this.messageConsumer = messageConsumer;
}
@PostConstruct
public void init() throws MqttException {
final String serverURI = "tcp://localhost:1883";
final MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(false);
options.setMaxInflight(10);
options.setServerURIs(new String[] {serverURI});
options.setMqttVersion(MQTT_VERSION_3_1_1);
final ThreadFactory threadFactory = new DefaultThreadFactory("mqtt-client-exec");
executorService = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
mqttClient = new MqttClient(serverURI, clientId, persistence, executorService);
mqttClient.setTimeToWait(-1);
mqttClient.connect(options);
mqttClient.setCallback(this);
log.debugf("[MQTT][Connected][client: %s]", clientId);
}
@PreDestroy
public void destroy() throws MqttException {
mqttClient.disconnect();
executorService.shutdownNow();
log.debugf("[MQTT][Disconnected][client: %s]", clientId);
}
@Override
public void connectionLost(Throwable cause) {
log.errorf("[MQTT][connectionLost][%s]", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
log.debugf("[MQTT][messageArrived][client: %s][topic: %s][message: %s]", clientId, topic, message);
if (nonNull(messageConsumer)) {
messageConsumer.accept(message);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.tracef("[MQTT][deliveryComplete][token: %s]", token);
}
public void publish(final String topic, final MqttMessage message) {
try {
mqttClient.publish(topic, message);
} catch (final MqttException e) {
log.error(e.getMessage(), e);
}
}
public void subscribe(final String topicFilter, int qos) throws MqttException {
mqttClient.subscribe(topicFilter, qos);
}
public void unsubsribe(final String topicFilter) throws MqttException {
mqttClient.unsubscribe(topicFilter);
}
public void setMessageConsumer(Consumer<MqttMessage> messageConsumer) {
this.messageConsumer = messageConsumer;
}
}

View File

@ -0,0 +1,34 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
urn:activemq /schema/artemis-server.xsd
urn:activemq:core /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- Acceptors -->
<acceptors>
<!-- In VM acceptor -->
<acceptor name="in-vm">vm://0</acceptor>
<acceptor name="mqtt">tcp://127.0.0.1:1883?protocols=MQTT&amp;useEpoll=false&amp;useKQueue=false</acceptor>
</acceptors>
</core>
</configuration>

View File

@ -0,0 +1,61 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Additional logger names to configure (root logger is always configured)
# Root logger option
loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests
# Root logger level
logger.level=INFO
# ActiveMQ Artemis logger levels
logger.org.apache.activemq.artemis.core.server.level=INFO
logger.org.apache.activemq.artemis.journal.level=INFO
logger.org.apache.activemq.artemis.utils.level=INFO
logger.org.apache.activemq.artemis.jms.level=INFO
logger.org.apache.activemq.artemis.ra.level=INFO
logger.org.apache.activemq.artemis.tests.unit.level=INFO
logger.org.apache.activemq.artemis.tests.integration.level=DEBUG
logger.org.apache.activemq.artemis.jms.tests.level=INFO
# Root logger handlers
logger.handlers=CONSOLE,TEST
#logger.handlers=CONSOLE,FILE
# Console handler configuration
handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
handler.CONSOLE.properties=autoFlush
handler.CONSOLE.level=FINE
handler.CONSOLE.autoFlush=true
handler.CONSOLE.formatter=PATTERN
# File handler configuration
handler.FILE=org.jboss.logmanager.handlers.FileHandler
handler.FILE.level=FINE
handler.FILE.properties=autoFlush,fileName
handler.FILE.autoFlush=true
handler.FILE.fileName=target/activemq.log
handler.FILE.formatter=PATTERN
# Console handler configuration
handler.TEST=org.apache.activemq.artemis.logs.AssertionLoggerHandler
handler.TEST.level=TRACE
handler.TEST.formatter=PATTERN
# Formatter pattern configuration
formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
formatter.PATTERN.properties=pattern
formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n