ARTEMIS-4548 refactor MQTT tests

Many MQTT tests are run twice - once using TCP and once using
WebSockets. This is essentially a big waste of time since once the
connection is established to the broker the tests are identical. The
tests should be refactored to run just once and then there can be a
small number of tests specifically for WebSockets.

This should knock several minutes off the test-suite.
This commit is contained in:
Justin Bertram 2023-12-22 13:33:55 -06:00 committed by Robbie Gemmell
parent 5269b1a89d
commit 99d43dab01
38 changed files with 135 additions and 188 deletions

View File

@ -61,10 +61,6 @@ public class MQTT5Test extends MQTT5TestSupport {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public MQTT5Test(String protocol) {
super(protocol);
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testSimpleSendReceive() throws Exception {
String topic = RandomUtil.randomString();

View File

@ -21,8 +21,6 @@ import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.security.ProtectionDomain;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -71,15 +69,12 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonList;
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME;
@RunWith(Parameterized.class)
public class MQTT5TestSupport extends ActiveMQTestBase {
protected static final String TCP = "tcp";
protected static final String WS = "ws";
@ -88,34 +83,28 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
protected static final SimpleString DEAD_LETTER_ADDRESS = new SimpleString("DLA");
protected static final SimpleString EXPIRY_ADDRESS = new SimpleString("EXPIRY");
@Parameterized.Parameters(name = "protocol={0}")
public static Collection<Object[]> getParams() {
return Arrays.asList(new Object[][] {
{TCP},
{WS}
});
}
protected String protocol;
public MQTT5TestSupport(String protocol) {
this.protocol = protocol;
}
protected MqttClient createPahoClient(String clientId) throws MqttException {
return new MqttClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new MemoryPersistence());
return createPahoClient(TCP, clientId);
}
protected MqttClient createPahoClient(String protocol, String clientId) throws MqttException {
return createPahoClient(protocol, clientId, (isUseSsl() ? getSslPort() : getPort()));
}
protected MqttClient createPahoClient(String clientId, int port) throws MqttException {
return createPahoClient(TCP, clientId, port);
}
protected MqttClient createPahoClient(String protocol, String clientId, int port) throws MqttException {
return new MqttClient(protocol + "://localhost:" + port, clientId, new MemoryPersistence());
}
protected org.eclipse.paho.client.mqttv3.MqttClient createPaho3_1_1Client(String clientId) throws org.eclipse.paho.client.mqttv3.MqttException {
return new org.eclipse.paho.client.mqttv3.MqttClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence());
return new org.eclipse.paho.client.mqttv3.MqttClient(TCP + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence());
}
protected MqttAsyncClient createAsyncPahoClient(String clientId) throws MqttException {
return new MqttAsyncClient(protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new MemoryPersistence());
return new MqttAsyncClient(TCP + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new MemoryPersistence());
}
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

View File

@ -50,10 +50,6 @@ public class MQTTRetainMessageManagerTest extends MQTT5TestSupport {
private final int numberOfMessages = 1000;
private final int numberOfTests = 10;
public MQTTRetainMessageManagerTest(String protocol) {
super(protocol);
}
@Before
public void beforeEach() throws MqttException {
mqttPublisher = createPahoClient("publisher");

View File

@ -49,10 +49,6 @@ import org.junit.Test;
public class ControlPacketFormatTests extends MQTT5TestSupport {
public ControlPacketFormatTests(String protocol) {
super(protocol);
}
/*
* [MQTT-2.2.1-2] A PUBLISH packet MUST NOT contain a Packet Identifier if its QoS value is set to 0.
*/

View File

@ -31,8 +31,4 @@ import org.junit.Ignore;
@Ignore
public class DataFormatTests extends MQTT5TestSupport {
public DataFormatTests(String protocol) {
super(protocol);
}
}

View File

@ -43,10 +43,6 @@ import org.junit.Test;
public class EnhancedAuthenticationTests extends MQTT5TestSupport {
public EnhancedAuthenticationTests(String protocol) {
super(protocol);
}
/*
* [MQTT-4.12.0-1] If the Server does not support the Authentication Method supplied by the Client, it MAY send a
* CONNACK with a Reason Code of 0x8C (Bad authentication method) or 0x87 (Not Authorized) as described in section

View File

@ -33,8 +33,4 @@ import org.junit.Ignore;
@Ignore
public class FlowControlTests extends MQTT5TestSupport {
public FlowControlTests(String protocol) {
super(protocol);
}
}

View File

@ -22,7 +22,6 @@ import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.junit.Assume;
import org.junit.Test;
/**
@ -33,10 +32,6 @@ import org.junit.Test;
public class HandlingErrorTests extends MQTT5TestSupport {
public HandlingErrorTests(String protocol) {
this.protocol = protocol;
}
/*
* [MQTT-4.13.2-1] The CONNACK and DISCONNECT packets allow a Reason Code of 0x80 or greater to indicate that the
* Network Connection will be closed. If a Reason Code of 0x80 or greater is specified, then the Network Connection
@ -46,9 +41,6 @@ public class HandlingErrorTests extends MQTT5TestSupport {
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testEmptyClientIDWithoutCleanStart() throws Exception {
// This is apparently broken with the Paho client + web socket. The broker never even receives a CONNECT packet.
Assume.assumeTrue(protocol.equals(TCP));
MqttClient client = createPahoClient("");
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
.cleanStart(false)

View File

@ -29,10 +29,6 @@ import org.junit.Test;
public class MessageDeliveryRetryTests extends MQTT5TestSupport {
public MessageDeliveryRetryTests(String protocol) {
super(protocol);
}
/*
* [MQTT-4.4.0-1] When a Client reconnects with Clean Start set to 0 and a session is present, both the Client and
* Server MUST resend any unacknowledged PUBLISH packets (where QoS > 0) and PUBREL packets using their original

View File

@ -36,8 +36,4 @@ import org.junit.Ignore;
@Ignore
public class MessageOrderingTests extends MQTT5TestSupport {
public MessageOrderingTests(String protocol) {
super(protocol);
}
}

View File

@ -34,10 +34,6 @@ import org.junit.Test;
public class MessageReceiptTests extends MQTT5TestSupport {
public MessageReceiptTests(String protocol) {
super(protocol);
}
/*
* [MQTT-4.5.0-1] When a Server takes ownership of an incoming Application Message it MUST add it to the Session
* State for those Clients that have matching Subscriptions.

View File

@ -27,8 +27,4 @@ import org.junit.Ignore;
@Ignore
public class NetworkConnectionTests extends MQTT5TestSupport {
public NetworkConnectionTests(String protocol) {
super(protocol);
}
}

View File

@ -60,10 +60,6 @@ import org.junit.Test;
public class QoSTests extends MQTT5TestSupport {
public QoSTests(String protocol) {
super(protocol);
}
/*
* [MQTT-4.3.2-2] In the QoS 1 delivery protocol, the sender MUST send a PUBLISH packet containing this Packet
* Identifier with QoS 1 and DUP flag set to 0.

View File

@ -27,8 +27,4 @@ import org.junit.Ignore;
@Ignore
public class SessionStateTests extends MQTT5TestSupport {
public SessionStateTests(String protocol) {
super(protocol);
}
}

View File

@ -43,10 +43,6 @@ import org.junit.Test;
public class SubscriptionTests extends MQTT5TestSupport {
public SubscriptionTests(String protocol) {
super(protocol);
}
/*
* [MQTT-4.8.2-3] The Server MUST respect the granted QoS for the Client's subscription.
*/

View File

@ -49,10 +49,6 @@ import org.junit.Test;
public class TopicNameAndFilterTests extends MQTT5TestSupport {
public TopicNameAndFilterTests(String protocol) {
super(protocol);
}
/*
* [MQTT-4.7.2-1] The Server MUST NOT match Topic Filters starting with a wildcard character (# or +) with Topic
* Names beginning with a $ character.

View File

@ -39,8 +39,4 @@ import org.junit.Ignore;
@Ignore
public class WebSocketTests extends MQTT5TestSupport {
public WebSocketTests(String protocol) {
super(protocol);
}
}

View File

@ -39,8 +39,4 @@ import org.junit.Ignore;
@Ignore
public class AuthTests extends MQTT5TestSupport {
public AuthTests(String protocol) {
this.protocol = protocol;
}
}

View File

@ -41,7 +41,6 @@ import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;
@ -82,10 +81,6 @@ import org.junit.Test;
public class ConnAckTests extends MQTT5TestSupport {
public ConnAckTests(String protocol) {
this.protocol = protocol;
}
/*
* [MQTT-3.1.3-6] A Server MAY allow a Client to supply a ClientID that has a length of zero bytes, however if it
* does so the Server MUST treat this as a special case and assign a unique ClientID to that Client.
@ -99,8 +94,6 @@ public class ConnAckTests extends MQTT5TestSupport {
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testEmptyClientID() throws Exception {
// This is apparently broken with the Paho client + web socket. The broker never even receives a CONNECT packet.
Assume.assumeTrue(protocol.equals(TCP));
// no session should exist
assertEquals(0, getSessionStates().size());
@ -268,8 +261,6 @@ public class ConnAckTests extends MQTT5TestSupport {
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testSessionPresentWithNonZeroConnackReasonCode() throws Exception {
// This is apparently broken with the Paho client + web socket. The broker never even receives a CONNECT packet.
Assume.assumeTrue(protocol.equals(TCP));
CountDownLatch latch = new CountDownLatch(1);
MQTTInterceptor outgoingInterceptor = (packet, connection) -> {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@ -42,11 +43,9 @@ import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
/**
* Fulfilled by client or Netty codec (i.e. not tested here):
@ -104,10 +103,6 @@ public class ConnectTests extends MQTT5TestSupport {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public ConnectTests(String protocol) {
super(protocol);
}
/*
* [MQTT-3.1.2-7] If the Will Flag is set to 1 this indicates that, a Will Message MUST be stored on the Server and
* associated with the Session.
@ -632,9 +627,6 @@ public class ConnectTests extends MQTT5TestSupport {
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testEmptyClientID() throws Exception {
// This is apparently broken with the Paho client + web socket. The broker never even receives a CONNECT packet.
Assume.assumeTrue(protocol.equals(TCP));
// no session should exist
assertEquals(0, getSessionStates().size());
@ -658,9 +650,6 @@ public class ConnectTests extends MQTT5TestSupport {
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testEmptyClientIDWithoutCleanStart() throws Exception {
// This is apparently broken with the Paho client + web socket. The broker never even receives a CONNECT packet.
Assume.assumeTrue(protocol.equals(TCP));
// no session should exist
assertEquals(0, getSessionStates().size());

View File

@ -29,10 +29,6 @@ import org.junit.Test;
public class ConnectTestsWithSecurity extends MQTT5TestSupport {
public ConnectTestsWithSecurity(String protocol) {
super(protocol);
}
@Override
public boolean isSecurityEnabled() {
return true;

View File

@ -53,10 +53,6 @@ import org.junit.Test;
public class DisconnectTests extends MQTT5TestSupport {
public DisconnectTests(String protocol) {
super(protocol);
}
/*
* [MQTT-3.14.2-1] The Client or Server sending the DISCONNECT packet MUST use one of the DISCONNECT Reason Codes.
*

View File

@ -29,10 +29,6 @@ import org.junit.Test;
public class PingReqTests extends MQTT5TestSupport {
public PingReqTests(String protocol) {
super(protocol);
}
/*
* [MQTT-3.12.4-1] The Server MUST send a PINGRESP packet in response to a PINGREQ packet.
*/

View File

@ -26,8 +26,4 @@ import org.junit.Ignore;
@Ignore
public class PingRespTests extends MQTT5TestSupport {
public PingRespTests(String protocol) {
super(protocol);
}
}

View File

@ -37,10 +37,6 @@ import org.junit.Test;
public class PubAckTests extends MQTT5TestSupport {
public PubAckTests(String protocol) {
super(protocol);
}
/*
* [MQTT-3.4.2-1] The Client or Server sending the PUBACK packet MUST use one of the PUBACK Reason Codes.
*/

View File

@ -37,10 +37,6 @@ import org.junit.Test;
public class PubCompTests extends MQTT5TestSupport {
public PubCompTests(String protocol) {
super(protocol);
}
/*
* [MQTT-3.7.2-1] The Client or Server sending the PUBCOMP packets MUST use one of the PUBCOMP Reason Codes.
*/

View File

@ -37,10 +37,6 @@ import org.junit.Test;
public class PubRecTests extends MQTT5TestSupport {
public PubRecTests(String protocol) {
super(protocol);
}
/*
* [MQTT-3.5.2-1] The Client or Server sending the PUBREC packet MUST use one of the PUBREC Reason Codes.
*/

View File

@ -42,10 +42,6 @@ import org.junit.Test;
public class PubRelTests extends MQTT5TestSupport {
public PubRelTests(String protocol) {
super(protocol);
}
/*
* [MQTT-3.6.2-1] The Client or Server sending the PUBREL packet MUST use one of the PUBREL Reason Codes.
*/

View File

@ -79,10 +79,6 @@ public class PublishTests extends MQTT5TestSupport {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public PublishTests(String protocol) {
super(protocol);
}
/*
* [MQTT-3.3.1-1] The DUP flag MUST be set to 1 by the Client or Server when it attempts to re-deliver a PUBLISH
* packet.

View File

@ -37,10 +37,6 @@ import org.junit.Test;
public class PublishTestsWithSecurity extends MQTT5TestSupport {
public PublishTestsWithSecurity(String protocol) {
super(protocol);
}
@Override
public boolean isSecurityEnabled() {
return true;

View File

@ -34,10 +34,6 @@ import org.junit.Test;
public class SubAckTests extends MQTT5TestSupport {
public SubAckTests(String protocol) {
super(protocol);
}
/*
* [MQTT-3.9.3-1] The order of Reason Codes in the SUBACK packet MUST match the order of Topic Filters in the
* SUBSCRIBE packet.

View File

@ -50,10 +50,6 @@ import org.junit.Test;
public class SubscribeTests extends MQTT5TestSupport {
public SubscribeTests(String protocol) {
super(protocol);
}
/*
* [MQTT-3.8.3-3] Bit 2 of the Subscription Options represents the No Local option. If the value is 1, Application
* Messages MUST NOT be forwarded to a connection with a ClientID equal to the ClientID of the publishing connection.

View File

@ -31,10 +31,6 @@ import org.junit.Test;
public class SubscribeTestsWithSecurity extends MQTT5TestSupport {
public SubscribeTestsWithSecurity(String protocol) {
super(protocol);
}
@Override
public boolean isSecurityEnabled() {
return true;

View File

@ -35,10 +35,6 @@ import org.junit.Test;
public class UnsubAckTests extends MQTT5TestSupport {
public UnsubAckTests(String protocol) {
super(protocol);
}
/*
* [MQTT-3.11.3-1] The order of Reason Codes in the UNSUBACK packet MUST match the order of Topic Filters in the
* UNSUBSCRIBE packet.

View File

@ -48,10 +48,6 @@ import org.junit.Test;
public class UnsubscribeTests extends MQTT5TestSupport {
public UnsubscribeTests(String protocol) {
super(protocol);
}
/*
* [MQTT-3.10.4-1] The Topic Filters (whether they contain wildcards or not) supplied in an UNSUBSCRIBE packet MUST
* be compared character-by-character with the current set of Topic Filters held by the Server for the Client. If any

View File

@ -14,19 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt5;
package org.apache.activemq.artemis.tests.integration.mqtt5.ssl;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class BasicSslTests extends MQTT5TestSupport {
protected String protocol;
public BasicSslTests(String protocol) {
super(protocol);
this.protocol = protocol;
}
@Parameterized.Parameters(name = "protocol={0}")
@ -42,12 +51,26 @@ public class BasicSslTests extends MQTT5TestSupport {
return true;
}
/*
* Basic SSL test. Just connect.
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testSsl() throws Exception {
MqttClient client = createPahoClient("client");
client.connect(getSslMqttConnectOptions());
public void testSimpleSendReceive() throws Exception {
String topic = RandomUtil.randomString();
byte[] body = RandomUtil.randomBytes(32);
CountDownLatch latch = new CountDownLatch(1);
MqttClient subscriber = createPahoClient(protocol,"subscriber");
subscriber.connect(getSslMqttConnectOptions());
subscriber.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
assertEqualsByteArrays(body, message.getPayload());
latch.countDown();
}
});
subscriber.subscribe(topic, AT_LEAST_ONCE);
MqttClient producer = createPahoClient(protocol,"producer");
producer.connect(getSslMqttConnectOptions());
producer.publish(topic, body, 1, false);
assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
}
}

View File

@ -14,18 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt5;
package org.apache.activemq.artemis.tests.integration.mqtt5.ssl;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class CertificateAuthenticationSslTests extends MQTT5TestSupport {
static {
@ -39,8 +49,10 @@ public class CertificateAuthenticationSslTests extends MQTT5TestSupport {
}
}
protected String protocol;
public CertificateAuthenticationSslTests(String protocol) {
super(protocol);
this.protocol = protocol;
}
@Parameterized.Parameters(name = "protocol={0}")
@ -70,14 +82,37 @@ public class CertificateAuthenticationSslTests extends MQTT5TestSupport {
protected void configureBrokerSecurity(ActiveMQServer server) {
server.setSecurityManager(new ActiveMQJAASSecurityManager("CertLogin"));
server.getConfiguration().setSecurityEnabled(true);
HashSet<Role> roles = new HashSet<>();
roles.add(new Role("programmers", true, true, true, false, false, false, false, false, true, true));
server.getConfiguration().putSecurityRoles("#", roles);
}
/*
* Basic mutual SSL test with certificate-based authentication
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testMutualSsl() throws Exception {
MqttClient client = createPahoClient("client");
client.connect(getSslMqttConnectOptions());
public void testSimpleSendReceive() throws Exception {
String topic = RandomUtil.randomString();
byte[] body = RandomUtil.randomBytes(32);
CountDownLatch latch = new CountDownLatch(1);
MqttClient subscriber = createPahoClient(protocol,"subscriber");
subscriber.connect(getSslMqttConnectOptions());
subscriber.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
assertEqualsByteArrays(body, message.getPayload());
latch.countDown();
}
});
subscriber.subscribe(topic, AT_LEAST_ONCE);
Wait.assertTrue(() -> getSubscriptionQueue(topic) != null, 2000, 100);
Wait.assertEquals(1, () -> getSubscriptionQueue(topic).getConsumerCount(), 2000, 100);
MqttClient producer = createPahoClient(protocol,"producer");
producer.connect(getSslMqttConnectOptions());
producer.publish(topic, body, 1, false);
assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt5.websocket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.junit.Test;
public class BasicWebSocketTests extends MQTT5TestSupport {
@Test(timeout = DEFAULT_TIMEOUT)
public void testSimpleSendReceive() throws Exception {
String topic = RandomUtil.randomString();
byte[] body = RandomUtil.randomBytes(32);
CountDownLatch latch = new CountDownLatch(1);
MqttClient subscriber = createPahoClient(WS,"subscriber");
subscriber.connect();
subscriber.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
assertEqualsByteArrays(body, message.getPayload());
latch.countDown();
}
});
subscriber.subscribe(topic, AT_LEAST_ONCE);
MqttClient producer = createPahoClient(WS,"producer");
producer.connect();
producer.publish(topic, body, 1, false);
assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
}
}