From 8c259116a8f825f5b21176445a04718a6afe98fb Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Thu, 6 Feb 2020 11:30:40 -0600 Subject: [PATCH] NO-JIRA clarify & verify web socket support for MQTT --- .../websocket/WebSocketServerHandler.java | 4 +--- .../spi/core/protocol/ProtocolManager.java | 4 ++-- docs/user-manual/en/amqp.md | 18 +++++++++++++++++- docs/user-manual/en/configuring-transports.md | 5 +++-- docs/user-manual/en/mqtt.md | 15 +++++++++++++++ docs/user-manual/en/stomp.md | 2 +- tests/integration-tests/pom.xml | 2 +- .../amqp/AmqpFlowControlFailTest.java | 1 - .../mqtt/imported/PahoMQTTTest.java | 18 +++++++++++++++++- 9 files changed, 57 insertions(+), 12 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java index 83a8d0e431..f2b003cabd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java @@ -47,8 +47,6 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; public class WebSocketServerHandler extends SimpleChannelInboundHandler { - private static final String WEBSOCKET_PATH = "/stomp"; - private HttpRequest httpRequest; private WebSocketServerHandshaker handshaker; private List supportedProtocols; @@ -142,7 +140,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler } private String getWebSocketLocation(HttpRequest req) { - return "ws://" + req.headers().get(HttpHeaderNames.HOST) + WEBSOCKET_PATH; + return "ws://" + req.headers().get(HttpHeaderNames.HOST); } public HttpRequest getHttpRequest() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java index 1770a5cbf5..dc6ba99b62 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java @@ -62,8 +62,8 @@ public interface ProtocolManager

{ void handshake(NettyServerConnection connection, ActiveMQBuffer buffer); /** - * A list of the IANA websocket subprotocol identifiers supported by this protocol manager. These are used - * during the websocket subprotocol handshake. + * A list of the IANA websocket subprotocol identifiers (https://www.iana.org/assignments/websocket/websocket.xhtml) + * supported by this protocol manager. These are used during the websocket subprotocol handshake. * * @return A list of subprotocol ids */ diff --git a/docs/user-manual/en/amqp.md b/docs/user-manual/en/amqp.md index 30cb2aa081..c4ca17ee4b 100644 --- a/docs/user-manual/en/amqp.md +++ b/docs/user-manual/en/amqp.md @@ -155,4 +155,20 @@ This contains a real example for configuring amqpIdleTimeout: ```xml tcp://0.0.0.0:5672?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300;directDeliver=false;batchDelay=10 -``` \ No newline at end of file +``` + +## Web Sockets + +Apache ActiveMQ Artemis also supports AMQP over [Web +Sockets](https://html.spec.whatwg.org/multipage/web-sockets.html). Modern web +browsers which support Web Sockets can send and receive AMQP messages. + +AMQP over Web Sockets is supported via a normal AMQP acceptor: + +```xml +tcp://localhost:5672?protocols=AMQP +``` + +With this configuration, Apache ActiveMQ Artemis will accept AMQP connections +over Web Sockets on the port `5672`. Web browsers can then connect to +`ws://:5672` using a Web Socket to send and receive AMQP messages. \ No newline at end of file diff --git a/docs/user-manual/en/configuring-transports.md b/docs/user-manual/en/configuring-transports.md index c941735f80..ce13166dce 100644 --- a/docs/user-manual/en/configuring-transports.md +++ b/docs/user-manual/en/configuring-transports.md @@ -108,9 +108,10 @@ We believe this caters for the vast majority of transport requirements. Apache ActiveMQ Artemis supports using a single port for all protocols, Apache ActiveMQ Artemis will automatically detect which protocol is being used CORE, -AMQP, STOMP or OPENWIRE and use the appropriate Apache ActiveMQ Artemis +AMQP, STOMP, MQTT or OPENWIRE and use the appropriate Apache ActiveMQ Artemis handler. It will also detect whether protocols such as HTTP or Web Sockets are -being used and also use the appropriate decoders +being used and also use the appropriate decoders. Web Sockets are supported for +AMQP, STOMP, and MQTT. It is possible to limit which protocols are supported by using the `protocols` parameter on the Acceptor like so: diff --git a/docs/user-manual/en/mqtt.md b/docs/user-manual/en/mqtt.md index 73b1de423f..fcad038359 100644 --- a/docs/user-manual/en/mqtt.md +++ b/docs/user-manual/en/mqtt.md @@ -135,3 +135,18 @@ There are 2 types of wild cards in MQTT: Matches a single level in the address hierarchy. For example `/uk/+/stores` would match `/uk/newcastle/stores` but not `/uk/cities/newcastle/stores`. +## Web Sockets + +Apache ActiveMQ Artemis also supports MQTT over [Web +Sockets](https://html.spec.whatwg.org/multipage/web-sockets.html). Modern web +browsers which support Web Sockets can send and receive MQTT messages. + +MQTT over Web Sockets is supported via a normal MQTT acceptor: + +```xml +tcp://localhost:1883?protocols=MQTT +``` + +With this configuration, Apache ActiveMQ Artemis will accept MQTT connections +over Web Sockets on the port `1883`. Web browsers can then connect to +`ws://:1883` using a Web Socket to send and receive MQTT messages. diff --git a/docs/user-manual/en/stomp.md b/docs/user-manual/en/stomp.md index 4451166098..79f413d6e3 100644 --- a/docs/user-manual/en/stomp.md +++ b/docs/user-manual/en/stomp.md @@ -285,7 +285,7 @@ the same as the default value of ## Web Sockets -Apache ActiveMQ Artemis also support STOMP over [Web +Apache ActiveMQ Artemis also supports STOMP over [Web Sockets](https://html.spec.whatwg.org/multipage/web-sockets.html). Modern web browsers which support Web Sockets can send and receive STOMP messages. diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index 37f4e668b3..4e7fdf7383 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -29,7 +29,7 @@ ${project.basedir}/../.. - 1.1.0 + 1.2.2 diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java index 42fb5f3081..a3f9c500e5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java @@ -56,7 +56,6 @@ public class AmqpFlowControlFailTest { @Parameterized.Parameter(2) public String expectedMessage; - @Parameterized.Parameters(name = "useModified={0}") public static Collection parameters() { return Arrays.asList(new Object[][] { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java index a5d39b392d..7ac99b90c2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -30,11 +32,25 @@ import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class PahoMQTTTest extends MQTTTestSupport { private static MQTTLogger LOG = MQTTLogger.LOGGER; + @Parameterized.Parameters(name = "protocol={0}") + public static Collection getParams() { + return Arrays.asList(new Object[][] {{"tcp"}, {"ws"}}); + } + + public String protocol; + + public PahoMQTTTest(String protocol) { + this.protocol = protocol; + } + @Test(timeout = 300000) public void testLotsOfClients() throws Exception { @@ -146,7 +162,7 @@ public class PahoMQTTTest extends MQTTTestSupport { } private MqttClient createPahoClient(String clientId) throws MqttException { - return new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence()); + return new MqttClient(protocol + "://localhost:" + getPort(), clientId, new MemoryPersistence()); } }