diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java index f433caf822..53ee74b1d2 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.usecases; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.Connection; import javax.jms.Message; @@ -275,6 +276,132 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { LOG.info("done: " + getName()); } + public void testExpiredMessagesWithVerySlowConsumerCanContinue() throws Exception { + createBroker(); + final long queuePrefetch = 600; + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch); + connection = factory.createConnection(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + producer = session.createProducer(destination); + final int ttl = 4000; + producer.setTimeToLive(ttl); + + final long sendCount = 1500; + final CountDownLatch receivedOneCondition = new CountDownLatch(1); + final CountDownLatch waitCondition = new CountDownLatch(1); + final AtomicLong received = new AtomicLong(); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + + public void onMessage(Message message) { + try { + LOG.info("Got my message: " + message); + receivedOneCondition.countDown(); + received.incrementAndGet(); + waitCondition.await(60, TimeUnit.SECONDS); + LOG.info("acking message: " + message); + message.acknowledge(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.toString()); + } + } + }); + + connection.start(); + + + final Thread producingThread = new Thread("Producing Thread") { + public void run() { + try { + int i = 0; + long tStamp = System.currentTimeMillis(); + while (i++ < sendCount) { + producer.send(session.createTextMessage("test")); + if (i%100 == 0) { + LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms"); + tStamp = System.currentTimeMillis() ; + } + } + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + + producingThread.start(); + assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS)); + + assertTrue("producer completed within time ", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + producingThread.join(1000); + return !producingThread.isAlive(); + } + })); + + final DestinationViewMBean view = createView(destination); + + assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return queuePrefetch == view.getDispatchCount(); + } + })); + assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return sendCount == view.getExpiredCount(); + } + })); + + LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + + ", size= " + view.getQueueSize()); + + // let the ack happen + waitCondition.countDown(); + + Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + // consumer ackLater(delivery ack for expired messages) is based on half the prefetch value + // which will leave half of the prefetch pending till consumer close + return (queuePrefetch/2) -1 == view.getInFlightCount(); + } + }); + LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + + ", size= " + view.getQueueSize()); + + + assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount()); + assertEquals("size gets back to 0 ", 0, view.getQueueSize()); + assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount()); + + + // produce some more + producer.setTimeToLive(0); + for (int i=0; i= sendCount; + } + }); + + consumer.close(); + + Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return 0 == view.getInFlightCount(); + } + }); + assertEquals("inflight goes to zeor on close", 0, view.getInFlightCount()); + + LOG.info("done: " + getName()); + } + + + protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { String domain = "org.apache.activemq"; ObjectName name;