Always honor the link credit as true prefetch value for the
subscription.  Enables previously failing test to verify.
This commit is contained in:
Timothy Bish 2015-03-13 18:20:26 -04:00
parent 72839b78a7
commit 6a6ef45ee0
5 changed files with 16 additions and 45 deletions

View File

@ -33,7 +33,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
private final AmqpTransport transport; private final AmqpTransport transport;
private final BrokerService brokerService; private final BrokerService brokerService;
private int prefetch = 0;
private int producerCredit = DEFAULT_PREFETCH; private int producerCredit = DEFAULT_PREFETCH;
interface Discriminator { interface Discriminator {
@ -90,7 +89,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
} }
IAmqpProtocolConverter next = match.create(transport, brokerService); IAmqpProtocolConverter next = match.create(transport, brokerService);
next.setPrefetch(prefetch);
next.setProducerCredit(producerCredit); next.setProducerCredit(producerCredit);
transport.setProtocolConverter(next); transport.setProtocolConverter(next);
for (Command send : pendingCommands) { for (Command send : pendingCommands) {
@ -116,11 +114,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
public void updateTracer() { public void updateTracer() {
} }
@Override
public void setPrefetch(int prefetch) {
this.prefetch = prefetch;
}
@Override @Override
public void setProducerCredit(int producerCredit) { public void setProducerCredit(int producerCredit) {
this.producerCredit = producerCredit; this.producerCredit = producerCredit;

View File

@ -155,7 +155,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private final BrokerService brokerService; private final BrokerService brokerService;
private AuthenticationBroker authenticator; private AuthenticationBroker authenticator;
protected int prefetch;
protected int producerCredit; protected int producerCredit;
protected Transport protonTransport = Proton.transport(); protected Transport protonTransport = Proton.transport();
protected Connection protonConnection = Proton.connection(); protected Connection protonConnection = Proton.connection();
@ -410,17 +409,15 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
int credit = link.getCredit(); int credit = link.getCredit();
if (context instanceof ConsumerContext) { if (context instanceof ConsumerContext) {
ConsumerContext consumerContext = (ConsumerContext)context; ConsumerContext consumerContext = (ConsumerContext)context;
// change consumer prefetch if it's not been already set using
// transport connector property or consumer preference if (credit != consumerContext.credit) {
if (consumerContext.consumerPrefetch == 0 && credit > 0) { consumerContext.credit = credit >= 0 ? credit : 0;
ConsumerControl control = new ConsumerControl(); ConsumerControl control = new ConsumerControl();
control.setConsumerId(consumerContext.consumerId); control.setConsumerId(consumerContext.consumerId);
control.setDestination(consumerContext.destination); control.setDestination(consumerContext.destination);
control.setPrefetch(credit); control.setPrefetch(consumerContext.credit);
consumerContext.consumerPrefetch = credit;
sendToActiveMQ(control, null); sendToActiveMQ(control, null);
} }
consumerContext.credit = credit;
} }
((AmqpDeliveryListener) link.getContext()).drainCheck(); ((AmqpDeliveryListener) link.getContext()).drainCheck();
} }
@ -1061,7 +1058,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
public ConsumerInfo info; public ConsumerInfo info;
private boolean endOfBrowse = false; private boolean endOfBrowse = false;
public int credit; public int credit;
public int consumerPrefetch = 0;
private long lastDeliveredSequenceId; private long lastDeliveredSequenceId;
protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>(); protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
@ -1481,33 +1477,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
destination = createDestination(source); destination = createDestination(source);
} }
int senderCredit = sender.getRemoteCredit();
subscriptionsByConsumerId.put(id, consumerContext); subscriptionsByConsumerId.put(id, consumerContext);
ConsumerInfo consumerInfo = new ConsumerInfo(id); ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerContext.info = consumerInfo;
consumerInfo.setSelector(selector); consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true); consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(destination); consumerInfo.setDestination(destination);
consumerContext.setDestination(destination); consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
int senderCredit = sender.getRemoteCredit();
if (prefetch != 0) {
// use the value configured on the transport connector
// this value will not be changed to the consumer's preference
consumerInfo.setPrefetchSize(prefetch);
consumerContext.consumerPrefetch = prefetch;
} else {
if (senderCredit != 0) {
// set the prefetch to the value of the remote credit
// and ignore the later changes
consumerInfo.setPrefetchSize(senderCredit);
consumerContext.consumerPrefetch = senderCredit;
} else {
// set zero value for now and change to the consumer's preference
// on the first flow packet
consumerInfo.setPrefetchSize(0);
}
}
consumerContext.credit = senderCredit;
consumerInfo.setDispatchAsync(true); consumerInfo.setDispatchAsync(true);
if (source.getDistributionMode() == COPY && destination.isQueue()) { if (source.getDistributionMode() == COPY && destination.isQueue()) {
consumerInfo.setBrowser(true); consumerInfo.setBrowser(true);
} }
@ -1521,6 +1500,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
consumerInfo.setNoLocal(true); consumerInfo.setNoLocal(true);
} }
consumerContext.info = consumerInfo;
consumerContext.setDestination(destination);
consumerContext.credit = senderCredit;
sendToActiveMQ(consumerInfo, new ResponseHandler() { sendToActiveMQ(consumerInfo, new ResponseHandler() {
@Override @Override
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException { public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
@ -1656,11 +1639,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} }
} }
@Override
public void setPrefetch(int prefetch) {
this.prefetch = prefetch;
}
@Override @Override
public void setProducerCredit(int producerCredit) { public void setProducerCredit(int producerCredit) {
this.producerCredit = producerCredit; this.producerCredit = producerCredit;

View File

@ -187,8 +187,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
this.protocolConverter = protocolConverter; this.protocolConverter = protocolConverter;
} }
/**
* @deprecated AMQP receiver configures it's prefetch via flow, remove on next release.
*/
@Deprecated
public void setPrefetch(int prefetch) { public void setPrefetch(int prefetch) {
protocolConverter.setPrefetch(prefetch);
} }
public void setProducerCredit(int producerCredit) { public void setProducerCredit(int producerCredit) {

View File

@ -32,7 +32,5 @@ public interface IAmqpProtocolConverter {
void updateTracer(); void updateTracer();
void setPrefetch(int prefetch);
void setProducerCredit(int producerCredit); void setProducerCredit(int producerCredit);
} }

View File

@ -141,7 +141,6 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
@Ignore("Fails due to issues with accept and no credit")
@Test(timeout = 60000) @Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception { public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception {
int MSG_COUNT = 4; int MSG_COUNT = 4;