From b843e128522eb36168350c303fb89434e6102a26 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 14 Oct 2024 10:15:55 -0400 Subject: [PATCH] ARTEMIS-2748 Support WebSocket compression in the transport pipeline Adds support for WebSocket compression using the netty server handler to enable per message compression and decompression as a transparent layer of the netty pipeine. --- .../impl/netty/TransportConstants.java | 9 ++ .../core/protocol/ProtocolHandler.java | 18 ++- .../websocket/WebSocketServerHandler.java | 6 +- .../websocket/WebSocketServerHandlerTest.java | 2 +- .../amqp/AmqpWebSocketConnectionTest.java | 128 ++++++++++++++++++ 5 files changed, 158 insertions(+), 5 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketConnectionTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 7859fec4a6..91ebe95cd9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -351,6 +351,15 @@ public class TransportConstants { public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10; + /* + * Defines if the WS acceptor allows a client to request compression via WS extensions for + * per message deflate. By default this is not enabled and the WS upgrade response will not + * carry any compression support headers when the client indicates it supports compression. + */ + public static final String WEB_SOCKET_COMPRESSION_SUPPORTED = "webSocketCompressionSupported"; + + public static final boolean DEFAULT_WEB_SOCKET_COMPRESSION_SUPPORTED = false; + public static final String QUIET_PERIOD = "quietPeriod"; public static final String DISABLE_STOMP_SERVER_HEADER = "disableStompServerHeader"; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index 064c5023ee..e5efc05603 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -37,6 +37,8 @@ import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; + import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.remoting.impl.netty.ConnectionCreator; @@ -152,15 +154,27 @@ public class ProtocolHandler { FullHttpRequest request = (FullHttpRequest) msg; HttpHeaders headers = request.headers(); String upgrade = headers.get("upgrade"); + if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) { int stompMaxFramePayloadLength = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH, -1, nettyAcceptor.getConfiguration()); if (stompMaxFramePayloadLength != -1) { ActiveMQServerLogger.LOGGER.deprecatedConfigurationOption(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH, TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH); } stompMaxFramePayloadLength = stompMaxFramePayloadLength != -1 ? stompMaxFramePayloadLength : TransportConstants.DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH; + int webSocketMaxFramePayloadLength = ConfigurationHelper.getIntProperty(TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH, -1, nettyAcceptor.getConfiguration()); - String encoderConfigType = ConfigurationHelper.getStringProperty(TransportConstants.WEB_SOCKET_ENCODER_TYPE, TransportConstants.DEFAULT_WEB_SOCKET_ENCODER_TYPE, nettyAcceptor.getConfiguration()); - ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength != -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength, WebSocketFrameEncoderType.valueOfType(encoderConfigType))); + webSocketMaxFramePayloadLength = webSocketMaxFramePayloadLength != -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength; + + final boolean enableCompression = ConfigurationHelper.getBooleanProperty( + TransportConstants.WEB_SOCKET_COMPRESSION_SUPPORTED, TransportConstants.DEFAULT_WEB_SOCKET_COMPRESSION_SUPPORTED, nettyAcceptor.getConfiguration()); + final String encoderConfigType = ConfigurationHelper.getStringProperty( + TransportConstants.WEB_SOCKET_ENCODER_TYPE, TransportConstants.DEFAULT_WEB_SOCKET_ENCODER_TYPE, nettyAcceptor.getConfiguration()); + + if (enableCompression) { + ctx.pipeline().addLast(new WebSocketServerCompressionHandler()); + } + + ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength, WebSocketFrameEncoderType.valueOfType(encoderConfigType), enableCompression)); ctx.pipeline().addLast(new ProtocolDecoder(false, false)); ctx.pipeline().remove(this); ctx.pipeline().remove(HTTP_HANDLER); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java index e15fe21991..abde34b9fa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java @@ -51,12 +51,14 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler private WebSocketServerHandshaker handshaker; private List supportedProtocols; private int maxFramePayloadLength; + private boolean allowExtensions; private WebSocketFrameEncoderType encoderType; - public WebSocketServerHandler(List supportedProtocols, int maxFramePayloadLength, WebSocketFrameEncoderType encoderType) { + public WebSocketServerHandler(List supportedProtocols, int maxFramePayloadLength, WebSocketFrameEncoderType encoderType, boolean allowExtensions) { this.supportedProtocols = supportedProtocols; this.maxFramePayloadLength = maxFramePayloadLength; this.encoderType = encoderType; + this.allowExtensions = allowExtensions; } @Override @@ -81,7 +83,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler // Handshake String supportedProtocolsCSV = StringUtil.joinStringList(supportedProtocols, ","); - WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV, false, maxFramePayloadLength); + WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV, allowExtensions, maxFramePayloadLength); this.httpRequest = req; this.handshaker = wsFactory.newHandshaker(req); if (this.handshaker == null) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java index f1f9644c12..f581f78f72 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java @@ -49,7 +49,7 @@ public class WebSocketServerHandlerTest { public void setup() throws Exception { maxFramePayloadLength = 8192; supportedProtocols = Arrays.asList("STOMP"); - spy = spy(new WebSocketServerHandler(supportedProtocols, maxFramePayloadLength, WebSocketFrameEncoderType.BINARY)); + spy = spy(new WebSocketServerHandler(supportedProtocols, maxFramePayloadLength, WebSocketFrameEncoderType.BINARY, false)); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketConnectionTest.java new file mode 100644 index 0000000000..d6a986b419 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketConnectionTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter; +import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension; +import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters; +import org.apache.qpid.protonj2.test.driver.ProtonTestClient; +import org.apache.qpid.protonj2.test.driver.ProtonTestClientOptions; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Test connections via Web Sockets + */ +@ExtendWith(ParameterizedTestExtension.class) +public class AmqpWebSocketConnectionTest extends AmqpClientTestSupport { + + @Parameter(index = 0) + public boolean supportWSCompression; + + @Parameters(name = "supportWSCompression={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] {{true}, {false}}); + } + + @Override + protected void configureAMQPAcceptorParameters(TransportConfiguration tc) { + tc.getParams().put("webSocketCompressionSupported", supportWSCompression); + } + + @TestTemplate + public void testClientConnectsWithWebSocketCompressionOn() throws Exception { + testClientConnectsWithWebSockets(true); + } + + @TestTemplate + public void testClientConnectsWithWebSocketCompressionOff() throws Exception { + testClientConnectsWithWebSockets(false); + } + + private void testClientConnectsWithWebSockets(boolean clientAsksForCompression) throws Exception { + final ProtonTestClientOptions clientOpts = new ProtonTestClientOptions(); + + clientOpts.setUseWebSockets(true); + clientOpts.setWebSocketCompression(clientAsksForCompression); + + try (ProtonTestClient client = new ProtonTestClient(clientOpts)) { + client.queueClientSaslAnonymousConnect(); + client.remoteOpen().queue(); + client.expectOpen(); + client.remoteBegin().queue(); + client.expectBegin(); + client.connect("localhost", AMQP_PORT); + + client.waitForScriptToComplete(5, TimeUnit.MINUTES); + + if (clientAsksForCompression && supportWSCompression) { + assertTrue(client.isWSCompressionActive()); + } else { + assertFalse(client.isWSCompressionActive()); + } + + client.expectAttach().ofSender(); + client.expectAttach().ofReceiver(); + client.expectFlow(); + + // Attach a sender and receiver + client.remoteAttach().ofReceiver() + .withName("ws-compression-test") + .withSource().withAddress(getQueueName()) + .withCapabilities("queue").also() + .withTarget().and() + .now(); + client.remoteFlow().withLinkCredit(10).now(); + client.remoteAttach().ofSender() + .withInitialDeliveryCount(0) + .withName("ws-compression-test") + .withTarget().withAddress(getQueueName()) + .withCapabilities("queue").also() + .withSource().and() + .now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final String payload = "test-data:" + "A".repeat(1000); + + // Broker sends message to subscription and acknowledges to sender + client.expectTransfer().withMessage().withValue(payload); + client.expectDisposition().withSettled(true).withState().accepted(); + + // Client sends message to queue with subscription + client.remoteTransfer().withDeliveryId(0) + .withBody().withValue(payload).also() + .now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + + client.expectClose(); + client.remoteClose().now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + client.close(); + } + } +}