From 21a594c8e817ebb2090c1de016c5bb1f4f905e08 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Thu, 29 Mar 2018 13:23:33 -0400 Subject: [PATCH] AMQ-6940 - Add flag to disable TopicSubscription in flight stats To save memory usage in some use cases add a new flag to PolicyEntry called useTopicSubscriptionInflightStats to allow disabling the inflight stats (cherry picked from commit 65b0f2ad0d48845ad54681ac0eff832de122e2a9) --- .../broker/region/TopicSubscription.java | 98 ++++++++++++------- .../broker/region/policy/PolicyEntry.java | 10 ++ .../AbstractInflightMessageSizeTest.java | 43 ++++++-- ...leSubscriptionInflightMessageSizeTest.java | 5 +- ...ueSubscriptionInflightMessageSizeTest.java | 5 +- ...icSubscriptionInflightMessageSizeTest.java | 34 ++++++- 6 files changed, 143 insertions(+), 52 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 4962de66bf..bf3f97bebf 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -32,7 +32,6 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; -import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; @@ -72,6 +71,7 @@ public class TopicSubscription extends AbstractSubscription { protected ActiveMQMessageAudit audit; protected boolean active = false; protected boolean discarding = false; + private boolean useTopicSubscriptionInflightStats = true; //Used for inflight message size calculations protected final Object dispatchLock = new Object(); @@ -258,8 +258,10 @@ public class TopicSubscription extends AbstractSubscription { synchronized(dispatchLock) { matched.remove(); getSubscriptionStatistics().getDispatched().increment(); - dispatched.add(new DispatchedNode(node)); - getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); + if (isUseTopicSubscriptionInflightStats()) { + dispatched.add(new DispatchedNode(node)); + getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); + } node.decrementReferenceCount(); } break; @@ -382,43 +384,55 @@ public class TopicSubscription extends AbstractSubscription { * @param ack */ private void updateStatsOnAck(final MessageAck ack) { - synchronized(dispatchLock) { - boolean inAckRange = false; - List removeList = new ArrayList<>(); - for (final DispatchedNode node : dispatched) { - MessageId messageId = node.getMessageId(); - if (ack.getFirstMessageId() == null - || ack.getFirstMessageId().equals(messageId)) { - inAckRange = true; + //Allow disabling inflight stats to save memory usage + if (isUseTopicSubscriptionInflightStats()) { + synchronized(dispatchLock) { + boolean inAckRange = false; + List removeList = new ArrayList<>(); + for (final DispatchedNode node : dispatched) { + MessageId messageId = node.getMessageId(); + if (ack.getFirstMessageId() == null + || ack.getFirstMessageId().equals(messageId)) { + inAckRange = true; + } + if (inAckRange) { + removeList.add(node); + if (ack.getLastMessageId().equals(messageId)) { + break; + } + } } - if (inAckRange) { - removeList.add(node); - if (ack.getLastMessageId().equals(messageId)) { - break; + + for (final DispatchedNode node : removeList) { + dispatched.remove(node); + getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); + + final Destination destination = node.getDestination(); + incrementStatsOnAck(destination, ack, 1); + if (!ack.isInTransaction()) { + contractPrefetchExtension(1); } } } - - for (final DispatchedNode node : removeList) { - dispatched.remove(node); - getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); - getSubscriptionStatistics().getDequeues().increment(); - - final Destination destination = node.getDestination(); - if (destination != null) { - destination.getDestinationStatistics().getDequeues().increment(); - destination.getDestinationStatistics().getInflight().decrement(); - if (info.isNetworkSubscription()) { - destination.getDestinationStatistics().getForwards().increment(); - } - if (ack.isExpiredAck()) { - destination.getDestinationStatistics().getExpired().increment(); - } - } - if (!ack.isInTransaction()) { - contractPrefetchExtension(1); - } + } else { + if (singleDestination && destination != null) { + incrementStatsOnAck(destination, ack, ack.getMessageCount()); } + if (!ack.isInTransaction()) { + contractPrefetchExtension(ack.getMessageCount()); + } + } + } + + private void incrementStatsOnAck(final Destination destination, final MessageAck ack, final int count) { + getSubscriptionStatistics().getDequeues().add(count); + destination.getDestinationStatistics().getDequeues().add(count); + destination.getDestinationStatistics().getInflight().subtract(count); + if (info.isNetworkSubscription()) { + destination.getDestinationStatistics().getForwards().add(count); + } + if (ack.isExpiredAck()) { + destination.getDestinationStatistics().getExpired().add(count); } } @@ -653,8 +667,10 @@ public class TopicSubscription extends AbstractSubscription { md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination()); synchronized(dispatchLock) { getSubscriptionStatistics().getDispatched().increment(); - dispatched.add(new DispatchedNode(node)); - getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); + if (isUseTopicSubscriptionInflightStats()) { + dispatched.add(new DispatchedNode(node)); + getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); + } } // Keep track if this subscription is receiving messages from a single destination. @@ -764,6 +780,14 @@ public class TopicSubscription extends AbstractSubscription { } } + public boolean isUseTopicSubscriptionInflightStats() { + return useTopicSubscriptionInflightStats; + } + + public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) { + this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats; + } + private static class DispatchedNode { private final int size; private final MessageId messageId; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 5b7ff0e184..b985c95403 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -103,6 +103,7 @@ public class PolicyEntry extends DestinationMapEntry { private NetworkBridgeFilterFactory networkBridgeFilterFactory; private boolean doOptimzeMessageStorage = true; private int maxDestinations = -1; + private boolean useTopicSubscriptionInflightStats = true; /* * percentage of in-flight messages above which optimize message store is disabled @@ -315,6 +316,7 @@ public class PolicyEntry extends DestinationMapEntry { configurePrefetch(subscription); subscription.setUsePrefetchExtension(isUsePrefetchExtension()); subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); + subscription.setUseTopicSubscriptionInflightStats(isUseTopicSubscriptionInflightStats()); if (pendingMessageLimitStrategy != null) { int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription); int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit(); @@ -1100,4 +1102,12 @@ public class PolicyEntry extends DestinationMapEntry { public String toString() { return "PolicyEntry [" + destination + "]"; } + } + + public boolean isUseTopicSubscriptionInflightStats() { + return useTopicSubscriptionInflightStats; + } + + public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) { + this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java index 07784e7a37..a127feb7c7 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java @@ -38,6 +38,8 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.util.Wait; import org.junit.After; @@ -59,6 +61,7 @@ public abstract class AbstractInflightMessageSizeTest { protected Destination amqDestination; protected MessageConsumer consumer; protected int prefetch = 100; + protected boolean useTopicSubscriptionInflightStats; final protected int ackType; final protected boolean optimizeAcknowledge; final protected String destName = "testDest"; @@ -66,20 +69,29 @@ public abstract class AbstractInflightMessageSizeTest { @Parameters public static Collection data() { return Arrays.asList(new Object[][] { - {ActiveMQSession.SESSION_TRANSACTED, true}, - {ActiveMQSession.AUTO_ACKNOWLEDGE, true}, - {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true}, - {ActiveMQSession.CLIENT_ACKNOWLEDGE, true}, - {ActiveMQSession.SESSION_TRANSACTED, false}, - {ActiveMQSession.AUTO_ACKNOWLEDGE, false}, - {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false}, - {ActiveMQSession.CLIENT_ACKNOWLEDGE, false} + {ActiveMQSession.SESSION_TRANSACTED, true, true}, + {ActiveMQSession.AUTO_ACKNOWLEDGE, true, true}, + {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true, true}, + {ActiveMQSession.CLIENT_ACKNOWLEDGE, true, true}, + {ActiveMQSession.SESSION_TRANSACTED, false, true}, + {ActiveMQSession.AUTO_ACKNOWLEDGE, false, true}, + {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false, true}, + {ActiveMQSession.CLIENT_ACKNOWLEDGE, false, true}, + {ActiveMQSession.SESSION_TRANSACTED, true, false}, + {ActiveMQSession.AUTO_ACKNOWLEDGE, true, false}, + {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true, false}, + {ActiveMQSession.CLIENT_ACKNOWLEDGE, true, false}, + {ActiveMQSession.SESSION_TRANSACTED, false, false}, + {ActiveMQSession.AUTO_ACKNOWLEDGE, false, false}, + {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false, false}, + {ActiveMQSession.CLIENT_ACKNOWLEDGE, false, false} }); } - public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) { + public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge, boolean useTopicSubscriptionInflightStats) { this.ackType = ackType; this.optimizeAcknowledge = optimizeAcknowledge; + this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats; } @Before @@ -88,6 +100,12 @@ public abstract class AbstractInflightMessageSizeTest { brokerService.setDeleteAllMessagesOnStartup(true); TransportConnector tcp = brokerService .addConnector("tcp://localhost:0"); + PolicyEntry policy = new PolicyEntry(); + policy.setUseTopicSubscriptionInflightStats(useTopicSubscriptionInflightStats); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + brokerService.setDestinationPolicy(pMap); + brokerService.start(); //used to test optimizeAcknowledge works String optAckString = optimizeAcknowledge ? "?jms.optimizeAcknowledge=true&jms.optimizedAckScheduledAckInterval=2000" : ""; @@ -129,6 +147,8 @@ public abstract class AbstractInflightMessageSizeTest { */ @Test(timeout=15000) public void testInflightMessageSize() throws Exception { + Assume.assumeTrue(useTopicSubscriptionInflightStats); + final long size = sendMessages(10); assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() { @@ -155,6 +175,8 @@ public abstract class AbstractInflightMessageSizeTest { */ @Test(timeout=15000) public void testInflightMessageSizePrefetchFilled() throws Exception { + Assume.assumeTrue(useTopicSubscriptionInflightStats); + final long size = sendMessages(prefetch); assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() { @@ -187,6 +209,8 @@ public abstract class AbstractInflightMessageSizeTest { */ @Test(timeout=15000) public void testInflightMessageSizePrefetchNotFilled() throws Exception { + Assume.assumeTrue(useTopicSubscriptionInflightStats); + final long size = sendMessages(prefetch - 10); assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() { @@ -227,6 +251,7 @@ public abstract class AbstractInflightMessageSizeTest { */ @Test(timeout=15000) public void testInflightMessageSizeRollback() throws Exception { + Assume.assumeTrue(useTopicSubscriptionInflightStats); Assume.assumeTrue(ackType == ActiveMQSession.SESSION_TRANSACTED); final long size = sendMessages(10); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java index 29d6cb79a5..a7e947329a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java @@ -34,8 +34,9 @@ import org.junit.runners.Parameterized; @RunWith(Parameterized.class) public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest { - public DurableSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) { - super(ackType, optimizeAcknowledge); + public DurableSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge, + boolean useTopicSubscriptionInflightStats) { + super(ackType, optimizeAcknowledge, useTopicSubscriptionInflightStats); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java index 84ddc71c27..217aefb2d2 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java @@ -34,8 +34,9 @@ import org.junit.runners.Parameterized; @RunWith(Parameterized.class) public class QueueSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest { - public QueueSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) { - super(ackType, optimizeAcknowledge); + public QueueSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge, + boolean useTopicSubscriptionInflightStats) { + super(ackType, optimizeAcknowledge, useTopicSubscriptionInflightStats); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java index 797d409a56..132a96d9b5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.statistics; +import static org.junit.Assert.assertTrue; + import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; @@ -23,6 +25,9 @@ import javax.jms.MessageConsumer; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.junit.Assume; +import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -33,8 +38,8 @@ import org.junit.runners.Parameterized; @RunWith(Parameterized.class) public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest { - public TopicSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) { - super(ackType, optimizeAcknowledge); + public TopicSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge, boolean useTopicSubscriptionInflightStats) { + super(ackType, optimizeAcknowledge, useTopicSubscriptionInflightStats); } @Override @@ -57,4 +62,29 @@ public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMe return new ActiveMQTopic(destName); } + @Test(timeout=15000) + public void testInflightMessageSizeDisabled() throws Exception { + Assume.assumeFalse(useTopicSubscriptionInflightStats); + sendMessages(10); + + Thread.sleep(1000); + + assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getSubscription().getInFlightMessageSize() == 0; + } + })); + + receiveMessages(10); + + Thread.sleep(1000); + assertTrue("Inflight message size should still be 0", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getSubscription().getInFlightMessageSize() == 0; + } + })); + } + }