diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java index 5c15c1f08e..55931a6ebb 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java @@ -63,7 +63,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore { static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class); final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter; final ConcurrentMap inflightTransactions = new ConcurrentHashMap(); - final Set recoveredPendingCommit = new HashSet(); + final ConcurrentMap pendingCommit = new ConcurrentHashMap(); private Journal journal; private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; @@ -294,10 +294,12 @@ public class MultiKahaDBTransactionStore implements TransactionStore { public void persistOutcome(Tx tx, TransactionId txid) throws IOException { tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))))); + pendingCommit.put(txid, tx); } public void persistCompletion(TransactionId txid) throws IOException { store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))); + pendingCommit.remove(txid); } private Location store(JournalCommand data) throws IOException { @@ -355,6 +357,9 @@ public class MultiKahaDBTransactionStore implements TransactionStore { for (Tx tx : inflightTransactions.values()) { knownDataFileIds.remove(tx.getPreparedLocationId()); } + for (Tx tx : pendingCommit.values()) { + knownDataFileIds.remove(tx.getPreparedLocationId()); + } try { journal.removeDataFiles(knownDataFileIds); } catch (Exception e) { @@ -380,8 +385,8 @@ public class MultiKahaDBTransactionStore implements TransactionStore { process(location, load(location)); location = journal.getNextLocation(location); } - recoveredPendingCommit.addAll(inflightTransactions.keySet()); - LOG.info("pending local transactions: " + recoveredPendingCommit); + pendingCommit.putAll(inflightTransactions); + LOG.info("pending local transactions: " + pendingCommit.keySet()); } public JournalCommand load(Location location) throws IOException { @@ -437,10 +442,9 @@ public class MultiKahaDBTransactionStore implements TransactionStore { for (TransactionId txid : broker.getPreparedTransactions(null)) { if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) { try { - if (recoveredPendingCommit.contains(txid)) { + if (pendingCommit.keySet().contains(txid)) { LOG.info("delivering pending commit outcome for tid: " + txid); broker.commitTransaction(null, txid, false); - recoveredPendingCommit.remove(txid); } else { LOG.info("delivering rollback outcome to store for tid: " + txid); broker.forgetTransaction(null, txid); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java index 4a7e9c67e8..da96431141 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java @@ -207,7 +207,7 @@ public class MKahaDBTxRecoveryTest { multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4*1024); - multiKahaDBPersistenceAdapter.setJournalCleanupInterval(CLEANUP_INTERVAL_MILLIS); + multiKahaDBPersistenceAdapter.setJournalCleanupInterval(10); broker = createBroker(multiKahaDBPersistenceAdapter); }