Do not delete empty tx before commit/rollback
This commit is contained in:
parent
d499e4d8cb
commit
5eecd87106
|
@ -149,8 +149,9 @@ public class JDBCJournalImpl implements Journal {
|
||||||
List<JDBCJournalRecord> recordRef = records;
|
List<JDBCJournalRecord> recordRef = records;
|
||||||
records = new ArrayList<JDBCJournalRecord>();
|
records = new ArrayList<JDBCJournalRecord>();
|
||||||
|
|
||||||
// We keep a list of deleted records (used for cleaning up old transaction data).
|
// We keep a list of deleted records and committed tx (used for cleaning up old transaction data).
|
||||||
List<Long> deletedRecords = new ArrayList<>();
|
List<Long> deletedRecords = new ArrayList<>();
|
||||||
|
List<Long> committedTransactions = new ArrayList<>();
|
||||||
|
|
||||||
TransactionHolder holder;
|
TransactionHolder holder;
|
||||||
|
|
||||||
|
@ -180,6 +181,7 @@ public class JDBCJournalImpl implements Journal {
|
||||||
deleteJournalRecords.addBatch();
|
deleteJournalRecords.addBatch();
|
||||||
}
|
}
|
||||||
record.writeRecord(insertJournalRecords);
|
record.writeRecord(insertJournalRecords);
|
||||||
|
committedTransactions.add(record.getTxId());
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
// Default we add a new record to the DB
|
// Default we add a new record to the DB
|
||||||
|
@ -202,7 +204,7 @@ public class JDBCJournalImpl implements Journal {
|
||||||
|
|
||||||
connection.commit();
|
connection.commit();
|
||||||
|
|
||||||
cleanupTxRecords(deletedRecords);
|
cleanupTxRecords(deletedRecords, committedTransactions);
|
||||||
success = true;
|
success = true;
|
||||||
}
|
}
|
||||||
catch (SQLException e) {
|
catch (SQLException e) {
|
||||||
|
@ -215,12 +217,16 @@ public class JDBCJournalImpl implements Journal {
|
||||||
|
|
||||||
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
|
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
|
||||||
we remove the Tx Records (i.e. PREPARE, COMMIT). */
|
we remove the Tx Records (i.e. PREPARE, COMMIT). */
|
||||||
private void cleanupTxRecords(List<Long> deletedRecords) throws SQLException {
|
private void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
|
||||||
|
|
||||||
List<RecordInfo> iterableCopy;
|
List<RecordInfo> iterableCopy;
|
||||||
List<TransactionHolder> iterableCopyTx = new ArrayList<>();
|
List<TransactionHolder> iterableCopyTx = new ArrayList<>();
|
||||||
iterableCopyTx.addAll(transactions.values());
|
iterableCopyTx.addAll(transactions.values());
|
||||||
|
|
||||||
|
for (Long txId : committedTx) {
|
||||||
|
transactions.get(txId).committed = true;
|
||||||
|
}
|
||||||
|
|
||||||
// TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
|
// TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
|
||||||
for (TransactionHolder h : iterableCopyTx) {
|
for (TransactionHolder h : iterableCopyTx) {
|
||||||
|
|
||||||
|
@ -233,7 +239,7 @@ public class JDBCJournalImpl implements Journal {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (h.recordInfos.isEmpty()) {
|
if (h.recordInfos.isEmpty() && h.committed) {
|
||||||
deleteJournalTxRecords.setLong(1, h.transactionID);
|
deleteJournalTxRecords.setLong(1, h.transactionID);
|
||||||
deleteJournalTxRecords.addBatch();
|
deleteJournalTxRecords.addBatch();
|
||||||
transactions.remove(h.transactionID);
|
transactions.remove(h.transactionID);
|
||||||
|
|
|
@ -90,7 +90,11 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
|
||||||
|
|
||||||
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
|
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
|
||||||
// It is possible that the TX could be null, since deletes could have happened in the journal.
|
// It is possible that the TX could be null, since deletes could have happened in the journal.
|
||||||
TransactionHolder tx = loadTransactions.remove(transactionID);
|
|
||||||
|
TransactionHolder tx = loadTransactions.get(transactionID);
|
||||||
|
tx.committed = true;
|
||||||
|
|
||||||
|
// We can remove local Tx without associated records
|
||||||
if (tx != null) {
|
if (tx != null) {
|
||||||
for (RecordInfo txRecord : tx.recordInfos) {
|
for (RecordInfo txRecord : tx.recordInfos) {
|
||||||
if (txRecord.isUpdate) {
|
if (txRecord.isUpdate) {
|
||||||
|
@ -117,11 +121,11 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
|
||||||
|
|
||||||
public void checkPreparedTx() {
|
public void checkPreparedTx() {
|
||||||
for (TransactionHolder transaction : loadTransactions.values()) {
|
for (TransactionHolder transaction : loadTransactions.values()) {
|
||||||
if (!transaction.prepared || transaction.invalid) {
|
if ((!transaction.prepared && !transaction.committed) || transaction.invalid) {
|
||||||
ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID);
|
ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID);
|
||||||
loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete);
|
loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete);
|
||||||
}
|
}
|
||||||
else {
|
else if (!transaction.committed) {
|
||||||
PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
|
PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
|
||||||
info.getRecords().addAll(transaction.recordInfos);
|
info.getRecords().addAll(transaction.recordInfos);
|
||||||
info.getRecordsToDelete().addAll(transaction.recordsToDelete);
|
info.getRecordsToDelete().addAll(transaction.recordsToDelete);
|
||||||
|
|
|
@ -39,4 +39,6 @@ final class TransactionHolder {
|
||||||
public boolean invalid;
|
public boolean invalid;
|
||||||
|
|
||||||
public byte[] extraData;
|
public byte[] extraData;
|
||||||
|
|
||||||
|
public boolean committed;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue