mirror of https://github.com/apache/activemq.git
add commit replay test to assert dups suppressed for KahaDB
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@892291 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8732f70793
commit
58e6532c7e
|
@ -20,6 +20,8 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -39,6 +41,7 @@ import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.command.TransactionId;
|
import org.apache.activemq.command.TransactionId;
|
||||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||||
|
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -106,20 +109,24 @@ public class FailoverTransactionTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFailoverCommitReplyLost() throws Exception {
|
public void testFailoverCommitReplyLost() throws Exception {
|
||||||
doTestFailoverCommitReplyLost(false);
|
doTestFailoverCommitReplyLost(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFailoverCommitReplyLostJdbc() throws Exception {
|
public void testFailoverCommitReplyLostJdbc() throws Exception {
|
||||||
doTestFailoverCommitReplyLost(true);
|
doTestFailoverCommitReplyLost(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void doTestFailoverCommitReplyLost(boolean useJdbcPersistenceAdapter) throws Exception {
|
@Test
|
||||||
|
public void testFailoverCommitReplyLostKahaDB() throws Exception {
|
||||||
|
doTestFailoverCommitReplyLost(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doTestFailoverCommitReplyLost(final int adapter) throws Exception {
|
||||||
|
|
||||||
broker = createBroker(true);
|
broker = createBroker(true);
|
||||||
if (useJdbcPersistenceAdapter) {
|
setPersistenceAdapter(adapter);
|
||||||
broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
|
|
||||||
}
|
|
||||||
broker.setPlugins(new BrokerPlugin[] {
|
broker.setPlugins(new BrokerPlugin[] {
|
||||||
new BrokerPluginSupport() {
|
new BrokerPluginSupport() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -173,9 +180,7 @@ public class FailoverTransactionTest {
|
||||||
// will be stopped by the plugin
|
// will be stopped by the plugin
|
||||||
broker.waitUntilStopped();
|
broker.waitUntilStopped();
|
||||||
broker = createBroker(false);
|
broker = createBroker(false);
|
||||||
if (useJdbcPersistenceAdapter) {
|
setPersistenceAdapter(adapter);
|
||||||
broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
|
|
||||||
}
|
|
||||||
broker.start();
|
broker.start();
|
||||||
|
|
||||||
assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
|
assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
|
||||||
|
@ -195,9 +200,7 @@ public class FailoverTransactionTest {
|
||||||
|
|
||||||
LOG.info("Checking for remaining/hung messages..");
|
LOG.info("Checking for remaining/hung messages..");
|
||||||
broker = createBroker(false);
|
broker = createBroker(false);
|
||||||
if (useJdbcPersistenceAdapter) {
|
setPersistenceAdapter(adapter);
|
||||||
broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
|
|
||||||
}
|
|
||||||
broker.start();
|
broker.start();
|
||||||
|
|
||||||
// after restart, ensure no dangling messages
|
// after restart, ensure no dangling messages
|
||||||
|
@ -215,6 +218,21 @@ public class FailoverTransactionTest {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setPersistenceAdapter(int adapter) throws IOException {
|
||||||
|
switch (adapter) {
|
||||||
|
case 0:
|
||||||
|
break;
|
||||||
|
case 1:
|
||||||
|
broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter();
|
||||||
|
store.setDirectory(new File("target/activemq-data/kahadb/FailoverTransactionTest"));
|
||||||
|
broker.setPersistenceAdapter(store);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
|
public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
|
||||||
startCleanBroker();
|
startCleanBroker();
|
||||||
|
|
Loading…
Reference in New Issue