diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java index 58da746af4..5eefbb200b 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java @@ -27,7 +27,10 @@ import org.apache.activemq.command.Command; */ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { + private static final int DEFAULT_PREFETCH = 100; + final private AmqpTransport transport; + private int prefetch = DEFAULT_PREFETCH; interface Discriminator { boolean matches(AmqpHeader header); @@ -81,6 +84,7 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { match = DISCRIMINATORS.get(0); } IAmqpProtocolConverter next = match.create(transport); + next.setPrefetch(prefetch); transport.setProtocolConverter(next); for (Command send : pendingCommands) { next.onActiveMQCommand(send); @@ -104,4 +108,9 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { @Override public void updateTracer() { } + + @Override + public void setPrefetch(int prefetch) { + this.prefetch = prefetch; + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 4951d1399f..fb275b8214 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -117,7 +117,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private static final Symbol NO_LOCAL = Symbol.valueOf("no-local"); private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED"); - protected int prefetch = 100; + protected int prefetch; protected Transport protonTransport = Proton.transport(); protected Connection protonConnection = Proton.connection(); protected Collector eventCollector = new CollectorImpl(); @@ -780,11 +780,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) { // Client is producing to this receiver object org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget(); + int flow = prefetch; + // use client's preference if set + if (receiver.getRemoteCredit() != 0) { + flow = receiver.getRemoteCredit(); + } try { if (remoteTarget instanceof Coordinator) { pumpProtonToSocket(); receiver.setContext(coordinatorContext); - receiver.flow(prefetch); + receiver.flow(flow); receiver.open(); pumpProtonToSocket(); } else { @@ -804,7 +809,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { ProducerContext producerContext = new ProducerContext(producerId, dest); receiver.setContext(producerContext); - receiver.flow(prefetch); + receiver.flow(flow); ProducerInfo producerInfo = new ProducerInfo(producerId); producerInfo.setDestination(dest); sendToActiveMQ(producerInfo, new ResponseHandler() { @@ -1258,7 +1263,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setSelector(selector); consumerInfo.setNoRangeAcks(true); consumerInfo.setDestination(dest); - consumerInfo.setPrefetchSize(100); + // use client's preference if set + if (sender.getRemoteCredit() != 0) { + consumerInfo.setPrefetchSize(sender.getRemoteCredit()); + } else { + consumerInfo.setPrefetchSize(prefetch); + } consumerInfo.setDispatchAsync(true); if (source.getDistributionMode() == COPY && dest.isQueue()) { consumerInfo.setBrowser(true); @@ -1372,4 +1382,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { condition.setDescription(description); return condition; } + + public void setPrefetch(int prefetch) { + this.prefetch = prefetch; + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java index 0f0badb482..41256c608c 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java @@ -176,4 +176,8 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) { this.protocolConverter = protocolConverter; } + + public void setPrefetch(int prefetch) { + protocolConverter.setPrefetch(prefetch); + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java index d4eb64800c..e3621bf662 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java @@ -31,4 +31,6 @@ public interface IAmqpProtocolConverter { void onActiveMQCommand(Command command) throws Exception; void updateTracer(); + + void setPrefetch(int prefetch); }