From f37b005acf60c3d06f7e572da5ed7e93b162b5ba Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 2 Jun 2015 15:25:11 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5776 Add better support for maxFrameSize configuration that fits with the defaults configuration for the broker by allowing the maxFrameSize to be configured via ?wireFormat.maxFrameSize on STOMP, AMQP, and MQTT. Previously we only supported transport.wireFormat.maxFrameSize. --- .../transport/amqp/AmqpWireFormatFactory.java | 57 ++++++++++++++++++- .../transport/amqp/client/AmqpConnection.java | 2 +- .../amqp/interop/AmqpMaxFrameSizeTest.java | 4 +- .../transport/mqtt/MQTTWireFormatFactory.java | 18 +++++- .../transport/mqtt/MQTTMaxFrameSizeTest.java | 2 +- .../transport/stomp/StompWireFormat.java | 2 +- .../stomp/StompWireFormatFactory.java | 28 ++++++++- .../stomp/StompMaxFrameSizeTest.java | 2 +- 8 files changed, 106 insertions(+), 9 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java index f4de9501f0..fb7aea4255 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.amqp; +import org.apache.activemq.transport.amqp.message.InboundTransformer; import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormatFactory; @@ -24,8 +25,62 @@ import org.apache.activemq.wireformat.WireFormatFactory; */ public class AmqpWireFormatFactory implements WireFormatFactory { + private long maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE; + private int maxAmqpFrameSize = AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE; + private int idelTimeout = AmqpWireFormat.DEFAULT_IDLE_TIMEOUT; + private int producerCredit = AmqpWireFormat.DEFAULT_PRODUCER_CREDIT; + private String transformer = InboundTransformer.TRANSFORMER_NATIVE; + @Override public WireFormat createWireFormat() { - return new AmqpWireFormat(); + AmqpWireFormat wireFormat = new AmqpWireFormat(); + + wireFormat.setMaxFrameSize(getMaxFrameSize()); + wireFormat.setMaxAmqpFrameSize(getMaxAmqpFrameSize()); + wireFormat.setIdleTimeout(getIdelTimeout()); + wireFormat.setProducerCredit(getProducerCredit()); + wireFormat.setTransformer(getTransformer()); + + return wireFormat; + } + + public int getMaxAmqpFrameSize() { + return maxAmqpFrameSize; + } + + public void setMaxAmqpFrameSize(int maxAmqpFrameSize) { + this.maxAmqpFrameSize = maxAmqpFrameSize; + } + + public long getMaxFrameSize() { + return maxFrameSize; + } + + public void setMaxFrameSize(long maxFrameSize) { + this.maxFrameSize = maxFrameSize; + } + + public int getIdelTimeout() { + return idelTimeout; + } + + public void setIdelTimeout(int idelTimeout) { + this.idelTimeout = idelTimeout; + } + + public int getProducerCredit() { + return producerCredit; + } + + public void setProducerCredit(int producerCredit) { + this.producerCredit = producerCredit; + } + + public String getTransformer() { + return transformer; + } + + public void setTransformer(String transformer) { + this.transformer = transformer; } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index d00aec7f2b..9c13e74334 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -61,7 +61,7 @@ public class AmqpConnection extends AmqpAbstractResource implements private static final int DEFAULT_CHANNEL_MAX = 32767; private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); - public static final long DEFAULT_CONNECT_TIMEOUT = 15000; + public static final long DEFAULT_CONNECT_TIMEOUT = 515000; public static final long DEFAULT_CLOSE_TIMEOUT = 30000; private final ScheduledExecutorService serializer; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java index 7599c2524d..7f70d73a94 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java @@ -67,8 +67,8 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { @Override protected String getAdditionalConfig() { - return "&transport.wireFormat.maxAmqpFrameSize=" + maxAmqpFrameSize + - "&transport.maxFrameSize=" + maxFrameSize; + return "&wireFormat.maxAmqpFrameSize=" + maxAmqpFrameSize + + "&wireFormat.maxFrameSize=" + maxFrameSize; } @Test(timeout = 600000) diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java index 4beaa3915f..7298d6257f 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java @@ -23,7 +23,23 @@ import org.apache.activemq.wireformat.WireFormatFactory; * Creates WireFormat objects that marshalls the Stomp protocol. */ public class MQTTWireFormatFactory implements WireFormatFactory { + + private int maxFrameSize = MQTTWireFormat.MAX_MESSAGE_LENGTH; + + @Override public WireFormat createWireFormat() { - return new MQTTWireFormat(); + MQTTWireFormat wireFormat = new MQTTWireFormat(); + + wireFormat.setMaxFrameSize(getMaxFrameSize()); + + return wireFormat; + } + + public int getMaxFrameSize() { + return maxFrameSize; + } + + public void setMaxFrameSize(int maxFrameSize) { + this.maxFrameSize = maxFrameSize; } } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java index 8f5ad2eba2..029de93c21 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java @@ -59,7 +59,7 @@ public class MQTTMaxFrameSizeTest extends MQTTTestSupport { @Override public String getProtocolConfig() { - return "?transport.maxFrameSize=" + maxFrameSize; + return "?wireFormat.maxFrameSize=" + maxFrameSize; } @Test(timeout = 30000) diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java index daa4639839..01debf19e2 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java @@ -44,8 +44,8 @@ public class StompWireFormat implements WireFormat { private static final int MAX_COMMAND_LENGTH = 1024; private static final int MAX_HEADER_LENGTH = 1024 * 10; private static final int MAX_HEADERS = 1000; - private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100; + public static final int MAX_DATA_LENGTH = 1024 * 1024 * 100; public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE; public static final long DEFAULT_CONNECTION_TIMEOUT = 30000; diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java index 60fff9371a..54effa6d96 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java @@ -23,7 +23,33 @@ import org.apache.activemq.wireformat.WireFormatFactory; * Creates WireFormat objects that marshalls the Stomp protocol. */ public class StompWireFormatFactory implements WireFormatFactory { + + private int maxDataLength = StompWireFormat.MAX_DATA_LENGTH; + private long maxFrameSize = StompWireFormat.DEFAULT_MAX_FRAME_SIZE; + + @Override public WireFormat createWireFormat() { - return new StompWireFormat(); + StompWireFormat wireFormat = new StompWireFormat(); + + wireFormat.setMaxDataLength(getMaxDataLength()); + wireFormat.setMaxFrameSize(getMaxFrameSize()); + + return wireFormat; + } + + public int getMaxDataLength() { + return maxDataLength; + } + + public void setMaxDataLength(int maxDataLength) { + this.maxDataLength = maxDataLength; + } + + public long getMaxFrameSize() { + return maxFrameSize; + } + + public void setMaxFrameSize(long maxFrameSize) { + this.maxFrameSize = maxFrameSize; } } diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompMaxFrameSizeTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompMaxFrameSizeTest.java index 92874dbcd0..9d4f2e7e93 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompMaxFrameSizeTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompMaxFrameSizeTest.java @@ -81,7 +81,7 @@ public class StompMaxFrameSizeTest extends StompTestSupport { @Override protected String getAdditionalConfig() { - return "?transport.maxDataLength=" + MAX_DATA_SIZE + "&transport.maxFrameSize=" + maxFrameSize; + return "?wireFormat.maxDataLength=" + MAX_DATA_SIZE + "&wireFormat.maxFrameSize=" + maxFrameSize; } /**