ARTEMIS-4294 support text-encoded WebSocket frames

This commit is contained in:
Justin Bertram 2023-05-26 10:32:07 -05:00
parent d943213cae
commit c4501f6793
9 changed files with 204 additions and 113 deletions

View File

@ -337,6 +337,10 @@ public class TransportConstants {
public static final int DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH = 65536; public static final int DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH = 65536;
public static final String WEB_SOCKET_ENCODER_TYPE = "webSocketEncoderType";
public static final String DEFAULT_WEB_SOCKET_ENCODER_TYPE = "binary";
public static final String HANDSHAKE_TIMEOUT = "handshake-timeout"; public static final String HANDSHAKE_TIMEOUT = "handshake-timeout";
public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10; public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10;

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettySNIHostnameHand
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketFrameEncoderType;
import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler; import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ConfigurationHelper;
@ -158,7 +159,8 @@ public class ProtocolHandler {
} }
stompMaxFramePayloadLength = stompMaxFramePayloadLength != -1 ? stompMaxFramePayloadLength : TransportConstants.DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH; stompMaxFramePayloadLength = stompMaxFramePayloadLength != -1 ? stompMaxFramePayloadLength : TransportConstants.DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH;
int webSocketMaxFramePayloadLength = ConfigurationHelper.getIntProperty(TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH, -1, nettyAcceptor.getConfiguration()); int webSocketMaxFramePayloadLength = ConfigurationHelper.getIntProperty(TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH, -1, nettyAcceptor.getConfiguration());
ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength != -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength)); String encoderConfigType = ConfigurationHelper.getStringProperty(TransportConstants.WEB_SOCKET_ENCODER_TYPE, TransportConstants.DEFAULT_WEB_SOCKET_ENCODER_TYPE, nettyAcceptor.getConfiguration());
ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength != -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength, WebSocketFrameEncoderType.valueOfType(encoderConfigType)));
ctx.pipeline().addLast(new ProtocolDecoder(false, false)); ctx.pipeline().addLast(new ProtocolDecoder(false, false));
ctx.pipeline().remove(this); ctx.pipeline().remove(this);
ctx.pipeline().remove(HTTP_HANDLER); ctx.pipeline().remove(HTTP_HANDLER);

View File

@ -533,4 +533,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229247, value = "Invalid address configuration for '{}'. Address must support multicast and/or anycast.") @Message(id = 229247, value = "Invalid address configuration for '{}'. Address must support multicast and/or anycast.")
IllegalArgumentException addressWithNoRoutingType(String address); IllegalArgumentException addressWithNoRoutingType(String address);
@Message(id = 229248, value = "Invalid value for webSocketEncoderType: '{}'. Supported values: 'binary', 'text'.")
IllegalStateException invalidWebSocketEncoderType(String webSocketEncoderType);
} }

View File

