diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 43bc67bf24..4eae86840c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -318,9 +318,15 @@ public class TransportConstants { public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L; + /** + * @deprecated Use {@link TransportConstants#WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH instead}. + */ + @Deprecated public static final String STOMP_MAX_FRAME_PAYLOAD_LENGTH = "stompMaxFramePayloadLength"; - public static final int DEFAULT_STOMP_MAX_FRAME_PAYLOAD_LENGTH = 65536; + public static final String WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH = "webSocketMaxFramePayloadLength"; + + public static final int DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH = 65536; public static final String HANDSHAKE_TIMEOUT = "handshake-timeout"; @@ -426,6 +432,7 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID); allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED); allowableAcceptorKeys.add(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH); + allowableAcceptorKeys.add(TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH); allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword()); allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec()); allowableAcceptorKeys.add(TransportConstants.BACKLOG_PROP_NAME); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index 69ab95ff00..62b09c508b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -152,7 +152,13 @@ public class ProtocolHandler { HttpHeaders headers = request.headers(); String upgrade = headers.get("upgrade"); if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) { - ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH, TransportConstants.DEFAULT_STOMP_MAX_FRAME_PAYLOAD_LENGTH, nettyAcceptor.getConfiguration()))); + int stompMaxFramePayloadLength = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH, -1, nettyAcceptor.getConfiguration()); + if (stompMaxFramePayloadLength != -1) { + ActiveMQServerLogger.LOGGER.deprecatedConfigurationOption(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH); + } + stompMaxFramePayloadLength = stompMaxFramePayloadLength != -1 ? stompMaxFramePayloadLength : TransportConstants.DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH; + int webSocketMaxFramePayloadLength = ConfigurationHelper.getIntProperty(TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH, -1, nettyAcceptor.getConfiguration()); + ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength != -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength)); ctx.pipeline().addLast(new ProtocolDecoder(false, false)); ctx.pipeline().remove(this); ctx.pipeline().remove(HTTP_HANDLER); diff --git a/docs/user-manual/en/stomp.md b/docs/user-manual/en/stomp.md index 97fc80ad92..eeb123738c 100644 --- a/docs/user-manual/en/stomp.md +++ b/docs/user-manual/en/stomp.md @@ -362,8 +362,9 @@ description). The payload length of Web Socket frames can vary between client implementations. By default the broker will accept frames with a payload length of 65,536. If the client needs to send payloads longer than this in a single -frame this length can be adjusted by using the `stompMaxFramePayloadLength` URL -parameter on the acceptor. +frame this length can be adjusted by using the `webSocketMaxFramePayloadLength` URL +parameter on the acceptor. In previous version this was configured via the +similarly named `stompMaxFramePayloadLength` acceptor URL parameter. The `stomp-websockets` example shows how to configure an Apache ActiveMQ Artemis broker to have web browsers and Java applications exchanges messages. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWebSocketMaxFrameTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWebSocketMaxFrameTest.java index f56a96cd1a..e4ed8ea3cb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWebSocketMaxFrameTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWebSocketMaxFrameTest.java @@ -20,10 +20,10 @@ import java.net.URI; import java.util.Arrays; import java.util.Collection; -import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.commons.lang3.RandomStringUtils; import org.junit.Test; import org.junit.runner.RunWith; @@ -32,11 +32,16 @@ import org.junit.runners.Parameterized; @RunWith(Parameterized.class) public class StompWebSocketMaxFrameTest extends StompTestBase { - private URI wsURI; + private final int wsPortWithStompMaxFrame = 61614; + private final int wsPortWithWebSocketMaxFrame = 61615; + private final int wsPortWithBothMaxFrameAndWorks = 61619; + private final int wsPortWithBothMaxFrameButFails = 61620; + private URI wsURIForStompMaxFrame; + private URI wsURIForWebSocketMaxFrame; + private URI wsURIForBothMaxFrameAndWorks; + private URI wsURIForBothMaxFrameButFails; - private int wsport = 61614; - - private int stompWSMaxFrameSize = 131072; // 128kb + private final int stompWSMaxFrameSize = 131072; // 128kb @Parameterized.Parameters(name = "{0}") public static Collection data() { @@ -46,8 +51,15 @@ public class StompWebSocketMaxFrameTest extends StompTestBase { @Override public void setUp() throws Exception { super.setUp(); - server.getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + wsport + "?stompMaxFramePayloadLength=" + stompWSMaxFrameSize).start(); - wsURI = createStompClientUri(scheme, hostname, wsport); + server.getRemotingService().createAcceptor("test1", "tcp://127.0.0.1:" + wsPortWithStompMaxFrame + "?stompMaxFramePayloadLength=" + stompWSMaxFrameSize).start(); + server.getRemotingService().createAcceptor("test2", "tcp://127.0.0.1:" + wsPortWithWebSocketMaxFrame + "?webSocketMaxFramePayloadLength=" + stompWSMaxFrameSize).start(); + server.getRemotingService().createAcceptor("test3", "tcp://127.0.0.1:" + wsPortWithBothMaxFrameAndWorks + "?stompMaxFramePayloadLength=" + stompWSMaxFrameSize / 2 + ";webSocketMaxFramePayloadLength=" + stompWSMaxFrameSize).start(); + server.getRemotingService().createAcceptor("test4", "tcp://127.0.0.1:" + wsPortWithBothMaxFrameButFails + "?stompMaxFramePayloadLength=" + stompWSMaxFrameSize + ";webSocketMaxFramePayloadLength=" + stompWSMaxFrameSize / 2).start(); + + wsURIForStompMaxFrame = createStompClientUri(scheme, hostname, wsPortWithStompMaxFrame); + wsURIForWebSocketMaxFrame = createStompClientUri(scheme, hostname, wsPortWithWebSocketMaxFrame); + wsURIForBothMaxFrameAndWorks = createStompClientUri(scheme, hostname, wsPortWithBothMaxFrameAndWorks); + wsURIForBothMaxFrameButFails = createStompClientUri(scheme, hostname, wsPortWithBothMaxFrameButFails); } @Test @@ -61,31 +73,71 @@ public class StompWebSocketMaxFrameTest extends StompTestBase { conn.getTransport().setMaxFrameSize(stompWSMaxFrameSize); conn.getTransport().connect(); - StompClientConnection conn2 = StompClientConnectionFactory.createClientConnection(wsURI, false); + StompClientConnection conn2 = StompClientConnectionFactory.createClientConnection(wsURIForStompMaxFrame, false); conn2.getTransport().setMaxFrameSize(stompWSMaxFrameSize); conn2.getTransport().connect(); - Wait.waitFor(() -> conn2.getTransport().isConnected() && conn.getTransport().isConnected(), 10000); + StompClientConnection conn3 = StompClientConnectionFactory.createClientConnection(wsURIForWebSocketMaxFrame, false); + conn3.getTransport().setMaxFrameSize(stompWSMaxFrameSize); + conn3.getTransport().connect(); + + StompClientConnection conn4 = StompClientConnectionFactory.createClientConnection(wsURIForBothMaxFrameAndWorks, false); + conn4.getTransport().setMaxFrameSize(stompWSMaxFrameSize); + conn4.getTransport().connect(); + + StompClientConnection conn5 = StompClientConnectionFactory.createClientConnection(wsURIForBothMaxFrameButFails, false); + conn5.getTransport().setMaxFrameSize(stompWSMaxFrameSize); + conn5.getTransport().connect(); + + Wait.waitFor(() -> conn5.getTransport().isConnected() && conn4.getTransport().isConnected() && conn3.getTransport().isConnected() && conn2.getTransport().isConnected() && conn.getTransport().isConnected(), 10000); conn.connect(); conn2.connect(); + conn3.connect(); + conn4.connect(); + conn5.connect(); subscribeQueue(conn2, "sub1", getQueuePrefix() + getQueueName()); + subscribeQueue(conn3, "sub2", getQueuePrefix() + getQueueName()); + subscribeQueue(conn4, "sub3", getQueuePrefix() + getQueueName()); try { - // Client is kicked when sending frame > largest frame size. + // Client is kicked when sending frame > largest frame size. Default 64kb send(conn, getQueuePrefix() + getQueueName(), "text/plain", largeString1, false); Wait.waitFor(() -> !conn.getTransport().isConnected(), 2000); assertFalse(conn.getTransport().isConnected()); + // Fails because webSocketMaxFramePayloadLength is only configured for 64kb. + send(conn5, getQueuePrefix() + getQueueName(), "text/plain", largeString1, false); + Wait.waitFor(() -> !conn5.getTransport().isConnected(), 2000); + assertFalse(conn5.getTransport().isConnected()); + send(conn2, getQueuePrefix() + getQueueName(), "text/plain", largeString2, false); Wait.waitFor(() -> !conn2.getTransport().isConnected(), 2000); assertTrue(conn2.getTransport().isConnected()); - ClientStompFrame frame = conn2.receiveFrame(); - assertNotNull(frame); - assertEquals(largeString2, frame.getBody()); + send(conn3, getQueuePrefix() + getQueueName(), "text/plain", largeString2, false); + Wait.waitFor(() -> !conn3.getTransport().isConnected(), 2000); + assertTrue(conn3.getTransport().isConnected()); + send(conn4, getQueuePrefix() + getQueueName(), "text/plain", largeString2, false); + Wait.waitFor(() -> !conn4.getTransport().isConnected(), 2000); + assertTrue(conn4.getTransport().isConnected()); + + ClientStompFrame frame2 = conn2.receiveFrame(); + assertNotNull(frame2); + assertEquals(largeString2, frame2.getBody()); + + ClientStompFrame frame3 = conn3.receiveFrame(); + assertNotNull(frame3); + assertEquals(largeString2, frame3.getBody()); + + ClientStompFrame frame4 = conn4.receiveFrame(); + assertNotNull(frame4); + assertEquals(largeString2, frame4.getBody()); } finally { + conn5.closeTransport(); + conn4.closeTransport(); + conn3.closeTransport(); conn2.closeTransport(); conn.closeTransport(); }