ARTEMIS-893 Fix FailoverTransactionTest.testWaitForMissingRedeliveries

This commit is contained in:
Howard Gao 2016-12-15 23:33:01 +08:00 committed by Clebert Suconic
parent 0c18c343e9
commit 8348cdd2b6
1 changed files with 48 additions and 34 deletions

View File

@ -385,6 +385,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
LOG.info("Received: " + msg);
Assert.assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
proxy.close();
}
@Test
@ -857,6 +858,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
broker.stop();
broker = createBroker();
broker.start();
Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
} finally {
connection.close();
@ -872,45 +874,57 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME);
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(destination);
try {
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME);
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(destination);
produceMessage(producerSession, destination);
Message msg = consumer.receive(20000);
if (msg == null) {
AutoFailTestSupport.dumpAllThreads("missing-");
}
Assert.assertNotNull("got message just produced", msg);
broker.stop();
broker = createBroker();
broker.start();
final CountDownLatch commitDone = new CountDownLatch(1);
// will block pending re-deliveries
new Thread() {
@Override
public void run() {
LOG.info("doing async commit...");
try {
consumerSession.commit();
commitDone.countDown();
} catch (JMSException ignored) {
}
produceMessage(producerSession, destination);
Message msg = consumer.receive(20000);
if (msg == null) {
AutoFailTestSupport.dumpAllThreads("missing-");
}
}.start();
Assert.assertNotNull("got message just produced", msg);
broker.stop();
broker = createBroker();
broker.start();
broker.stop();
broker = createBroker();
broker.start();
Assert.assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
final CountDownLatch commitDone = new CountDownLatch(1);
final CountDownLatch gotException = new CountDownLatch(1);
// will block pending re-deliveries
new Thread() {
@Override
public void run() {
LOG.info("doing async commit...");
try {
consumerSession.commit();
commitDone.countDown();
}
catch (JMSException ignored) {
System.out.println("--->err: got exfeption:");
ignored.printStackTrace();
gotException.countDown();
}
finally {
commitDone.countDown();
}
}
}.start();
Assert.assertNull("should not get committed message", consumer.receive(5000));
connection.close();
broker.stop();
broker = createBroker();
broker.start();
Assert.assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
Assert.assertTrue("got exception on commit", gotException.await(30, TimeUnit.SECONDS));
Assert.assertNotNull("should get failed committed message", consumer.receive(5000));
} finally {
connection.close();
}
}
@Test