diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java index f4de9501f0..fb7aea4255 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.amqp; +import org.apache.activemq.transport.amqp.message.InboundTransformer; import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormatFactory; @@ -24,8 +25,62 @@ import org.apache.activemq.wireformat.WireFormatFactory; */ public class AmqpWireFormatFactory implements WireFormatFactory { + private long maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE; + private int maxAmqpFrameSize = AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE; + private int idelTimeout = AmqpWireFormat.DEFAULT_IDLE_TIMEOUT; + private int producerCredit = AmqpWireFormat.DEFAULT_PRODUCER_CREDIT; + private String transformer = InboundTransformer.TRANSFORMER_NATIVE; + @Override public WireFormat createWireFormat() { - return new AmqpWireFormat(); + AmqpWireFormat wireFormat = new AmqpWireFormat(); + + wireFormat.setMaxFrameSize(getMaxFrameSize()); + wireFormat.setMaxAmqpFrameSize(getMaxAmqpFrameSize()); + wireFormat.setIdleTimeout(getIdelTimeout()); + wireFormat.setProducerCredit(getProducerCredit()); + wireFormat.setTransformer(getTransformer()); + + return wireFormat; + } + + public int getMaxAmqpFrameSize() { + return maxAmqpFrameSize; + } + + public void setMaxAmqpFrameSize(int maxAmqpFrameSize) { + this.maxAmqpFrameSize = maxAmqpFrameSize; + } + + public long getMaxFrameSize() { + return maxFrameSize; + } + + public void setMaxFrameSize(long maxFrameSize) { + this.maxFrameSize = maxFrameSize; + } + + public int getIdelTimeout() { + return idelTimeout; + } + + public void setIdelTimeout(int idelTimeout) { + this.idelTimeout = idelTimeout; + } + + public int getProducerCredit() { + return producerCredit; + } + + public void setProducerCredit(int producerCredit) { + this.producerCredit = producerCredit; + } + + public String getTransformer() { + return transformer; + } + + public void setTransformer(String transformer) { + this.transformer = transformer; } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index d00aec7f2b..9c13e74334 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -61,7 +61,7 @@ public class AmqpConnection extends AmqpAbstractResource implements private static final int DEFAULT_CHANNEL_MAX = 32767; private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); - public static final long DEFAULT_CONNECT_TIMEOUT = 15000; + public static final long DEFAULT_CONNECT_TIMEOUT = 515000; public static final long DEFAULT_CLOSE_TIMEOUT = 30000; private final ScheduledExecutorService serializer; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java index 7599c2524d..7f70d73a94 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java @@ -67,8 +67,8 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { @Override protected String getAdditionalConfig() { - return "&transport.wireFormat.maxAmqpFrameSize=" + maxAmqpFrameSize + - "&transport.maxFrameSize=" + maxFrameSize; + return "&wireFormat.maxAmqpFrameSize=" + maxAmqpFrameSize + + "&wireFormat.maxFrameSize=" + maxFrameSize; } @Test(timeout = 600000) diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java index 4beaa3915f..7298d6257f 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormatFactory.java @@ -23,7 +23,23 @@ import org.apache.activemq.wireformat.WireFormatFactory; * Creates WireFormat objects that marshalls the Stomp protocol. */ public class MQTTWireFormatFactory implements WireFormatFactory { + + private int maxFrameSize = MQTTWireFormat.MAX_MESSAGE_LENGTH; + + @Override public WireFormat createWireFormat() { - return new MQTTWireFormat(); + MQTTWireFormat wireFormat = new MQTTWireFormat(); + + wireFormat.setMaxFrameSize(getMaxFrameSize()); + + return wireFormat; + } + + public int getMaxFrameSize() { + return maxFrameSize; + } + + public void setMaxFrameSize(int maxFrameSize) { + this.maxFrameSize = maxFrameSize; } } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java index 8f5ad2eba2..029de93c21 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java @@ -59,7 +59,7 @@ public class MQTTMaxFrameSizeTest extends MQTTTestSupport { @Override public String getProtocolConfig() { - return "?transport.maxFrameSize=" + maxFrameSize; + return "?wireFormat.maxFrameSize=" + maxFrameSize; } @Test(timeout = 30000) diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java index daa4639839..01debf19e2 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java @@ -44,8 +44,8 @@ public class StompWireFormat implements WireFormat { private static final int MAX_COMMAND_LENGTH = 1024; private static final int MAX_HEADER_LENGTH = 1024 * 10; private static final int MAX_HEADERS = 1000; - private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100; + public static final int MAX_DATA_LENGTH = 1024 * 1024 * 100; public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE; public static final long DEFAULT_CONNECTION_TIMEOUT = 30000; diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java index 60fff9371a..54effa6d96 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java @@ -23,7 +23,33 @@ import org.apache.activemq.wireformat.WireFormatFactory; * Creates WireFormat objects that marshalls the Stomp protocol. */ public class StompWireFormatFactory implements WireFormatFactory { + + private int maxDataLength = StompWireFormat.MAX_DATA_LENGTH; + private long maxFrameSize = StompWireFormat.DEFAULT_MAX_FRAME_SIZE; + + @Override public WireFormat createWireFormat() { - return new StompWireFormat(); + StompWireFormat wireFormat = new StompWireFormat(); + + wireFormat.setMaxDataLength(getMaxDataLength()); + wireFormat.setMaxFrameSize(getMaxFrameSize()); + + return wireFormat; + } + + public int getMaxDataLength() { + return maxDataLength; + } + + public void setMaxDataLength(int maxDataLength) { + this.maxDataLength = maxDataLength; + } + + public long getMaxFrameSize() { + return maxFrameSize; + } + + public void setMaxFrameSize(long maxFrameSize) { + this.maxFrameSize = maxFrameSize; } } diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompMaxFrameSizeTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompMaxFrameSizeTest.java index 92874dbcd0..9d4f2e7e93 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompMaxFrameSizeTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompMaxFrameSizeTest.java @@ -81,7 +81,7 @@ public class StompMaxFrameSizeTest extends StompTestSupport { @Override protected String getAdditionalConfig() { - return "?transport.maxDataLength=" + MAX_DATA_SIZE + "&transport.maxFrameSize=" + maxFrameSize; + return "?wireFormat.maxDataLength=" + MAX_DATA_SIZE + "&wireFormat.maxFrameSize=" + maxFrameSize; } /**