mirror of https://github.com/apache/activemq.git
starter test case for https://issues.apache.org/activemq/browse/AMQ-2908 - it works though
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1031136 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
06cbebc8da
commit
ca668019a0
|
@ -18,6 +18,7 @@ package org.apache.activemq.usecases;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
|
@ -275,6 +276,132 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
||||||
LOG.info("done: " + getName());
|
LOG.info("done: " + getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testExpiredMessagesWithVerySlowConsumerCanContinue() throws Exception {
|
||||||
|
createBroker();
|
||||||
|
final long queuePrefetch = 600;
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
|
||||||
|
connection = factory.createConnection();
|
||||||
|
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
producer = session.createProducer(destination);
|
||||||
|
final int ttl = 4000;
|
||||||
|
producer.setTimeToLive(ttl);
|
||||||
|
|
||||||
|
final long sendCount = 1500;
|
||||||
|
final CountDownLatch receivedOneCondition = new CountDownLatch(1);
|
||||||
|
final CountDownLatch waitCondition = new CountDownLatch(1);
|
||||||
|
final AtomicLong received = new AtomicLong();
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
consumer.setMessageListener(new MessageListener() {
|
||||||
|
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
try {
|
||||||
|
LOG.info("Got my message: " + message);
|
||||||
|
receivedOneCondition.countDown();
|
||||||
|
received.incrementAndGet();
|
||||||
|
waitCondition.await(60, TimeUnit.SECONDS);
|
||||||
|
LOG.info("acking message: " + message);
|
||||||
|
message.acknowledge();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
fail(e.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
|
||||||
|
final Thread producingThread = new Thread("Producing Thread") {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
int i = 0;
|
||||||
|
long tStamp = System.currentTimeMillis();
|
||||||
|
while (i++ < sendCount) {
|
||||||
|
producer.send(session.createTextMessage("test"));
|
||||||
|
if (i%100 == 0) {
|
||||||
|
LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms");
|
||||||
|
tStamp = System.currentTimeMillis() ;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
ex.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
producingThread.start();
|
||||||
|
assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
assertTrue("producer completed within time ", Wait.waitFor(new Wait.Condition() {
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
producingThread.join(1000);
|
||||||
|
return !producingThread.isAlive();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
final DestinationViewMBean view = createView(destination);
|
||||||
|
|
||||||
|
assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return queuePrefetch == view.getDispatchCount();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() {
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return sendCount == view.getExpiredCount();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
|
||||||
|
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
|
||||||
|
+ ", size= " + view.getQueueSize());
|
||||||
|
|
||||||
|
// let the ack happen
|
||||||
|
waitCondition.countDown();
|
||||||
|
|
||||||
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
// consumer ackLater(delivery ack for expired messages) is based on half the prefetch value
|
||||||
|
// which will leave half of the prefetch pending till consumer close
|
||||||
|
return (queuePrefetch/2) -1 == view.getInFlightCount();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
|
||||||
|
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
|
||||||
|
+ ", size= " + view.getQueueSize());
|
||||||
|
|
||||||
|
|
||||||
|
assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount());
|
||||||
|
assertEquals("size gets back to 0 ", 0, view.getQueueSize());
|
||||||
|
assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount());
|
||||||
|
|
||||||
|
|
||||||
|
// produce some more
|
||||||
|
producer.setTimeToLive(0);
|
||||||
|
for (int i=0; i<sendCount; i++) {
|
||||||
|
producer.send(session.createTextMessage("test-" + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return received.get() >= sendCount;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return 0 == view.getInFlightCount();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertEquals("inflight goes to zeor on close", 0, view.getInFlightCount());
|
||||||
|
|
||||||
|
LOG.info("done: " + getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
|
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
|
||||||
String domain = "org.apache.activemq";
|
String domain = "org.apache.activemq";
|
||||||
ObjectName name;
|
ObjectName name;
|
||||||
|
|
Loading…
Reference in New Issue