[AMQ-9257] Disabled expire message checking when pauseDispatch=true (#1005)

(cherry picked from commit 9a5b61f6a2)
This commit is contained in:
Matt Pavlovich 2023-05-19 08:06:53 -05:00
parent 5fb5793d9a
commit 972d31f976
3 changed files with 74 additions and 1 deletions

View File

@ -957,6 +957,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} }
private void expireMessages() { private void expireMessages() {
if(isDispatchPaused()) {
LOG.debug("{} dispatchPaused, skipping expire messages check", getActiveMQDestination().getQualifiedName());
return;
}
LOG.debug("{} expiring messages ..", getActiveMQDestination().getQualifiedName()); LOG.debug("{} expiring messages ..", getActiveMQDestination().getQualifiedName());
// just track the insertion count // just track the insertion count

View File

@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
public class QueueDispatchSelector extends SimpleDispatchSelector { public class QueueDispatchSelector extends SimpleDispatchSelector {
private static final Logger LOG = LoggerFactory.getLogger(QueueDispatchSelector.class); private static final Logger LOG = LoggerFactory.getLogger(QueueDispatchSelector.class);
private Subscription exclusiveConsumer; private Subscription exclusiveConsumer;
private boolean paused; private volatile boolean paused;
/** /**
* @param destination * @param destination

View File

@ -37,6 +37,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy; import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -254,6 +255,73 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
assertEquals("memory usage doesn't go to duck egg", 0, view.getMemoryPercentUsage()); assertEquals("memory usage doesn't go to duck egg", 0, view.getMemoryPercentUsage());
} }
public void testExpiredMessagesWithNoConsumerPauseResume() throws Exception {
createBrokerWithMemoryLimit();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
producer.setTimeToLive(1000);
connection.start();
final long sendCount = 2000;
ObjectName name = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test");
final QueueViewMBean queueView = (QueueViewMBean) broker.getManagementContext().newProxyInstance(name, QueueViewMBean.class, true);
queueView.pause();
assertTrue(queueView.isPaused());
final Thread producingThread = new Thread("Producing Thread") {
@Override
public void run() {
try {
int i = 0;
long tStamp = System.currentTimeMillis();
while (i++ < sendCount) {
producer.send(session.createTextMessage("test"));
if (i%100 == 0) {
LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms");
tStamp = System.currentTimeMillis() ;
}
}
} catch (Throwable ex) {
ex.printStackTrace();
}
}
};
producingThread.start();
assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
producingThread.join(TimeUnit.SECONDS.toMillis(3000));
return !producingThread.isAlive();
}
}));
assertEquals("No messages should have expired", Long.valueOf(0l), Long.valueOf(queueView.getExpiredCount()));
queueView.resume();
assertFalse(queueView.isPaused());
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("enqueue=" + queueView.getEnqueueCount() + ", dequeue=" + queueView.getDequeueCount()
+ ", inflight=" + queueView.getInFlightCount() + ", expired= " + queueView.getExpiredCount()
+ ", size= " + queueView.getQueueSize());
return sendCount == queueView.getExpiredCount();
}
}, Wait.MAX_WAIT_MILLIS * 10);
LOG.info("enqueue=" + queueView.getEnqueueCount() + ", dequeue=" + queueView.getDequeueCount()
+ ", inflight=" + queueView.getInFlightCount() + ", expired= " + queueView.getExpiredCount()
+ ", size= " + queueView.getQueueSize());
assertEquals("Not all sent messages have expired", sendCount, queueView.getExpiredCount());
assertEquals("memory usage doesn't go to duck egg", 0, queueView.getMemoryPercentUsage());
}
// first ack delivered after expiry // first ack delivered after expiry
public void testExpiredMessagesWithVerySlowConsumer() throws Exception { public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
createBroker(); createBroker();