This commit is contained in:
Justin Bertram 2022-02-07 19:45:09 -06:00
commit 8353eca9ee
No known key found for this signature in database
GPG Key ID: F41830B875BB8633
4 changed files with 83 additions and 17 deletions

View File

@ -318,9 +318,15 @@ public class TransportConstants {
public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L;
/**
* @deprecated Use {@link TransportConstants#WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH instead}.
*/
@Deprecated
public static final String STOMP_MAX_FRAME_PAYLOAD_LENGTH = "stompMaxFramePayloadLength";
public static final int DEFAULT_STOMP_MAX_FRAME_PAYLOAD_LENGTH = 65536;
public static final String WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH = "webSocketMaxFramePayloadLength";
public static final int DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH = 65536;
public static final String HANDSHAKE_TIMEOUT = "handshake-timeout";
@ -426,6 +432,7 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID);
allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED);
allowableAcceptorKeys.add(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH);
allowableAcceptorKeys.add(TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH);
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
allowableAcceptorKeys.add(TransportConstants.BACKLOG_PROP_NAME);

View File

@ -152,7 +152,13 @@ public class ProtocolHandler {
HttpHeaders headers = request.headers();
String upgrade = headers.get("upgrade");
if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) {
ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH, TransportConstants.DEFAULT_STOMP_MAX_FRAME_PAYLOAD_LENGTH, nettyAcceptor.getConfiguration())));
int stompMaxFramePayloadLength = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH, -1, nettyAcceptor.getConfiguration());
if (stompMaxFramePayloadLength != -1) {
ActiveMQServerLogger.LOGGER.deprecatedConfigurationOption(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH);
}
stompMaxFramePayloadLength = stompMaxFramePayloadLength != -1 ? stompMaxFramePayloadLength : TransportConstants.DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH;
int webSocketMaxFramePayloadLength = ConfigurationHelper.getIntProperty(TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH, -1, nettyAcceptor.getConfiguration());
ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength != -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength));
ctx.pipeline().addLast(new ProtocolDecoder(false, false));
ctx.pipeline().remove(this);
ctx.pipeline().remove(HTTP_HANDLER);

View File

@ -362,8 +362,9 @@ description).
The payload length of Web Socket frames can vary between client
implementations. By default the broker will accept frames with a payload length
of 65,536. If the client needs to send payloads longer than this in a single
frame this length can be adjusted by using the `stompMaxFramePayloadLength` URL
parameter on the acceptor.
frame this length can be adjusted by using the `webSocketMaxFramePayloadLength` URL
parameter on the acceptor. In previous version this was configured via the
similarly named `stompMaxFramePayloadLength` acceptor URL parameter.
The `stomp-websockets` example shows how to configure an Apache ActiveMQ
Artemis broker to have web browsers and Java applications exchanges messages.

View File

