Up the max frame size to a value of 1mb so that we are restricted by the
default in the QPid client which is 32k.
This commit is contained in:
Timothy Bish 2014-01-08 11:27:39 -05:00
parent 0f0c0d676a
commit 283cdd0502
3 changed files with 46 additions and 20 deletions

View File

@ -126,6 +126,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
public AmqpProtocolConverter(AmqpTransport transport) {
this.amqpTransport = transport;
int maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE;
// AMQ-4914 - Setting the max frame size to large stalls out the QPid client on sends or
// consume due to no session credit. Once fixed we should set this value using
// the configured maxFrameSize on the URI.
//int maxFrameSize = transport.getWireFormat().getMaxFrameSize() > Integer.MAX_VALUE ?
// Integer.MAX_VALUE : (int) transport.getWireFormat().getMaxFrameSize();
this.protonTransport.setMaxFrameSize(maxFrameSize);
this.protonTransport.bind(this.protonConnection);
updateTracer();
}

View File

@ -16,22 +16,26 @@
*/
package org.apache.activemq.transport.amqp;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.Buffer;
import java.io.*;
/**
*/
public class AmqpWireFormat implements WireFormat {
public static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1;
private int version = 1;
private long maxFrameSize = 1024*1024*100;
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
@Override
public ByteSequence marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
@ -40,27 +44,31 @@ public class AmqpWireFormat implements WireFormat {
return baos.toByteSequence();
}
@Override
public Object unmarshal(ByteSequence packet) throws IOException {
ByteArrayInputStream stream = new ByteArrayInputStream(packet);
DataInputStream dis = new DataInputStream(stream);
return unmarshal(dis);
}
@Override
public void marshal(Object command, DataOutput dataOut) throws IOException {
Buffer frame = (Buffer) command;
frame.writeTo(dataOut);
}
boolean magicRead = false;
@Override
public Object unmarshal(DataInput dataIn) throws IOException {
if( !magicRead ) {
if (!magicRead) {
Buffer magic = new Buffer(8);
magic.readFrom(dataIn);
magicRead = true;
return new AmqpHeader(magic);
} else {
int size = dataIn.readInt();
if( size > maxFrameSize) {
if (size > maxFrameSize) {
throw new AmqpProtocolException("Frame size exceeded max frame length.");
}
Buffer frame = new Buffer(size);
@ -71,8 +79,7 @@ public class AmqpWireFormat implements WireFormat {
}
}
/**
*/
@Override
public void setVersion(int version) {
this.version = version;
}
@ -80,11 +87,11 @@ public class AmqpWireFormat implements WireFormat {
/**
* @return the version of the wire format
*/
@Override
public int getVersion() {
return this.version;
}
public long getMaxFrameSize() {
return maxFrameSize;
}

View File

@ -16,15 +16,9 @@
*/
package org.apache.activemq.transport.amqp.bugs;
import org.apache.activemq.transport.amqp.AmqpTestSupport;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
@ -35,7 +29,15 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import static org.junit.Assert.*;
import org.apache.activemq.transport.amqp.AmqpTestSupport;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQ4914Test extends AmqpTestSupport {
@Rule
@ -67,6 +69,13 @@ public class AMQ4914Test extends AmqpTestSupport {
}
}
@Test(timeout = 2 * 60 * 1000)
public void testSendFixedSizedMessages() throws JMSException {
doTestSendLargeMessage(65536);
doTestSendLargeMessage(65536 * 2);
doTestSendLargeMessage(65536 * 4);
}
@Ignore("AMQ-4914")
@Test(timeout = 2 * 60 * 1000)
public void testSendLargeMessages() throws JMSException {