From f6e26085cfa3693b2872b1c53354c674c0cbca49 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Thu, 17 Nov 2022 09:02:43 -0500 Subject: [PATCH] AMQ-9159 - Add a test case to verify inflight message stats for wildcard consumer when a destination is removed --- .../AbstractInflightMessageSizeTest.java | 80 +++++++++++++++++-- ...leSubscriptionInflightMessageSizeTest.java | 19 ++++- ...ueSubscriptionInflightMessageSizeTest.java | 10 +-- ...icSubscriptionInflightMessageSizeTest.java | 8 +- 4 files changed, 98 insertions(+), 19 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java index 05b1ef030a..de012d3230 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java @@ -66,7 +66,9 @@ public abstract class AbstractInflightMessageSizeTest { protected boolean useTopicSubscriptionInflightStats; final protected int ackType; final protected boolean optimizeAcknowledge; - final protected String destName = "testDest"; + final protected String destNamePrefix = "testDest"; + final protected String destName = "testDest.1"; + final protected String destName2 = "testDest.2"; //use 10 second wait for assertions instead of the 30 default protected final long WAIT_DURATION = 10 * 1000; @@ -367,8 +369,62 @@ public abstract class AbstractInflightMessageSizeTest { getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION)); } + @Test(timeout=60000) + public void testInflightMessageSizeRemoveDestination() throws Exception { + Assume.assumeTrue(useTopicSubscriptionInflightStats); + //Close as we will re-create with a wildcard sub to get messages from 2 destinations + consumer.close(); + + consumer = getMessageConsumer(destNamePrefix + ".>"); + sendMessages(10); + sendMessages(10, getActiveMQDestination(destName2)); + Destination amqDestination2 = TestSupport.getDestination(brokerService, getActiveMQDestination(destName2)); + final Subscription subscription = getSubscription(); + + //Wait for the 10 messages to get dispatched and then close the consumer to test cleanup + assertTrue("Should be 10 in flight messages", + Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION)); + assertTrue("Should be 10 in flight messages", + Wait.waitFor(() -> amqDestination2.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION)); + assertTrue("Inflight message size should be 20", + Wait.waitFor(() -> subscription.getInFlightSize() == 20, WAIT_DURATION, SLEEP_DURATION)); + assertTrue("Inflight message size should be greater than 0", + Wait.waitFor(() -> subscription.getInFlightMessageSize() > 0, WAIT_DURATION, SLEEP_DURATION)); + + //remove 1 destination, leaving 10 in flight + brokerService.getBroker().removeDestination(brokerService.getAdminConnectionContext(), getActiveMQDestination(), 1000); + + //Make sure all the stats are updated after 1 destination removal + assertTrue("Destination inflight message count should be 0", + Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION)); + assertTrue("Destination inflight message count should still be 10", + Wait.waitFor(() -> amqDestination2.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION)); + assertTrue("Inflight message size should be 10", + Wait.waitFor(() -> subscription.getInFlightSize() == 10, WAIT_DURATION, SLEEP_DURATION)); + assertTrue("Inflight message size should be greater than 0", + Wait.waitFor(() -> subscription.getInFlightMessageSize() > 0, WAIT_DURATION, SLEEP_DURATION)); + + //remove second dest + brokerService.getBroker().removeDestination(brokerService.getAdminConnectionContext(), getActiveMQDestination(destName2), 1000); + + assertTrue("Destination inflight message count should be 0", + Wait.waitFor(() -> amqDestination2.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION)); + assertTrue("Inflight message size should be 0", + Wait.waitFor(() -> subscription.getInFlightSize() == 0, WAIT_DURATION, SLEEP_DURATION)); + assertTrue("Inflight message size should be 0", + Wait.waitFor(() -> subscription.getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION)); + } + protected long sendMessages(int count) throws JMSException { - return sendMessages(count, null); + return sendMessages(count, null, dest); + } + + protected long sendMessages(int count, javax.jms.Destination dest) throws JMSException { + return sendMessages(count, null, dest); + } + + protected long sendMessages(int count, Integer ttl) throws JMSException { + return sendMessages(count, ttl, dest); } /** @@ -377,7 +433,7 @@ public abstract class AbstractInflightMessageSizeTest { * @param count * @throws JMSException */ - protected long sendMessages(int count, Integer ttl) throws JMSException { + protected long sendMessages(int count, Integer ttl, javax.jms.Destination dest) throws JMSException { MessageProducer producer = session.createProducer(dest); if (ttl != null) { producer.setTimeToLive(ttl); @@ -412,10 +468,22 @@ public abstract class AbstractInflightMessageSizeTest { protected abstract Subscription getSubscription(); - protected abstract ActiveMQDestination getActiveMQDestination(); + protected ActiveMQDestination getActiveMQDestination() { + return getActiveMQDestination(destName); + } - protected abstract MessageConsumer getMessageConsumer() throws JMSException; + protected abstract ActiveMQDestination getActiveMQDestination(String destName); - protected abstract javax.jms.Destination getDestination() throws JMSException ; + protected MessageConsumer getMessageConsumer() throws JMSException { + return getMessageConsumer(destName); + } + + protected abstract MessageConsumer getMessageConsumer(String destName) throws JMSException; + + protected javax.jms.Destination getDestination() throws JMSException { + return getDestination(destName); + } + + protected abstract javax.jms.Destination getDestination(String destName) throws JMSException; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java index a7e947329a..2f349296ca 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java @@ -24,6 +24,8 @@ import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.SubscriptionKey; +import org.junit.Assume; +import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -40,8 +42,8 @@ public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflight } @Override - protected MessageConsumer getMessageConsumer() throws JMSException { - return session.createDurableSubscriber((javax.jms.Topic)dest, "sub1"); + protected MessageConsumer getMessageConsumer(String destName) throws JMSException { + return session.createDurableSubscriber(getDestination(destName), "sub1"); } @Override @@ -50,13 +52,22 @@ public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflight } @Override - protected javax.jms.Topic getDestination() throws JMSException { + protected javax.jms.Topic getDestination(String destName) throws JMSException { return session.createTopic(destName); } @Override - protected ActiveMQDestination getActiveMQDestination() { + protected ActiveMQDestination getActiveMQDestination(String destName) { return new ActiveMQTopic(destName); } + @Test(timeout=60000) + public void testInflightMessageSizeRemoveDestination() throws Exception { + Assume.assumeTrue(useTopicSubscriptionInflightStats); + //Close as we will re-create with a wildcard sub + consumer.close(); + session.unsubscribe("sub1"); + super.testInflightMessageSizeRemoveDestination(); + } + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java index 217aefb2d2..11812ab249 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java @@ -40,22 +40,22 @@ public class QueueSubscriptionInflightMessageSizeTest extends AbstractInflightMe } @Override - protected MessageConsumer getMessageConsumer() throws JMSException { - return session.createConsumer(dest); + protected MessageConsumer getMessageConsumer(String destName) throws JMSException { + return session.createConsumer(getDestination(destName)); } @Override protected Subscription getSubscription() { - return ((Queue)amqDestination).getConsumers().get(0); + return amqDestination.getConsumers().get(0); } @Override - protected Destination getDestination() throws JMSException { + protected Destination getDestination(String destName) throws JMSException { return session.createQueue(destName); } @Override - protected ActiveMQDestination getActiveMQDestination() { + protected ActiveMQDestination getActiveMQDestination(String destName) { return new ActiveMQQueue(destName); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java index 132a96d9b5..1691329def 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java @@ -43,8 +43,8 @@ public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMe } @Override - protected MessageConsumer getMessageConsumer() throws JMSException { - return session.createConsumer(dest); + protected MessageConsumer getMessageConsumer(String destName) throws JMSException { + return session.createConsumer(getDestination(destName)); } @Override @@ -53,12 +53,12 @@ public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMe } @Override - protected Destination getDestination() throws JMSException { + protected Destination getDestination(String destName) throws JMSException { return session.createTopic(destName); } @Override - protected ActiveMQDestination getActiveMQDestination() { + protected ActiveMQDestination getActiveMQDestination(String destName) { return new ActiveMQTopic(destName); }