@ -20,10 +20,10 @@ import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -32,11 +32,16 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class StompWebSocketMaxFrameTest extends StompTestBase {
private URI wsURI;
private final int wsPortWithStompMaxFrame = 61614;
private final int wsPortWithWebSocketMaxFrame = 61615;
private final int wsPortWithBothMaxFrameAndWorks = 61619;
private final int wsPortWithBothMaxFrameButFails = 61620;
private URI wsURIForStompMaxFrame;
private URI wsURIForWebSocketMaxFrame;
private URI wsURIForBothMaxFrameAndWorks;
private URI wsURIForBothMaxFrameButFails;
private int wsport = 61614;
private int stompWSMaxFrameSize = 131072; // 128kb
private final int stompWSMaxFrameSize = 131072; // 128kb
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
@ -46,8 +51,15 @@ public class StompWebSocketMaxFrameTest extends StompTestBase {
@Override
public void setUp() throws Exception {
super.setUp();
server.getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + wsport + "?stompMaxFramePayloadLength=" + stompWSMaxFrameSize).start();
wsURI = createStompClientUri(scheme, hostname, wsport);
server.getRemotingService().createAcceptor("test1", "tcp://127.0.0.1:" + wsPortWithStompMaxFrame + "?stompMaxFramePayloadLength=" + stompWSMaxFrameSize).start();
server.getRemotingService().createAcceptor("test2", "tcp://127.0.0.1:" + wsPortWithWebSocketMaxFrame + "?webSocketMaxFramePayloadLength=" + stompWSMaxFrameSize).start();
server.getRemotingService().createAcceptor("test3", "tcp://127.0.0.1:" + wsPortWithBothMaxFrameAndWorks + "?stompMaxFramePayloadLength=" + stompWSMaxFrameSize / 2 + ";webSocketMaxFramePayloadLength=" + stompWSMaxFrameSize).start();
server.getRemotingService().createAcceptor("test4", "tcp://127.0.0.1:" + wsPortWithBothMaxFrameButFails + "?stompMaxFramePayloadLength=" + stompWSMaxFrameSize + ";webSocketMaxFramePayloadLength=" + stompWSMaxFrameSize / 2).start();
wsURIForStompMaxFrame = createStompClientUri(scheme, hostname, wsPortWithStompMaxFrame);
wsURIForWebSocketMaxFrame = createStompClientUri(scheme, hostname, wsPortWithWebSocketMaxFrame);
wsURIForBothMaxFrameAndWorks = createStompClientUri(scheme, hostname, wsPortWithBothMaxFrameAndWorks);
wsURIForBothMaxFrameButFails = createStompClientUri(scheme, hostname, wsPortWithBothMaxFrameButFails);
}
@Test
@ -61,31 +73,71 @@ public class StompWebSocketMaxFrameTest extends StompTestBase {
conn.getTransport().setMaxFrameSize(stompWSMaxFrameSize);
conn.getTransport().connect();
StompClientConnection conn2 = StompClientConnectionFactory.createClientConnection(wsURI, false);
StompClientConnection conn2 = StompClientConnectionFactory.createClientConnection(wsURIForStompMaxFrame, false);
conn2.getTransport().setMaxFrameSize(stompWSMaxFrameSize);
conn2.getTransport().connect();
Wait.waitFor(() -> conn2.getTransport().isConnected() && conn.getTransport().isConnected(), 10000);
StompClientConnection conn3 = StompClientConnectionFactory.createClientConnection(wsURIForWebSocketMaxFrame, false);
conn3.getTransport().setMaxFrameSize(stompWSMaxFrameSize);
conn3.getTransport().connect();
StompClientConnection conn4 = StompClientConnectionFactory.createClientConnection(wsURIForBothMaxFrameAndWorks, false);
conn4.getTransport().setMaxFrameSize(stompWSMaxFrameSize);
conn4.getTransport().connect();
StompClientConnection conn5 = StompClientConnectionFactory.createClientConnection(wsURIForBothMaxFrameButFails, false);
conn5.getTransport().setMaxFrameSize(stompWSMaxFrameSize);
conn5.getTransport().connect();
Wait.waitFor(() -> conn5.getTransport().isConnected() && conn4.getTransport().isConnected() && conn3.getTransport().isConnected() && conn2.getTransport().isConnected() && conn.getTransport().isConnected(), 10000);
conn.connect();
conn2.connect();
conn3.connect();
conn4.connect();
conn5.connect();
subscribeQueue(conn2, "sub1", getQueuePrefix() + getQueueName());
subscribeQueue(conn3, "sub2", getQueuePrefix() + getQueueName());
subscribeQueue(conn4, "sub3", getQueuePrefix() + getQueueName());
try {
// Client is kicked when sending frame > largest frame size.
// Client is kicked when sending frame > largest frame size. Default 64kb
send(conn, getQueuePrefix() + getQueueName(), "text/plain", largeString1, false);
Wait.waitFor(() -> !conn.getTransport().isConnected(), 2000);
assertFalse(conn.getTransport().isConnected());
// Fails because webSocketMaxFramePayloadLength is only configured for 64kb.
send(conn5, getQueuePrefix() + getQueueName(), "text/plain", largeString1, false);
Wait.waitFor(() -> !conn5.getTransport().isConnected(), 2000);
assertFalse(conn5.getTransport().isConnected());
send(conn2, getQueuePrefix() + getQueueName(), "text/plain", largeString2, false);
Wait.waitFor(() -> !conn2.getTransport().isConnected(), 2000);
assertTrue(conn2.getTransport().isConnected());
ClientStompFrame frame = conn2.receiveFrame();
assertNotNull(frame);
assertEquals(largeString2, frame.getBody());
send(conn3, getQueuePrefix() + getQueueName(), "text/plain", largeString2, false);
Wait.waitFor(() -> !conn3.getTransport().isConnected(), 2000);
assertTrue(conn3.getTransport().isConnected());
send(conn4, getQueuePrefix() + getQueueName(), "text/plain", largeString2, false);
Wait.waitFor(() -> !conn4.getTransport().isConnected(), 2000);
assertTrue(conn4.getTransport().isConnected());
ClientStompFrame frame2 = conn2.receiveFrame();
assertNotNull(frame2);
assertEquals(largeString2, frame2.getBody());
ClientStompFrame frame3 = conn3.receiveFrame();
assertNotNull(frame3);
assertEquals(largeString2, frame3.getBody());
ClientStompFrame frame4 = conn4.receiveFrame();
assertNotNull(frame4);
assertEquals(largeString2, frame4.getBody());
} finally {
conn5.closeTransport();
conn4.closeTransport();
conn3.closeTransport();
conn2.closeTransport();
conn.closeTransport();
}