git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@905641 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-02-02 14:18:54 +00:00
parent 2920f2e0b4
commit e0ce1d770d
1 changed files with 30 additions and 20 deletions

View File

@ -27,6 +27,7 @@ import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.JMSException;
@ -391,16 +392,16 @@ public class FailoverTransactionTest {
@Test
public void testFailoverConsumerAckLost() throws Exception {
// as failure depends on hash order, do a few times
for (int i=0; i<3; i++) {
for (int i=0; i<4; i++) {
try {
doTestFailoverConsumerAckLost();
doTestFailoverConsumerAckLost(i);
} finally {
stopBroker();
}
}
}
public void doTestFailoverConsumerAckLost() throws Exception {
public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
final int adapter = 0;
broker = createBroker(true);
setPersistenceAdapter(adapter);
@ -456,7 +457,7 @@ public class FailoverTransactionTest {
final Vector<Message> receivedMessages = new Vector<Message>();
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("doing async commit after consume...");
@ -465,12 +466,17 @@ public class FailoverTransactionTest {
LOG.info("consumer1 first attempt got message: " + msg);
receivedMessages.add(msg);
TimeUnit.SECONDS.sleep(7);
// give some variance to the runs
TimeUnit.SECONDS.sleep(pauseSeconds * 2);
// should not get a second message as there are two messages and two consumers
// but with failover and unordered connection restore it can get the second
// message which could create a problem for a pending ack and also invalidate
// the transaction in which the first was consumed and acked
// and prefetch=1, but with failover and unordered connection restore it can get the second
// message.
// For the transaction to complete it needs to get the same one or two messages
// again so that the acks line up.
// If redelivery order is different, the commit should fail with an ex
//
msg = consumer1.receive(5000);
LOG.info("consumer1 second attempt got message: " + msg);
if (msg != null) {
@ -481,8 +487,9 @@ public class FailoverTransactionTest {
try {
consumerSession1.commit();
} catch (JMSException expectedSometimes) {
LOG.info("got rollback ex on commit", expectedSometimes);
if (expectedSometimes instanceof TransactionRolledBackException && receivedMessages.size() == 2) {
LOG.info("got exception ex on commit", expectedSometimes);
if (expectedSometimes instanceof TransactionRolledBackException) {
gotTransactionRolledBackException.set(true);
// ok, message one was not replayed so we expect the rollback
} else {
throw expectedSometimes;
@ -506,24 +513,27 @@ public class FailoverTransactionTest {
assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
// getting 2 is indicative of orderiing issue. a problem if dangling message found after restart
LOG.info("received message count: " + receivedMessages.size());
// new transaction
Message msg = consumer1.receive(2000);
Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000);
LOG.info("post: from consumer1 received: " + msg);
if (receivedMessages.size() == 1) {
assertNull("should be nothing left for consumer as recieve should have committed", msg);
} else {
if (gotTransactionRolledBackException.get()) {
assertNotNull("should be available again after commit rollback ex", msg);
} else {
assertNull("should be nothing left for consumer as recieve should have committed", msg);
}
consumerSession1.commit();
// consumer2 should get other message
msg = consumer2.receive(5000);
LOG.info("post: from consumer2 received: " + msg);
assertNotNull("got second message on consumer2", msg);
consumerSession2.commit();
if (gotTransactionRolledBackException.get() ||
!gotTransactionRolledBackException.get() && receivedMessages.size() == 1) {
// just one message successfully consumed or none consumed
// consumer2 should get other message
msg = consumer2.receive(10000);
LOG.info("post: from consumer2 received: " + msg);
assertNotNull("got second message on consumer2", msg);
consumerSession2.commit();
}
for (Connection c: connections) {
c.close();