From 972d31f976133c77ae74aca4e359c5a57a75b4ff Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Fri, 19 May 2023 08:06:53 -0500 Subject: [PATCH] [AMQ-9257] Disabled expire message checking when pauseDispatch=true (#1005) (cherry picked from commit 9a5b61f6a28184cfe832871302ece16069ebb71d) --- .../apache/activemq/broker/region/Queue.java | 5 ++ .../broker/region/QueueDispatchSelector.java | 2 +- .../ExpiredMessagesWithNoConsumerTest.java | 68 +++++++++++++++++++ 3 files changed, 74 insertions(+), 1 deletion(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 79f897ec12..66fa2cf397 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -957,6 +957,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } private void expireMessages() { + if(isDispatchPaused()) { + LOG.debug("{} dispatchPaused, skipping expire messages check", getActiveMQDestination().getQualifiedName()); + return; + } + LOG.debug("{} expiring messages ..", getActiveMQDestination().getQualifiedName()); // just track the insertion count diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java index 7d3f69f84e..d57f25ad7f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java @@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory; public class QueueDispatchSelector extends SimpleDispatchSelector { private static final Logger LOG = LoggerFactory.getLogger(QueueDispatchSelector.class); private Subscription exclusiveConsumer; - private boolean paused; + private volatile boolean paused; /** * @param destination diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java index e2ad7f602a..fdc520771e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java @@ -37,6 +37,7 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy; import org.apache.activemq.broker.region.policy.PolicyEntry; @@ -254,6 +255,73 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { assertEquals("memory usage doesn't go to duck egg", 0, view.getMemoryPercentUsage()); } + public void testExpiredMessagesWithNoConsumerPauseResume() throws Exception { + + createBrokerWithMemoryLimit(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + connection = factory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(destination); + producer.setTimeToLive(1000); + connection.start(); + final long sendCount = 2000; + + ObjectName name = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test"); + final QueueViewMBean queueView = (QueueViewMBean) broker.getManagementContext().newProxyInstance(name, QueueViewMBean.class, true); + queueView.pause(); + assertTrue(queueView.isPaused()); + + final Thread producingThread = new Thread("Producing Thread") { + @Override + public void run() { + try { + int i = 0; + long tStamp = System.currentTimeMillis(); + while (i++ < sendCount) { + producer.send(session.createTextMessage("test")); + if (i%100 == 0) { + LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms"); + tStamp = System.currentTimeMillis() ; + } + } + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + + producingThread.start(); + + assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + producingThread.join(TimeUnit.SECONDS.toMillis(3000)); + return !producingThread.isAlive(); + } + })); + + assertEquals("No messages should have expired", Long.valueOf(0l), Long.valueOf(queueView.getExpiredCount())); + queueView.resume(); + assertFalse(queueView.isPaused()); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("enqueue=" + queueView.getEnqueueCount() + ", dequeue=" + queueView.getDequeueCount() + + ", inflight=" + queueView.getInFlightCount() + ", expired= " + queueView.getExpiredCount() + + ", size= " + queueView.getQueueSize()); + return sendCount == queueView.getExpiredCount(); + } + }, Wait.MAX_WAIT_MILLIS * 10); + LOG.info("enqueue=" + queueView.getEnqueueCount() + ", dequeue=" + queueView.getDequeueCount() + + ", inflight=" + queueView.getInFlightCount() + ", expired= " + queueView.getExpiredCount() + + ", size= " + queueView.getQueueSize()); + + assertEquals("Not all sent messages have expired", sendCount, queueView.getExpiredCount()); + assertEquals("memory usage doesn't go to duck egg", 0, queueView.getMemoryPercentUsage()); + } + // first ack delivered after expiry public void testExpiredMessagesWithVerySlowConsumer() throws Exception { createBroker();