This commit is contained in:
Clebert Suconic 2018-09-19 15:32:03 -04:00
commit 94bd8ed77e
2 changed files with 16 additions and 3 deletions

View File

@ -251,7 +251,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
logger.trace("JDBC commit worked");
}
cleanupTxRecords(deletedRecords, committedTransactions);
if (cleanupTxRecords(deletedRecords, committedTransactions)) {
deleteJournalTxRecords.executeBatch();
connection.commit();
logger.trace("JDBC commit worked on cleanupTxRecords");
}
executeCallbacks(recordRef, true);
return recordRef.size();
@ -291,7 +295,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
we remove the Tx Records (i.e. PREPARE, COMMIT). */
private synchronized void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
private synchronized boolean cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
List<RecordInfo> iterableCopy;
List<TransactionHolder> iterableCopyTx = new ArrayList<>();
iterableCopyTx.addAll(transactions.values());
@ -299,7 +303,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
for (Long txId : committedTx) {
transactions.get(txId).committed = true;
}
boolean hasDeletedJournalTxRecords = false;
// TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
for (TransactionHolder h : iterableCopyTx) {
@ -315,9 +319,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
if (h.recordInfos.isEmpty() && h.committed) {
deleteJournalTxRecords.setLong(1, h.transactionID);
deleteJournalTxRecords.addBatch();
hasDeletedJournalTxRecords = true;
transactions.remove(h.transactionID);
}
}
return hasDeletedJournalTxRecords;
}
private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean success) {

View File

@ -129,6 +129,13 @@ public class JDBCJournalTest extends ActiveMQTestBase {
assertEquals(noRecords, journal.getNumberOfRecords());
}
@Test
public void testCleanupTxRecords() throws Exception {
journal.appendDeleteRecordTransactional(1, 1);
journal.appendCommitRecord(1, true);
assertEquals(0, journal.getNumberOfRecords());
}
@Test
public void testCallbacks() throws Exception {
final int noRecords = 10;