diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java index 2ec3a09be8..26b3561f6e 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java @@ -99,6 +99,11 @@ public class AmqpWSTransport extends TransportSupport implements WSTransport, AM return frameReader.getWireFormat(); } + @Override + public int getMaxFrameSize() { + return (int) Math.min(((AmqpWireFormat) getWireFormat()).getMaxFrameSize(), Integer.MAX_VALUE); + } + @Override protected void doStop(ServiceStopper stopper) throws Exception { // Currently nothing needed here since we have no async workers. diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java index 89facbe8d4..7ddfc1f2b1 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java @@ -45,12 +45,13 @@ public class AmqpWireFormat implements WireFormat { public static final int DEFAULT_IDLE_TIMEOUT = 30000; public static final int DEFAULT_PRODUCER_CREDIT = 1000; public static final boolean DEFAULT_ALLOW_NON_SASL_CONNECTIONS = false; + public static final int DEFAULT_ANQP_FRAME_SIZE = NO_AMQP_MAX_FRAME_SIZE; private static final int SASL_PROTOCOL = 3; private int version = 1; private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; - private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE; + private int maxAmqpFrameSize = DEFAULT_ANQP_FRAME_SIZE; private int connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT; private int idelTimeout = DEFAULT_IDLE_TIMEOUT; private int producerCredit = DEFAULT_PRODUCER_CREDIT; diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java index bb428b4d3a..196046cc69 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java @@ -26,7 +26,7 @@ import org.apache.activemq.wireformat.WireFormatFactory; public class AmqpWireFormatFactory implements WireFormatFactory { private long maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE; - private int maxAmqpFrameSize = AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE; + private int maxAmqpFrameSize = AmqpWireFormat.DEFAULT_ANQP_FRAME_SIZE; private int idelTimeout = AmqpWireFormat.DEFAULT_IDLE_TIMEOUT; private int producerCredit = AmqpWireFormat.DEFAULT_PRODUCER_CREDIT; private String transformer = InboundTransformer.TRANSFORMER_NATIVE; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java index 574e9f022a..f249e7ccf6 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java @@ -168,9 +168,39 @@ public class JMSClientContext { private ConnectionFactory createConnectionFactory( URI remoteURI, String username, String password, boolean syncPublish) { - boolean useSSL = remoteURI.getScheme().toLowerCase().contains("ssl"); + String clientScheme; + boolean useSSL = false; - String amqpURI = (useSSL ? "amqps://" : "amqp://") + remoteURI.getHost() + ":" + remoteURI.getPort(); + switch (remoteURI.getScheme()) { + case "tcp" : + case "amqp": + case "auto": + case "amqp+nio": + case "auto+nio": + clientScheme = "amqp://"; + break; + case "ssl": + case "amqp+ssl": + case "auto+ssl": + case "amqp+nio+ssl": + case "auto+nio+ssl": + clientScheme = "amqps://"; + useSSL = true; + break; + case "ws": + case "amqp+ws": + clientScheme = "amqpws://"; + break; + case "wss": + case "amqp+wss": + clientScheme = "amqpwss://"; + useSSL = true; + break; + default: + clientScheme = "amqp://"; + } + + String amqpURI = clientScheme + remoteURI.getHost() + ":" + remoteURI.getPort(); if (useSSL) { amqpURI += "?transport.verifyHost=false"; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java index d855c6b69b..840865234f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java @@ -92,9 +92,39 @@ public class JMSClientTestSupport extends AmqpTestSupport { protected URI getAmqpURI(String uriOptions) { - boolean useSSL = getBrokerURI().getScheme().toLowerCase().contains("ssl"); + String clientScheme; + boolean useSSL = false; - String amqpURI = (useSSL ? "amqps://" : "amqp://") + getBrokerURI().getHost() + ":" + getBrokerURI().getPort(); + switch (getBrokerURI().getScheme()) { + case "tcp" : + case "amqp": + case "auto": + case "amqp+nio": + case "auto+nio": + clientScheme = "amqp://"; + break; + case "ssl": + case "amqp+ssl": + case "auto+ssl": + case "amqp+nio+ssl": + case "auto+nio+ssl": + clientScheme = "amqps://"; + useSSL = true; + break; + case "ws": + case "amqp+ws": + clientScheme = "amqpws://"; + break; + case "wss": + case "amqp+wss": + clientScheme = "amqpwss://"; + useSSL = true; + break; + default: + clientScheme = "amqp://"; + } + + String amqpURI = clientScheme + getBrokerURI().getHost() + ":" + getBrokerURI().getPort(); if (uriOptions != null && !uriOptions.isEmpty()) { if (uriOptions.startsWith("?") || uriOptions.startsWith("&")) { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java index ef6eabac85..20ad2d9d23 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java @@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.Arrays; +import java.util.Collection; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -29,13 +32,32 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JMSLargeMessageSendRecvTest extends AmqpTestSupport { +@RunWith(Parameterized.class) +public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport { + + @Parameters(name="{0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {"amqp", false}, + {"amqp+ws", false}, + {"amqp+ssl", true}, + {"amqp+wss", true} + }); + } + + public JMSLargeMessageSendRecvTest(String connectorScheme, boolean secure) { + super(connectorScheme, secure); + } @Rule public TestName testName = new TestName(); @@ -77,7 +99,7 @@ public class JMSLargeMessageSendRecvTest extends AmqpTestSupport { String payload = createLargeString(expectedSize); assertEquals(expectedSize, payload.getBytes().length); - Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI); + Connection connection = JMSClientContext.INSTANCE.createConnection(getBrokerAmqpConnectionURI()); long startTime = System.currentTimeMillis(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(testName.getMethodName()); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index b8d38e2e9f..8956692707 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -680,6 +680,12 @@ public class AmqpSession extends AmqpAbstractResource { //----- Private implementation details -----------------------------------// + @Override + protected void doOpen() { + getEndpoint().setIncomingCapacity(Integer.MAX_VALUE); + super.doOpen(); + } + @Override protected void doOpenInspection() { try { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java index dc13369a0f..e794274f32 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java @@ -44,7 +44,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport { - private final int TEST_IDLE_TIMEOUT = 3000; + private final int TEST_IDLE_TIMEOUT = 1000; @Parameters(name="connector={0}") public static Collection data() { @@ -165,7 +165,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport { connection.connect(); assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); - assertFalse(disconnected.await(10, TimeUnit.SECONDS)); + assertFalse(disconnected.await(5, TimeUnit.SECONDS)); connection.close(); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java index de47fd2070..97b38fb3da 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java @@ -44,7 +44,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport { - private final int TEST_IDLE_TIMEOUT = 3000; + private final int TEST_IDLE_TIMEOUT = 1000; @Parameters(name="connector={0}") public static Collection data() { @@ -106,7 +106,7 @@ public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport { connection.connect(); assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); - assertFalse(disconnected.await(10, TimeUnit.SECONDS)); + assertFalse(disconnected.await(5, TimeUnit.SECONDS)); assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); connection.close(); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java index 11cade73a4..a3474a9556 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java @@ -28,11 +28,16 @@ import static org.junit.Assert.fail; import java.util.Arrays; import java.util.Collection; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.activemq.transport.amqp.AmqpSupport; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.AmqpError; @@ -258,4 +263,42 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport { connection1.close(); assertEquals(0, getProxyToBroker().getCurrentConnectionsCount()); } + + @Test(timeout = 60000) + public void testSimpleSendOneReceive() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + + AmqpMessage message = new AmqpMessage(); + + final int PAYLOAD_SIZE = 1024 * 1024; + + byte[] payload = new byte[PAYLOAD_SIZE]; + for (int i = 0; i < PAYLOAD_SIZE; i++) { + payload[i] = (byte) (i % PAYLOAD_SIZE); + } + + message.setMessageId("msg" + 1); + message.setMessageAnnotation("serialNo", 1); + message.setBytes(payload); + + sender.send(message); + sender.close(); + + LOG.info("Attempting to read message with receiver"); + receiver.flow(2); + AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read message", received); + assertEquals("msg1", received.getMessageId()); + received.accept(); + + receiver.close(); + + connection.close(); + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java index c818abe5f1..84415d7dc1 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java @@ -43,6 +43,8 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { + private final int TEST_IDLE_TIMEOUT = 500; + private final String testName; private final int maxFrameSize; private final int maxAmqpFrameSize; @@ -54,6 +56,8 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { { "amqp-> MFS < MAFS", "amqp", false, 2048, 1024 }, { "amqp+nio-> MFS > MAFS", "amqp+nio", false, 1024, 2048 }, { "amqp+nio-> MFS < MAFS", "amqp+nio", false, 2048, 1024 }, + { "amqp+ws-> MFS > MAFS", "amqp+ws", false, 1024, 2048 }, + { "amqp+ws-> MFS < MAFS", "amqp+ws", false, 2048, 1024 }, }); } @@ -89,12 +93,13 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { } }); + connection.setIdleTimeout(TEST_IDLE_TIMEOUT); connection.connect(); AmqpSession session = connection.createSession(); AmqpSender sender = session.createSender("queue://" + getTestName(), true); - byte[] payload = new byte[maxFrameSize]; + byte[] payload = new byte[maxFrameSize * 2]; for (int i = 0; i < payload.length; ++i) { payload[i] = 42; } @@ -104,7 +109,7 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { sender.send(message); - assertTrue("Connection should have failed", failed.await(10, TimeUnit.SECONDS)); + assertTrue("Connection should have failed", failed.await(30, TimeUnit.SECONDS)); assertNotNull(getProxyToQueue(getTestName())); assertEquals(0, getProxyToQueue(getTestName()).getQueueSize()); diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java index e15f86f293..9a30660959 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java @@ -55,6 +55,11 @@ public interface WSTransport extends Transport { void onSocketOutboundBinary(ByteBuffer data) throws IOException; } + /** + * @return the maximum frame size allowed for this WS Transport. + */ + int getMaxFrameSize(); + /** * @return the WS sub-protocol that this transport is supplying. */ diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java index 0ca80efd57..d5a5207f37 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java @@ -218,6 +218,11 @@ public final class WSTransportProxy extends TransportSupport implements Transpor @Override public void onWebSocketConnect(Session session) { this.session = session; + + if (wsTransport.getMaxFrameSize() > 0) { + this.session.getPolicy().setMaxBinaryMessageSize(wsTransport.getMaxFrameSize()); + this.session.getPolicy().setMaxTextMessageSize(wsTransport.getMaxFrameSize()); + } } @Override diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java index 3029668dd2..ea8867d76e 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java @@ -145,9 +145,11 @@ public class WSTransportServer extends WebTransportServerSupport implements Brok @Override public void setTransportOption(Map transportOptions) { + // String transport from options and Map socketOptions = IntrospectionSupport.extractProperties(transportOptions, "transport."); socketConnectorFactory.setTransportOptions(socketOptions); - super.setTransportOption(socketOptions); + transportOptions.putAll(socketOptions); + super.setTransportOption(transportOptions); } @Override diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java index 21754ada6c..8cb3811e0e 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java @@ -112,7 +112,7 @@ public class WSServlet extends WebSocketServlet implements BrokerServiceAware { switch (requestedProtocol) { case MQTT: socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); - ((MQTTSocket) socket).setTransportOptions(new HashMap(transportOptions)); + ((MQTTSocket) socket).setTransportOptions(new HashMap<>(transportOptions)); ((MQTTSocket) socket).setPeerCertificates(req.getCertificates()); resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols, req.getSubProtocols(), "mqtt")); break;