This closes #370 Jdbc improvements

This commit is contained in:
Andy Taylor 2016-02-04 16:41:38 +00:00
commit 623b1b9143
5 changed files with 79 additions and 74 deletions

View File

@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Timer; import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -42,13 +43,14 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public class JDBCJournalImpl implements Journal { public class JDBCJournalImpl implements Journal {
// Sync Delay in ms // Sync Delay in ms
public static final int SYNC_DELAY = 500; public static final int SYNC_DELAY = 5;
private static int USER_VERSION = 1; private static int USER_VERSION = 1;
@ -83,14 +85,15 @@ public class JDBCJournalImpl implements Journal {
// Track Tx Records // Track Tx Records
private Map<Long, TransactionHolder> transactions = new ConcurrentHashMap<>(); private Map<Long, TransactionHolder> transactions = new ConcurrentHashMap<>();
private boolean isLoaded = false; // Sequence ID for journal records
private AtomicLong seq = new AtomicLong(0);
public JDBCJournalImpl(String jdbcUrl, String tableName) { public JDBCJournalImpl(String jdbcUrl, String tableName) {
this.tableName = tableName; this.tableName = tableName;
this.jdbcUrl = jdbcUrl; this.jdbcUrl = jdbcUrl;
timerThread = "Timer JDBC Journal(" + tableName + ")"; timerThread = "Timer JDBC Journal(" + tableName + ")";
records = new ArrayList<JDBCJournalRecord>(); records = new ArrayList<>();
} }
@Override @Override
@ -149,8 +152,9 @@ public class JDBCJournalImpl implements Journal {
List<JDBCJournalRecord> recordRef = records; List<JDBCJournalRecord> recordRef = records;
records = new ArrayList<JDBCJournalRecord>(); records = new ArrayList<JDBCJournalRecord>();
// We keep a list of deleted records (used for cleaning up old transaction data). // We keep a list of deleted records and committed tx (used for cleaning up old transaction data).
List<Long> deletedRecords = new ArrayList<>(); List<Long> deletedRecords = new ArrayList<>();
List<Long> committedTransactions = new ArrayList<>();
TransactionHolder holder; TransactionHolder holder;
@ -167,7 +171,6 @@ public class JDBCJournalImpl implements Journal {
break; break;
case JDBCJournalRecord.ROLLBACK_RECORD: case JDBCJournalRecord.ROLLBACK_RECORD:
// Roll back we remove all records associated with this TX ID. This query is always performed last. // Roll back we remove all records associated with this TX ID. This query is always performed last.
holder = transactions.get(record.getTxId());
deleteJournalTxRecords.setLong(1, record.getTxId()); deleteJournalTxRecords.setLong(1, record.getTxId());
deleteJournalTxRecords.addBatch(); deleteJournalTxRecords.addBatch();
break; break;
@ -180,6 +183,7 @@ public class JDBCJournalImpl implements Journal {
deleteJournalRecords.addBatch(); deleteJournalRecords.addBatch();
} }
record.writeRecord(insertJournalRecords); record.writeRecord(insertJournalRecords);
committedTransactions.add(record.getTxId());
break; break;
default: default:
// Default we add a new record to the DB // Default we add a new record to the DB
@ -202,7 +206,7 @@ public class JDBCJournalImpl implements Journal {
connection.commit(); connection.commit();
cleanupTxRecords(deletedRecords); cleanupTxRecords(deletedRecords, committedTransactions);
success = true; success = true;
} }
catch (SQLException e) { catch (SQLException e) {
@ -215,12 +219,16 @@ public class JDBCJournalImpl implements Journal {
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
we remove the Tx Records (i.e. PREPARE, COMMIT). */ we remove the Tx Records (i.e. PREPARE, COMMIT). */
private void cleanupTxRecords(List<Long> deletedRecords) throws SQLException { private void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
List<RecordInfo> iterableCopy; List<RecordInfo> iterableCopy;
List<TransactionHolder> iterableCopyTx = new ArrayList<>(); List<TransactionHolder> iterableCopyTx = new ArrayList<>();
iterableCopyTx.addAll(transactions.values()); iterableCopyTx.addAll(transactions.values());
for (Long txId : committedTx) {
transactions.get(txId).committed = true;
}
// TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop // TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
for (TransactionHolder h : iterableCopyTx) { for (TransactionHolder h : iterableCopyTx) {
@ -233,7 +241,7 @@ public class JDBCJournalImpl implements Journal {
} }
} }
if (h.recordInfos.isEmpty()) { if (h.recordInfos.isEmpty() && h.committed) {
deleteJournalTxRecords.setLong(1, h.transactionID); deleteJournalTxRecords.setLong(1, h.transactionID);
deleteJournalTxRecords.addBatch(); deleteJournalTxRecords.addBatch();
transactions.remove(h.transactionID); transactions.remove(h.transactionID);
@ -255,7 +263,7 @@ public class JDBCJournalImpl implements Journal {
// On rollback we must update the tx map to remove all the tx entries // On rollback we must update the tx map to remove all the tx entries
for (TransactionHolder txH : txHolders) { for (TransactionHolder txH : txHolders) {
if (txH.prepared == false && txH.recordInfos.isEmpty() && txH.recordsToDelete.isEmpty()) { if (!txH.prepared && txH.recordInfos.isEmpty() && txH.recordsToDelete.isEmpty()) {
transactions.remove(txH.transactionID); transactions.remove(txH.transactionID);
} }
} }
@ -279,7 +287,14 @@ public class JDBCJournalImpl implements Journal {
t.start(); t.start();
} }
private synchronized void appendRecord(JDBCJournalRecord record) { private void appendRecord(JDBCJournalRecord record) throws Exception {
SimpleWaitIOCallback callback = null;
if (record.isSync() && record.getIoCompletion() == null) {
callback = new SimpleWaitIOCallback();
record.setIoCompletion(callback);
}
try { try {
journalLock.writeLock().lock(); journalLock.writeLock().lock();
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
@ -290,6 +305,8 @@ public class JDBCJournalImpl implements Journal {
finally { finally {
journalLock.writeLock().unlock(); journalLock.writeLock().unlock();
} }
if (callback != null) callback.waitCompletion();
} }
private void addTxRecord(JDBCJournalRecord record) { private void addTxRecord(JDBCJournalRecord record) {
@ -334,7 +351,7 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType); r.setUserRecordType(recordType);
r.setRecord(record); r.setRecord(record);
r.setSync(sync); r.setSync(sync);
@ -343,7 +360,7 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType); r.setUserRecordType(recordType);
r.setRecord(record); r.setRecord(record);
r.setSync(sync); r.setSync(sync);
@ -356,7 +373,7 @@ public class JDBCJournalImpl implements Journal {
EncodingSupport record, EncodingSupport record,
boolean sync, boolean sync,
IOCompletion completionCallback) throws Exception { IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType); r.setUserRecordType(recordType);
r.setRecord(record); r.setRecord(record);
r.setSync(sync); r.setSync(sync);
@ -366,7 +383,7 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType); r.setUserRecordType(recordType);
r.setRecord(record); r.setRecord(record);
r.setSync(sync); r.setSync(sync);
@ -375,7 +392,7 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType); r.setUserRecordType(recordType);
r.setRecord(record); r.setRecord(record);
r.setSync(sync); r.setSync(sync);
@ -388,7 +405,7 @@ public class JDBCJournalImpl implements Journal {
EncodingSupport record, EncodingSupport record,
boolean sync, boolean sync,
IOCompletion completionCallback) throws Exception { IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType); r.setUserRecordType(recordType);
r.setRecord(record); r.setRecord(record);
r.setSync(sync); r.setSync(sync);
@ -398,14 +415,14 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendDeleteRecord(long id, boolean sync) throws Exception { public void appendDeleteRecord(long id, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
r.setSync(sync); r.setSync(sync);
appendRecord(r); appendRecord(r);
} }
@Override @Override
public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception { public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
r.setSync(sync); r.setSync(sync);
r.setIoCompletion(completionCallback); r.setIoCompletion(completionCallback);
appendRecord(r); appendRecord(r);
@ -413,7 +430,7 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception { public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
r.setUserRecordType(recordType); r.setUserRecordType(recordType);
r.setRecord(record); r.setRecord(record);
r.setTxId(txID); r.setTxId(txID);
@ -425,7 +442,7 @@ public class JDBCJournalImpl implements Journal {
long id, long id,
byte recordType, byte recordType,
EncodingSupport record) throws Exception { EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
r.setUserRecordType(recordType); r.setUserRecordType(recordType);
r.setRecord(record); r.setRecord(record);
r.setTxId(txID); r.setTxId(txID);
@ -434,7 +451,7 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception { public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet());
r.setUserRecordType(recordType); r.setUserRecordType(recordType);
r.setRecord(record); r.setRecord(record);
r.setTxId(txID); r.setTxId(txID);
@ -446,7 +463,7 @@ public class JDBCJournalImpl implements Journal {
long id, long id,
byte recordType, byte recordType,
EncodingSupport record) throws Exception { EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet());
r.setUserRecordType(recordType); r.setUserRecordType(recordType);
r.setRecord(record); r.setRecord(record);
r.setTxId(txID); r.setTxId(txID);
@ -455,7 +472,7 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception { public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
r.setRecord(record); r.setRecord(record);
r.setTxId(txID); r.setTxId(txID);
appendRecord(r); appendRecord(r);
@ -463,7 +480,7 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception { public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
r.setRecord(record); r.setRecord(record);
r.setTxId(txID); r.setTxId(txID);
appendRecord(r); appendRecord(r);
@ -471,21 +488,21 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendDeleteRecordTransactional(long txID, long id) throws Exception { public void appendDeleteRecordTransactional(long txID, long id) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX); JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
r.setTxId(txID); r.setTxId(txID);
appendRecord(r); appendRecord(r);
} }
@Override @Override
public void appendCommitRecord(long txID, boolean sync) throws Exception { public void appendCommitRecord(long txID, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
r.setTxId(txID); r.setTxId(txID);
appendRecord(r); appendRecord(r);
} }
@Override @Override
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception { public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
r.setTxId(txID); r.setTxId(txID);
r.setIoCompletion(callback); r.setIoCompletion(callback);
appendRecord(r); appendRecord(r);
@ -496,7 +513,7 @@ public class JDBCJournalImpl implements Journal {
boolean sync, boolean sync,
IOCompletion callback, IOCompletion callback,
boolean lineUpContext) throws Exception { boolean lineUpContext) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
r.setTxId(txID); r.setTxId(txID);
r.setStoreLineUp(lineUpContext); r.setStoreLineUp(lineUpContext);
r.setIoCompletion(callback); r.setIoCompletion(callback);
@ -505,7 +522,7 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception { public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
r.setTxId(txID); r.setTxId(txID);
r.setTxData(transactionData); r.setTxData(transactionData);
r.setSync(sync); r.setSync(sync);
@ -517,7 +534,7 @@ public class JDBCJournalImpl implements Journal {
EncodingSupport transactionData, EncodingSupport transactionData,
boolean sync, boolean sync,
IOCompletion callback) throws Exception { IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
r.setTxId(txID); r.setTxId(txID);
r.setTxData(transactionData); r.setTxData(transactionData);
r.setTxData(transactionData); r.setTxData(transactionData);
@ -528,7 +545,7 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception { public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
r.setTxId(txID); r.setTxId(txID);
r.setTxData(transactionData); r.setTxData(transactionData);
r.setSync(sync); r.setSync(sync);
@ -537,7 +554,7 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendRollbackRecord(long txID, boolean sync) throws Exception { public void appendRollbackRecord(long txID, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
r.setTxId(txID); r.setTxId(txID);
r.setSync(sync); r.setSync(sync);
appendRecord(r); appendRecord(r);
@ -545,7 +562,7 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception { public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD); JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
r.setTxId(txID); r.setTxId(txID);
r.setSync(sync); r.setSync(sync);
r.setIoCompletion(callback); r.setIoCompletion(callback);
@ -594,13 +611,15 @@ public class JDBCJournalImpl implements Journal {
throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType()); throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType());
} }
noRecords++; noRecords++;
if (r.getSeq() > seq.longValue()) {
seq.set(r.getSeq());
}
} }
jrc.checkPreparedTx(); jrc.checkPreparedTx();
jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId()); jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
jli.setNumberOfRecords(noRecords); jli.setNumberOfRecords(noRecords);
transactions = jrc.getTransactions(); transactions = jrc.getTransactions();
isLoaded = true;
} }
return jli; return jli;
} }

View File

@ -29,19 +29,13 @@ import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
public class JDBCJournalLoaderCallback implements LoaderCallback { public class JDBCJournalLoaderCallback implements LoaderCallback {
private static final int DELETE_FLUSH = 20000;
private final List<PreparedTransactionInfo> preparedTransactions; private final List<PreparedTransactionInfo> preparedTransactions;
private final TransactionFailureCallback failureCallback; private final TransactionFailureCallback failureCallback;
private final boolean fixBadTX;
/* We keep track of list entries for each ID. This preserves order and allows multiple record insertions with the /* We keep track of list entries for each ID. This preserves order and allows multiple record insertions with the
same ID. We use this for deleting records */ same ID. We use this for deleting records */
private final Map<Long, List<Integer>> deleteReferences = new HashMap<Long, List<Integer>>(); private final Map<Long, List<Integer>> deleteReferences = new HashMap<>();
private Runtime runtime = Runtime.getRuntime();
private final List<RecordInfo> committedRecords; private final List<RecordInfo> committedRecords;
@ -54,7 +48,6 @@ public class JDBCJournalLoaderCallback implements LoaderCallback {
this.committedRecords = committedRecords; this.committedRecords = committedRecords;
this.preparedTransactions = preparedTransactions; this.preparedTransactions = preparedTransactions;
this.failureCallback = failureCallback; this.failureCallback = failureCallback;
this.fixBadTX = fixBadTX;
} }
public synchronized void checkMaxId(long id) { public synchronized void checkMaxId(long id) {
@ -71,7 +64,7 @@ public class JDBCJournalLoaderCallback implements LoaderCallback {
int index = committedRecords.size(); int index = committedRecords.size();
committedRecords.add(index, info); committedRecords.add(index, info);
ArrayList<Integer> indexes = new ArrayList<Integer>(); ArrayList<Integer> indexes = new ArrayList<>();
indexes.add(index); indexes.add(index);
deleteReferences.put(info.id, indexes); deleteReferences.put(info.id, indexes);
@ -89,10 +82,6 @@ public class JDBCJournalLoaderCallback implements LoaderCallback {
} }
} }
public int getNoRecords() {
return committedRecords.size();
}
@Override @Override
public void failedTransaction(final long transactionID, public void failedTransaction(final long transactionID,
final List<RecordInfo> records, final List<RecordInfo> records,

View File

@ -29,7 +29,7 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public class JDBCJournalReaderCallback implements JournalReaderCallback { public class JDBCJournalReaderCallback implements JournalReaderCallback {
private final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>(); private final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<>();
private final LoaderCallback loadManager; private final LoaderCallback loadManager;
@ -90,8 +90,11 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception { public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
// It is possible that the TX could be null, since deletes could have happened in the journal. // It is possible that the TX could be null, since deletes could have happened in the journal.
TransactionHolder tx = loadTransactions.remove(transactionID); TransactionHolder tx = loadTransactions.get(transactionID);
// We can remove local Tx without associated records
if (tx != null) { if (tx != null) {
tx.committed = true;
for (RecordInfo txRecord : tx.recordInfos) { for (RecordInfo txRecord : tx.recordInfos) {
if (txRecord.isUpdate) { if (txRecord.isUpdate) {
loadManager.updateRecord(txRecord); loadManager.updateRecord(txRecord);
@ -117,11 +120,11 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
public void checkPreparedTx() { public void checkPreparedTx() {
for (TransactionHolder transaction : loadTransactions.values()) { for (TransactionHolder transaction : loadTransactions.values()) {
if (!transaction.prepared || transaction.invalid) { if ((!transaction.prepared && !transaction.committed) || transaction.invalid) {
ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID); ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID);
loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete); loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete);
} }
else { else if (!transaction.committed) {
PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData); PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
info.getRecords().addAll(transaction.recordInfos); info.getRecords().addAll(transaction.recordInfos);
info.getRecordsToDelete().addAll(transaction.recordsToDelete); info.getRecordsToDelete().addAll(transaction.recordsToDelete);

View File

@ -88,7 +88,9 @@ public class JDBCJournalRecord {
private boolean isTransactional; private boolean isTransactional;
public JDBCJournalRecord(long id, byte recordType) { private long seq;
public JDBCJournalRecord(long id, byte recordType, long seq) {
this.id = id; this.id = id;
this.recordType = recordType; this.recordType = recordType;
@ -104,41 +106,32 @@ public class JDBCJournalRecord {
txDataSize = 0; txDataSize = 0;
txData = new ByteArrayInputStream(new byte[0]); txData = new ByteArrayInputStream(new byte[0]);
txCheckNoRecords = 0; txCheckNoRecords = 0;
this.seq = seq;
} }
public static String createTableSQL(String tableName) { public static String createTableSQL(String tableName) {
return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,timestamp BIGINT)"; return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT)";
} }
public static String insertRecordsSQL(String tableName) { public static String insertRecordsSQL(String tableName) {
return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,timestamp) " return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) "
+ "VALUES (?,?,?,?,?,?,?,?,?,?,?)"; + "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
} }
public static String selectRecordsSQL(String tableName) { public static String selectRecordsSQL(String tableName) {
return "SELECT id," + "recordType," + "compactCount," + "txId," + "userRecordType," + "variableSize," + "record," + "txDataSize," + "txData," + "txCheckNoRecords " + "FROM " + tableName; return "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq "
+ "FROM " + tableName + " ORDER BY seq ASC";
} }
public static String deleteRecordsSQL(String tableName) { public static String deleteRecordsSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE id = ?"; return "DELETE FROM " + tableName + " WHERE id = ?";
} }
public static String deleteCommittedDeleteRecordsForTxSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE id IN (SELECT id FROM " + tableName + " WHERE txID=?)";
}
public static String deleteCommittedTxRecordsSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE txId=? AND (recordType=" + PREPARE_RECORD + " OR recordType=" + COMMIT_RECORD + ")";
}
public static String deleteJournalTxRecordsSQL(String tableName) { public static String deleteJournalTxRecordsSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE txId=?"; return "DELETE FROM " + tableName + " WHERE txId=?";
} }
public static String deleteRolledBackTxSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE txId=?";
}
public void complete(boolean success) { public void complete(boolean success) {
if (ioCompletion != null) { if (ioCompletion != null) {
if (success) { if (success) {
@ -179,22 +172,17 @@ public class JDBCJournalRecord {
statement.setInt(8, txDataSize); statement.setInt(8, txDataSize);
statement.setBytes(9, txDataBytes); statement.setBytes(9, txDataBytes);
statement.setInt(10, txCheckNoRecords); statement.setInt(10, txCheckNoRecords);
statement.setLong(11, System.currentTimeMillis()); statement.setLong(11, seq);
statement.addBatch(); statement.addBatch();
} }
protected void writeDeleteTxRecord(PreparedStatement deleteTxStatement) throws SQLException {
deleteTxStatement.setLong(1, txId);
deleteTxStatement.addBatch();
}
protected void writeDeleteRecord(PreparedStatement deleteStatement) throws SQLException { protected void writeDeleteRecord(PreparedStatement deleteStatement) throws SQLException {
deleteStatement.setLong(1, id); deleteStatement.setLong(1, id);
deleteStatement.addBatch(); deleteStatement.addBatch();
} }
public static JDBCJournalRecord readRecord(ResultSet rs) throws SQLException { public static JDBCJournalRecord readRecord(ResultSet rs) throws SQLException {
JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2)); JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2), rs.getLong(11));
record.setCompactCount((byte) rs.getShort(3)); record.setCompactCount((byte) rs.getShort(3));
record.setTxId(rs.getLong(4)); record.setTxId(rs.getLong(4));
record.setUserRecordType((byte) rs.getShort(5)); record.setUserRecordType((byte) rs.getShort(5));
@ -355,4 +343,8 @@ public class JDBCJournalRecord {
public boolean isTransactional() { public boolean isTransactional() {
return isTransactional; return isTransactional;
} }
public long getSeq() {
return seq;
}
} }

View File

@ -39,4 +39,6 @@ final class TransactionHolder {
public boolean invalid; public boolean invalid;
public byte[] extraData; public byte[] extraData;
public boolean committed;
} }