From f69fd6f0020290752a7424479821c22d94f9b8b7 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Thu, 29 Mar 2018 08:54:17 -0400 Subject: [PATCH] AMQ-6940 - Reduce memory footprint for inflight statistics For the TopicSubscription case we can reduce the inflight statistics memory footprint by not storing the entire message reference for in flight messages and instead just a subset of the information needed. --- .../broker/region/TopicSubscription.java | 57 ++++++++++++++----- 1 file changed, 44 insertions(+), 13 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 4b958f3bd1..4962de66bf 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -32,6 +32,7 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; @@ -74,7 +75,7 @@ public class TopicSubscription extends AbstractSubscription { //Used for inflight message size calculations protected final Object dispatchLock = new Object(); - protected final List dispatched = new ArrayList(); + protected final List dispatched = new ArrayList<>(); public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { super(broker, context, info); @@ -257,7 +258,7 @@ public class TopicSubscription extends AbstractSubscription { synchronized(dispatchLock) { matched.remove(); getSubscriptionStatistics().getDispatched().increment(); - dispatched.add(node); + dispatched.add(new DispatchedNode(node)); getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); node.decrementReferenceCount(); } @@ -383,8 +384,8 @@ public class TopicSubscription extends AbstractSubscription { private void updateStatsOnAck(final MessageAck ack) { synchronized(dispatchLock) { boolean inAckRange = false; - List removeList = new ArrayList(); - for (final MessageReference node : dispatched) { + List removeList = new ArrayList<>(); + for (final DispatchedNode node : dispatched) { MessageId messageId = node.getMessageId(); if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) { @@ -398,17 +399,21 @@ public class TopicSubscription extends AbstractSubscription { } } - for (final MessageReference node : removeList) { + for (final DispatchedNode node : removeList) { dispatched.remove(node); getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); getSubscriptionStatistics().getDequeues().increment(); - ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); - ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); - if (info.isNetworkSubscription()) { - ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); - } - if (ack.isExpiredAck()) { - destination.getDestinationStatistics().getExpired().add(ack.getMessageCount()); + + final Destination destination = node.getDestination(); + if (destination != null) { + destination.getDestinationStatistics().getDequeues().increment(); + destination.getDestinationStatistics().getInflight().decrement(); + if (info.isNetworkSubscription()) { + destination.getDestinationStatistics().getForwards().increment(); + } + if (ack.isExpiredAck()) { + destination.getDestinationStatistics().getExpired().increment(); + } } if (!ack.isInTransaction()) { contractPrefetchExtension(1); @@ -648,7 +653,7 @@ public class TopicSubscription extends AbstractSubscription { md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination()); synchronized(dispatchLock) { getSubscriptionStatistics().getDispatched().increment(); - dispatched.add(node); + dispatched.add(new DispatchedNode(node)); getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); } @@ -759,4 +764,30 @@ public class TopicSubscription extends AbstractSubscription { } } + private static class DispatchedNode { + private final int size; + private final MessageId messageId; + private final Destination destination; + + public DispatchedNode(final MessageReference node) { + super(); + this.size = node.getSize(); + this.messageId = node.getMessageId(); + this.destination = node.getRegionDestination() instanceof Destination ? + ((Destination)node.getRegionDestination()) : null; + } + + public long getSize() { + return size; + } + + public MessageId getMessageId() { + return messageId; + } + + public Destination getDestination() { + return destination; + } + } + }