mirror of https://github.com/apache/activemq.git
Respect the wireFormat.maxFrameSize option on WS and WSS transports allowing binary content larger than 65535
This commit is contained in:
parent
07b0d913af
commit
2e2d5ddd3d
|
@ -99,6 +99,11 @@ public class AmqpWSTransport extends TransportSupport implements WSTransport, AM
|
|||
return frameReader.getWireFormat();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxFrameSize() {
|
||||
return (int) Math.min(((AmqpWireFormat) getWireFormat()).getMaxFrameSize(), Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop(ServiceStopper stopper) throws Exception {
|
||||
// Currently nothing needed here since we have no async workers.
|
||||
|
|
|
@ -45,12 +45,13 @@ public class AmqpWireFormat implements WireFormat {
|
|||
public static final int DEFAULT_IDLE_TIMEOUT = 30000;
|
||||
public static final int DEFAULT_PRODUCER_CREDIT = 1000;
|
||||
public static final boolean DEFAULT_ALLOW_NON_SASL_CONNECTIONS = false;
|
||||
public static final int DEFAULT_ANQP_FRAME_SIZE = NO_AMQP_MAX_FRAME_SIZE;
|
||||
|
||||
private static final int SASL_PROTOCOL = 3;
|
||||
|
||||
private int version = 1;
|
||||
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
|
||||
private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
|
||||
private int maxAmqpFrameSize = DEFAULT_ANQP_FRAME_SIZE;
|
||||
private int connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT;
|
||||
private int idelTimeout = DEFAULT_IDLE_TIMEOUT;
|
||||
private int producerCredit = DEFAULT_PRODUCER_CREDIT;
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.activemq.wireformat.WireFormatFactory;
|
|||
public class AmqpWireFormatFactory implements WireFormatFactory {
|
||||
|
||||
private long maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE;
|
||||
private int maxAmqpFrameSize = AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE;
|
||||
private int maxAmqpFrameSize = AmqpWireFormat.DEFAULT_ANQP_FRAME_SIZE;
|
||||
private int idelTimeout = AmqpWireFormat.DEFAULT_IDLE_TIMEOUT;
|
||||
private int producerCredit = AmqpWireFormat.DEFAULT_PRODUCER_CREDIT;
|
||||
private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
|
||||
|
|
|
@ -168,9 +168,39 @@ public class JMSClientContext {
|
|||
private ConnectionFactory createConnectionFactory(
|
||||
URI remoteURI, String username, String password, boolean syncPublish) {
|
||||
|
||||
boolean useSSL = remoteURI.getScheme().toLowerCase().contains("ssl");
|
||||
String clientScheme;
|
||||
boolean useSSL = false;
|
||||
|
||||
String amqpURI = (useSSL ? "amqps://" : "amqp://") + remoteURI.getHost() + ":" + remoteURI.getPort();
|
||||
switch (remoteURI.getScheme()) {
|
||||
case "tcp" :
|
||||
case "amqp":
|
||||
case "auto":
|
||||
case "amqp+nio":
|
||||
case "auto+nio":
|
||||
clientScheme = "amqp://";
|
||||
break;
|
||||
case "ssl":
|
||||
case "amqp+ssl":
|
||||
case "auto+ssl":
|
||||
case "amqp+nio+ssl":
|
||||
case "auto+nio+ssl":
|
||||
clientScheme = "amqps://";
|
||||
useSSL = true;
|
||||
break;
|
||||
case "ws":
|
||||
case "amqp+ws":
|
||||
clientScheme = "amqpws://";
|
||||
break;
|
||||
case "wss":
|
||||
case "amqp+wss":
|
||||
clientScheme = "amqpwss://";
|
||||
useSSL = true;
|
||||
break;
|
||||
default:
|
||||
clientScheme = "amqp://";
|
||||
}
|
||||
|
||||
String amqpURI = clientScheme + remoteURI.getHost() + ":" + remoteURI.getPort();
|
||||
|
||||
if (useSSL) {
|
||||
amqpURI += "?transport.verifyHost=false";
|
||||
|
|
|
@ -92,9 +92,39 @@ public class JMSClientTestSupport extends AmqpTestSupport {
|
|||
|
||||
protected URI getAmqpURI(String uriOptions) {
|
||||
|
||||
boolean useSSL = getBrokerURI().getScheme().toLowerCase().contains("ssl");
|
||||
String clientScheme;
|
||||
boolean useSSL = false;
|
||||
|
||||
String amqpURI = (useSSL ? "amqps://" : "amqp://") + getBrokerURI().getHost() + ":" + getBrokerURI().getPort();
|
||||
switch (getBrokerURI().getScheme()) {
|
||||
case "tcp" :
|
||||
case "amqp":
|
||||
case "auto":
|
||||
case "amqp+nio":
|
||||
case "auto+nio":
|
||||
clientScheme = "amqp://";
|
||||
break;
|
||||
case "ssl":
|
||||
case "amqp+ssl":
|
||||
case "auto+ssl":
|
||||
case "amqp+nio+ssl":
|
||||
case "auto+nio+ssl":
|
||||
clientScheme = "amqps://";
|
||||
useSSL = true;
|
||||
break;
|
||||
case "ws":
|
||||
case "amqp+ws":
|
||||
clientScheme = "amqpws://";
|
||||
break;
|
||||
case "wss":
|
||||
case "amqp+wss":
|
||||
clientScheme = "amqpwss://";
|
||||
useSSL = true;
|
||||
break;
|
||||
default:
|
||||
clientScheme = "amqp://";
|
||||
}
|
||||
|
||||
String amqpURI = clientScheme + getBrokerURI().getHost() + ":" + getBrokerURI().getPort();
|
||||
|
||||
if (uriOptions != null && !uriOptions.isEmpty()) {
|
||||
if (uriOptions.startsWith("?") || uriOptions.startsWith("&")) {
|
||||
|
|
|
@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
|
@ -29,13 +32,32 @@ import javax.jms.Queue;
|
|||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class JMSLargeMessageSendRecvTest extends AmqpTestSupport {
|
||||
@RunWith(Parameterized.class)
|
||||
public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
|
||||
|
||||
@Parameters(name="{0}")
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{"amqp", false},
|
||||
{"amqp+ws", false},
|
||||
{"amqp+ssl", true},
|
||||
{"amqp+wss", true}
|
||||
});
|
||||
}
|
||||
|
||||
public JMSLargeMessageSendRecvTest(String connectorScheme, boolean secure) {
|
||||
super(connectorScheme, secure);
|
||||
}
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
@ -77,7 +99,7 @@ public class JMSLargeMessageSendRecvTest extends AmqpTestSupport {
|
|||
String payload = createLargeString(expectedSize);
|
||||
assertEquals(expectedSize, payload.getBytes().length);
|
||||
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(getBrokerAmqpConnectionURI());
|
||||
long startTime = System.currentTimeMillis();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue(testName.getMethodName());
|
||||
|
|
|
@ -680,6 +680,12 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
|||
|
||||
//----- Private implementation details -----------------------------------//
|
||||
|
||||
@Override
|
||||
protected void doOpen() {
|
||||
getEndpoint().setIncomingCapacity(Integer.MAX_VALUE);
|
||||
super.doOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doOpenInspection() {
|
||||
try {
|
||||
|
|
|
@ -44,7 +44,7 @@ import org.junit.runners.Parameterized.Parameters;
|
|||
@RunWith(Parameterized.class)
|
||||
public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
|
||||
|
||||
private final int TEST_IDLE_TIMEOUT = 3000;
|
||||
private final int TEST_IDLE_TIMEOUT = 1000;
|
||||
|
||||
@Parameters(name="connector={0}")
|
||||
public static Collection<Object[]> data() {
|
||||
|
@ -165,7 +165,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
|
|||
connection.connect();
|
||||
|
||||
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
|
||||
assertFalse(disconnected.await(10, TimeUnit.SECONDS));
|
||||
assertFalse(disconnected.await(5, TimeUnit.SECONDS));
|
||||
|
||||
connection.close();
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ import org.junit.runners.Parameterized.Parameters;
|
|||
@RunWith(Parameterized.class)
|
||||
public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {
|
||||
|
||||
private final int TEST_IDLE_TIMEOUT = 3000;
|
||||
private final int TEST_IDLE_TIMEOUT = 1000;
|
||||
|
||||
@Parameters(name="connector={0}")
|
||||
public static Collection<Object[]> data() {
|
||||
|
@ -106,7 +106,7 @@ public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {
|
|||
connection.connect();
|
||||
|
||||
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
|
||||
assertFalse(disconnected.await(10, TimeUnit.SECONDS));
|
||||
assertFalse(disconnected.await(5, TimeUnit.SECONDS));
|
||||
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
|
||||
|
||||
connection.close();
|
||||
|
|
|
@ -28,11 +28,16 @@ import static org.junit.Assert.fail;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.transport.amqp.AmqpSupport;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpValidator;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
||||
|
@ -258,4 +263,42 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
|
|||
connection1.close();
|
||||
assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSimpleSendOneReceive() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
|
||||
final int PAYLOAD_SIZE = 1024 * 1024;
|
||||
|
||||
byte[] payload = new byte[PAYLOAD_SIZE];
|
||||
for (int i = 0; i < PAYLOAD_SIZE; i++) {
|
||||
payload[i] = (byte) (i % PAYLOAD_SIZE);
|
||||
}
|
||||
|
||||
message.setMessageId("msg" + 1);
|
||||
message.setMessageAnnotation("serialNo", 1);
|
||||
message.setBytes(payload);
|
||||
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
LOG.info("Attempting to read message with receiver");
|
||||
receiver.flow(2);
|
||||
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
|
||||
assertNotNull("Should have read message", received);
|
||||
assertEquals("msg1", received.getMessageId());
|
||||
received.accept();
|
||||
|
||||
receiver.close();
|
||||
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,6 +43,8 @@ import org.junit.runners.Parameterized.Parameters;
|
|||
@RunWith(Parameterized.class)
|
||||
public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
|
||||
|
||||
private final int TEST_IDLE_TIMEOUT = 500;
|
||||
|
||||
private final String testName;
|
||||
private final int maxFrameSize;
|
||||
private final int maxAmqpFrameSize;
|
||||
|
@ -54,6 +56,8 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
|
|||
{ "amqp-> MFS < MAFS", "amqp", false, 2048, 1024 },
|
||||
{ "amqp+nio-> MFS > MAFS", "amqp+nio", false, 1024, 2048 },
|
||||
{ "amqp+nio-> MFS < MAFS", "amqp+nio", false, 2048, 1024 },
|
||||
{ "amqp+ws-> MFS > MAFS", "amqp+ws", false, 1024, 2048 },
|
||||
{ "amqp+ws-> MFS < MAFS", "amqp+ws", false, 2048, 1024 },
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -89,12 +93,13 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
connection.setIdleTimeout(TEST_IDLE_TIMEOUT);
|
||||
connection.connect();
|
||||
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName(), true);
|
||||
|
||||
byte[] payload = new byte[maxFrameSize];
|
||||
byte[] payload = new byte[maxFrameSize * 2];
|
||||
for (int i = 0; i < payload.length; ++i) {
|
||||
payload[i] = 42;
|
||||
}
|
||||
|
@ -104,7 +109,7 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
|
|||
|
||||
sender.send(message);
|
||||
|
||||
assertTrue("Connection should have failed", failed.await(10, TimeUnit.SECONDS));
|
||||
assertTrue("Connection should have failed", failed.await(30, TimeUnit.SECONDS));
|
||||
|
||||
assertNotNull(getProxyToQueue(getTestName()));
|
||||
assertEquals(0, getProxyToQueue(getTestName()).getQueueSize());
|
||||
|
|
|
@ -55,6 +55,11 @@ public interface WSTransport extends Transport {
|
|||
void onSocketOutboundBinary(ByteBuffer data) throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximum frame size allowed for this WS Transport.
|
||||
*/
|
||||
int getMaxFrameSize();
|
||||
|
||||
/**
|
||||
* @return the WS sub-protocol that this transport is supplying.
|
||||
*/
|
||||
|
|
|
@ -218,6 +218,11 @@ public final class WSTransportProxy extends TransportSupport implements Transpor
|
|||
@Override
|
||||
public void onWebSocketConnect(Session session) {
|
||||
this.session = session;
|
||||
|
||||
if (wsTransport.getMaxFrameSize() > 0) {
|
||||
this.session.getPolicy().setMaxBinaryMessageSize(wsTransport.getMaxFrameSize());
|
||||
this.session.getPolicy().setMaxTextMessageSize(wsTransport.getMaxFrameSize());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -145,9 +145,11 @@ public class WSTransportServer extends WebTransportServerSupport implements Brok
|
|||
|
||||
@Override
|
||||
public void setTransportOption(Map<String, Object> transportOptions) {
|
||||
// String transport from options and
|
||||
Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(transportOptions, "transport.");
|
||||
socketConnectorFactory.setTransportOptions(socketOptions);
|
||||
super.setTransportOption(socketOptions);
|
||||
transportOptions.putAll(socketOptions);
|
||||
super.setTransportOption(transportOptions);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -112,7 +112,7 @@ public class WSServlet extends WebSocketServlet implements BrokerServiceAware {
|
|||
switch (requestedProtocol) {
|
||||
case MQTT:
|
||||
socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
|
||||
((MQTTSocket) socket).setTransportOptions(new HashMap<String, Object>(transportOptions));
|
||||
((MQTTSocket) socket).setTransportOptions(new HashMap<>(transportOptions));
|
||||
((MQTTSocket) socket).setPeerCertificates(req.getCertificates());
|
||||
resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols, req.getSubProtocols(), "mqtt"));
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue