From fc6402613d19b4f92fba89f54c9a47ed9d310458 Mon Sep 17 00:00:00 2001 From: brusdev Date: Thu, 19 Mar 2020 12:11:28 +0100 Subject: [PATCH] ARTEMIS-2664 Simplify the credits acquiring. Replace the AtomicInteger with an int. Indeed deliveredAcks is used only by the acknowledge method and it is only executed by the EpollEventLoop thread bounded with the relative connection channel. --- .../protocol/openwire/amq/AMQConsumer.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index eb4ce407e1..cb9c74b121 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -68,7 +68,7 @@ public class AMQConsumer { private int prefetchSize; private final AtomicInteger currentWindow; - private final AtomicInteger deliveredAcks; + private int deliveredAcks; private long messagePullSequence = 0; private final AtomicReference messagePullHandler = new AtomicReference<>(null); //internal means we don't expose @@ -88,7 +88,7 @@ public class AMQConsumer { this.scheduledPool = scheduledPool; this.prefetchSize = info.getPrefetchSize(); this.currentWindow = new AtomicInteger(prefetchSize); - this.deliveredAcks = new AtomicInteger(0); + this.deliveredAcks = 0; if (prefetchSize == 0) { messagePullHandler.set(new MessagePullHandler()); } @@ -300,18 +300,15 @@ public class AMQConsumer { List ackList = serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, first, last); if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() || ack.isPoisonAck())) { - this.deliveredAcks.getAndUpdate(deliveredAcks -> { - if (deliveredAcks >= ackList.size()) { - return deliveredAcks - ackList.size(); - } - + if (deliveredAcks < ackList.size()) { acquireCredit(ackList.size() - deliveredAcks); - - return 0; - }); + deliveredAcks = 0; + } else { + deliveredAcks -= ackList.size(); + } } else { if (ack.isDeliveredAck()) { - this.deliveredAcks.addAndGet(ack.getMessageCount()); + this.deliveredAcks += ack.getMessageCount(); } acquireCredit(ack.getMessageCount());