diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 3edf16f755..acaa72ad45 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -337,6 +337,10 @@ public class TransportConstants { public static final int DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH = 65536; + public static final String WEB_SOCKET_ENCODER_TYPE = "webSocketEncoderType"; + + public static final String DEFAULT_WEB_SOCKET_ENCODER_TYPE = "binary"; + public static final String HANDSHAKE_TIMEOUT = "handshake-timeout"; public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index 62b09c508b..8d489c0423 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettySNIHostnameHand import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketFrameEncoderType; import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.utils.ConfigurationHelper; @@ -158,7 +159,8 @@ public class ProtocolHandler { } stompMaxFramePayloadLength = stompMaxFramePayloadLength != -1 ? stompMaxFramePayloadLength : TransportConstants.DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH; int webSocketMaxFramePayloadLength = ConfigurationHelper.getIntProperty(TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH, -1, nettyAcceptor.getConfiguration()); - ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength != -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength)); + String encoderConfigType = ConfigurationHelper.getStringProperty(TransportConstants.WEB_SOCKET_ENCODER_TYPE, TransportConstants.DEFAULT_WEB_SOCKET_ENCODER_TYPE, nettyAcceptor.getConfiguration()); + ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength != -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength, WebSocketFrameEncoderType.valueOfType(encoderConfigType))); ctx.pipeline().addLast(new ProtocolDecoder(false, false)); ctx.pipeline().remove(this); ctx.pipeline().remove(HTTP_HANDLER); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 886595ff5e..2018d3f136 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -533,4 +533,7 @@ public interface ActiveMQMessageBundle { @Message(id = 229247, value = "Invalid address configuration for '{}'. Address must support multicast and/or anycast.") IllegalArgumentException addressWithNoRoutingType(String address); + + @Message(id = 229248, value = "Invalid value for webSocketEncoderType: '{}'. Supported values: 'binary', 'text'.") + IllegalStateException invalidWebSocketEncoderType(String webSocketEncoderType); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketFrameEncoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketFrameEncoder.java index 813a2a082e..feed8fee25 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketFrameEncoder.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketFrameEncoder.java @@ -1,69 +1,76 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.server.protocol.websocket; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; -import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; -import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; - -/** - * This class uses the maximum frame payload size to packetize/frame outbound websocket messages into - * continuation frames. - */ -public class WebSocketFrameEncoder extends ChannelOutboundHandlerAdapter { - - private int maxFramePayloadLength; - - /** - * @param maxFramePayloadLength - */ - public WebSocketFrameEncoder(int maxFramePayloadLength) { - this.maxFramePayloadLength = maxFramePayloadLength; - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof ByteBuf) { - writeContinuationFrame(ctx, (ByteBuf) msg, promise); - } else { - super.write(ctx, msg, promise); - } - } - - private void writeContinuationFrame(ChannelHandlerContext ctx, ByteBuf byteBuf, ChannelPromise promise) { - int count = byteBuf.readableBytes(); - int length = Math.min(count, maxFramePayloadLength); - boolean finalFragment = length == count; - ByteBuf fragment = Unpooled.buffer(length); - byteBuf.readBytes(fragment, length); - ctx.writeAndFlush(new BinaryWebSocketFrame(finalFragment, 0, fragment), promise); - - while ((count = byteBuf.readableBytes()) > 0) { - length = Math.min(count, maxFramePayloadLength); - finalFragment = length == count; - fragment = Unpooled.buffer(length); - byteBuf.readBytes(fragment, length); - ctx.writeAndFlush(new ContinuationWebSocketFrame(finalFragment, 0, fragment), promise); - } - - byteBuf.release(); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.protocol.websocket; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; + +/** + * This class uses the maximum frame payload size to packetize/frame outbound websocket messages into + * continuation frames. + */ +public class WebSocketFrameEncoder extends ChannelOutboundHandlerAdapter { + + private int maxFramePayloadLength; + private WebSocketFrameEncoderType type; + + /** + * @param maxFramePayloadLength + */ + public WebSocketFrameEncoder(int maxFramePayloadLength, WebSocketFrameEncoderType type) { + this.maxFramePayloadLength = maxFramePayloadLength; + this.type = type; + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof ByteBuf) { + writeContinuationFrame(ctx, (ByteBuf) msg, promise); + } else { + super.write(ctx, msg, promise); + } + } + + private void writeContinuationFrame(ChannelHandlerContext ctx, ByteBuf byteBuf, ChannelPromise promise) { + int count = byteBuf.readableBytes(); + int length = Math.min(count, maxFramePayloadLength); + boolean finalFragment = length == count; + ByteBuf fragment = Unpooled.buffer(length); + byteBuf.readBytes(fragment, length); + if (type == WebSocketFrameEncoderType.BINARY) { + ctx.writeAndFlush(new BinaryWebSocketFrame(finalFragment, 0, fragment), promise); + } else { + ctx.writeAndFlush(new TextWebSocketFrame(finalFragment, 0, fragment), promise); + } + + while ((count = byteBuf.readableBytes()) > 0) { + length = Math.min(count, maxFramePayloadLength); + finalFragment = length == count; + fragment = Unpooled.buffer(length); + byteBuf.readBytes(fragment, length); + ctx.writeAndFlush(new ContinuationWebSocketFrame(finalFragment, 0, fragment), promise); + } + + byteBuf.release(); + } +} \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketFrameEncoderType.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketFrameEncoderType.java new file mode 100644 index 0000000000..0355d160a8 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketFrameEncoderType.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.protocol.websocket; + +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; + +public enum WebSocketFrameEncoderType { + BINARY, TEXT; + + public static WebSocketFrameEncoderType valueOfType(String type) { + switch (type) { + case "binary": + return BINARY; + case "text": + return TEXT; + default: + throw ActiveMQMessageBundle.BUNDLE.invalidWebSocketEncoderType(type); + } + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java index f2b003cabd..e15fe21991 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java @@ -51,10 +51,12 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler private WebSocketServerHandshaker handshaker; private List supportedProtocols; private int maxFramePayloadLength; + private WebSocketFrameEncoderType encoderType; - public WebSocketServerHandler(List supportedProtocols, int maxFramePayloadLength) { + public WebSocketServerHandler(List supportedProtocols, int maxFramePayloadLength, WebSocketFrameEncoderType encoderType) { this.supportedProtocols = supportedProtocols; this.maxFramePayloadLength = maxFramePayloadLength; + this.encoderType = encoderType; } @Override @@ -86,19 +88,15 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { ChannelFuture handshake = this.handshaker.handshake(ctx.channel(), req); - handshake.addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - // we need to insert an encoder that takes the underlying ChannelBuffer of a StompFrame.toActiveMQBuffer and - // wrap it in a binary web socket frame before letting the wsencoder send it on the wire - WebSocketFrameEncoder encoder = new WebSocketFrameEncoder(maxFramePayloadLength); - future.channel().pipeline().addAfter("wsencoder", "websocket-frame-encoder", encoder); - } else { - // Handshake failed, fire an exceptionCaught event - future.channel().pipeline().fireExceptionCaught(future.cause()); - } + handshake.addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + // we need to insert an encoder that takes the underlying ChannelBuffer of a StompFrame.toActiveMQBuffer and + // wrap it in a web socket frame before letting the wsencoder send it on the wire + WebSocketFrameEncoder encoder = new WebSocketFrameEncoder(maxFramePayloadLength, encoderType); + future.channel().pipeline().addAfter("wsencoder", "websocket-frame-encoder", encoder); + } else { + // Handshake failed, fire an exceptionCaught event + future.channel().pipeline().fireExceptionCaught(future.cause()); } }); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketFrameEncoderTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketFrameEncoderTest.java index e4c2a46f58..18e2d91d84 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketFrameEncoderTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketFrameEncoderTest.java @@ -16,6 +16,25 @@ */ package org.apache.activemq.artemis.core.server.protocol.websocket; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -25,25 +44,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import java.nio.charset.StandardCharsets; -import java.util.List; - -import io.netty.buffer.ByteBufUtil; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; -import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; -import io.netty.handler.codec.http.websocketx.WebSocketFrame; - /** * WebSocketContinuationFrameEncoderTest */ @@ -51,7 +51,8 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame; public class WebSocketFrameEncoderTest { private int maxFramePayloadLength = 100; - private WebSocketFrameEncoder spy; + private WebSocketFrameEncoder binarySpy; + private WebSocketFrameEncoder textSpy; @Mock private ChannelHandlerContext ctx; @@ -60,11 +61,21 @@ public class WebSocketFrameEncoderTest { @Before public void setUp() throws Exception { - spy = spy(new WebSocketFrameEncoder(maxFramePayloadLength)); + binarySpy = spy(new WebSocketFrameEncoder(maxFramePayloadLength, WebSocketFrameEncoderType.BINARY)); + textSpy = spy(new WebSocketFrameEncoder(maxFramePayloadLength, WebSocketFrameEncoderType.TEXT)); } @Test - public void testWriteNonByteBuf() throws Exception { + public void testWriteNonByteBufBinary() throws Exception { + testWriteNonByteBuf(binarySpy); + } + + @Test + public void testWriteNonByteBufText() throws Exception { + testWriteNonByteBuf(textSpy); + } + + private void testWriteNonByteBuf(WebSocketFrameEncoder spy) throws Exception { Object msg = "Not a ByteBuf"; spy.write(ctx, msg, promise); //test @@ -76,7 +87,16 @@ public class WebSocketFrameEncoderTest { } @Test - public void testWriteReleaseBuffer() throws Exception { + public void testWriteReleaseBufferBinary() throws Exception { + testWriteReleaseBuffer(binarySpy, BinaryWebSocketFrame.class); + } + + @Test + public void testWriteReleaseBufferText() throws Exception { + testWriteReleaseBuffer(textSpy, TextWebSocketFrame.class); + } + + private void testWriteReleaseBuffer(WebSocketFrameEncoder spy, Class webSocketFrameClass) throws Exception { String content = "Buffer should be released"; int utf8Bytes = ByteBufUtil.utf8Bytes(content); @@ -91,14 +111,22 @@ public class WebSocketFrameEncoderTest { assertEquals(0, msg.readableBytes()); verify(ctx).writeAndFlush(frameCaptor.capture(), eq(promise)); WebSocketFrame frame = frameCaptor.getValue(); - assertTrue(frame instanceof BinaryWebSocketFrame); + assertTrue(webSocketFrameClass.isInstance(frame)); assertTrue(frame.isFinalFragment()); assertEquals(content, frame.content().toString(StandardCharsets.UTF_8)); } + @Test + public void testWriteSingleFrameBinary() throws Exception { + testWriteSingleFrame(binarySpy, BinaryWebSocketFrame.class); + } @Test - public void testWriteSingleFrame() throws Exception { + public void testWriteSingleFrameText() throws Exception { + testWriteSingleFrame(textSpy, TextWebSocketFrame.class); + } + + private void testWriteSingleFrame(WebSocketFrameEncoder spy, Class webSocketFrameClass) throws Exception { String content = "Content MSG length less than max frame payload length: " + maxFramePayloadLength; ByteBuf msg = Unpooled.copiedBuffer(content, StandardCharsets.UTF_8); ArgumentCaptor frameCaptor = ArgumentCaptor.forClass(WebSocketFrame.class); @@ -108,13 +136,22 @@ public class WebSocketFrameEncoderTest { assertEquals(0, msg.readableBytes()); verify(ctx).writeAndFlush(frameCaptor.capture(), eq(promise)); WebSocketFrame frame = frameCaptor.getValue(); - assertTrue(frame instanceof BinaryWebSocketFrame); + assertTrue(webSocketFrameClass.isInstance(frame)); assertTrue(frame.isFinalFragment()); assertEquals(content, frame.content().toString(StandardCharsets.UTF_8)); } @Test - public void testWriteContinuationFrames() throws Exception { + public void testWriteContinuationFramesBinary() throws Exception { + testWriteContinuationFrames(binarySpy, BinaryWebSocketFrame.class); + } + + @Test + public void testWriteContinuationFramesText() throws Exception { + testWriteContinuationFrames(textSpy, TextWebSocketFrame.class); + } + + private void testWriteContinuationFrames(WebSocketFrameEncoder spy, Class webSocketFrameClass) throws Exception { String contentPart = "Content MSG Length @ "; StringBuilder contentBuilder = new StringBuilder(3 * maxFramePayloadLength); @@ -140,7 +177,7 @@ public class WebSocketFrameEncoderTest { int offset = 0; WebSocketFrame first = frames.get(0); - assertTrue(first instanceof BinaryWebSocketFrame); + assertTrue(webSocketFrameClass.isInstance(first)); assertFalse(first.isFinalFragment()); assertEquals(content.substring(offset, offset + maxFramePayloadLength), first.content().toString(StandardCharsets.UTF_8)); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java index a7578d7777..b37e145e8b 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java @@ -50,7 +50,7 @@ public class WebSocketServerHandlerTest { public void setup() throws Exception { maxFramePayloadLength = 8192; supportedProtocols = Arrays.asList("STOMP"); - spy = spy(new WebSocketServerHandler(supportedProtocols, maxFramePayloadLength)); + spy = spy(new WebSocketServerHandler(supportedProtocols, maxFramePayloadLength, WebSocketFrameEncoderType.BINARY)); } @Test diff --git a/docs/user-manual/en/stomp.md b/docs/user-manual/en/stomp.md index eeb123738c..be8b6603ef 100644 --- a/docs/user-manual/en/stomp.md +++ b/docs/user-manual/en/stomp.md @@ -366,6 +366,11 @@ frame this length can be adjusted by using the `webSocketMaxFramePayloadLength` parameter on the acceptor. In previous version this was configured via the similarly named `stompMaxFramePayloadLength` acceptor URL parameter. +Web Socket frames can be encoded as either [binary or text](https://datatracker.ietf.org/doc/html/rfc6455#section-11.8). +By default the broker encodes them as binary. However, this can be changed by +using the `webSocketEncoderType` acceptor URL parameter. Valid values are +`binary` and `text`. + The `stomp-websockets` example shows how to configure an Apache ActiveMQ Artemis broker to have web browsers and Java applications exchanges messages.