Respect the wireFormat.maxFrameSize option on WS and WSS transports
allowing binary content larger than 65535
(cherry picked from commit 2e2d5ddd3d)
This commit is contained in:
Timothy Bish 2017-05-04 16:37:53 -04:00
parent 675729c204
commit bc879d762a
15 changed files with 170 additions and 16 deletions

View File

@ -99,6 +99,11 @@ public class AmqpWSTransport extends TransportSupport implements WSTransport, AM
return frameReader.getWireFormat();
}
@Override
public int getMaxFrameSize() {
return (int) Math.min(((AmqpWireFormat) getWireFormat()).getMaxFrameSize(), Integer.MAX_VALUE);
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
// Currently nothing needed here since we have no async workers.

View File

@ -45,12 +45,13 @@ public class AmqpWireFormat implements WireFormat {
public static final int DEFAULT_IDLE_TIMEOUT = 30000;
public static final int DEFAULT_PRODUCER_CREDIT = 1000;
public static final boolean DEFAULT_ALLOW_NON_SASL_CONNECTIONS = false;
public static final int DEFAULT_ANQP_FRAME_SIZE = NO_AMQP_MAX_FRAME_SIZE;
private static final int SASL_PROTOCOL = 3;
private int version = 1;
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
private int maxAmqpFrameSize = DEFAULT_ANQP_FRAME_SIZE;
private int connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT;
private int idelTimeout = DEFAULT_IDLE_TIMEOUT;
private int producerCredit = DEFAULT_PRODUCER_CREDIT;

View File

@ -26,7 +26,7 @@ import org.apache.activemq.wireformat.WireFormatFactory;
public class AmqpWireFormatFactory implements WireFormatFactory {
private long maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE;
private int maxAmqpFrameSize = AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE;
private int maxAmqpFrameSize = AmqpWireFormat.DEFAULT_ANQP_FRAME_SIZE;
private int idelTimeout = AmqpWireFormat.DEFAULT_IDLE_TIMEOUT;
private int producerCredit = AmqpWireFormat.DEFAULT_PRODUCER_CREDIT;
private String transformer = InboundTransformer.TRANSFORMER_NATIVE;

View File

@ -168,9 +168,39 @@ public class JMSClientContext {
private ConnectionFactory createConnectionFactory(
URI remoteURI, String username, String password, boolean syncPublish) {
boolean useSSL = remoteURI.getScheme().toLowerCase().contains("ssl");
String clientScheme;
boolean useSSL = false;
String amqpURI = (useSSL ? "amqps://" : "amqp://") + remoteURI.getHost() + ":" + remoteURI.getPort();
switch (remoteURI.getScheme()) {
case "tcp" :
case "amqp":
case "auto":
case "amqp+nio":
case "auto+nio":
clientScheme = "amqp://";
break;
case "ssl":
case "amqp+ssl":
case "auto+ssl":
case "amqp+nio+ssl":
case "auto+nio+ssl":
clientScheme = "amqps://";
useSSL = true;
break;
case "ws":
case "amqp+ws":
clientScheme = "amqpws://";
break;
case "wss":
case "amqp+wss":
clientScheme = "amqpwss://";
useSSL = true;
break;
default:
clientScheme = "amqp://";
}
String amqpURI = clientScheme + remoteURI.getHost() + ":" + remoteURI.getPort();
if (useSSL) {
amqpURI += "?transport.verifyHost=false";

View File

@ -92,9 +92,39 @@ public class JMSClientTestSupport extends AmqpTestSupport {
protected URI getAmqpURI(String uriOptions) {
boolean useSSL = getBrokerURI().getScheme().toLowerCase().contains("ssl");
String clientScheme;
boolean useSSL = false;
String amqpURI = (useSSL ? "amqps://" : "amqp://") + getBrokerURI().getHost() + ":" + getBrokerURI().getPort();
switch (getBrokerURI().getScheme()) {
case "tcp" :
case "amqp":
case "auto":
case "amqp+nio":
case "auto+nio":
clientScheme = "amqp://";
break;
case "ssl":
case "amqp+ssl":
case "auto+ssl":
case "amqp+nio+ssl":
case "auto+nio+ssl":
clientScheme = "amqps://";
useSSL = true;
break;
case "ws":
case "amqp+ws":
clientScheme = "amqpws://";
break;
case "wss":
case "amqp+wss":
clientScheme = "amqpwss://";
useSSL = true;
break;
default:
clientScheme = "amqp://";
}
String amqpURI = clientScheme + getBrokerURI().getHost() + ":" + getBrokerURI().getPort();
if (uriOptions != null && !uriOptions.isEmpty()) {
if (uriOptions.startsWith("?") || uriOptions.startsWith("&")) {

View File

@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@ -29,13 +32,32 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JMSLargeMessageSendRecvTest extends AmqpTestSupport {
@RunWith(Parameterized.class)
public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
@Parameters(name="{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"amqp", false},
{"amqp+ws", false},
{"amqp+ssl", true},
{"amqp+wss", true}
});
}
public JMSLargeMessageSendRecvTest(String connectorScheme, boolean secure) {
super(connectorScheme, secure);
}
@Rule
public TestName testName = new TestName();
@ -77,7 +99,7 @@ public class JMSLargeMessageSendRecvTest extends AmqpTestSupport {
String payload = createLargeString(expectedSize);
assertEquals(expectedSize, payload.getBytes().length);
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
Connection connection = JMSClientContext.INSTANCE.createConnection(getBrokerAmqpConnectionURI());
long startTime = System.currentTimeMillis();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(testName.getMethodName());

View File

@ -680,6 +680,12 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
//----- Private implementation details -----------------------------------//
@Override
protected void doOpen() {
getEndpoint().setIncomingCapacity(Integer.MAX_VALUE);
super.doOpen();
}
@Override
protected void doOpenInspection() {
try {

View File

@ -44,7 +44,7 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
private final int TEST_IDLE_TIMEOUT = 3000;
private final int TEST_IDLE_TIMEOUT = 1000;
@Parameters(name="connector={0}")
public static Collection<Object[]> data() {
@ -165,7 +165,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
connection.connect();
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
assertFalse(disconnected.await(10, TimeUnit.SECONDS));
assertFalse(disconnected.await(5, TimeUnit.SECONDS));
connection.close();

View File

@ -44,7 +44,7 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {
private final int TEST_IDLE_TIMEOUT = 3000;
private final int TEST_IDLE_TIMEOUT = 1000;
@Parameters(name="connector={0}")
public static Collection<Object[]> data() {
@ -106,7 +106,7 @@ public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {
connection.connect();
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
assertFalse(disconnected.await(10, TimeUnit.SECONDS));
assertFalse(disconnected.await(5, TimeUnit.SECONDS));
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
connection.close();

View File

@ -28,11 +28,16 @@ import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.amqp.AmqpSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
@ -258,4 +263,42 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport {
connection1.close();
assertEquals(0, getProxyToBroker().getCurrentConnectionsCount());
}
@Test(timeout = 60000)
public void testSimpleSendOneReceive() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
AmqpMessage message = new AmqpMessage();
final int PAYLOAD_SIZE = 1024 * 1024;
byte[] payload = new byte[PAYLOAD_SIZE];
for (int i = 0; i < PAYLOAD_SIZE; i++) {
payload[i] = (byte) (i % PAYLOAD_SIZE);
}
message.setMessageId("msg" + 1);
message.setMessageAnnotation("serialNo", 1);
message.setBytes(payload);
sender.send(message);
sender.close();
LOG.info("Attempting to read message with receiver");
receiver.flow(2);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
assertEquals("msg1", received.getMessageId());
received.accept();
receiver.close();
connection.close();
}
}

View File

@ -43,6 +43,8 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
private final int TEST_IDLE_TIMEOUT = 500;
private final String testName;
private final int maxFrameSize;
private final int maxAmqpFrameSize;
@ -54,6 +56,8 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
{ "amqp-> MFS < MAFS", "amqp", false, 2048, 1024 },
{ "amqp+nio-> MFS > MAFS", "amqp+nio", false, 1024, 2048 },
{ "amqp+nio-> MFS < MAFS", "amqp+nio", false, 2048, 1024 },
{ "amqp+ws-> MFS > MAFS", "amqp+ws", false, 1024, 2048 },
{ "amqp+ws-> MFS < MAFS", "amqp+ws", false, 2048, 1024 },
});
}
@ -89,12 +93,13 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
}
});
connection.setIdleTimeout(TEST_IDLE_TIMEOUT);
connection.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName(), true);
byte[] payload = new byte[maxFrameSize];
byte[] payload = new byte[maxFrameSize * 2];
for (int i = 0; i < payload.length; ++i) {
payload[i] = 42;
}
@ -104,7 +109,7 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
sender.send(message);
assertTrue("Connection should have failed", failed.await(10, TimeUnit.SECONDS));
assertTrue("Connection should have failed", failed.await(30, TimeUnit.SECONDS));
assertNotNull(getProxyToQueue(getTestName()));
assertEquals(0, getProxyToQueue(getTestName()).getQueueSize());

View File

@ -55,6 +55,11 @@ public interface WSTransport extends Transport {
void onSocketOutboundBinary(ByteBuffer data) throws IOException;
}
/**
* @return the maximum frame size allowed for this WS Transport.
*/
int getMaxFrameSize();
/**
* @return the WS sub-protocol that this transport is supplying.
*/

View File

@ -218,6 +218,11 @@ public final class WSTransportProxy extends TransportSupport implements Transpor
@Override
public void onWebSocketConnect(Session session) {
this.session = session;
if (wsTransport.getMaxFrameSize() > 0) {
this.session.getPolicy().setMaxBinaryMessageSize(wsTransport.getMaxFrameSize());
this.session.getPolicy().setMaxTextMessageSize(wsTransport.getMaxFrameSize());
}
}
@Override

View File

@ -145,9 +145,11 @@ public class WSTransportServer extends WebTransportServerSupport implements Brok
@Override
public void setTransportOption(Map<String, Object> transportOptions) {
// String transport from options and
Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(transportOptions, "transport.");
socketConnectorFactory.setTransportOptions(socketOptions);
super.setTransportOption(socketOptions);
transportOptions.putAll(socketOptions);
super.setTransportOption(transportOptions);
}
@Override

View File

@ -112,7 +112,7 @@ public class WSServlet extends WebSocketServlet implements BrokerServiceAware {
switch (requestedProtocol) {
case MQTT:
socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
((MQTTSocket) socket).setTransportOptions(new HashMap<String, Object>(transportOptions));
((MQTTSocket) socket).setTransportOptions(new HashMap<>(transportOptions));
((MQTTSocket) socket).setPeerCertificates(req.getCertificates());
resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols, req.getSubProtocols(), "mqtt"));
break;