(AMQ-9156, AMQ-9167) - Update TopicSubscription to use a new counter for

current dispatched count

The previous way of computing the count of using total dispatched minus
total dequeued didn't work in the case of destination removal and
messages were not acked. The counter is needed as the dispatched list is
optional unlike prefetch subs.

(cherry picked from commit 8554a1464c)
This commit is contained in:
Christopher L. Shannon (cshannon) 2022-11-15 16:59:01 -05:00
parent 31b6a03e8b
commit 46cdecf05f
1 changed files with 14 additions and 5 deletions

View File

@ -69,6 +69,12 @@ public class TopicSubscription extends AbstractSubscription {
protected final Object dispatchLock = new Object();
protected final List<DispatchedNode> dispatched = new ArrayList<>();
//Keep track of current dispatched count. This is necessary because the dispatched list is optional
//and only used if in flights stats are turned on to save memory. The previous way of calculating current dispatched
//of using total dispatched - dequeues doesn't work well because dequeues won't be incremented on destination removal and
//no acks recevied. This counter could be removed in the future if we ever decide to require always using the dispatched list.
protected final AtomicInteger currentDispatchedCount = new AtomicInteger();
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
this.usageManager = usageManager;
@ -250,6 +256,7 @@ public class TopicSubscription extends AbstractSubscription {
synchronized(dispatchLock) {
matched.remove();
getSubscriptionStatistics().getDispatched().increment();
currentDispatchedCount.incrementAndGet();
if (isUseTopicSubscriptionInflightStats()) {
dispatched.add(new DispatchedNode(node));
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
@ -359,9 +366,10 @@ public class TopicSubscription extends AbstractSubscription {
synchronized(dispatchLock) {
dispatched.removeIf(node -> {
if (node.getDestination() == destination) {
//We only need to clean up inflight message size here on the sub stats as
//inflight on destination stat is cleaned up on destroy
//On removal from dispatched need to decrement counters
currentDispatchedCount.decrementAndGet();
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
destination.getDestinationStatistics().getInflight().decrement();
return true;
}
return false;
@ -435,6 +443,7 @@ public class TopicSubscription extends AbstractSubscription {
}
private void incrementStatsOnAck(final Destination destination, final MessageAck ack, final int count) {
currentDispatchedCount.addAndGet(-count);
getSubscriptionStatistics().getDequeues().add(count);
destination.getDestinationStatistics().getDequeues().add(count);
destination.getDestinationStatistics().getInflight().subtract(count);
@ -463,8 +472,7 @@ public class TopicSubscription extends AbstractSubscription {
@Override
public int getDispatchedQueueSize() {
return (int)(getSubscriptionStatistics().getDispatched().getCount() -
getSubscriptionStatistics().getDequeues().getCount());
return currentDispatchedCount.get();
}
public int getMaximumPendingMessages() {
@ -671,6 +679,7 @@ public class TopicSubscription extends AbstractSubscription {
md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
synchronized(dispatchLock) {
getSubscriptionStatistics().getDispatched().increment();
currentDispatchedCount.incrementAndGet();
if (isUseTopicSubscriptionInflightStats()) {
dispatched.add(new DispatchedNode(node));
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
@ -770,7 +779,7 @@ public class TopicSubscription extends AbstractSubscription {
dispatched.clear();
//Clear any unacked messages from destination inflight stats
if (destination != null) {
destination.getDestinationStatistics().getInflight().subtract(getDispatchedQueueSize());
destination.getDestinationStatistics().getInflight().subtract(currentDispatchedCount.get());
}
}
}