NO-JIRA clarify & verify web socket support for MQTT
This commit is contained in:
parent
ead80eae5c
commit
8c259116a8
|
@ -47,8 +47,6 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
|||
|
||||
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
|
||||
|
||||
private static final String WEBSOCKET_PATH = "/stomp";
|
||||
|
||||
private HttpRequest httpRequest;
|
||||
private WebSocketServerHandshaker handshaker;
|
||||
private List<String> supportedProtocols;
|
||||
|
@ -142,7 +140,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
|
|||
}
|
||||
|
||||
private String getWebSocketLocation(HttpRequest req) {
|
||||
return "ws://" + req.headers().get(HttpHeaderNames.HOST) + WEBSOCKET_PATH;
|
||||
return "ws://" + req.headers().get(HttpHeaderNames.HOST);
|
||||
}
|
||||
|
||||
public HttpRequest getHttpRequest() {
|
||||
|
|
|
@ -62,8 +62,8 @@ public interface ProtocolManager<P extends BaseInterceptor> {
|
|||
void handshake(NettyServerConnection connection, ActiveMQBuffer buffer);
|
||||
|
||||
/**
|
||||
* A list of the IANA websocket subprotocol identifiers supported by this protocol manager. These are used
|
||||
* during the websocket subprotocol handshake.
|
||||
* A list of the IANA websocket subprotocol identifiers (https://www.iana.org/assignments/websocket/websocket.xhtml)
|
||||
* supported by this protocol manager. These are used during the websocket subprotocol handshake.
|
||||
*
|
||||
* @return A list of subprotocol ids
|
||||
*/
|
||||
|
|
|
@ -156,3 +156,19 @@ This contains a real example for configuring amqpIdleTimeout:
|
|||
```xml
|
||||
<acceptor name="amqp">tcp://0.0.0.0:5672?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300;directDeliver=false;batchDelay=10</acceptor>
|
||||
```
|
||||
|
||||
## Web Sockets
|
||||
|
||||
Apache ActiveMQ Artemis also supports AMQP over [Web
|
||||
Sockets](https://html.spec.whatwg.org/multipage/web-sockets.html). Modern web
|
||||
browsers which support Web Sockets can send and receive AMQP messages.
|
||||
|
||||
AMQP over Web Sockets is supported via a normal AMQP acceptor:
|
||||
|
||||
```xml
|
||||
<acceptor name="amqp-ws-acceptor">tcp://localhost:5672?protocols=AMQP</acceptor>
|
||||
```
|
||||
|
||||
With this configuration, Apache ActiveMQ Artemis will accept AMQP connections
|
||||
over Web Sockets on the port `5672`. Web browsers can then connect to
|
||||
`ws://<server>:5672` using a Web Socket to send and receive AMQP messages.
|
|
@ -108,9 +108,10 @@ We believe this caters for the vast majority of transport requirements.
|
|||
|
||||
Apache ActiveMQ Artemis supports using a single port for all protocols, Apache
|
||||
ActiveMQ Artemis will automatically detect which protocol is being used CORE,
|
||||
AMQP, STOMP or OPENWIRE and use the appropriate Apache ActiveMQ Artemis
|
||||
AMQP, STOMP, MQTT or OPENWIRE and use the appropriate Apache ActiveMQ Artemis
|
||||
handler. It will also detect whether protocols such as HTTP or Web Sockets are
|
||||
being used and also use the appropriate decoders
|
||||
being used and also use the appropriate decoders. Web Sockets are supported for
|
||||
AMQP, STOMP, and MQTT.
|
||||
|
||||
It is possible to limit which protocols are supported by using the `protocols`
|
||||
parameter on the Acceptor like so:
|
||||
|
|
|
@ -135,3 +135,18 @@ There are 2 types of wild cards in MQTT:
|
|||
Matches a single level in the address hierarchy. For example `/uk/+/stores`
|
||||
would match `/uk/newcastle/stores` but not `/uk/cities/newcastle/stores`.
|
||||
|
||||
## Web Sockets
|
||||
|
||||
Apache ActiveMQ Artemis also supports MQTT over [Web
|
||||
Sockets](https://html.spec.whatwg.org/multipage/web-sockets.html). Modern web
|
||||
browsers which support Web Sockets can send and receive MQTT messages.
|
||||
|
||||
MQTT over Web Sockets is supported via a normal MQTT acceptor:
|
||||
|
||||
```xml
|
||||
<acceptor name="mqtt-ws-acceptor">tcp://localhost:1883?protocols=MQTT</acceptor>
|
||||
```
|
||||
|
||||
With this configuration, Apache ActiveMQ Artemis will accept MQTT connections
|
||||
over Web Sockets on the port `1883`. Web browsers can then connect to
|
||||
`ws://<server>:1883` using a Web Socket to send and receive MQTT messages.
|
||||
|
|
|
@ -285,7 +285,7 @@ the same as the default value of
|
|||
|
||||
## Web Sockets
|
||||
|
||||
Apache ActiveMQ Artemis also support STOMP over [Web
|
||||
Apache ActiveMQ Artemis also supports STOMP over [Web
|
||||
Sockets](https://html.spec.whatwg.org/multipage/web-sockets.html). Modern web
|
||||
browsers which support Web Sockets can send and receive STOMP messages.
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
|
||||
<properties>
|
||||
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
|
||||
<paho.client.mqttv3.version>1.1.0</paho.client.mqttv3.version>
|
||||
<paho.client.mqttv3.version>1.2.2</paho.client.mqttv3.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -56,7 +56,6 @@ public class AmqpFlowControlFailTest {
|
|||
@Parameterized.Parameter(2)
|
||||
public String expectedMessage;
|
||||
|
||||
|
||||
@Parameterized.Parameters(name = "useModified={0}")
|
||||
public static Collection<Object[]> parameters() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -30,11 +32,25 @@ import org.eclipse.paho.client.mqttv3.MqttException;
|
|||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class PahoMQTTTest extends MQTTTestSupport {
|
||||
|
||||
private static MQTTLogger LOG = MQTTLogger.LOGGER;
|
||||
|
||||
@Parameterized.Parameters(name = "protocol={0}")
|
||||
public static Collection<Object[]> getParams() {
|
||||
return Arrays.asList(new Object[][] {{"tcp"}, {"ws"}});
|
||||
}
|
||||
|
||||
public String protocol;
|
||||
|
||||
public PahoMQTTTest(String protocol) {
|
||||
this.protocol = protocol;
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testLotsOfClients() throws Exception {
|
||||
|
||||
|
@ -146,7 +162,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
|
|||
}
|
||||
|
||||
private MqttClient createPahoClient(String clientId) throws MqttException {
|
||||
return new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence());
|
||||
return new MqttClient(protocol + "://localhost:" + getPort(), clientId, new MemoryPersistence());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue