diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java index edd8dd0877..163fc6b826 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol.proton; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Executor; @@ -49,6 +50,8 @@ import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME */ public class ProtonProtocolManager implements ProtocolManager, NotificationListener { + private static final List websocketRegistryNames = Arrays.asList("amqp"); + private final ActiveMQServer server; private MessageConverter protonConverter; @@ -147,6 +150,11 @@ public class ProtonProtocolManager implements ProtocolManager, Noti public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { } + @Override + public List websocketSubprotocolIdentifiers() { + return websocketRegistryNames; + } + public String getPubSubPrefix() { return pubSubPrefix; } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index 1d38fcf2b7..17f7e33ec7 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; @@ -34,15 +38,14 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import java.util.ArrayList; -import java.util.List; - /** * MQTTProtocolManager */ class MQTTProtocolManager extends AbstractProtocolManager implements NotificationListener { + private static final List websocketRegistryNames = Arrays.asList("mqtt", "mqttv3.1"); + private ActiveMQServer server; private MQTTLogger log = MQTTLogger.LOGGER; @@ -138,6 +141,11 @@ class MQTTProtocolManager extends AbstractProtocolManager websocketSubprotocolIdentifiers() { + return websocketRegistryNames; + } + public void invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) { super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 50b047a288..d8dd639986 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol.openwire; import javax.jms.InvalidClientIDException; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -74,6 +75,8 @@ import org.apache.activemq.util.LongSequenceGenerator; public class OpenWireProtocolManager implements ProtocolManager, ClusterTopologyListener { + private static final List websocketRegistryNames = Collections.EMPTY_LIST; + private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); private static final IdGenerator ID_GENERATOR = new IdGenerator(); @@ -269,6 +272,11 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { } + @Override + public List websocketSubprotocolIdentifiers() { + return websocketRegistryNames; + } + public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception { String username = info.getUserName(); String password = info.getPassword(); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 9c92fd1fe6..5de63d3816 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp; import javax.security.cert.X509Certificate; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -56,9 +57,8 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto * StompProtocolManager */ public class StompProtocolManager extends AbstractProtocolManager { - // Constants ----------------------------------------------------- - // Attributes ---------------------------------------------------- + private static final List websocketRegistryNames = Arrays.asList("v10.stomp", "v11.stomp", "v12.stomp"); private final ActiveMQServer server; @@ -192,6 +192,11 @@ public class StompProtocolManager extends AbstractProtocolManager websocketSubprotocolIdentifiers() { + return websocketRegistryNames; + } + // Public -------------------------------------------------------- public boolean send(final StompConnection connection, final StompFrame frame) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index f4fef21376..b4d8de5562 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -45,7 +46,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; -import org.apache.activemq.artemis.core.server.protocol.stomp.WebSocketServerHandler; +import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.utils.ConfigurationHelper; @@ -64,6 +65,8 @@ public class ProtocolHandler { private HttpKeepAliveRunnable httpKeepAliveRunnable; + private final List websocketSubprotocolIds; + public ProtocolHandler(Map protocolMap, NettyAcceptor nettyAcceptor, final Map configuration, @@ -72,6 +75,13 @@ public class ProtocolHandler { this.nettyAcceptor = nettyAcceptor; this.configuration = configuration; this.scheduledThreadPool = scheduledThreadPool; + + websocketSubprotocolIds = new ArrayList<>(); + for (ProtocolManager pm : protocolMap.values()) { + if (pm.websocketSubprotocolIdentifiers() != null) { + websocketSubprotocolIds.addAll(pm.websocketSubprotocolIdentifiers()); + } + } } public ChannelHandler getProtocolDecoder() { @@ -106,7 +116,7 @@ public class ProtocolHandler { HttpHeaders headers = request.headers(); String upgrade = headers.get("upgrade"); if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) { - ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler()); + ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds)); ctx.pipeline().addLast(new ProtocolDecoder(false, false)); ctx.pipeline().remove(this); ctx.pipeline().remove("http-handler"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java index 410b0c09e3..69db679bbd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -63,6 +64,8 @@ public class CoreProtocolManager implements ProtocolManager { private static final Logger logger = Logger.getLogger(CoreProtocolManager.class); + private static final List websocketRegistryNames = Collections.EMPTY_LIST; + private final ActiveMQServer server; private final List incomingInterceptors; @@ -181,6 +184,11 @@ public class CoreProtocolManager implements ProtocolManager { } } + @Override + public List websocketSubprotocolIdentifiers() { + return websocketRegistryNames; + } + private boolean isArtemis(ActiveMQBuffer buffer) { return buffer.getByte(0) == 'A' && buffer.getByte(1) == 'R' && diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/stomp/WebSocketServerHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java similarity index 93% rename from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/stomp/WebSocketServerHandler.java rename to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java index e276047bb4..671cb76b73 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/stomp/WebSocketServerHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java @@ -14,9 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.core.server.protocol.stomp; +package org.apache.activemq.artemis.core.server.protocol.websocket; import java.nio.charset.StandardCharsets; +import java.util.List; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; @@ -38,6 +39,7 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; +import org.apache.activemq.artemis.utils.StringUtil; import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive; import static io.netty.handler.codec.http.HttpHeaders.setContentLength; @@ -51,8 +53,13 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler private HttpRequest httpRequest; private WebSocketServerHandshaker handshaker; + private List supportedProtocols; private static final BinaryWebSocketEncoder BINARY_WEBSOCKET_ENCODER = new BinaryWebSocketEncoder(); + public WebSocketServerHandler(List supportedProtocols) { + this.supportedProtocols = supportedProtocols; + } + @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { @@ -75,7 +82,8 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler } // Handshake - WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), "v10.stomp,v11.stomp", false); + String supportedProtocolsCSV = StringUtil.joinStringList(supportedProtocols, ","); + WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV,false); this.httpRequest = req; this.handshaker = wsFactory.newHandshaker(req); if (this.handshaker == null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java index 3de5d5da5e..62befaf8fc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java @@ -61,4 +61,12 @@ public interface ProtocolManager

{ boolean acceptsNoHandshake(); void handshake(NettyServerConnection connection, ActiveMQBuffer buffer); + + /** + * A list of the IANA websocket subprotocol identifiers supported by this protocol manager. These are used + * during the websocket subprotocol handshake. + * + * @return A list of subprotocol ids + */ + List websocketSubprotocolIdentifiers(); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java index 46d7aa3549..1e6a21b8d3 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java @@ -72,7 +72,7 @@ public class NettyAcceptorFactoryTest extends ActiveMQTestBase { }; - Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()), null); + Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()), new HashMap()); Assert.assertTrue(acceptor instanceof NettyAcceptor); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java index 1379628c6c..7f410ef0c2 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java @@ -95,7 +95,7 @@ public class NettyAcceptorTest extends ActiveMQTestBase { } }; pool2 = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()); - NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, null); + NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, new HashMap()); addActiveMQComponent(acceptor); acceptor.start();