@ -1,69 +1,76 @@
/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.artemis.core.server.protocol.websocket; package org.apache.activemq.artemis.core.server.protocol.websocket;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**
* This class uses the maximum frame payload size to packetize/frame outbound websocket messages into /**
* continuation frames. * This class uses the maximum frame payload size to packetize/frame outbound websocket messages into
*/ * continuation frames.
public class WebSocketFrameEncoder extends ChannelOutboundHandlerAdapter { */
public class WebSocketFrameEncoder extends ChannelOutboundHandlerAdapter {
private int maxFramePayloadLength;
private int maxFramePayloadLength;
/** private WebSocketFrameEncoderType type;
* @param maxFramePayloadLength
*/ /**
public WebSocketFrameEncoder(int maxFramePayloadLength) { * @param maxFramePayloadLength
this.maxFramePayloadLength = maxFramePayloadLength; */
} public WebSocketFrameEncoder(int maxFramePayloadLength, WebSocketFrameEncoderType type) {
this.maxFramePayloadLength = maxFramePayloadLength;
@Override this.type = type;
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { }
if (msg instanceof ByteBuf) {
writeContinuationFrame(ctx, (ByteBuf) msg, promise); @Override
} else { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise); if (msg instanceof ByteBuf) {
} writeContinuationFrame(ctx, (ByteBuf) msg, promise);
} } else {
super.write(ctx, msg, promise);
private void writeContinuationFrame(ChannelHandlerContext ctx, ByteBuf byteBuf, ChannelPromise promise) { }
int count = byteBuf.readableBytes(); }
int length = Math.min(count, maxFramePayloadLength);
boolean finalFragment = length == count; private void writeContinuationFrame(ChannelHandlerContext ctx, ByteBuf byteBuf, ChannelPromise promise) {
ByteBuf fragment = Unpooled.buffer(length); int count = byteBuf.readableBytes();
byteBuf.readBytes(fragment, length); int length = Math.min(count, maxFramePayloadLength);
ctx.writeAndFlush(new BinaryWebSocketFrame(finalFragment, 0, fragment), promise); boolean finalFragment = length == count;
ByteBuf fragment = Unpooled.buffer(length);
while ((count = byteBuf.readableBytes()) > 0) { byteBuf.readBytes(fragment, length);
length = Math.min(count, maxFramePayloadLength); if (type == WebSocketFrameEncoderType.BINARY) {
finalFragment = length == count; ctx.writeAndFlush(new BinaryWebSocketFrame(finalFragment, 0, fragment), promise);
fragment = Unpooled.buffer(length); } else {
byteBuf.readBytes(fragment, length); ctx.writeAndFlush(new TextWebSocketFrame(finalFragment, 0, fragment), promise);
ctx.writeAndFlush(new ContinuationWebSocketFrame(finalFragment, 0, fragment), promise); }
}
while ((count = byteBuf.readableBytes()) > 0) {
byteBuf.release(); length = Math.min(count, maxFramePayloadLength);
} finalFragment = length == count;
} fragment = Unpooled.buffer(length);
byteBuf.readBytes(fragment, length);
ctx.writeAndFlush(new ContinuationWebSocketFrame(finalFragment, 0, fragment), promise);
}
byteBuf.release();
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.protocol.websocket;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
public enum WebSocketFrameEncoderType {
BINARY, TEXT;
public static WebSocketFrameEncoderType valueOfType(String type) {
switch (type) {
case "binary":
return BINARY;
case "text":
return TEXT;
default:
throw ActiveMQMessageBundle.BUNDLE.invalidWebSocketEncoderType(type);
}
}
}

View File

@ -51,10 +51,12 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
private WebSocketServerHandshaker handshaker; private WebSocketServerHandshaker handshaker;
private List<String> supportedProtocols; private List<String> supportedProtocols;
private int maxFramePayloadLength; private int maxFramePayloadLength;
private WebSocketFrameEncoderType encoderType;
public WebSocketServerHandler(List<String> supportedProtocols, int maxFramePayloadLength) { public WebSocketServerHandler(List<String> supportedProtocols, int maxFramePayloadLength, WebSocketFrameEncoderType encoderType) {
this.supportedProtocols = supportedProtocols; this.supportedProtocols = supportedProtocols;
this.maxFramePayloadLength = maxFramePayloadLength; this.maxFramePayloadLength = maxFramePayloadLength;
this.encoderType = encoderType;
} }
@Override @Override
@ -86,19 +88,15 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else { } else {
ChannelFuture handshake = this.handshaker.handshake(ctx.channel(), req); ChannelFuture handshake = this.handshaker.handshake(ctx.channel(), req);
handshake.addListener(new ChannelFutureListener() { handshake.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
@Override // we need to insert an encoder that takes the underlying ChannelBuffer of a StompFrame.toActiveMQBuffer and
public void operationComplete(ChannelFuture future) throws Exception { // wrap it in a web socket frame before letting the wsencoder send it on the wire
if (future.isSuccess()) { WebSocketFrameEncoder encoder = new WebSocketFrameEncoder(maxFramePayloadLength, encoderType);
// we need to insert an encoder that takes the underlying ChannelBuffer of a StompFrame.toActiveMQBuffer and future.channel().pipeline().addAfter("wsencoder", "websocket-frame-encoder", encoder);
// wrap it in a binary web socket frame before letting the wsencoder send it on the wire } else {
WebSocketFrameEncoder encoder = new WebSocketFrameEncoder(maxFramePayloadLength); // Handshake failed, fire an exceptionCaught event
future.channel().pipeline().addAfter("wsencoder", "websocket-frame-encoder", encoder); future.channel().pipeline().fireExceptionCaught(future.cause());
} else {
// Handshake failed, fire an exceptionCaught event
future.channel().pipeline().fireExceptionCaught(future.cause());
}
} }
}); });
} }

