diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java index 922c15eceb..08f6be3389 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java @@ -559,6 +559,10 @@ public abstract class StompTestBase extends ActiveMQTestBase { } frame = conn.sendFrame(frame); + if (frame != null && frame.getCommand().equals("ERROR")) { + return frame; + } + if (receipt) { assertEquals(Stomp.Responses.RECEIPT, frame.getCommand()); assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWebSocketMaxFrameTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWebSocketMaxFrameTest.java new file mode 100644 index 0000000000..f48e5cd083 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWebSocketMaxFrameTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.stomp; + +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.activemq.artemis.junit.Wait; +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; +import org.apache.commons.lang.RandomStringUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class StompWebSocketMaxFrameTest extends StompTestBase { + + private URI wsURI; + + private int wsport = 61614; + + private int stompWSMaxFrameSize = 131072; // 128kb + + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new Object[][]{{"ws+v10.stomp"}, {"ws+v11.stomp"}, {"ws+v12.stomp"}}); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + wsport + "?stompMaxFramePayloadLength=" + stompWSMaxFrameSize).start(); + wsURI = createStompClientUri(scheme, hostname, wsport); + } + + @Test + public void testStompSendReceiveWithMaxFramePayloadLength() throws Exception { + // Assert that sending message > default 64kb fails + int size = 65536; + String largeString1 = RandomStringUtils.randomAlphabetic(size); + String largeString2 = RandomStringUtils.randomAlphabetic(size); + + StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri, false); + conn.getTransport().setMaxFrameSize(stompWSMaxFrameSize); + conn.getTransport().connect(); + + StompClientConnection conn2 = StompClientConnectionFactory.createClientConnection(wsURI, false); + conn2.getTransport().setMaxFrameSize(stompWSMaxFrameSize); + conn2.getTransport().connect(); + + Wait.waitFor(() -> conn2.getTransport().isConnected() && conn.getTransport().isConnected(), 10000); + conn.connect(); + conn2.connect(); + + subscribeQueue(conn2, "sub1", getQueuePrefix() + getQueueName()); + + try { + // Client is kicked when sending frame > largest frame size. + send(conn, getQueuePrefix() + getQueueName(), "text/plain", largeString1, false); + Wait.waitFor(() -> !conn.getTransport().isConnected(), 2000); + assertFalse(conn.getTransport().isConnected()); + + send(conn2, getQueuePrefix() + getQueueName(), "text/plain", largeString2, false); + Wait.waitFor(() -> !conn2.getTransport().isConnected(), 2000); + assertTrue(conn2.getTransport().isConnected()); + + ClientStompFrame frame = conn2.receiveFrame(); + assertNotNull(frame); + assertEquals(largeString2, frame.getBody()); + + } finally { + conn2.closeTransport(); + conn.closeTransport(); + } + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java index 78c9c4b310..3fdb1d9a32 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java @@ -75,18 +75,34 @@ public abstract class AbstractStompClientConnection implements StompClientConnec transport.setTransportListener(new StompTransportListener()); transport.connect(); - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisfied() throws Exception { - return transport.isConnected(); - } - }, 10000); + Wait.waitFor(() -> transport.isConnected(), 1000); if (!transport.isConnected()) { throw new RuntimeException("Could not connect transport"); } } + public AbstractStompClientConnection(URI uri, boolean autoConnect) throws Exception { + parseURI(uri); + this.factory = StompFrameFactoryFactory.getFactory(version); + + readBuffer = ByteBuffer.allocateDirect(10240); + receiveList = new ArrayList<>(10240); + + transport = NettyTransportFactory.createTransport(uri); + transport.setTransportListener(new StompTransportListener()); + + if (autoConnect) { + transport.connect(); + + Wait.waitFor(() -> transport.isConnected(), 1000); + + if (!transport.isConnected()) { + throw new RuntimeException("Could not connect transport"); + } + } + } + private void parseURI(URI uri) { scheme = uri.getScheme() == null ? "tcp" : uri.getScheme(); host = uri.getHost(); @@ -318,6 +334,11 @@ public abstract class AbstractStompClientConnection implements StompClientConnec transport.close(); } + @Override + public NettyTransport getTransport() { + return transport; + } + protected class Pinger extends Thread { long pingInterval; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java index 012bb49054..9adde6b01f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java @@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.integration.stomp.util; import java.io.IOException; +import org.apache.activemq.transport.netty.NettyTransport; + public interface StompClientConnection { ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException; @@ -54,5 +56,7 @@ public interface StompClientConnection { int getServerPingNumber(); void closeTransport() throws IOException; + + NettyTransport getTransport(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java index 06d18455e6..c9b65bebeb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java @@ -53,6 +53,20 @@ public class StompClientConnectionFactory { return null; } + public static StompClientConnection createClientConnection(URI uri, boolean autoConnect) throws Exception { + String version = getStompVersionFromURI(uri); + if ("1.0".equals(version)) { + return new StompClientConnectionV10(uri, autoConnect); + } + if ("1.1".equals(version)) { + return new StompClientConnectionV11(uri, autoConnect); + } + if ("1.2".equals(version)) { + return new StompClientConnectionV12(uri, autoConnect); + } + return null; + } + public static String getStompVersionFromURI(URI uri) { String scheme = uri.getScheme(); if (scheme.contains("10")) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java index 56c72dba4e..714840300e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java @@ -37,6 +37,10 @@ public class StompClientConnectionV10 extends AbstractStompClientConnection { super(uri); } + public StompClientConnectionV10(URI uri, boolean autoConnect) throws Exception { + super(uri, autoConnect); + } + @Override public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException { return connect(username, passcode, null); @@ -44,6 +48,7 @@ public class StompClientConnectionV10 extends AbstractStompClientConnection { @Override public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException { + ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT); frame.addHeader(Stomp.Headers.Connect.LOGIN, username); frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java index 5f0cca3c5f..05a2c6a67b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java @@ -36,6 +36,10 @@ public class StompClientConnectionV11 extends StompClientConnectionV10 { super(uri); } + public StompClientConnectionV11(URI uri, boolean autoConnect) throws Exception { + super(uri, autoConnect); + } + @Override public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException { ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java index afa1f08a72..5a4ed29d47 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java @@ -29,6 +29,10 @@ public class StompClientConnectionV12 extends StompClientConnectionV11 { super(uri); } + public StompClientConnectionV12(URI uri, boolean autoConnect) throws Exception { + super(uri, autoConnect); + } + public ClientStompFrame createAnyFrame(String command) { return factory.newAnyFrame(command); }