This commit is contained in:
Clebert Suconic 2020-04-13 16:57:06 -04:00
commit 89634e3660
1 changed files with 8 additions and 11 deletions

View File

@ -68,7 +68,7 @@ public class AMQConsumer {
private int prefetchSize; private int prefetchSize;
private final AtomicInteger currentWindow; private final AtomicInteger currentWindow;
private final AtomicInteger deliveredAcks; private int deliveredAcks;
private long messagePullSequence = 0; private long messagePullSequence = 0;
private final AtomicReference<MessagePullHandler> messagePullHandler = new AtomicReference<>(null); private final AtomicReference<MessagePullHandler> messagePullHandler = new AtomicReference<>(null);
//internal means we don't expose //internal means we don't expose
@ -88,7 +88,7 @@ public class AMQConsumer {
this.scheduledPool = scheduledPool; this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize(); this.prefetchSize = info.getPrefetchSize();
this.currentWindow = new AtomicInteger(prefetchSize); this.currentWindow = new AtomicInteger(prefetchSize);
this.deliveredAcks = new AtomicInteger(0); this.deliveredAcks = 0;
if (prefetchSize == 0) { if (prefetchSize == 0) {
messagePullHandler.set(new MessagePullHandler()); messagePullHandler.set(new MessagePullHandler());
} }
@ -300,18 +300,15 @@ public class AMQConsumer {
List<MessageReference> ackList = serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, first, last); List<MessageReference> ackList = serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, first, last);
if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() || ack.isPoisonAck())) { if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() || ack.isPoisonAck())) {
this.deliveredAcks.getAndUpdate(deliveredAcks -> { if (deliveredAcks < ackList.size()) {
if (deliveredAcks >= ackList.size()) {
return deliveredAcks - ackList.size();
}
acquireCredit(ackList.size() - deliveredAcks); acquireCredit(ackList.size() - deliveredAcks);
deliveredAcks = 0;
return 0; } else {
}); deliveredAcks -= ackList.size();
}
} else { } else {
if (ack.isDeliveredAck()) { if (ack.isDeliveredAck()) {
this.deliveredAcks.addAndGet(ack.getMessageCount()); this.deliveredAcks += ack.getMessageCount();
} }
acquireCredit(ack.getMessageCount()); acquireCredit(ack.getMessageCount());