From 46cdecf05f38b122fb834355192712ba0e611bdb Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Tue, 15 Nov 2022 16:59:01 -0500 Subject: [PATCH] (AMQ-9156, AMQ-9167) - Update TopicSubscription to use a new counter for current dispatched count The previous way of computing the count of using total dispatched minus total dequeued didn't work in the case of destination removal and messages were not acked. The counter is needed as the dispatched list is optional unlike prefetch subs. (cherry picked from commit 8554a1464c6471f81ebbcd6c482376ae32cf6808) --- .../broker/region/TopicSubscription.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 412ab43df7..26aeb5bed8 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -69,6 +69,12 @@ public class TopicSubscription extends AbstractSubscription { protected final Object dispatchLock = new Object(); protected final List dispatched = new ArrayList<>(); + //Keep track of current dispatched count. This is necessary because the dispatched list is optional + //and only used if in flights stats are turned on to save memory. The previous way of calculating current dispatched + //of using total dispatched - dequeues doesn't work well because dequeues won't be incremented on destination removal and + //no acks recevied. This counter could be removed in the future if we ever decide to require always using the dispatched list. + protected final AtomicInteger currentDispatchedCount = new AtomicInteger(); + public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { super(broker, context, info); this.usageManager = usageManager; @@ -250,6 +256,7 @@ public class TopicSubscription extends AbstractSubscription { synchronized(dispatchLock) { matched.remove(); getSubscriptionStatistics().getDispatched().increment(); + currentDispatchedCount.incrementAndGet(); if (isUseTopicSubscriptionInflightStats()) { dispatched.add(new DispatchedNode(node)); getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); @@ -359,9 +366,10 @@ public class TopicSubscription extends AbstractSubscription { synchronized(dispatchLock) { dispatched.removeIf(node -> { if (node.getDestination() == destination) { - //We only need to clean up inflight message size here on the sub stats as - //inflight on destination stat is cleaned up on destroy + //On removal from dispatched need to decrement counters + currentDispatchedCount.decrementAndGet(); getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); + destination.getDestinationStatistics().getInflight().decrement(); return true; } return false; @@ -435,6 +443,7 @@ public class TopicSubscription extends AbstractSubscription { } private void incrementStatsOnAck(final Destination destination, final MessageAck ack, final int count) { + currentDispatchedCount.addAndGet(-count); getSubscriptionStatistics().getDequeues().add(count); destination.getDestinationStatistics().getDequeues().add(count); destination.getDestinationStatistics().getInflight().subtract(count); @@ -463,8 +472,7 @@ public class TopicSubscription extends AbstractSubscription { @Override public int getDispatchedQueueSize() { - return (int)(getSubscriptionStatistics().getDispatched().getCount() - - getSubscriptionStatistics().getDequeues().getCount()); + return currentDispatchedCount.get(); } public int getMaximumPendingMessages() { @@ -671,6 +679,7 @@ public class TopicSubscription extends AbstractSubscription { md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination()); synchronized(dispatchLock) { getSubscriptionStatistics().getDispatched().increment(); + currentDispatchedCount.incrementAndGet(); if (isUseTopicSubscriptionInflightStats()) { dispatched.add(new DispatchedNode(node)); getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); @@ -770,7 +779,7 @@ public class TopicSubscription extends AbstractSubscription { dispatched.clear(); //Clear any unacked messages from destination inflight stats if (destination != null) { - destination.getDestinationStatistics().getInflight().subtract(getDispatchedQueueSize()); + destination.getDestinationStatistics().getInflight().subtract(currentDispatchedCount.get()); } } }