From c32820d87bbe3d8652fce1fdaaed83729f330ddf Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Tue, 2 Feb 2010 14:16:41 +0000 Subject: [PATCH] tidy up test case for https://issues.apache.org/activemq/browse/AMQ-2590 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@905640 13f79535-47bb-0310-9956-ffa450edef68 --- .../failover/FailoverTransactionTest.java | 50 +++++++++++-------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index a1417dc128..f39ffa3255 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -27,6 +27,7 @@ import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; import javax.jms.JMSException; @@ -391,16 +392,16 @@ public class FailoverTransactionTest { @Test public void testFailoverConsumerAckLost() throws Exception { // as failure depends on hash order, do a few times - for (int i=0; i<3; i++) { + for (int i=0; i<4; i++) { try { - doTestFailoverConsumerAckLost(); + doTestFailoverConsumerAckLost(i); } finally { stopBroker(); } } } - public void doTestFailoverConsumerAckLost() throws Exception { + public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception { final int adapter = 0; broker = createBroker(true); setPersistenceAdapter(adapter); @@ -456,7 +457,7 @@ public class FailoverTransactionTest { final Vector receivedMessages = new Vector(); final CountDownLatch commitDoneLatch = new CountDownLatch(1); - + final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false); Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("doing async commit after consume..."); @@ -465,12 +466,17 @@ public class FailoverTransactionTest { LOG.info("consumer1 first attempt got message: " + msg); receivedMessages.add(msg); - TimeUnit.SECONDS.sleep(7); + // give some variance to the runs + TimeUnit.SECONDS.sleep(pauseSeconds * 2); // should not get a second message as there are two messages and two consumers - // but with failover and unordered connection restore it can get the second - // message which could create a problem for a pending ack and also invalidate - // the transaction in which the first was consumed and acked + // and prefetch=1, but with failover and unordered connection restore it can get the second + // message. + + // For the transaction to complete it needs to get the same one or two messages + // again so that the acks line up. + // If redelivery order is different, the commit should fail with an ex + // msg = consumer1.receive(5000); LOG.info("consumer1 second attempt got message: " + msg); if (msg != null) { @@ -481,8 +487,9 @@ public class FailoverTransactionTest { try { consumerSession1.commit(); } catch (JMSException expectedSometimes) { - LOG.info("got rollback ex on commit", expectedSometimes); - if (expectedSometimes instanceof TransactionRolledBackException && receivedMessages.size() == 2) { + LOG.info("got exception ex on commit", expectedSometimes); + if (expectedSometimes instanceof TransactionRolledBackException) { + gotTransactionRolledBackException.set(true); // ok, message one was not replayed so we expect the rollback } else { throw expectedSometimes; @@ -506,24 +513,27 @@ public class FailoverTransactionTest { assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); - // getting 2 is indicative of orderiing issue. a problem if dangling message found after restart LOG.info("received message count: " + receivedMessages.size()); // new transaction - Message msg = consumer1.receive(2000); + Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000); LOG.info("post: from consumer1 received: " + msg); - if (receivedMessages.size() == 1) { - assertNull("should be nothing left for consumer as recieve should have committed", msg); - } else { + if (gotTransactionRolledBackException.get()) { assertNotNull("should be available again after commit rollback ex", msg); + } else { + assertNull("should be nothing left for consumer as recieve should have committed", msg); } consumerSession1.commit(); - // consumer2 should get other message - msg = consumer2.receive(5000); - LOG.info("post: from consumer2 received: " + msg); - assertNotNull("got second message on consumer2", msg); - consumerSession2.commit(); + if (gotTransactionRolledBackException.get() || + !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) { + // just one message successfully consumed or none consumed + // consumer2 should get other message + msg = consumer2.receive(10000); + LOG.info("post: from consumer2 received: " + msg); + assertNotNull("got second message on consumer2", msg); + consumerSession2.commit(); + } for (Connection c: connections) { c.close();