ARTEMIS-1428 Add WS tests for max frame size
This commit is contained in:
parent
120fc190c6
commit
18109e30d9
|
@ -559,6 +559,10 @@ public abstract class StompTestBase extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
frame = conn.sendFrame(frame);
|
frame = conn.sendFrame(frame);
|
||||||
|
|
||||||
|
if (frame != null && frame.getCommand().equals("ERROR")) {
|
||||||
|
return frame;
|
||||||
|
}
|
||||||
|
|
||||||
if (receipt) {
|
if (receipt) {
|
||||||
assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
|
assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
|
||||||
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
|
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.stomp;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.junit.Wait;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
|
||||||
|
import org.apache.commons.lang.RandomStringUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class StompWebSocketMaxFrameTest extends StompTestBase {
|
||||||
|
|
||||||
|
private URI wsURI;
|
||||||
|
|
||||||
|
private int wsport = 61614;
|
||||||
|
|
||||||
|
private int stompWSMaxFrameSize = 131072; // 128kb
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "{0}")
|
||||||
|
public static Collection<Object[]> data() {
|
||||||
|
return Arrays.asList(new Object[][]{{"ws+v10.stomp"}, {"ws+v11.stomp"}, {"ws+v12.stomp"}});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + wsport + "?stompMaxFramePayloadLength=" + stompWSMaxFrameSize).start();
|
||||||
|
wsURI = createStompClientUri(scheme, hostname, wsport);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStompSendReceiveWithMaxFramePayloadLength() throws Exception {
|
||||||
|
// Assert that sending message > default 64kb fails
|
||||||
|
int size = 65536;
|
||||||
|
String largeString1 = RandomStringUtils.randomAlphabetic(size);
|
||||||
|
String largeString2 = RandomStringUtils.randomAlphabetic(size);
|
||||||
|
|
||||||
|
StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri, false);
|
||||||
|
conn.getTransport().setMaxFrameSize(stompWSMaxFrameSize);
|
||||||
|
conn.getTransport().connect();
|
||||||
|
|
||||||
|
StompClientConnection conn2 = StompClientConnectionFactory.createClientConnection(wsURI, false);
|
||||||
|
conn2.getTransport().setMaxFrameSize(stompWSMaxFrameSize);
|
||||||
|
conn2.getTransport().connect();
|
||||||
|
|
||||||
|
Wait.waitFor(() -> conn2.getTransport().isConnected() && conn.getTransport().isConnected(), 10000);
|
||||||
|
conn.connect();
|
||||||
|
conn2.connect();
|
||||||
|
|
||||||
|
subscribeQueue(conn2, "sub1", getQueuePrefix() + getQueueName());
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Client is kicked when sending frame > largest frame size.
|
||||||
|
send(conn, getQueuePrefix() + getQueueName(), "text/plain", largeString1, false);
|
||||||
|
Wait.waitFor(() -> !conn.getTransport().isConnected(), 2000);
|
||||||
|
assertFalse(conn.getTransport().isConnected());
|
||||||
|
|
||||||
|
send(conn2, getQueuePrefix() + getQueueName(), "text/plain", largeString2, false);
|
||||||
|
Wait.waitFor(() -> !conn2.getTransport().isConnected(), 2000);
|
||||||
|
assertTrue(conn2.getTransport().isConnected());
|
||||||
|
|
||||||
|
ClientStompFrame frame = conn2.receiveFrame();
|
||||||
|
assertNotNull(frame);
|
||||||
|
assertEquals(largeString2, frame.getBody());
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
conn2.closeTransport();
|
||||||
|
conn.closeTransport();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -75,18 +75,34 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
|
||||||
transport.setTransportListener(new StompTransportListener());
|
transport.setTransportListener(new StompTransportListener());
|
||||||
transport.connect();
|
transport.connect();
|
||||||
|
|
||||||
Wait.waitFor(new Wait.Condition() {
|
Wait.waitFor(() -> transport.isConnected(), 1000);
|
||||||
@Override
|
|
||||||
public boolean isSatisfied() throws Exception {
|
|
||||||
return transport.isConnected();
|
|
||||||
}
|
|
||||||
}, 10000);
|
|
||||||
|
|
||||||
if (!transport.isConnected()) {
|
if (!transport.isConnected()) {
|
||||||
throw new RuntimeException("Could not connect transport");
|
throw new RuntimeException("Could not connect transport");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public AbstractStompClientConnection(URI uri, boolean autoConnect) throws Exception {
|
||||||
|
parseURI(uri);
|
||||||
|
this.factory = StompFrameFactoryFactory.getFactory(version);
|
||||||
|
|
||||||
|
readBuffer = ByteBuffer.allocateDirect(10240);
|
||||||
|
receiveList = new ArrayList<>(10240);
|
||||||
|
|
||||||
|
transport = NettyTransportFactory.createTransport(uri);
|
||||||
|
transport.setTransportListener(new StompTransportListener());
|
||||||
|
|
||||||
|
if (autoConnect) {
|
||||||
|
transport.connect();
|
||||||
|
|
||||||
|
Wait.waitFor(() -> transport.isConnected(), 1000);
|
||||||
|
|
||||||
|
if (!transport.isConnected()) {
|
||||||
|
throw new RuntimeException("Could not connect transport");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void parseURI(URI uri) {
|
private void parseURI(URI uri) {
|
||||||
scheme = uri.getScheme() == null ? "tcp" : uri.getScheme();
|
scheme = uri.getScheme() == null ? "tcp" : uri.getScheme();
|
||||||
host = uri.getHost();
|
host = uri.getHost();
|
||||||
|
@ -318,6 +334,11 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
|
||||||
transport.close();
|
transport.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NettyTransport getTransport() {
|
||||||
|
return transport;
|
||||||
|
}
|
||||||
|
|
||||||
protected class Pinger extends Thread {
|
protected class Pinger extends Thread {
|
||||||
|
|
||||||
long pingInterval;
|
long pingInterval;
|
||||||
|
|
|
@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.integration.stomp.util;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.activemq.transport.netty.NettyTransport;
|
||||||
|
|
||||||
public interface StompClientConnection {
|
public interface StompClientConnection {
|
||||||
|
|
||||||
ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException;
|
ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException;
|
||||||
|
@ -54,5 +56,7 @@ public interface StompClientConnection {
|
||||||
int getServerPingNumber();
|
int getServerPingNumber();
|
||||||
|
|
||||||
void closeTransport() throws IOException;
|
void closeTransport() throws IOException;
|
||||||
|
|
||||||
|
NettyTransport getTransport();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,20 @@ public class StompClientConnectionFactory {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static StompClientConnection createClientConnection(URI uri, boolean autoConnect) throws Exception {
|
||||||
|
String version = getStompVersionFromURI(uri);
|
||||||
|
if ("1.0".equals(version)) {
|
||||||
|
return new StompClientConnectionV10(uri, autoConnect);
|
||||||
|
}
|
||||||
|
if ("1.1".equals(version)) {
|
||||||
|
return new StompClientConnectionV11(uri, autoConnect);
|
||||||
|
}
|
||||||
|
if ("1.2".equals(version)) {
|
||||||
|
return new StompClientConnectionV12(uri, autoConnect);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
public static String getStompVersionFromURI(URI uri) {
|
public static String getStompVersionFromURI(URI uri) {
|
||||||
String scheme = uri.getScheme();
|
String scheme = uri.getScheme();
|
||||||
if (scheme.contains("10")) {
|
if (scheme.contains("10")) {
|
||||||
|
|
|
@ -37,6 +37,10 @@ public class StompClientConnectionV10 extends AbstractStompClientConnection {
|
||||||
super(uri);
|
super(uri);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public StompClientConnectionV10(URI uri, boolean autoConnect) throws Exception {
|
||||||
|
super(uri, autoConnect);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException {
|
public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException {
|
||||||
return connect(username, passcode, null);
|
return connect(username, passcode, null);
|
||||||
|
@ -44,6 +48,7 @@ public class StompClientConnectionV10 extends AbstractStompClientConnection {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
|
public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
|
||||||
|
|
||||||
ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT);
|
ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT);
|
||||||
frame.addHeader(Stomp.Headers.Connect.LOGIN, username);
|
frame.addHeader(Stomp.Headers.Connect.LOGIN, username);
|
||||||
frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode);
|
frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode);
|
||||||
|
|
|
@ -36,6 +36,10 @@ public class StompClientConnectionV11 extends StompClientConnectionV10 {
|
||||||
super(uri);
|
super(uri);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public StompClientConnectionV11(URI uri, boolean autoConnect) throws Exception {
|
||||||
|
super(uri, autoConnect);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
|
public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
|
||||||
ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT);
|
ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT);
|
||||||
|
|
|
@ -29,6 +29,10 @@ public class StompClientConnectionV12 extends StompClientConnectionV11 {
|
||||||
super(uri);
|
super(uri);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public StompClientConnectionV12(URI uri, boolean autoConnect) throws Exception {
|
||||||
|
super(uri, autoConnect);
|
||||||
|
}
|
||||||
|
|
||||||
public ClientStompFrame createAnyFrame(String command) {
|
public ClientStompFrame createAnyFrame(String command) {
|
||||||
return factory.newAnyFrame(command);
|
return factory.newAnyFrame(command);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue