From ce7498c971b99e2515f07aab36418a1a0f19c03e Mon Sep 17 00:00:00 2001 From: hkesler Date: Thu, 19 Jul 2018 11:53:04 -0600 Subject: [PATCH] AMQ-7015 Added a purgeRecoveredXATransactions property on the KahaDB adaptor to purge prepared XA messages on recovery --- .../kahadb/KahaDBPersistenceAdapter.java | 8 +++ .../store/kahadb/MessageDatabase.java | 16 +++++ .../broker/BrokerRestartTestSupport.java | 9 ++- .../activemq/broker/XARecoveryBrokerTest.java | 66 +++++++++++++++++++ 4 files changed, 96 insertions(+), 3 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index c4f480c13f..fa0f7c22b1 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -547,6 +547,14 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); } + public boolean isPurgeRecoveredXATransactions() { + return letter.isPurgeRecoveredXATransactions(); + } + + public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) { + letter.setPurgeRecoveredXATransactions(purgeRecoveredXATransactions); + } + @Override public void setBrokerService(BrokerService brokerService) { super.setBrokerService(brokerService); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 94de6ea640..76d0da0e4d 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -272,6 +272,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private boolean ignoreMissingJournalfiles = false; private int indexCacheSize = 10000; private boolean checkForCorruptJournalFiles = false; + private boolean purgeRecoveredXATransactions = false; private boolean checksumJournalFiles = true; protected boolean forceRecoverIndex = false; private boolean archiveCorruptedIndex = false; @@ -748,6 +749,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe for (TransactionId txId : preparedTransactions.keySet()) { LOG.warn("Recovered prepared XA TX: [{}]", txId); } + + if (purgeRecoveredXATransactions){ + if (!preparedTransactions.isEmpty()){ + LOG.warn("Purging " + preparedTransactions.size() + " recovered prepared XA TXs" ); + preparedTransactions.clear(); + } + } } } finally { @@ -3340,6 +3348,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; } + public boolean isPurgeRecoveredXATransactions() { + return purgeRecoveredXATransactions; + } + + public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) { + this.purgeRecoveredXATransactions = purgeRecoveredXATransactions; + } + public boolean isChecksumJournalFiles() { return checksumJournalFiles; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java index c4e3848803..111494a027 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java @@ -58,10 +58,13 @@ public class BrokerRestartTestSupport extends BrokerTestSupport { * @throws URISyntaxException */ protected void restartBroker() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - broker = createRestartedBroker(); + stopBroker(); broker.start(); } + protected void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = createRestartedBroker(); + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java index 9660ef0b0b..b52681a58c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -267,6 +267,72 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { assertEmptyDLQ(); } + public void testPreparedTransactionRecoveredPurgeOnRestart() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Prepare 4 message sends. + for (int i = 0; i < 4; i++) { + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + + // Prepare + connection.send(createPrepareTransaction(connectionInfo, txid)); + } + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + stopBroker(); + if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { + KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter(); + adapter.setPurgeRecoveredXATransactions(true); + LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter"); + } + broker.start(); + + // Setup the consumer and try receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + + Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); + assertNotNull(response); + DataArrayResponse dar = (DataArrayResponse)response; + + //These should be purged so expect 0 + assertEquals(0, dar.getData().length); + + } + private void assertEmptyDLQ() throws Exception { try { DestinationViewMBean destinationView = getProxyToDestination(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));