diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index eb4ce407e1..cb9c74b121 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -68,7 +68,7 @@ public class AMQConsumer { private int prefetchSize; private final AtomicInteger currentWindow; - private final AtomicInteger deliveredAcks; + private int deliveredAcks; private long messagePullSequence = 0; private final AtomicReference messagePullHandler = new AtomicReference<>(null); //internal means we don't expose @@ -88,7 +88,7 @@ public class AMQConsumer { this.scheduledPool = scheduledPool; this.prefetchSize = info.getPrefetchSize(); this.currentWindow = new AtomicInteger(prefetchSize); - this.deliveredAcks = new AtomicInteger(0); + this.deliveredAcks = 0; if (prefetchSize == 0) { messagePullHandler.set(new MessagePullHandler()); } @@ -300,18 +300,15 @@ public class AMQConsumer { List ackList = serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, first, last); if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() || ack.isPoisonAck())) { - this.deliveredAcks.getAndUpdate(deliveredAcks -> { - if (deliveredAcks >= ackList.size()) { - return deliveredAcks - ackList.size(); - } - + if (deliveredAcks < ackList.size()) { acquireCredit(ackList.size() - deliveredAcks); - - return 0; - }); + deliveredAcks = 0; + } else { + deliveredAcks -= ackList.size(); + } } else { if (ack.isDeliveredAck()) { - this.deliveredAcks.addAndGet(ack.getMessageCount()); + this.deliveredAcks += ack.getMessageCount(); } acquireCredit(ack.getMessageCount());