git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1441085 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-31 17:44:04 +00:00
parent b8bbca6773
commit f88b21613b
1 changed files with 106 additions and 103 deletions

View File

@ -33,6 +33,7 @@ public class OptimizedAckTest extends TestSupport {
private static final Logger LOG = LoggerFactory.getLogger(OptimizedAckTest.class); private static final Logger LOG = LoggerFactory.getLogger(OptimizedAckTest.class);
private ActiveMQConnection connection; private ActiveMQConnection connection;
@Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(); super.setUp();
connection = (ActiveMQConnection) createConnection(); connection = (ActiveMQConnection) createConnection();
@ -43,130 +44,132 @@ public class OptimizedAckTest extends TestSupport {
connection.setPrefetchPolicy(prefetchPolicy); connection.setPrefetchPolicy(prefetchPolicy);
} }
@Override
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
connection.close(); connection.close();
super.tearDown(); super.tearDown();
} }
public void testReceivedMessageStillInflight() throws Exception { public void testReceivedMessageStillInflight() throws Exception {
connection.start(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test"); Queue queue = session.createQueue("test");
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("Hello" + i)); producer.send(session.createTextMessage("Hello" + i));
} }
final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
} }
})); }));
for (int i=0; i<10; i++) { for (int i = 0; i < 10; i++) {
javax.jms.Message msg = consumer.receive(4000); javax.jms.Message msg = consumer.receive(4000);
long inFlightCount = regionBroker.getDestinationStatistics().getInflight().getCount();
assertNotNull(msg); assertNotNull(msg);
if (i<7) { if (i < 7) {
assertEquals("all prefetch is still in flight", 10, regionBroker.getDestinationStatistics().getInflight().getCount()); assertEquals("all prefetch is still in flight", 10, inFlightCount);
} else { } else {
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition(){ assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
} }
})); }));
} }
} }
} }
public void testVerySlowReceivedMessageStillInflight() throws Exception { public void testVerySlowReceivedMessageStillInflight() throws Exception {
connection.start(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test"); Queue queue = session.createQueue("test");
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("Hello" + i)); producer.send(session.createTextMessage("Hello" + i));
} }
final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
} }
})); }));
for (int i=0; i<10; i++) { for (int i = 0; i < 10; i++) {
Thread.sleep(400); Thread.sleep(400);
javax.jms.Message msg = consumer.receive(4000); javax.jms.Message msg = consumer.receive(4000);
long inFlightCount = regionBroker.getDestinationStatistics().getInflight().getCount(); long inFlightCount = regionBroker.getDestinationStatistics().getInflight().getCount();
assertNotNull(msg); assertNotNull(msg);
if (i<7) { if (i < 7) {
assertEquals("all prefetch is still in flight: " + i, 10, inFlightCount); assertEquals("all prefetch is still in flight: " + i, 10, inFlightCount);
} else { } else {
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition(){ assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
} }
})); }));
} }
} }
} }
public void testReceivedMessageNotInFlightAfterScheduledAckFires() throws Exception { public void testReceivedMessageNotInFlightAfterScheduledAckFires() throws Exception {
connection.setOptimizedAckScheduledAckInterval(TimeUnit.SECONDS.toMillis(10)); connection.setOptimizedAckScheduledAckInterval(TimeUnit.SECONDS.toMillis(10));
connection.start(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test"); Queue queue = session.createQueue("test");
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("Hello" + i)); producer.send(session.createTextMessage("Hello" + i));
} }
final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
} }
})); }));
for (int i=0; i<10; i++) { for (int i = 0; i < 10; i++) {
javax.jms.Message msg = consumer.receive(4000); javax.jms.Message msg = consumer.receive(4000);
long inFlightCount = regionBroker.getDestinationStatistics().getInflight().getCount(); long inFlightCount = regionBroker.getDestinationStatistics().getInflight().getCount();
assertNotNull(msg); assertNotNull(msg);
if (i<7) { if (i < 7) {
assertEquals("all prefetch is still in flight", 10, inFlightCount); assertEquals("all prefetch is still in flight", 10, inFlightCount);
} else { } else {
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition(){ assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
} }
})); }));
} }
} }
assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition(){ assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
return 0 == regionBroker.getDestinationStatistics().getInflight().getCount(); return 0 == regionBroker.getDestinationStatistics().getInflight().getCount();
} }
})); }));
} }
} }