diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index fa0f7c22b1..fbeda4c0a8 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -547,12 +547,12 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); } - public boolean isPurgeRecoveredXATransactions() { - return letter.isPurgeRecoveredXATransactions(); + public String getPurgeRecoveredXATransactionStrategy() { + return letter.getPurgeRecoveredXATransactionStrategy(); } - public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) { - letter.setPurgeRecoveredXATransactions(purgeRecoveredXATransactions); + public void setPurgeRecoveredXATransactionStrategy(String purgeRecoveredXATransactionStrategy) { + letter.setPurgeRecoveredXATransactionStrategy(purgeRecoveredXATransactionStrategy); } @Override diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 413e137a30..0e5c237d4b 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -240,6 +240,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } + public enum PurgeRecoveredXATransactionStrategy { + NEVER, + COMMIT, + ROLLBACK; + } + protected PageFile pageFile; protected Journal journal; protected Metadata metadata = new Metadata(); @@ -272,7 +278,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private boolean ignoreMissingJournalfiles = false; private int indexCacheSize = 10000; private boolean checkForCorruptJournalFiles = false; - private boolean purgeRecoveredXATransactions = false; + protected PurgeRecoveredXATransactionStrategy purgeRecoveredXATransactionStrategy = PurgeRecoveredXATransactionStrategy.NEVER; private boolean checksumJournalFiles = true; protected boolean forceRecoverIndex = false; private boolean archiveCorruptedIndex = false; @@ -746,14 +752,20 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } synchronized (preparedTransactions) { - for (TransactionId txId : preparedTransactions.keySet()) { - LOG.warn("Recovered prepared XA TX: [{}]", txId); - } - - if (purgeRecoveredXATransactions){ - if (!preparedTransactions.isEmpty()){ - LOG.warn("Purging " + preparedTransactions.size() + " recovered prepared XA TXs" ); - preparedTransactions.clear(); + Set txIds = new LinkedHashSet(preparedTransactions.keySet()); + for (TransactionId txId : txIds) { + switch (purgeRecoveredXATransactionStrategy){ + case NEVER: + LOG.warn("Recovered prepared XA TX: [{}]", txId); + break; + case COMMIT: + store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null); + LOG.warn("Recovered and Committing prepared XA TX: [{}]", txId); + break; + case ROLLBACK: + store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null); + LOG.warn("Recovered and Rolling Back prepared XA TX: [{}]", txId); + break; } } } @@ -3315,12 +3327,17 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; } - public boolean isPurgeRecoveredXATransactions() { - return purgeRecoveredXATransactions; + public PurgeRecoveredXATransactionStrategy getPurgeRecoveredXATransactionStrategyEnum() { + return purgeRecoveredXATransactionStrategy; } - public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) { - this.purgeRecoveredXATransactions = purgeRecoveredXATransactions; + public String getPurgeRecoveredXATransactionStrategy() { + return purgeRecoveredXATransactionStrategy.name(); + } + + public void setPurgeRecoveredXATransactionStrategy(String purgeRecoveredXATransactionStrategy) { + this.purgeRecoveredXATransactionStrategy = PurgeRecoveredXATransactionStrategy.valueOf( + purgeRecoveredXATransactionStrategy.trim().toUpperCase()); } public boolean isChecksumJournalFiles() { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java index 60d3b8b416..c9154a3813 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -267,7 +267,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { assertEmptyDLQ(); } - public void testPreparedTransactionRecoveredPurgeOnRestart() throws Exception { + public void testPreparedTransactionRecoveredPurgeRollbackOnRestart() throws Exception { ActiveMQDestination destination = createDestination(); @@ -306,7 +306,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { stopBroker(); if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter(); - adapter.setPurgeRecoveredXATransactions(true); + adapter.setPurgeRecoveredXATransactionStrategy("ROLLBACK"); LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter"); } broker.start(); @@ -320,9 +320,77 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { consumerInfo = createConsumerInfo(sessionInfo, destination); connection.send(consumerInfo); + // Since rolledback but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + + Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); + assertNotNull(response); + DataArrayResponse dar = (DataArrayResponse)response; + + //These should be purged so expect 0 + assertEquals(0, dar.getData().length); + + } + + public void testPreparedTransactionRecoveredPurgeCommitOnRestart() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Prepare 4 message sends. + for (int i = 0; i < 4; i++) { + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + + // Prepare + connection.send(createPrepareTransaction(connectionInfo, txid)); + } + // Since prepared but not committed.. they should not get delivered. assertNull(receiveMessage(connection)); assertNoMessagesLeft(connection); + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + stopBroker(); + if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { + KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter(); + adapter.setPurgeRecoveredXATransactionStrategy("COMMIT"); + LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter"); + } + broker.start(); + + // Setup the consumer and try receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Since committed ... they should get delivered. + for (int i = 0; i < 4; i++) { + assertNotNull(receiveMessage(connection)); + } + assertNoMessagesLeft(connection); Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); assertNotNull(response);