git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1441156 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-31 19:34:22 +00:00
parent fe2489fc4b
commit 7f022c9379
1 changed files with 43 additions and 38 deletions

View File

@ -70,21 +70,22 @@ public class OptimizedAckTest extends TestSupport {
}
}));
for (int i = 0; i < 10; i++) {
for (int i = 0; i < 6; i++) {
javax.jms.Message msg = consumer.receive(4000);
long inFlightCount = regionBroker.getDestinationStatistics().getInflight().getCount();
assertNotNull(msg);
if (i < 7) {
assertEquals("all prefetch is still in flight", 10, inFlightCount);
} else {
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
}
}));
}
assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount());
}
for (int i = 6; i < 10; i++) {
javax.jms.Message msg = consumer.receive(4000);
assertNotNull(msg);
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
}
}));
}
}
@ -108,22 +109,26 @@ public class OptimizedAckTest extends TestSupport {
}
}));
for (int i = 0; i < 10; i++) {
for (int i = 0; i < 6; i++) {
Thread.sleep(400);
javax.jms.Message msg = consumer.receive(4000);
long inFlightCount = regionBroker.getDestinationStatistics().getInflight().getCount();
assertNotNull(msg);
if (i < 7) {
assertEquals("all prefetch is still in flight: " + i, 10, inFlightCount);
} else {
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
}
}));
}
assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount());
}
for (int i = 6; i < 10; i++) {
Thread.sleep(400);
javax.jms.Message msg = consumer.receive(4000);
assertNotNull(msg);
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
}
}));
}
}
public void testReceivedMessageNotInFlightAfterScheduledAckFires() throws Exception {
@ -147,21 +152,21 @@ public class OptimizedAckTest extends TestSupport {
}
}));
for (int i = 0; i < 10; i++) {
for (int i = 0; i < 6; i++) {
javax.jms.Message msg = consumer.receive(4000);
long inFlightCount = regionBroker.getDestinationStatistics().getInflight().getCount();
assertNotNull(msg);
if (i < 7) {
assertEquals("all prefetch is still in flight", 10, inFlightCount);
} else {
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
}
}));
}
assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount());
}
for (int i = 6; i < 10; i++) {
javax.jms.Message msg = consumer.receive(4000);
assertNotNull(msg);
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
}
}));
}
assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition() {