ARTEMIS-2748 Support WebSocket compression in the transport pipeline

Adds support for WebSocket compression using the netty server handler to
enable per message compression and decompression as a transparent layer of
the netty pipeine.
This commit is contained in:
Timothy Bish 2024-10-14 10:15:55 -04:00 committed by Robbie Gemmell
parent 8e4bc33dc4
commit b843e12852
5 changed files with 158 additions and 5 deletions

View File

@ -351,6 +351,15 @@ public class TransportConstants {
public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10;
/*
* Defines if the WS acceptor allows a client to request compression via WS extensions for
* per message deflate. By default this is not enabled and the WS upgrade response will not
* carry any compression support headers when the client indicates it supports compression.
*/
public static final String WEB_SOCKET_COMPRESSION_SUPPORTED = "webSocketCompressionSupported";
public static final boolean DEFAULT_WEB_SOCKET_COMPRESSION_SUPPORTED = false;
public static final String QUIET_PERIOD = "quietPeriod";
public static final String DISABLE_STOMP_SERVER_HEADER = "disableStompServerHeader";

View File

@ -37,6 +37,8 @@ import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.remoting.impl.netty.ConnectionCreator;
@ -152,15 +154,27 @@ public class ProtocolHandler {
FullHttpRequest request = (FullHttpRequest) msg;
HttpHeaders headers = request.headers();
String upgrade = headers.get("upgrade");
if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) {
int stompMaxFramePayloadLength = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH, -1, nettyAcceptor.getConfiguration());
if (stompMaxFramePayloadLength != -1) {
ActiveMQServerLogger.LOGGER.deprecatedConfigurationOption(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH, TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH);
}
stompMaxFramePayloadLength = stompMaxFramePayloadLength != -1 ? stompMaxFramePayloadLength : TransportConstants.DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH;
int webSocketMaxFramePayloadLength = ConfigurationHelper.getIntProperty(TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH, -1, nettyAcceptor.getConfiguration());
String encoderConfigType = ConfigurationHelper.getStringProperty(TransportConstants.WEB_SOCKET_ENCODER_TYPE, TransportConstants.DEFAULT_WEB_SOCKET_ENCODER_TYPE, nettyAcceptor.getConfiguration());
ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength != -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength, WebSocketFrameEncoderType.valueOfType(encoderConfigType)));
webSocketMaxFramePayloadLength = webSocketMaxFramePayloadLength != -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength;
final boolean enableCompression = ConfigurationHelper.getBooleanProperty(
TransportConstants.WEB_SOCKET_COMPRESSION_SUPPORTED, TransportConstants.DEFAULT_WEB_SOCKET_COMPRESSION_SUPPORTED, nettyAcceptor.getConfiguration());
final String encoderConfigType = ConfigurationHelper.getStringProperty(
TransportConstants.WEB_SOCKET_ENCODER_TYPE, TransportConstants.DEFAULT_WEB_SOCKET_ENCODER_TYPE, nettyAcceptor.getConfiguration());
if (enableCompression) {
ctx.pipeline().addLast(new WebSocketServerCompressionHandler());
}
ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength, WebSocketFrameEncoderType.valueOfType(encoderConfigType), enableCompression));
ctx.pipeline().addLast(new ProtocolDecoder(false, false));
ctx.pipeline().remove(this);
ctx.pipeline().remove(HTTP_HANDLER);

View File