View File

@ -16,6 +16,25 @@
*/ */
package org.apache.activemq.artemis.core.server.protocol.websocket; package org.apache.activemq.artemis.core.server.protocol.websocket;
import java.nio.charset.StandardCharsets;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -25,25 +44,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import java.nio.charset.StandardCharsets;
import java.util.List;
import io.netty.buffer.ByteBufUtil;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
/** /**
* WebSocketContinuationFrameEncoderTest * WebSocketContinuationFrameEncoderTest
*/ */
@ -51,7 +51,8 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame;
public class WebSocketFrameEncoderTest { public class WebSocketFrameEncoderTest {
private int maxFramePayloadLength = 100; private int maxFramePayloadLength = 100;
private WebSocketFrameEncoder spy; private WebSocketFrameEncoder binarySpy;
private WebSocketFrameEncoder textSpy;
@Mock @Mock
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
@ -60,11 +61,21 @@ public class WebSocketFrameEncoderTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
spy = spy(new WebSocketFrameEncoder(maxFramePayloadLength)); binarySpy = spy(new WebSocketFrameEncoder(maxFramePayloadLength, WebSocketFrameEncoderType.BINARY));
textSpy = spy(new WebSocketFrameEncoder(maxFramePayloadLength, WebSocketFrameEncoderType.TEXT));
} }
@Test @Test
public void testWriteNonByteBuf() throws Exception { public void testWriteNonByteBufBinary() throws Exception {
testWriteNonByteBuf(binarySpy);
}
@Test
public void testWriteNonByteBufText() throws Exception {
testWriteNonByteBuf(textSpy);
}
private void testWriteNonByteBuf(WebSocketFrameEncoder spy) throws Exception {
Object msg = "Not a ByteBuf"; Object msg = "Not a ByteBuf";
spy.write(ctx, msg, promise); //test spy.write(ctx, msg, promise); //test
@ -76,7 +87,16 @@ public class WebSocketFrameEncoderTest {
} }
@Test @Test
public void testWriteReleaseBuffer() throws Exception { public void testWriteReleaseBufferBinary() throws Exception {
testWriteReleaseBuffer(binarySpy, BinaryWebSocketFrame.class);
}
@Test
public void testWriteReleaseBufferText() throws Exception {
testWriteReleaseBuffer(textSpy, TextWebSocketFrame.class);
}
private void testWriteReleaseBuffer(WebSocketFrameEncoder spy, Class webSocketFrameClass) throws Exception {
String content = "Buffer should be released"; String content = "Buffer should be released";
int utf8Bytes = ByteBufUtil.utf8Bytes(content); int utf8Bytes = ByteBufUtil.utf8Bytes(content);
@ -91,14 +111,22 @@ public class WebSocketFrameEncoderTest {
assertEquals(0, msg.readableBytes()); assertEquals(0, msg.readableBytes());
verify(ctx).writeAndFlush(frameCaptor.capture(), eq(promise)); verify(ctx).writeAndFlush(frameCaptor.capture(), eq(promise));
WebSocketFrame frame = frameCaptor.getValue(); WebSocketFrame frame = frameCaptor.getValue();
assertTrue(frame instanceof BinaryWebSocketFrame); assertTrue(webSocketFrameClass.isInstance(frame));
assertTrue(frame.isFinalFragment()); assertTrue(frame.isFinalFragment());
assertEquals(content, frame.content().toString(StandardCharsets.UTF_8)); assertEquals(content, frame.content().toString(StandardCharsets.UTF_8));
} }
@Test
public void testWriteSingleFrameBinary() throws Exception {
testWriteSingleFrame(binarySpy, BinaryWebSocketFrame.class);
}
@Test @Test
public void testWriteSingleFrame() throws Exception { public void testWriteSingleFrameText() throws Exception {
testWriteSingleFrame(textSpy, TextWebSocketFrame.class);
}
private void testWriteSingleFrame(WebSocketFrameEncoder spy, Class webSocketFrameClass) throws Exception {
String content = "Content MSG length less than max frame payload length: " + maxFramePayloadLength; String content = "Content MSG length less than max frame payload length: " + maxFramePayloadLength;
ByteBuf msg = Unpooled.copiedBuffer(content, StandardCharsets.UTF_8); ByteBuf msg = Unpooled.copiedBuffer(content, StandardCharsets.UTF_8);
ArgumentCaptor<WebSocketFrame> frameCaptor = ArgumentCaptor.forClass(WebSocketFrame.class); ArgumentCaptor<WebSocketFrame> frameCaptor = ArgumentCaptor.forClass(WebSocketFrame.class);
@ -108,13 +136,22 @@ public class WebSocketFrameEncoderTest {
assertEquals(0, msg.readableBytes()); assertEquals(0, msg.readableBytes());
verify(ctx).writeAndFlush(frameCaptor.capture(), eq(promise)); verify(ctx).writeAndFlush(frameCaptor.capture(), eq(promise));
WebSocketFrame frame = frameCaptor.getValue(); WebSocketFrame frame = frameCaptor.getValue();
assertTrue(frame instanceof BinaryWebSocketFrame); assertTrue(webSocketFrameClass.isInstance(frame));
assertTrue(frame.isFinalFragment()); assertTrue(frame.isFinalFragment());
assertEquals(content, frame.content().toString(StandardCharsets.UTF_8)); assertEquals(content, frame.content().toString(StandardCharsets.UTF_8));
} }
@Test @Test
public void testWriteContinuationFrames() throws Exception { public void testWriteContinuationFramesBinary() throws Exception {
testWriteContinuationFrames(binarySpy, BinaryWebSocketFrame.class);
}
@Test
public void testWriteContinuationFramesText() throws Exception {
testWriteContinuationFrames(textSpy, TextWebSocketFrame.class);
}
private void testWriteContinuationFrames(WebSocketFrameEncoder spy, Class webSocketFrameClass) throws Exception {
String contentPart = "Content MSG Length @ "; String contentPart = "Content MSG Length @ ";
StringBuilder contentBuilder = new StringBuilder(3 * maxFramePayloadLength); StringBuilder contentBuilder = new StringBuilder(3 * maxFramePayloadLength);
@ -140,7 +177,7 @@ public class WebSocketFrameEncoderTest {
int offset = 0; int offset = 0;
WebSocketFrame first = frames.get(0); WebSocketFrame first = frames.get(0);
assertTrue(first instanceof BinaryWebSocketFrame); assertTrue(webSocketFrameClass.isInstance(first));
assertFalse(first.isFinalFragment()); assertFalse(first.isFinalFragment());
assertEquals(content.substring(offset, offset + maxFramePayloadLength), assertEquals(content.substring(offset, offset + maxFramePayloadLength),
first.content().toString(StandardCharsets.UTF_8)); first.content().toString(StandardCharsets.UTF_8));

View File

@ -50,7 +50,7 @@ public class WebSocketServerHandlerTest {
public void setup() throws Exception { public void setup() throws Exception {
maxFramePayloadLength = 8192; maxFramePayloadLength = 8192;
supportedProtocols = Arrays.asList("STOMP"); supportedProtocols = Arrays.asList("STOMP");
spy = spy(new WebSocketServerHandler(supportedProtocols, maxFramePayloadLength)); spy = spy(new WebSocketServerHandler(supportedProtocols, maxFramePayloadLength, WebSocketFrameEncoderType.BINARY));
} }
@Test @Test

View File

@ -366,6 +366,11 @@ frame this length can be adjusted by using the `webSocketMaxFramePayloadLength`
parameter on the acceptor. In previous version this was configured via the parameter on the acceptor. In previous version this was configured via the
similarly named `stompMaxFramePayloadLength` acceptor URL parameter. similarly named `stompMaxFramePayloadLength` acceptor URL parameter.
Web Socket frames can be encoded as either [binary or text](https://datatracker.ietf.org/doc/html/rfc6455#section-11.8).
By default the broker encodes them as binary. However, this can be changed by
using the `webSocketEncoderType` acceptor URL parameter. Valid values are
`binary` and `text`.
The `stomp-websockets` example shows how to configure an Apache ActiveMQ The `stomp-websockets` example shows how to configure an Apache ActiveMQ
Artemis broker to have web browsers and Java applications exchanges messages. Artemis broker to have web browsers and Java applications exchanges messages.