ARTEMIS-3649 Fix zero prefetch OpenWire consumers

This commit is contained in:
Domenico Francesco Bruscino 2022-01-17 14:11:55 +01:00 committed by Gary Tully
parent fc357213d2
commit 9c01f9b983
2 changed files with 17 additions and 13 deletions

View File

@ -232,11 +232,24 @@ public class AMQConsumer {
return info.getConsumerId();
}
public void acquireCredit(int n) {
public void acquireCredit(int n, boolean delivered) {
if (messagePullHandler.get() != null) {
//don't acquire any credits when the pull handler controls it!!
return;
}
if (delivered) {
deliveredAcksCreditExtension += n;
} else if (deliveredAcksCreditExtension > 0) {
if (deliveredAcksCreditExtension < n) {
n -= deliveredAcksCreditExtension;
deliveredAcksCreditExtension = 0;
} else {
deliveredAcksCreditExtension -= n;
return;
}
}
int oldwindow = currentWindow.getAndAdd(n);
boolean promptDelivery = oldwindow < prefetchSize;
@ -294,8 +307,7 @@ public class AMQConsumer {
final int ackMessageCount = ack.getMessageCount();
if (ack.isDeliveredAck()) {
acquireCredit(ackMessageCount);
deliveredAcksCreditExtension += ackMessageCount;
acquireCredit(ackMessageCount, true);
// our work is done
return;
}
@ -310,15 +322,7 @@ public class AMQConsumer {
if (!ackList.isEmpty() || !removeReferences || serverConsumer.getQueue().isTemporary()) {
// valid match in delivered or browsing or temp - deal with credit
acquireCredit(ackMessageCount);
// some sort of real ack, rebalance deliveredAcksCreditExtension
if (deliveredAcksCreditExtension > 0) {
deliveredAcksCreditExtension -= ackMessageCount;
if (deliveredAcksCreditExtension >= 0) {
currentWindow.addAndGet(-ackMessageCount);
}
}
acquireCredit(ackMessageCount, false);
if (ack.isExpiredAck()) {
for (MessageReference ref : ackList) {

View File

@ -94,7 +94,7 @@ public class AMQConsumerTest {
Assert.assertFalse(consumer.hasCredits());
consumer.acquireCredit(1);
consumer.acquireCredit(1, true);
Assert.assertTrue(consumer.hasCredits());
}