diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 4b958f3bd1..4962de66bf 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -32,6 +32,7 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; @@ -74,7 +75,7 @@ public class TopicSubscription extends AbstractSubscription { //Used for inflight message size calculations protected final Object dispatchLock = new Object(); - protected final List dispatched = new ArrayList(); + protected final List dispatched = new ArrayList<>(); public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { super(broker, context, info); @@ -257,7 +258,7 @@ public class TopicSubscription extends AbstractSubscription { synchronized(dispatchLock) { matched.remove(); getSubscriptionStatistics().getDispatched().increment(); - dispatched.add(node); + dispatched.add(new DispatchedNode(node)); getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); node.decrementReferenceCount(); } @@ -383,8 +384,8 @@ public class TopicSubscription extends AbstractSubscription { private void updateStatsOnAck(final MessageAck ack) { synchronized(dispatchLock) { boolean inAckRange = false; - List removeList = new ArrayList(); - for (final MessageReference node : dispatched) { + List removeList = new ArrayList<>(); + for (final DispatchedNode node : dispatched) { MessageId messageId = node.getMessageId(); if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) { @@ -398,17 +399,21 @@ public class TopicSubscription extends AbstractSubscription { } } - for (final MessageReference node : removeList) { + for (final DispatchedNode node : removeList) { dispatched.remove(node); getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); getSubscriptionStatistics().getDequeues().increment(); - ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); - ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); - if (info.isNetworkSubscription()) { - ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); - } - if (ack.isExpiredAck()) { - destination.getDestinationStatistics().getExpired().add(ack.getMessageCount()); + + final Destination destination = node.getDestination(); + if (destination != null) { + destination.getDestinationStatistics().getDequeues().increment(); + destination.getDestinationStatistics().getInflight().decrement(); + if (info.isNetworkSubscription()) { + destination.getDestinationStatistics().getForwards().increment(); + } + if (ack.isExpiredAck()) { + destination.getDestinationStatistics().getExpired().increment(); + } } if (!ack.isInTransaction()) { contractPrefetchExtension(1); @@ -648,7 +653,7 @@ public class TopicSubscription extends AbstractSubscription { md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination()); synchronized(dispatchLock) { getSubscriptionStatistics().getDispatched().increment(); - dispatched.add(node); + dispatched.add(new DispatchedNode(node)); getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); } @@ -759,4 +764,30 @@ public class TopicSubscription extends AbstractSubscription { } } + private static class DispatchedNode { + private final int size; + private final MessageId messageId; + private final Destination destination; + + public DispatchedNode(final MessageReference node) { + super(); + this.size = node.getSize(); + this.messageId = node.getMessageId(); + this.destination = node.getRegionDestination() instanceof Destination ? + ((Destination)node.getRegionDestination()) : null; + } + + public long getSize() { + return size; + } + + public MessageId getMessageId() { + return messageId; + } + + public Destination getDestination() { + return destination; + } + } + }