diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index ba5b001e4d..fe014b9709 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -20,6 +20,8 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -39,6 +41,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.After; @@ -106,20 +109,24 @@ public class FailoverTransactionTest { @Test public void testFailoverCommitReplyLost() throws Exception { - doTestFailoverCommitReplyLost(false); + doTestFailoverCommitReplyLost(0); } @Test public void testFailoverCommitReplyLostJdbc() throws Exception { - doTestFailoverCommitReplyLost(true); + doTestFailoverCommitReplyLost(1); } - public void doTestFailoverCommitReplyLost(boolean useJdbcPersistenceAdapter) throws Exception { + @Test + public void testFailoverCommitReplyLostKahaDB() throws Exception { + doTestFailoverCommitReplyLost(2); + } + + public void doTestFailoverCommitReplyLost(final int adapter) throws Exception { broker = createBroker(true); - if (useJdbcPersistenceAdapter) { - broker.setPersistenceAdapter(new JDBCPersistenceAdapter()); - } + setPersistenceAdapter(adapter); + broker.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() { @Override @@ -173,9 +180,7 @@ public class FailoverTransactionTest { // will be stopped by the plugin broker.waitUntilStopped(); broker = createBroker(false); - if (useJdbcPersistenceAdapter) { - broker.setPersistenceAdapter(new JDBCPersistenceAdapter()); - } + setPersistenceAdapter(adapter); broker.start(); assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); @@ -195,9 +200,7 @@ public class FailoverTransactionTest { LOG.info("Checking for remaining/hung messages.."); broker = createBroker(false); - if (useJdbcPersistenceAdapter) { - broker.setPersistenceAdapter(new JDBCPersistenceAdapter()); - } + setPersistenceAdapter(adapter); broker.start(); // after restart, ensure no dangling messages @@ -215,6 +218,21 @@ public class FailoverTransactionTest { connection.close(); } + private void setPersistenceAdapter(int adapter) throws IOException { + switch (adapter) { + case 0: + break; + case 1: + broker.setPersistenceAdapter(new JDBCPersistenceAdapter()); + break; + case 2: + KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter(); + store.setDirectory(new File("target/activemq-data/kahadb/FailoverTransactionTest")); + broker.setPersistenceAdapter(store); + break; + } + } + @Test public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception { startCleanBroker();