Add better support for maxFrameSize configuration that fits with the
defaults configuration for the broker by allowing the maxFrameSize to be
configured via ?wireFormat.maxFrameSize on STOMP, AMQP, and MQTT.
Previously we only supported transport.wireFormat.maxFrameSize.
This commit is contained in:
Timothy Bish 2015-06-02 15:25:11 -04:00
parent 35b82e5f93
commit f37b005acf
8 changed files with 106 additions and 9 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.transport.amqp.message.InboundTransformer;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
@ -24,8 +25,62 @@ import org.apache.activemq.wireformat.WireFormatFactory;
*/
public class AmqpWireFormatFactory implements WireFormatFactory {
private long maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE;
private int maxAmqpFrameSize = AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE;
private int idelTimeout = AmqpWireFormat.DEFAULT_IDLE_TIMEOUT;
private int producerCredit = AmqpWireFormat.DEFAULT_PRODUCER_CREDIT;
private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
@Override
public WireFormat createWireFormat() {
return new AmqpWireFormat();
AmqpWireFormat wireFormat = new AmqpWireFormat();
wireFormat.setMaxFrameSize(getMaxFrameSize());
wireFormat.setMaxAmqpFrameSize(getMaxAmqpFrameSize());
wireFormat.setIdleTimeout(getIdelTimeout());
wireFormat.setProducerCredit(getProducerCredit());
wireFormat.setTransformer(getTransformer());
return wireFormat;
}
public int getMaxAmqpFrameSize() {
return maxAmqpFrameSize;
}
public void setMaxAmqpFrameSize(int maxAmqpFrameSize) {
this.maxAmqpFrameSize = maxAmqpFrameSize;
}
public long getMaxFrameSize() {
return maxFrameSize;
}
public void setMaxFrameSize(long maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
public int getIdelTimeout() {
return idelTimeout;
}
public void setIdelTimeout(int idelTimeout) {
this.idelTimeout = idelTimeout;
}
public int getProducerCredit() {
return producerCredit;
}
public void setProducerCredit(int producerCredit) {
this.producerCredit = producerCredit;
}
public String getTransformer() {
return transformer;
}
public void setTransformer(String transformer) {
this.transformer = transformer;
}
}

View File

@ -61,7 +61,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private static final int DEFAULT_CHANNEL_MAX = 32767;
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
public static final long DEFAULT_CONNECT_TIMEOUT = 15000;
public static final long DEFAULT_CONNECT_TIMEOUT = 515000;
public static final long DEFAULT_CLOSE_TIMEOUT = 30000;
private final ScheduledExecutorService serializer;

View File

@ -67,8 +67,8 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
@Override
protected String getAdditionalConfig() {
return "&transport.wireFormat.maxAmqpFrameSize=" + maxAmqpFrameSize +
"&transport.maxFrameSize=" + maxFrameSize;
return "&wireFormat.maxAmqpFrameSize=" + maxAmqpFrameSize +
"&wireFormat.maxFrameSize=" + maxFrameSize;
}
@Test(timeout = 600000)

View File

@ -23,7 +23,23 @@ import org.apache.activemq.wireformat.WireFormatFactory;
* Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
public class MQTTWireFormatFactory implements WireFormatFactory {
private int maxFrameSize = MQTTWireFormat.MAX_MESSAGE_LENGTH;
@Override
public WireFormat createWireFormat() {
return new MQTTWireFormat();
MQTTWireFormat wireFormat = new MQTTWireFormat();
wireFormat.setMaxFrameSize(getMaxFrameSize());
return wireFormat;
}
public int getMaxFrameSize() {
return maxFrameSize;
}
public void setMaxFrameSize(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
}

View File

@ -59,7 +59,7 @@ public class MQTTMaxFrameSizeTest extends MQTTTestSupport {
@Override
public String getProtocolConfig() {
return "?transport.maxFrameSize=" + maxFrameSize;
return "?wireFormat.maxFrameSize=" + maxFrameSize;
}
@Test(timeout = 30000)

View File

@ -44,8 +44,8 @@ public class StompWireFormat implements WireFormat {
private static final int MAX_COMMAND_LENGTH = 1024;
private static final int MAX_HEADER_LENGTH = 1024 * 10;
private static final int MAX_HEADERS = 1000;
private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
public static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
public static final long DEFAULT_CONNECTION_TIMEOUT = 30000;

View File

@ -23,7 +23,33 @@ import org.apache.activemq.wireformat.WireFormatFactory;
* Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
public class StompWireFormatFactory implements WireFormatFactory {
private int maxDataLength = StompWireFormat.MAX_DATA_LENGTH;
private long maxFrameSize = StompWireFormat.DEFAULT_MAX_FRAME_SIZE;
@Override
public WireFormat createWireFormat() {
return new StompWireFormat();
StompWireFormat wireFormat = new StompWireFormat();
wireFormat.setMaxDataLength(getMaxDataLength());
wireFormat.setMaxFrameSize(getMaxFrameSize());
return wireFormat;
}
public int getMaxDataLength() {
return maxDataLength;
}
public void setMaxDataLength(int maxDataLength) {
this.maxDataLength = maxDataLength;
}
public long getMaxFrameSize() {
return maxFrameSize;
}
public void setMaxFrameSize(long maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
}

View File

@ -81,7 +81,7 @@ public class StompMaxFrameSizeTest extends StompTestSupport {
@Override
protected String getAdditionalConfig() {
return "?transport.maxDataLength=" + MAX_DATA_SIZE + "&transport.maxFrameSize=" + maxFrameSize;
return "?wireFormat.maxDataLength=" + MAX_DATA_SIZE + "&wireFormat.maxFrameSize=" + maxFrameSize;
}
/**