ARTEMIS-2664 Simplify the credits acquiring.
Replace the AtomicInteger with an int. Indeed deliveredAcks is used only by the acknowledge method and it is only executed by the EpollEventLoop thread bounded with the relative connection channel.
This commit is contained in:
parent
30d521152d
commit
fc6402613d
|
@ -68,7 +68,7 @@ public class AMQConsumer {
|
||||||
|
|
||||||
private int prefetchSize;
|
private int prefetchSize;
|
||||||
private final AtomicInteger currentWindow;
|
private final AtomicInteger currentWindow;
|
||||||
private final AtomicInteger deliveredAcks;
|
private int deliveredAcks;
|
||||||
private long messagePullSequence = 0;
|
private long messagePullSequence = 0;
|
||||||
private final AtomicReference<MessagePullHandler> messagePullHandler = new AtomicReference<>(null);
|
private final AtomicReference<MessagePullHandler> messagePullHandler = new AtomicReference<>(null);
|
||||||
//internal means we don't expose
|
//internal means we don't expose
|
||||||
|
@ -88,7 +88,7 @@ public class AMQConsumer {
|
||||||
this.scheduledPool = scheduledPool;
|
this.scheduledPool = scheduledPool;
|
||||||
this.prefetchSize = info.getPrefetchSize();
|
this.prefetchSize = info.getPrefetchSize();
|
||||||
this.currentWindow = new AtomicInteger(prefetchSize);
|
this.currentWindow = new AtomicInteger(prefetchSize);
|
||||||
this.deliveredAcks = new AtomicInteger(0);
|
this.deliveredAcks = 0;
|
||||||
if (prefetchSize == 0) {
|
if (prefetchSize == 0) {
|
||||||
messagePullHandler.set(new MessagePullHandler());
|
messagePullHandler.set(new MessagePullHandler());
|
||||||
}
|
}
|
||||||
|
@ -300,18 +300,15 @@ public class AMQConsumer {
|
||||||
List<MessageReference> ackList = serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, first, last);
|
List<MessageReference> ackList = serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, first, last);
|
||||||
|
|
||||||
if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() || ack.isPoisonAck())) {
|
if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() || ack.isPoisonAck())) {
|
||||||
this.deliveredAcks.getAndUpdate(deliveredAcks -> {
|
if (deliveredAcks < ackList.size()) {
|
||||||
if (deliveredAcks >= ackList.size()) {
|
|
||||||
return deliveredAcks - ackList.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
acquireCredit(ackList.size() - deliveredAcks);
|
acquireCredit(ackList.size() - deliveredAcks);
|
||||||
|
deliveredAcks = 0;
|
||||||
return 0;
|
} else {
|
||||||
});
|
deliveredAcks -= ackList.size();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (ack.isDeliveredAck()) {
|
if (ack.isDeliveredAck()) {
|
||||||
this.deliveredAcks.addAndGet(ack.getMessageCount());
|
this.deliveredAcks += ack.getMessageCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
acquireCredit(ack.getMessageCount());
|
acquireCredit(ack.getMessageCount());
|
||||||
|
|
Loading…
Reference in New Issue