diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index faa29edef4..18404fa49c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -69,6 +69,12 @@ public class TopicSubscription extends AbstractSubscription { protected final Object dispatchLock = new Object(); protected final List dispatched = new ArrayList<>(); + //Keep track of current dispatched count. This is necessary because the dispatched list is optional + //and only used if in flights stats are turned on to save memory. The previous way of calculating current dispatched + //of using total dispatched - dequeues doesn't work well because dequeues won't be incremented on destination removal and + //no acks recevied. This counter could be removed in the future if we ever decide to require always using the dispatched list. + protected final AtomicInteger currentDispatchedCount = new AtomicInteger(); + public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { super(broker, context, info); this.usageManager = usageManager; @@ -250,6 +256,7 @@ public class TopicSubscription extends AbstractSubscription { synchronized(dispatchLock) { matched.remove(); getSubscriptionStatistics().getDispatched().increment(); + currentDispatchedCount.incrementAndGet(); if (isUseTopicSubscriptionInflightStats()) { dispatched.add(new DispatchedNode(node)); getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); @@ -359,9 +366,10 @@ public class TopicSubscription extends AbstractSubscription { synchronized(dispatchLock) { dispatched.removeIf(node -> { if (node.getDestination() == destination) { - //We only need to clean up inflight message size here on the sub stats as - //inflight on destination stat is cleaned up on destroy + //On removal from dispatched need to decrement counters + currentDispatchedCount.decrementAndGet(); getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); + destination.getDestinationStatistics().getInflight().decrement(); return true; } return false; @@ -435,6 +443,7 @@ public class TopicSubscription extends AbstractSubscription { } private void incrementStatsOnAck(final Destination destination, final MessageAck ack, final int count) { + currentDispatchedCount.addAndGet(-count); getSubscriptionStatistics().getDequeues().add(count); destination.getDestinationStatistics().getDequeues().add(count); destination.getDestinationStatistics().getInflight().subtract(count); @@ -463,8 +472,7 @@ public class TopicSubscription extends AbstractSubscription { @Override public int getDispatchedQueueSize() { - return (int)(getSubscriptionStatistics().getDispatched().getCount() - - getSubscriptionStatistics().getDequeues().getCount()); + return currentDispatchedCount.get(); } public int getMaximumPendingMessages() { @@ -671,6 +679,7 @@ public class TopicSubscription extends AbstractSubscription { md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination()); synchronized(dispatchLock) { getSubscriptionStatistics().getDispatched().increment(); + currentDispatchedCount.incrementAndGet(); if (isUseTopicSubscriptionInflightStats()) { dispatched.add(new DispatchedNode(node)); getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); @@ -772,7 +781,7 @@ public class TopicSubscription extends AbstractSubscription { dispatched.clear(); //Clear any unacked messages from destination inflight stats if (destination != null) { - destination.getDestinationStatistics().getInflight().subtract(getDispatchedQueueSize()); + destination.getDestinationStatistics().getInflight().subtract(currentDispatchedCount.get()); } } }