@ -51,12 +51,14 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
private WebSocketServerHandshaker handshaker;
private List<String> supportedProtocols;
private int maxFramePayloadLength;
private boolean allowExtensions;
private WebSocketFrameEncoderType encoderType;
public WebSocketServerHandler(List<String> supportedProtocols, int maxFramePayloadLength, WebSocketFrameEncoderType encoderType) {
public WebSocketServerHandler(List<String> supportedProtocols, int maxFramePayloadLength, WebSocketFrameEncoderType encoderType, boolean allowExtensions) {
this.supportedProtocols = supportedProtocols;
this.maxFramePayloadLength = maxFramePayloadLength;
this.encoderType = encoderType;
this.allowExtensions = allowExtensions;
}
@Override
@ -81,7 +83,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
// Handshake
String supportedProtocolsCSV = StringUtil.joinStringList(supportedProtocols, ",");
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV, false, maxFramePayloadLength);
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV, allowExtensions, maxFramePayloadLength);
this.httpRequest = req;
this.handshaker = wsFactory.newHandshaker(req);
if (this.handshaker == null) {

View File

@ -49,7 +49,7 @@ public class WebSocketServerHandlerTest {
public void setup() throws Exception {
maxFramePayloadLength = 8192;
supportedProtocols = Arrays.asList("STOMP");
spy = spy(new WebSocketServerHandler(supportedProtocols, maxFramePayloadLength, WebSocketFrameEncoderType.BINARY));
spy = spy(new WebSocketServerHandler(supportedProtocols, maxFramePayloadLength, WebSocketFrameEncoderType.BINARY, false));
}
@Test

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
import org.apache.qpid.protonj2.test.driver.ProtonTestClientOptions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
/**
* Test connections via Web Sockets
*/
@ExtendWith(ParameterizedTestExtension.class)
public class AmqpWebSocketConnectionTest extends AmqpClientTestSupport {
@Parameter(index = 0)
public boolean supportWSCompression;
@Parameters(name = "supportWSCompression={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {{true}, {false}});
}
@Override
protected void configureAMQPAcceptorParameters(TransportConfiguration tc) {
tc.getParams().put("webSocketCompressionSupported", supportWSCompression);
}
@TestTemplate
public void testClientConnectsWithWebSocketCompressionOn() throws Exception {
testClientConnectsWithWebSockets(true);
}
@TestTemplate
public void testClientConnectsWithWebSocketCompressionOff() throws Exception {
testClientConnectsWithWebSockets(false);
}
private void testClientConnectsWithWebSockets(boolean clientAsksForCompression) throws Exception {
final ProtonTestClientOptions clientOpts = new ProtonTestClientOptions();
clientOpts.setUseWebSockets(true);
clientOpts.setWebSocketCompression(clientAsksForCompression);
try (ProtonTestClient client = new ProtonTestClient(clientOpts)) {
client.queueClientSaslAnonymousConnect();
client.remoteOpen().queue();
client.expectOpen();
client.remoteBegin().queue();
client.expectBegin();
client.connect("localhost", AMQP_PORT);
client.waitForScriptToComplete(5, TimeUnit.MINUTES);
if (clientAsksForCompression && supportWSCompression) {
assertTrue(client.isWSCompressionActive());
} else {
assertFalse(client.isWSCompressionActive());
}
client.expectAttach().ofSender();
client.expectAttach().ofReceiver();
client.expectFlow();
// Attach a sender and receiver
client.remoteAttach().ofReceiver()
.withName("ws-compression-test")
.withSource().withAddress(getQueueName())
.withCapabilities("queue").also()
.withTarget().and()
.now();
client.remoteFlow().withLinkCredit(10).now();
client.remoteAttach().ofSender()
.withInitialDeliveryCount(0)
.withName("ws-compression-test")
.withTarget().withAddress(getQueueName())
.withCapabilities("queue").also()
.withSource().and()
.now();
client.waitForScriptToComplete(5, TimeUnit.SECONDS);
final String payload = "test-data:" + "A".repeat(1000);
// Broker sends message to subscription and acknowledges to sender
client.expectTransfer().withMessage().withValue(payload);
client.expectDisposition().withSettled(true).withState().accepted();
// Client sends message to queue with subscription
client.remoteTransfer().withDeliveryId(0)
.withBody().withValue(payload).also()
.now();
client.waitForScriptToComplete(5, TimeUnit.SECONDS);
client.expectClose();
client.remoteClose().now();
client.waitForScriptToComplete(5, TimeUnit.SECONDS);
client.close();
}
}
}