From f88b21613b9298da74c22188ebb10d769caec770 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Thu, 31 Jan 2013 17:44:04 +0000 Subject: [PATCH] apply fix for: https://issues.apache.org/jira/browse/AMQ-4288 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1441085 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/activemq/OptimizedAckTest.java | 209 +++++++++--------- 1 file changed, 106 insertions(+), 103 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java index eff6596698..5a1b1f55e6 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java @@ -33,6 +33,7 @@ public class OptimizedAckTest extends TestSupport { private static final Logger LOG = LoggerFactory.getLogger(OptimizedAckTest.class); private ActiveMQConnection connection; + @Override protected void setUp() throws Exception { super.setUp(); connection = (ActiveMQConnection) createConnection(); @@ -43,130 +44,132 @@ public class OptimizedAckTest extends TestSupport { connection.setPrefetchPolicy(prefetchPolicy); } + @Override protected void tearDown() throws Exception { connection.close(); super.tearDown(); } - public void testReceivedMessageStillInflight() throws Exception { - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue("test"); - MessageProducer producer = session.createProducer(queue); - for (int i = 0; i < 10; i++) { - producer.send(session.createTextMessage("Hello" + i)); - } + public void testReceivedMessageStillInflight() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test"); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("Hello" + i)); + } - final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); - MessageConsumer consumer = session.createConsumer(queue); + final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); + MessageConsumer consumer = session.createConsumer(queue); - assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); - return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); - } - })); + assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); + return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); - for (int i=0; i<10; i++) { + for (int i = 0; i < 10; i++) { javax.jms.Message msg = consumer.receive(4000); + long inFlightCount = regionBroker.getDestinationStatistics().getInflight().getCount(); assertNotNull(msg); - if (i<7) { - assertEquals("all prefetch is still in flight", 10, regionBroker.getDestinationStatistics().getInflight().getCount()); - } else { - assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition(){ - @Override - public boolean isSatisified() throws Exception { - LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); - return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); - } - })); - } - } - } + if (i < 7) { + assertEquals("all prefetch is still in flight", 10, inFlightCount); + } else { + assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); + return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); + } + } + } - public void testVerySlowReceivedMessageStillInflight() throws Exception { - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue("test"); - MessageProducer producer = session.createProducer(queue); - for (int i = 0; i < 10; i++) { - producer.send(session.createTextMessage("Hello" + i)); - } + public void testVerySlowReceivedMessageStillInflight() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test"); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("Hello" + i)); + } - final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); - MessageConsumer consumer = session.createConsumer(queue); + final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); + MessageConsumer consumer = session.createConsumer(queue); - assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); - return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); - } - })); + assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); + return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); - for (int i=0; i<10; i++) { - Thread.sleep(400); + for (int i = 0; i < 10; i++) { + Thread.sleep(400); javax.jms.Message msg = consumer.receive(4000); - long inFlightCount = regionBroker.getDestinationStatistics().getInflight().getCount(); + long inFlightCount = regionBroker.getDestinationStatistics().getInflight().getCount(); assertNotNull(msg); - if (i<7) { - assertEquals("all prefetch is still in flight: " + i, 10, inFlightCount); - } else { - assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition(){ - @Override - public boolean isSatisified() throws Exception { - return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); - } - })); - } - } - } + if (i < 7) { + assertEquals("all prefetch is still in flight: " + i, 10, inFlightCount); + } else { + assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); + } + } + } - public void testReceivedMessageNotInFlightAfterScheduledAckFires() throws Exception { - connection.setOptimizedAckScheduledAckInterval(TimeUnit.SECONDS.toMillis(10)); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue("test"); - MessageProducer producer = session.createProducer(queue); - for (int i = 0; i < 10; i++) { - producer.send(session.createTextMessage("Hello" + i)); - } + public void testReceivedMessageNotInFlightAfterScheduledAckFires() throws Exception { + connection.setOptimizedAckScheduledAckInterval(TimeUnit.SECONDS.toMillis(10)); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test"); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("Hello" + i)); + } - final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); - MessageConsumer consumer = session.createConsumer(queue); + final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); + MessageConsumer consumer = session.createConsumer(queue); - assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); - return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); - } - })); + assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); + return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); - for (int i=0; i<10; i++) { + for (int i = 0; i < 10; i++) { javax.jms.Message msg = consumer.receive(4000); - long inFlightCount = regionBroker.getDestinationStatistics().getInflight().getCount(); + long inFlightCount = regionBroker.getDestinationStatistics().getInflight().getCount(); assertNotNull(msg); - if (i<7) { - assertEquals("all prefetch is still in flight", 10, inFlightCount); - } else { - assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition(){ - @Override - public boolean isSatisified() throws Exception { - LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); - return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); - } - })); - } - } + if (i < 7) { + assertEquals("all prefetch is still in flight", 10, inFlightCount); + } else { + assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); + return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); + } + } - assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition(){ - @Override - public boolean isSatisified() throws Exception { - LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); - return 0 == regionBroker.getDestinationStatistics().getInflight().getCount(); - } - })); - } + assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); + return 0 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); + } }