git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@905640 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-02-02 14:16:41 +00:00
parent 59653a50f8
commit c32820d87b
1 changed files with 30 additions and 20 deletions

View File

@ -27,6 +27,7 @@ import java.util.Vector;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -391,16 +392,16 @@ public class FailoverTransactionTest {
@Test @Test
public void testFailoverConsumerAckLost() throws Exception { public void testFailoverConsumerAckLost() throws Exception {
// as failure depends on hash order, do a few times // as failure depends on hash order, do a few times
for (int i=0; i<3; i++) { for (int i=0; i<4; i++) {
try { try {
doTestFailoverConsumerAckLost(); doTestFailoverConsumerAckLost(i);
} finally { } finally {
stopBroker(); stopBroker();
} }
} }
} }
public void doTestFailoverConsumerAckLost() throws Exception { public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
final int adapter = 0; final int adapter = 0;
broker = createBroker(true); broker = createBroker(true);
setPersistenceAdapter(adapter); setPersistenceAdapter(adapter);
@ -456,7 +457,7 @@ public class FailoverTransactionTest {
final Vector<Message> receivedMessages = new Vector<Message>(); final Vector<Message> receivedMessages = new Vector<Message>();
final CountDownLatch commitDoneLatch = new CountDownLatch(1); final CountDownLatch commitDoneLatch = new CountDownLatch(1);
final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
Executors.newSingleThreadExecutor().execute(new Runnable() { Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() { public void run() {
LOG.info("doing async commit after consume..."); LOG.info("doing async commit after consume...");
@ -465,12 +466,17 @@ public class FailoverTransactionTest {
LOG.info("consumer1 first attempt got message: " + msg); LOG.info("consumer1 first attempt got message: " + msg);
receivedMessages.add(msg); receivedMessages.add(msg);
TimeUnit.SECONDS.sleep(7); // give some variance to the runs
TimeUnit.SECONDS.sleep(pauseSeconds * 2);
// should not get a second message as there are two messages and two consumers // should not get a second message as there are two messages and two consumers
// but with failover and unordered connection restore it can get the second // and prefetch=1, but with failover and unordered connection restore it can get the second
// message which could create a problem for a pending ack and also invalidate // message.
// the transaction in which the first was consumed and acked
// For the transaction to complete it needs to get the same one or two messages
// again so that the acks line up.
// If redelivery order is different, the commit should fail with an ex
//
msg = consumer1.receive(5000); msg = consumer1.receive(5000);
LOG.info("consumer1 second attempt got message: " + msg); LOG.info("consumer1 second attempt got message: " + msg);
if (msg != null) { if (msg != null) {
@ -481,8 +487,9 @@ public class FailoverTransactionTest {
try { try {
consumerSession1.commit(); consumerSession1.commit();
} catch (JMSException expectedSometimes) { } catch (JMSException expectedSometimes) {
LOG.info("got rollback ex on commit", expectedSometimes); LOG.info("got exception ex on commit", expectedSometimes);
if (expectedSometimes instanceof TransactionRolledBackException && receivedMessages.size() == 2) { if (expectedSometimes instanceof TransactionRolledBackException) {
gotTransactionRolledBackException.set(true);
// ok, message one was not replayed so we expect the rollback // ok, message one was not replayed so we expect the rollback
} else { } else {
throw expectedSometimes; throw expectedSometimes;
@ -506,24 +513,27 @@ public class FailoverTransactionTest {
assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
// getting 2 is indicative of orderiing issue. a problem if dangling message found after restart
LOG.info("received message count: " + receivedMessages.size()); LOG.info("received message count: " + receivedMessages.size());
// new transaction // new transaction
Message msg = consumer1.receive(2000); Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000);
LOG.info("post: from consumer1 received: " + msg); LOG.info("post: from consumer1 received: " + msg);
if (receivedMessages.size() == 1) { if (gotTransactionRolledBackException.get()) {
assertNull("should be nothing left for consumer as recieve should have committed", msg);
} else {
assertNotNull("should be available again after commit rollback ex", msg); assertNotNull("should be available again after commit rollback ex", msg);
} else {
assertNull("should be nothing left for consumer as recieve should have committed", msg);
} }
consumerSession1.commit(); consumerSession1.commit();
// consumer2 should get other message if (gotTransactionRolledBackException.get() ||
msg = consumer2.receive(5000); !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) {
LOG.info("post: from consumer2 received: " + msg); // just one message successfully consumed or none consumed
assertNotNull("got second message on consumer2", msg); // consumer2 should get other message
consumerSession2.commit(); msg = consumer2.receive(10000);
LOG.info("post: from consumer2 received: " + msg);
assertNotNull("got second message on consumer2", msg);
consumerSession2.commit();
}
for (Connection c: connections) { for (Connection c: connections) {
c.close(); c.close();