mirror of https://github.com/apache/activemq.git
(AMQ-9156, AMQ-9167) - Update TopicSubscription to use a new counter for
current dispatched count The previous way of computing the count of using total dispatched minus total dequeued didn't work in the case of destination removal and messages were not acked. The counter is needed as the dispatched list is optional unlike prefetch subs.
This commit is contained in:
parent
85502a526d
commit
8554a1464c
|
@ -69,6 +69,12 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
protected final Object dispatchLock = new Object();
|
||||
protected final List<DispatchedNode> dispatched = new ArrayList<>();
|
||||
|
||||
//Keep track of current dispatched count. This is necessary because the dispatched list is optional
|
||||
//and only used if in flights stats are turned on to save memory. The previous way of calculating current dispatched
|
||||
//of using total dispatched - dequeues doesn't work well because dequeues won't be incremented on destination removal and
|
||||
//no acks recevied. This counter could be removed in the future if we ever decide to require always using the dispatched list.
|
||||
protected final AtomicInteger currentDispatchedCount = new AtomicInteger();
|
||||
|
||||
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
|
||||
super(broker, context, info);
|
||||
this.usageManager = usageManager;
|
||||
|
@ -250,6 +256,7 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
synchronized(dispatchLock) {
|
||||
matched.remove();
|
||||
getSubscriptionStatistics().getDispatched().increment();
|
||||
currentDispatchedCount.incrementAndGet();
|
||||
if (isUseTopicSubscriptionInflightStats()) {
|
||||
dispatched.add(new DispatchedNode(node));
|
||||
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
|
||||
|
@ -359,9 +366,10 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
synchronized(dispatchLock) {
|
||||
dispatched.removeIf(node -> {
|
||||
if (node.getDestination() == destination) {
|
||||
//We only need to clean up inflight message size here on the sub stats as
|
||||
//inflight on destination stat is cleaned up on destroy
|
||||
//On removal from dispatched need to decrement counters
|
||||
currentDispatchedCount.decrementAndGet();
|
||||
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
|
||||
destination.getDestinationStatistics().getInflight().decrement();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -435,6 +443,7 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
}
|
||||
|
||||
private void incrementStatsOnAck(final Destination destination, final MessageAck ack, final int count) {
|
||||
currentDispatchedCount.addAndGet(-count);
|
||||
getSubscriptionStatistics().getDequeues().add(count);
|
||||
destination.getDestinationStatistics().getDequeues().add(count);
|
||||
destination.getDestinationStatistics().getInflight().subtract(count);
|
||||
|
@ -463,8 +472,7 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
|
||||
@Override
|
||||
public int getDispatchedQueueSize() {
|
||||
return (int)(getSubscriptionStatistics().getDispatched().getCount() -
|
||||
getSubscriptionStatistics().getDequeues().getCount());
|
||||
return currentDispatchedCount.get();
|
||||
}
|
||||
|
||||
public int getMaximumPendingMessages() {
|
||||
|
@ -671,6 +679,7 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
|
||||
synchronized(dispatchLock) {
|
||||
getSubscriptionStatistics().getDispatched().increment();
|
||||
currentDispatchedCount.incrementAndGet();
|
||||
if (isUseTopicSubscriptionInflightStats()) {
|
||||
dispatched.add(new DispatchedNode(node));
|
||||
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
|
||||
|
@ -772,7 +781,7 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
dispatched.clear();
|
||||
//Clear any unacked messages from destination inflight stats
|
||||
if (destination != null) {
|
||||
destination.getDestinationStatistics().getInflight().subtract(getDispatchedQueueSize());
|
||||
destination.getDestinationStatistics().getInflight().subtract(currentDispatchedCount.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue