diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 2108be7b19..4df6a17765 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -169,8 +169,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { boolean success = false; try { for (JDBCJournalRecord record : recordRef) { - record.storeLineUp(); - switch (record.getRecordType()) { case JDBCJournalRecord.DELETE_RECORD: // Standard SQL Delete Record, Non transactional delete @@ -224,16 +222,16 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { cleanupTxRecords(deletedRecords, committedTransactions); } catch (SQLException e) { logger.warn("Failed to remove the Tx Records", e.getMessage(), e); + } finally { + executeCallbacks(recordRef, success); } - executeCallbacks(recordRef, success); return recordRef.size(); } /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, we remove the Tx Records (i.e. PREPARE, COMMIT). */ private synchronized void cleanupTxRecords(List deletedRecords, List committedTx) throws SQLException { - connection.rollback(); List iterableCopy; List iterableCopyTx = new ArrayList<>(); iterableCopyTx.addAll(transactions.values()); @@ -264,6 +262,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { private void performRollback(List records) { try { + connection.rollback(); + for (JDBCJournalRecord record : records) { if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { removeTxRecord(record); @@ -299,9 +299,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } private void appendRecord(JDBCJournalRecord record) throws Exception { + record.storeLineUp(); SimpleWaitIOCallback callback = null; - if (record.isSync() && record.getIoCompletion() == null && !record.isTransactional()) { + if (record.isSync() && record.getIoCompletion() == null) { callback = new SimpleWaitIOCallback(); record.setIoCompletion(callback); } @@ -318,8 +319,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { syncTimer.delay(); - if (callback != null) + if (callback != null) { callback.waitCompletion(); + } } private synchronized void addTxRecord(JDBCJournalRecord record) { @@ -510,6 +512,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { public void appendCommitRecord(long txID, boolean sync) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); + r.setSync(sync); appendRecord(r); } @@ -517,6 +520,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); + r.setSync(sync); r.setIoCompletion(callback); appendRecord(r); } @@ -530,6 +534,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { r.setTxId(txID); r.setStoreLineUp(lineUpContext); r.setIoCompletion(callback); + r.setSync(sync); appendRecord(r); }