diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 2e70062b21..c82c0760fd 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -610,10 +610,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription { for (MessageReference r : dispatched) { if (r.getRegionDestination() == destination) { references.add(r); + //Decrement the size as we are removing and redispatching all references getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize()); } } redispatch.addAll(0, references); + //Clean up in flight message stats on the destination after dispatched is cleared destination.getDestinationStatistics().getInflight().subtract(references.size()); dispatched.removeAll(references); } @@ -730,6 +732,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription { if (nodeDest != null) { if (node != QueueMessageReference.NULL_MESSAGE) { nodeDest.getDestinationStatistics().getDispatched().increment(); + //We still increment here as the dispatched list is still tracking references at this point + //Metrics will get cleaned up in addReferencesAndUpdateRedispatch() when the dispatched + //list is also cleaned up as the failure causes the subscription to close incrementPrefetchCounter(node); LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", info.getConsumerId(), message.getMessageId(), message.getDestination(), diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 8b623bdbe5..4e62683026 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -216,7 +216,6 @@ public class TopicSubscription extends AbstractSubscription { * Discard any expired messages from the matched list. Called from a * synchronized block. * - * @throws IOException */ protected void removeExpiredMessages() throws IOException { try { @@ -354,6 +353,23 @@ public class TopicSubscription extends AbstractSubscription { return null; } + @Override + public List remove(ConnectionContext context, Destination destination) throws Exception { + if (isUseTopicSubscriptionInflightStats()) { + synchronized(dispatchLock) { + for (DispatchedNode node : dispatched) { + if (node.getDestination()== destination) { + //We only need to clean up inflight message size here on the sub stats as + //inflight on destination stat is cleaned up on destroy + getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); + } + } + dispatched.clear(); + } + } + return super.remove(context, destination); + } + /** * Occurs when a pull times out. If nothing has been dispatched since the * timeout was setup, then send the NULL message. @@ -692,6 +708,8 @@ public class TopicSubscription extends AbstractSubscription { public void onFailure() { Destination regionDestination = (Destination) node.getRegionDestination(); regionDestination.getDestinationStatistics().getDispatched().increment(); + //We still increment here as metrics get cleaned up on destroy() + //as the failure causes the subscription to close regionDestination.getDestinationStatistics().getInflight().increment(); node.decrementReferenceCount(); } @@ -749,6 +767,10 @@ public class TopicSubscription extends AbstractSubscription { setSlowConsumer(false); synchronized(dispatchLock) { dispatched.clear(); + //Clear any unacked messages from destination inflight stats + if (destination != null) { + destination.getDestinationStatistics().getInflight().subtract(getDispatchedQueueSize()); + } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java index 12db79f107..05b1ef030a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java @@ -16,33 +16,33 @@ */ package org.apache.activemq.statistics; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.Collection; import java.util.Random; - +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.broker.region.AbstractSubscription; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.util.Wait; import org.junit.After; import org.junit.Assume; @@ -69,8 +69,9 @@ public abstract class AbstractInflightMessageSizeTest { final protected String destName = "testDest"; //use 10 second wait for assertions instead of the 30 default - final protected long WAIT_DURATION = 10 * 1000; - final protected long SLEEP_DURATION = 500; + protected final long WAIT_DURATION = 10 * 1000; + protected final long SLEEP_DURATION = 500; + protected final AtomicBoolean failOnDispatch = new AtomicBoolean(); @Parameters public static Collection data() { @@ -102,6 +103,7 @@ public abstract class AbstractInflightMessageSizeTest { @Before public void setUp() throws Exception { + failOnDispatch.set(false); brokerService = new BrokerService(); brokerService.setDeleteAllMessagesOnStartup(true); TransportConnector tcp = brokerService @@ -111,6 +113,15 @@ public abstract class AbstractInflightMessageSizeTest { PolicyMap pMap = new PolicyMap(); pMap.setDefaultEntry(policy); brokerService.setDestinationPolicy(pMap); + brokerService.setPlugins(new BrokerPlugin[]{broker -> new BrokerFilter(broker) { + @Override + public void preProcessDispatch(MessageDispatch messageDispatch) { + super.preProcessDispatch(messageDispatch); + if (failOnDispatch.get()) { + throw new RuntimeException("fail dispatch"); + } + } + }}); brokerService.start(); //used to test optimizeAcknowledge works @@ -307,6 +318,55 @@ public abstract class AbstractInflightMessageSizeTest { Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION)); } + @Test(timeout=60000) + public void testInflightMessageSizeDispatchFailure() throws Exception { + Assume.assumeTrue(useTopicSubscriptionInflightStats); + + //Fail on all dispatches + failOnDispatch.set(true); + + //Need to reset each time here on send because dispatch will cause the connection to close + try { + sendMessages(1); + } catch (Exception e) { + //expected as session should close + } + + //Wait for session to fail + assertTrue(Wait.waitFor(() -> ((ActiveMQSession) session).isClosed(), WAIT_DURATION, SLEEP_DURATION)); + + //Make sure all the stats are cleaned up on failure of dispatches + assertTrue("Destination inflight message count should be 0", + Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION)); + assertTrue("Consumers size should be 0 due to failure or Inflight sub dispatched message count should be 0 for durable sub", + Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 || + getSubscription().getDispatchedQueueSize() == 0, WAIT_DURATION, SLEEP_DURATION)); + assertTrue("Consumers size should be 0 due to failure or Inflight message size should be 0 for durable sub", + Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 || + getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION)); + } + + @Test(timeout=60000) + public void testInflightMessageSizeConsumerClosed() throws Exception { + Assume.assumeTrue(useTopicSubscriptionInflightStats); + sendMessages(10); + + //Wait for the 10 messages to get dispatched and then close the consumer to test cleanup + assertTrue("Should be 10 in flight messages", + Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION)); + consumer.close(); + + //Make sure all the stats are cleaned up on failure of dispatches + assertTrue("Destination inflight message count should be 0", + Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION)); + assertTrue("Consumers size should be 0 due to failure or Inflight sub dispatched message count should be 0 for durable sub", + Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 || + getSubscription().getDispatchedQueueSize() == 0, WAIT_DURATION, SLEEP_DURATION)); + assertTrue("Consumers size should be 0 due to failure or Inflight message size should be 0 for durable sub", + Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 || + getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION)); + } + protected long sendMessages(int count) throws JMSException { return sendMessages(count, null); }