diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index 2b7895afc0..d1f91d0013 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -385,6 +385,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest { LOG.info("Received: " + msg); Assert.assertNull("no messges left dangling but got: " + msg, msg); connection.close(); + proxy.close(); } @Test @@ -857,6 +858,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest { broker.stop(); broker = createBroker(); broker.start(); + Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000)); } finally { connection.close(); @@ -872,45 +874,57 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000"); configureConnectionFactory(cf); Connection connection = cf.createConnection(); - connection.start(); - final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Queue destination = producerSession.createQueue(QUEUE_NAME); - final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = consumerSession.createConsumer(destination); + try { + connection.start(); + final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue destination = producerSession.createQueue(QUEUE_NAME); + final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = consumerSession.createConsumer(destination); - produceMessage(producerSession, destination); - Message msg = consumer.receive(20000); - if (msg == null) { - AutoFailTestSupport.dumpAllThreads("missing-"); - } - Assert.assertNotNull("got message just produced", msg); - - broker.stop(); - broker = createBroker(); - broker.start(); - - final CountDownLatch commitDone = new CountDownLatch(1); - // will block pending re-deliveries - new Thread() { - @Override - public void run() { - LOG.info("doing async commit..."); - try { - consumerSession.commit(); - commitDone.countDown(); - } catch (JMSException ignored) { - } + produceMessage(producerSession, destination); + Message msg = consumer.receive(20000); + if (msg == null) { + AutoFailTestSupport.dumpAllThreads("missing-"); } - }.start(); + Assert.assertNotNull("got message just produced", msg); - broker.stop(); - broker = createBroker(); - broker.start(); + broker.stop(); + broker = createBroker(); + broker.start(); - Assert.assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS)); + final CountDownLatch commitDone = new CountDownLatch(1); + final CountDownLatch gotException = new CountDownLatch(1); + // will block pending re-deliveries + new Thread() { + @Override + public void run() { + LOG.info("doing async commit..."); + try { + consumerSession.commit(); + commitDone.countDown(); + } + catch (JMSException ignored) { + System.out.println("--->err: got exfeption:"); + ignored.printStackTrace(); + gotException.countDown(); + } + finally { + commitDone.countDown(); + } + } + }.start(); - Assert.assertNull("should not get committed message", consumer.receive(5000)); - connection.close(); + broker.stop(); + broker = createBroker(); + broker.start(); + + Assert.assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS)); + Assert.assertTrue("got exception on commit", gotException.await(30, TimeUnit.SECONDS)); + + Assert.assertNotNull("should get failed committed message", consumer.receive(5000)); + } finally { + connection.close(); + } } @Test