AMQ-9156 - Make sure in flight metrics are properly decremented on

subscription destroys and dispatch failures
This commit is contained in:
Christopher L. Shannon (cshannon) 2022-11-03 10:38:02 -04:00
parent d46b74d674
commit 58666afffd
3 changed files with 94 additions and 7 deletions

View File

@ -610,10 +610,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
for (MessageReference r : dispatched) { for (MessageReference r : dispatched) {
if (r.getRegionDestination() == destination) { if (r.getRegionDestination() == destination) {
references.add(r); references.add(r);
//Decrement the size as we are removing and redispatching all references
getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize()); getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
} }
} }
redispatch.addAll(0, references); redispatch.addAll(0, references);
//Clean up in flight message stats on the destination after dispatched is cleared
destination.getDestinationStatistics().getInflight().subtract(references.size()); destination.getDestinationStatistics().getInflight().subtract(references.size());
dispatched.removeAll(references); dispatched.removeAll(references);
} }
@ -730,6 +732,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (nodeDest != null) { if (nodeDest != null) {
if (node != QueueMessageReference.NULL_MESSAGE) { if (node != QueueMessageReference.NULL_MESSAGE) {
nodeDest.getDestinationStatistics().getDispatched().increment(); nodeDest.getDestinationStatistics().getDispatched().increment();
//We still increment here as the dispatched list is still tracking references at this point
//Metrics will get cleaned up in addReferencesAndUpdateRedispatch() when the dispatched
//list is also cleaned up as the failure causes the subscription to close
incrementPrefetchCounter(node); incrementPrefetchCounter(node);
LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}",
info.getConsumerId(), message.getMessageId(), message.getDestination(), info.getConsumerId(), message.getMessageId(), message.getDestination(),

View File

@ -216,7 +216,6 @@ public class TopicSubscription extends AbstractSubscription {
* Discard any expired messages from the matched list. Called from a * Discard any expired messages from the matched list. Called from a
* synchronized block. * synchronized block.
* *
* @throws IOException
*/ */
protected void removeExpiredMessages() throws IOException { protected void removeExpiredMessages() throws IOException {
try { try {
@ -354,6 +353,23 @@ public class TopicSubscription extends AbstractSubscription {
return null; return null;
} }
@Override
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
if (isUseTopicSubscriptionInflightStats()) {
synchronized(dispatchLock) {
for (DispatchedNode node : dispatched) {
if (node.getDestination()== destination) {
//We only need to clean up inflight message size here on the sub stats as
//inflight on destination stat is cleaned up on destroy
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
}
}
dispatched.clear();
}
}
return super.remove(context, destination);
}
/** /**
* Occurs when a pull times out. If nothing has been dispatched since the * Occurs when a pull times out. If nothing has been dispatched since the
* timeout was setup, then send the NULL message. * timeout was setup, then send the NULL message.
@ -692,6 +708,8 @@ public class TopicSubscription extends AbstractSubscription {
public void onFailure() { public void onFailure() {
Destination regionDestination = (Destination) node.getRegionDestination(); Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.getDestinationStatistics().getDispatched().increment(); regionDestination.getDestinationStatistics().getDispatched().increment();
//We still increment here as metrics get cleaned up on destroy()
//as the failure causes the subscription to close
regionDestination.getDestinationStatistics().getInflight().increment(); regionDestination.getDestinationStatistics().getInflight().increment();
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
@ -749,6 +767,10 @@ public class TopicSubscription extends AbstractSubscription {
setSlowConsumer(false); setSlowConsumer(false);
synchronized(dispatchLock) { synchronized(dispatchLock) {
dispatched.clear(); dispatched.clear();
//Clear any unacked messages from destination inflight stats
if (destination != null) {
destination.getDestinationStatistics().getInflight().subtract(getDispatchedQueueSize());
}
} }
} }

View File

@ -16,33 +16,33 @@
*/ */
package org.apache.activemq.statistics; package org.apache.activemq.statistics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession; import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.TestSupport; import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.AbstractSubscription;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.junit.After; import org.junit.After;
import org.junit.Assume; import org.junit.Assume;
@ -69,8 +69,9 @@ public abstract class AbstractInflightMessageSizeTest {
final protected String destName = "testDest"; final protected String destName = "testDest";
//use 10 second wait for assertions instead of the 30 default //use 10 second wait for assertions instead of the 30 default
final protected long WAIT_DURATION = 10 * 1000; protected final long WAIT_DURATION = 10 * 1000;
final protected long SLEEP_DURATION = 500; protected final long SLEEP_DURATION = 500;
protected final AtomicBoolean failOnDispatch = new AtomicBoolean();
@Parameters @Parameters
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
@ -102,6 +103,7 @@ public abstract class AbstractInflightMessageSizeTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
failOnDispatch.set(false);
brokerService = new BrokerService(); brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true); brokerService.setDeleteAllMessagesOnStartup(true);
TransportConnector tcp = brokerService TransportConnector tcp = brokerService
@ -111,6 +113,15 @@ public abstract class AbstractInflightMessageSizeTest {
PolicyMap pMap = new PolicyMap(); PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy); pMap.setDefaultEntry(policy);
brokerService.setDestinationPolicy(pMap); brokerService.setDestinationPolicy(pMap);
brokerService.setPlugins(new BrokerPlugin[]{broker -> new BrokerFilter(broker) {
@Override
public void preProcessDispatch(MessageDispatch messageDispatch) {
super.preProcessDispatch(messageDispatch);
if (failOnDispatch.get()) {
throw new RuntimeException("fail dispatch");
}
}
}});
brokerService.start(); brokerService.start();
//used to test optimizeAcknowledge works //used to test optimizeAcknowledge works
@ -307,6 +318,55 @@ public abstract class AbstractInflightMessageSizeTest {
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION)); Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
} }
@Test(timeout=60000)
public void testInflightMessageSizeDispatchFailure() throws Exception {
Assume.assumeTrue(useTopicSubscriptionInflightStats);
//Fail on all dispatches
failOnDispatch.set(true);
//Need to reset each time here on send because dispatch will cause the connection to close
try {
sendMessages(1);
} catch (Exception e) {
//expected as session should close
}
//Wait for session to fail
assertTrue(Wait.waitFor(() -> ((ActiveMQSession) session).isClosed(), WAIT_DURATION, SLEEP_DURATION));
//Make sure all the stats are cleaned up on failure of dispatches
assertTrue("Destination inflight message count should be 0",
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
assertTrue("Consumers size should be 0 due to failure or Inflight sub dispatched message count should be 0 for durable sub",
Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 ||
getSubscription().getDispatchedQueueSize() == 0, WAIT_DURATION, SLEEP_DURATION));
assertTrue("Consumers size should be 0 due to failure or Inflight message size should be 0 for durable sub",
Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 ||
getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
}
@Test(timeout=60000)
public void testInflightMessageSizeConsumerClosed() throws Exception {
Assume.assumeTrue(useTopicSubscriptionInflightStats);
sendMessages(10);
//Wait for the 10 messages to get dispatched and then close the consumer to test cleanup
assertTrue("Should be 10 in flight messages",
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION));
consumer.close();
//Make sure all the stats are cleaned up on failure of dispatches
assertTrue("Destination inflight message count should be 0",
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
assertTrue("Consumers size should be 0 due to failure or Inflight sub dispatched message count should be 0 for durable sub",
Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 ||
getSubscription().getDispatchedQueueSize() == 0, WAIT_DURATION, SLEEP_DURATION));
assertTrue("Consumers size should be 0 due to failure or Inflight message size should be 0 for durable sub",
Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 ||
getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
}
protected long sendMessages(int count) throws JMSException { protected long sendMessages(int count) throws JMSException {
return sendMessages(count, null); return sendMessages(count, null);
} }