diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java index b484500a2c..f5b457b02b 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java @@ -33,7 +33,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { private final AmqpTransport transport; private final BrokerService brokerService; - private int prefetch = 0; private int producerCredit = DEFAULT_PREFETCH; interface Discriminator { @@ -90,7 +89,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { } IAmqpProtocolConverter next = match.create(transport, brokerService); - next.setPrefetch(prefetch); next.setProducerCredit(producerCredit); transport.setProtocolConverter(next); for (Command send : pendingCommands) { @@ -116,11 +114,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { public void updateTracer() { } - @Override - public void setPrefetch(int prefetch) { - this.prefetch = prefetch; - } - @Override public void setProducerCredit(int producerCredit) { this.producerCredit = producerCredit; diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 39c8c2bba8..3661f3db5e 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -155,7 +155,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private final BrokerService brokerService; private AuthenticationBroker authenticator; - protected int prefetch; protected int producerCredit; protected Transport protonTransport = Proton.transport(); protected Connection protonConnection = Proton.connection(); @@ -410,17 +409,15 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { int credit = link.getCredit(); if (context instanceof ConsumerContext) { ConsumerContext consumerContext = (ConsumerContext)context; - // change consumer prefetch if it's not been already set using - // transport connector property or consumer preference - if (consumerContext.consumerPrefetch == 0 && credit > 0) { + + if (credit != consumerContext.credit) { + consumerContext.credit = credit >= 0 ? credit : 0; ConsumerControl control = new ConsumerControl(); control.setConsumerId(consumerContext.consumerId); control.setDestination(consumerContext.destination); - control.setPrefetch(credit); - consumerContext.consumerPrefetch = credit; + control.setPrefetch(consumerContext.credit); sendToActiveMQ(control, null); } - consumerContext.credit = credit; } ((AmqpDeliveryListener) link.getContext()).drainCheck(); } @@ -1061,7 +1058,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { public ConsumerInfo info; private boolean endOfBrowse = false; public int credit; - public int consumerPrefetch = 0; private long lastDeliveredSequenceId; protected LinkedList dispatchedInTx = new LinkedList(); @@ -1481,33 +1477,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { destination = createDestination(source); } + int senderCredit = sender.getRemoteCredit(); + subscriptionsByConsumerId.put(id, consumerContext); ConsumerInfo consumerInfo = new ConsumerInfo(id); - consumerContext.info = consumerInfo; consumerInfo.setSelector(selector); consumerInfo.setNoRangeAcks(true); consumerInfo.setDestination(destination); - consumerContext.setDestination(destination); - int senderCredit = sender.getRemoteCredit(); - if (prefetch != 0) { - // use the value configured on the transport connector - // this value will not be changed to the consumer's preference - consumerInfo.setPrefetchSize(prefetch); - consumerContext.consumerPrefetch = prefetch; - } else { - if (senderCredit != 0) { - // set the prefetch to the value of the remote credit - // and ignore the later changes - consumerInfo.setPrefetchSize(senderCredit); - consumerContext.consumerPrefetch = senderCredit; - } else { - // set zero value for now and change to the consumer's preference - // on the first flow packet - consumerInfo.setPrefetchSize(0); - } - } - consumerContext.credit = senderCredit; + consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0); consumerInfo.setDispatchAsync(true); + if (source.getDistributionMode() == COPY && destination.isQueue()) { consumerInfo.setBrowser(true); } @@ -1521,6 +1500,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setNoLocal(true); } + consumerContext.info = consumerInfo; + consumerContext.setDestination(destination); + consumerContext.credit = senderCredit; + sendToActiveMQ(consumerInfo, new ResponseHandler() { @Override public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException { @@ -1656,11 +1639,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } } - @Override - public void setPrefetch(int prefetch) { - this.prefetch = prefetch; - } - @Override public void setProducerCredit(int producerCredit) { this.producerCredit = producerCredit; diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java index fb7542b512..5dfdf75815 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java @@ -187,8 +187,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor this.protocolConverter = protocolConverter; } + /** + * @deprecated AMQP receiver configures it's prefetch via flow, remove on next release. + */ + @Deprecated public void setPrefetch(int prefetch) { - protocolConverter.setPrefetch(prefetch); } public void setProducerCredit(int producerCredit) { diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java index 3e365aed1d..8296ef2979 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java @@ -32,7 +32,5 @@ public interface IAmqpProtocolConverter { void updateTracer(); - void setPrefetch(int prefetch); - void setProducerCredit(int producerCredit); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index 12458118e0..1bc3d6618f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -141,7 +141,6 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { connection.close(); } - @Ignore("Fails due to issues with accept and no credit") @Test(timeout = 60000) public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception { int MSG_COUNT = 4;