diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 82053d437c..2f56addaf9 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -149,8 +149,9 @@ public class JDBCJournalImpl implements Journal { List recordRef = records; records = new ArrayList(); - // We keep a list of deleted records (used for cleaning up old transaction data). + // We keep a list of deleted records and committed tx (used for cleaning up old transaction data). List deletedRecords = new ArrayList<>(); + List committedTransactions = new ArrayList<>(); TransactionHolder holder; @@ -180,6 +181,7 @@ public class JDBCJournalImpl implements Journal { deleteJournalRecords.addBatch(); } record.writeRecord(insertJournalRecords); + committedTransactions.add(record.getTxId()); break; default: // Default we add a new record to the DB @@ -202,7 +204,7 @@ public class JDBCJournalImpl implements Journal { connection.commit(); - cleanupTxRecords(deletedRecords); + cleanupTxRecords(deletedRecords, committedTransactions); success = true; } catch (SQLException e) { @@ -215,12 +217,16 @@ public class JDBCJournalImpl implements Journal { /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, we remove the Tx Records (i.e. PREPARE, COMMIT). */ - private void cleanupTxRecords(List deletedRecords) throws SQLException { + private void cleanupTxRecords(List deletedRecords, List committedTx) throws SQLException { List iterableCopy; List iterableCopyTx = new ArrayList<>(); iterableCopyTx.addAll(transactions.values()); + for (Long txId : committedTx) { + transactions.get(txId).committed = true; + } + // TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop for (TransactionHolder h : iterableCopyTx) { @@ -233,7 +239,7 @@ public class JDBCJournalImpl implements Journal { } } - if (h.recordInfos.isEmpty()) { + if (h.recordInfos.isEmpty() && h.committed) { deleteJournalTxRecords.setLong(1, h.transactionID); deleteJournalTxRecords.addBatch(); transactions.remove(h.transactionID); diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalReaderCallback.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalReaderCallback.java index 844fd4a26f..a7c765e836 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalReaderCallback.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalReaderCallback.java @@ -90,7 +90,11 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback { public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception { // It is possible that the TX could be null, since deletes could have happened in the journal. - TransactionHolder tx = loadTransactions.remove(transactionID); + + TransactionHolder tx = loadTransactions.get(transactionID); + tx.committed = true; + + // We can remove local Tx without associated records if (tx != null) { for (RecordInfo txRecord : tx.recordInfos) { if (txRecord.isUpdate) { @@ -117,11 +121,11 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback { public void checkPreparedTx() { for (TransactionHolder transaction : loadTransactions.values()) { - if (!transaction.prepared || transaction.invalid) { + if ((!transaction.prepared && !transaction.committed) || transaction.invalid) { ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID); loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete); } - else { + else if (!transaction.committed) { PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData); info.getRecords().addAll(transaction.recordInfos); info.getRecordsToDelete().addAll(transaction.recordsToDelete); diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/TransactionHolder.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/TransactionHolder.java index 4c3cd846d3..c03e7479d4 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/TransactionHolder.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/TransactionHolder.java @@ -39,4 +39,6 @@ final class TransactionHolder { public boolean invalid; public byte[] extraData; + + public boolean committed; }