diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java index 5eefbb200b..09478d7f6e 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java @@ -31,6 +31,7 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { final private AmqpTransport transport; private int prefetch = DEFAULT_PREFETCH; + private int producerCredit = DEFAULT_PREFETCH; interface Discriminator { boolean matches(AmqpHeader header); @@ -85,6 +86,7 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { } IAmqpProtocolConverter next = match.create(transport); next.setPrefetch(prefetch); + next.setProducerCredit(producerCredit); transport.setProtocolConverter(next); for (Command send : pendingCommands) { next.onActiveMQCommand(send); @@ -113,4 +115,9 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { public void setPrefetch(int prefetch) { this.prefetch = prefetch; } + + @Override + public void setProducerCredit(int producerCredit) { + this.producerCredit = producerCredit; + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 80b47cc265..d9bfaa3bf4 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -30,31 +30,7 @@ import javax.jms.Destination; import javax.jms.InvalidClientIDException; import javax.jms.InvalidSelectorException; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQTempQueue; -import org.apache.activemq.command.ActiveMQTempTopic; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.ConnectionError; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.DestinationInfo; -import org.apache.activemq.command.ExceptionResponse; -import org.apache.activemq.command.LocalTransactionId; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.RemoveInfo; -import org.apache.activemq.command.RemoveSubscriptionInfo; -import org.apache.activemq.command.Response; -import org.apache.activemq.command.SessionId; -import org.apache.activemq.command.SessionInfo; -import org.apache.activemq.command.ShutdownInfo; -import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.*; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IdGenerator; @@ -122,6 +98,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED"); protected int prefetch; + protected int producerCredit; protected Transport protonTransport = Proton.transport(); protected Connection protonConnection = Proton.connection(); protected Collector eventCollector = new CollectorImpl(); @@ -296,8 +273,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { processLinkEvent(event.getLink()); break; case LINK_FLOW: - Link link = event.getLink(); - ((AmqpDeliveryListener) link.getContext()).drainCheck(); + processLinkFlow(event.getLink()); break; case DELIVERY: processDelivery(event.getDelivery()); @@ -317,6 +293,25 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } } + protected void processLinkFlow(Link link) throws Exception { + Object context = link.getContext(); + int credit = link.getRemoteCredit(); + if (context != null && context instanceof ConsumerContext) { + ConsumerContext consumerContext = (ConsumerContext)context; + // change ActiveMQ consumer prefetch if needed + if (consumerContext.credit == 0 && consumerContext.consumerPrefetch != credit && credit > 0) { + ConsumerControl control = new ConsumerControl(); + control.setConsumerId(consumerContext.consumerId); + control.setDestination(consumerContext.destination); + control.setPrefetch(credit); + consumerContext.consumerPrefetch = credit; + sendToActiveMQ(control, null); + } + consumerContext.credit = credit; + } + ((AmqpDeliveryListener) link.getContext()).drainCheck(); + } + protected void processConnectionEvent(Connection connection) throws Exception { EndpointState remoteState = connection.getRemoteState(); if (remoteState == EndpointState.ACTIVE) { @@ -828,7 +823,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) { // Client is producing to this receiver object org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget(); - int flow = prefetch; + int flow = producerCredit; // use client's preference if set if (receiver.getRemoteCredit() != 0) { flow = receiver.getRemoteCredit(); @@ -923,6 +918,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private boolean closed; public ConsumerInfo info; private boolean endOfBrowse = false; + public ActiveMQDestination destination; + public int credit; + public int consumerPrefetch; protected LinkedList dispatchedInTx = new LinkedList(); @@ -1317,12 +1315,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setSelector(selector); consumerInfo.setNoRangeAcks(true); consumerInfo.setDestination(dest); - // use client's preference if set - if (sender.getRemoteCredit() != 0) { - consumerInfo.setPrefetchSize(sender.getRemoteCredit()); - } else { - consumerInfo.setPrefetchSize(prefetch); - } + consumerContext.destination = dest; + consumerInfo.setPrefetchSize(sender.getRemoteCredit()); + consumerContext.credit = sender.getRemoteCredit(); + consumerContext.consumerPrefetch = consumerInfo.getPrefetchSize(); consumerInfo.setDispatchAsync(true); if (source.getDistributionMode() == COPY && dest.isQueue()) { consumerInfo.setBrowser(true); @@ -1441,4 +1437,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { public void setPrefetch(int prefetch) { this.prefetch = prefetch; } + + @Override + public void setProducerCredit(int producerCredit) { + this.producerCredit = producerCredit; + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java index 41256c608c..ec63ae7e5b 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java @@ -180,4 +180,8 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor public void setPrefetch(int prefetch) { protocolConverter.setPrefetch(prefetch); } + + public void setProducerCredit(int producerCredit) { + protocolConverter.setProducerCredit(producerCredit); + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java index e3621bf662..3e365aed1d 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java @@ -33,4 +33,6 @@ public interface IAmqpProtocolConverter { void updateTracer(); void setPrefetch(int prefetch); + + void setProducerCredit(int producerCredit); }