ARTEMIS-1085 Perform storelineup on appendRecord
This commit is contained in:
parent
33fff52651
commit
120b8aa7ad
|
@ -169,8 +169,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
boolean success = false;
|
||||
try {
|
||||
for (JDBCJournalRecord record : recordRef) {
|
||||
record.storeLineUp();
|
||||
|
||||
switch (record.getRecordType()) {
|
||||
case JDBCJournalRecord.DELETE_RECORD:
|
||||
// Standard SQL Delete Record, Non transactional delete
|
||||
|
@ -224,16 +222,16 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
cleanupTxRecords(deletedRecords, committedTransactions);
|
||||
} catch (SQLException e) {
|
||||
logger.warn("Failed to remove the Tx Records", e.getMessage(), e);
|
||||
} finally {
|
||||
executeCallbacks(recordRef, success);
|
||||
}
|
||||
|
||||
executeCallbacks(recordRef, success);
|
||||
return recordRef.size();
|
||||
}
|
||||
|
||||
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
|
||||
we remove the Tx Records (i.e. PREPARE, COMMIT). */
|
||||
private synchronized void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
|
||||
connection.rollback();
|
||||
List<RecordInfo> iterableCopy;
|
||||
List<TransactionHolder> iterableCopyTx = new ArrayList<>();
|
||||
iterableCopyTx.addAll(transactions.values());
|
||||
|
@ -264,6 +262,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
|
||||
private void performRollback(List<JDBCJournalRecord> records) {
|
||||
try {
|
||||
connection.rollback();
|
||||
|
||||
for (JDBCJournalRecord record : records) {
|
||||
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
|
||||
removeTxRecord(record);
|
||||
|
@ -299,9 +299,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
}
|
||||
|
||||
private void appendRecord(JDBCJournalRecord record) throws Exception {
|
||||
record.storeLineUp();
|
||||
|
||||
SimpleWaitIOCallback callback = null;
|
||||
if (record.isSync() && record.getIoCompletion() == null && !record.isTransactional()) {
|
||||
if (record.isSync() && record.getIoCompletion() == null) {
|
||||
callback = new SimpleWaitIOCallback();
|
||||
record.setIoCompletion(callback);
|
||||
}
|
||||
|
@ -318,8 +319,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
|
||||
syncTimer.delay();
|
||||
|
||||
if (callback != null)
|
||||
if (callback != null) {
|
||||
callback.waitCompletion();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void addTxRecord(JDBCJournalRecord record) {
|
||||
|
@ -510,6 +512,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
public void appendCommitRecord(long txID, boolean sync) throws Exception {
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
|
||||
r.setTxId(txID);
|
||||
r.setSync(sync);
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
|
@ -517,6 +520,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
|
||||
r.setTxId(txID);
|
||||
r.setSync(sync);
|
||||
r.setIoCompletion(callback);
|
||||
appendRecord(r);
|
||||
}
|
||||
|
@ -530,6 +534,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
r.setTxId(txID);
|
||||
r.setStoreLineUp(lineUpContext);
|
||||
r.setIoCompletion(callback);
|
||||
r.setSync(sync);
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue