mirror of https://github.com/apache/activemq.git
[AMQ-9257] Disabled expire message checking when pauseDispatch=true (#1005)
(cherry picked from commit 9a5b61f6a2
)
This commit is contained in:
parent
749e28d35e
commit
cda1007ef6
|
@ -957,6 +957,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
}
|
||||
|
||||
private void expireMessages() {
|
||||
if(isDispatchPaused()) {
|
||||
LOG.debug("{} dispatchPaused, skipping expire messages check", getActiveMQDestination().getQualifiedName());
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.debug("{} expiring messages ..", getActiveMQDestination().getQualifiedName());
|
||||
|
||||
// just track the insertion count
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
|
|||
public class QueueDispatchSelector extends SimpleDispatchSelector {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(QueueDispatchSelector.class);
|
||||
private Subscription exclusiveConsumer;
|
||||
private boolean paused;
|
||||
private volatile boolean paused;
|
||||
|
||||
/**
|
||||
* @param destination
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
|
|||
import org.apache.activemq.CombinationTestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.DestinationViewMBean;
|
||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
|
||||
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
|
@ -254,6 +255,73 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
assertEquals("memory usage doesn't go to duck egg", 0, view.getMemoryPercentUsage());
|
||||
}
|
||||
|
||||
public void testExpiredMessagesWithNoConsumerPauseResume() throws Exception {
|
||||
|
||||
createBrokerWithMemoryLimit();
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
|
||||
connection = factory.createConnection();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
producer = session.createProducer(destination);
|
||||
producer.setTimeToLive(1000);
|
||||
connection.start();
|
||||
final long sendCount = 2000;
|
||||
|
||||
ObjectName name = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test");
|
||||
final QueueViewMBean queueView = (QueueViewMBean) broker.getManagementContext().newProxyInstance(name, QueueViewMBean.class, true);
|
||||
queueView.pause();
|
||||
assertTrue(queueView.isPaused());
|
||||
|
||||
final Thread producingThread = new Thread("Producing Thread") {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
int i = 0;
|
||||
long tStamp = System.currentTimeMillis();
|
||||
while (i++ < sendCount) {
|
||||
producer.send(session.createTextMessage("test"));
|
||||
if (i%100 == 0) {
|
||||
LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms");
|
||||
tStamp = System.currentTimeMillis() ;
|
||||
}
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
producingThread.start();
|
||||
|
||||
assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
producingThread.join(TimeUnit.SECONDS.toMillis(3000));
|
||||
return !producingThread.isAlive();
|
||||
}
|
||||
}));
|
||||
|
||||
assertEquals("No messages should have expired", Long.valueOf(0l), Long.valueOf(queueView.getExpiredCount()));
|
||||
queueView.resume();
|
||||
assertFalse(queueView.isPaused());
|
||||
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.info("enqueue=" + queueView.getEnqueueCount() + ", dequeue=" + queueView.getDequeueCount()
|
||||
+ ", inflight=" + queueView.getInFlightCount() + ", expired= " + queueView.getExpiredCount()
|
||||
+ ", size= " + queueView.getQueueSize());
|
||||
return sendCount == queueView.getExpiredCount();
|
||||
}
|
||||
}, Wait.MAX_WAIT_MILLIS * 10);
|
||||
LOG.info("enqueue=" + queueView.getEnqueueCount() + ", dequeue=" + queueView.getDequeueCount()
|
||||
+ ", inflight=" + queueView.getInFlightCount() + ", expired= " + queueView.getExpiredCount()
|
||||
+ ", size= " + queueView.getQueueSize());
|
||||
|
||||
assertEquals("Not all sent messages have expired", sendCount, queueView.getExpiredCount());
|
||||
assertEquals("memory usage doesn't go to duck egg", 0, queueView.getMemoryPercentUsage());
|
||||
}
|
||||
|
||||
// first ack delivered after expiry
|
||||
public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
|
||||
createBroker();
|
||||
|
|
Loading…
Reference in New Issue