diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 82053d437c..cc7462ba58 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Properties; import java.util.Timer; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -42,13 +43,14 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.impl.JournalFile; +import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.jdbc.store.JDBCUtils; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; public class JDBCJournalImpl implements Journal { // Sync Delay in ms - public static final int SYNC_DELAY = 500; + public static final int SYNC_DELAY = 5; private static int USER_VERSION = 1; @@ -83,14 +85,15 @@ public class JDBCJournalImpl implements Journal { // Track Tx Records private Map transactions = new ConcurrentHashMap<>(); - private boolean isLoaded = false; + // Sequence ID for journal records + private AtomicLong seq = new AtomicLong(0); public JDBCJournalImpl(String jdbcUrl, String tableName) { this.tableName = tableName; this.jdbcUrl = jdbcUrl; timerThread = "Timer JDBC Journal(" + tableName + ")"; - records = new ArrayList(); + records = new ArrayList<>(); } @Override @@ -149,8 +152,9 @@ public class JDBCJournalImpl implements Journal { List recordRef = records; records = new ArrayList(); - // We keep a list of deleted records (used for cleaning up old transaction data). + // We keep a list of deleted records and committed tx (used for cleaning up old transaction data). List deletedRecords = new ArrayList<>(); + List committedTransactions = new ArrayList<>(); TransactionHolder holder; @@ -167,7 +171,6 @@ public class JDBCJournalImpl implements Journal { break; case JDBCJournalRecord.ROLLBACK_RECORD: // Roll back we remove all records associated with this TX ID. This query is always performed last. - holder = transactions.get(record.getTxId()); deleteJournalTxRecords.setLong(1, record.getTxId()); deleteJournalTxRecords.addBatch(); break; @@ -180,6 +183,7 @@ public class JDBCJournalImpl implements Journal { deleteJournalRecords.addBatch(); } record.writeRecord(insertJournalRecords); + committedTransactions.add(record.getTxId()); break; default: // Default we add a new record to the DB @@ -202,7 +206,7 @@ public class JDBCJournalImpl implements Journal { connection.commit(); - cleanupTxRecords(deletedRecords); + cleanupTxRecords(deletedRecords, committedTransactions); success = true; } catch (SQLException e) { @@ -215,12 +219,16 @@ public class JDBCJournalImpl implements Journal { /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, we remove the Tx Records (i.e. PREPARE, COMMIT). */ - private void cleanupTxRecords(List deletedRecords) throws SQLException { + private void cleanupTxRecords(List deletedRecords, List committedTx) throws SQLException { List iterableCopy; List iterableCopyTx = new ArrayList<>(); iterableCopyTx.addAll(transactions.values()); + for (Long txId : committedTx) { + transactions.get(txId).committed = true; + } + // TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop for (TransactionHolder h : iterableCopyTx) { @@ -233,7 +241,7 @@ public class JDBCJournalImpl implements Journal { } } - if (h.recordInfos.isEmpty()) { + if (h.recordInfos.isEmpty() && h.committed) { deleteJournalTxRecords.setLong(1, h.transactionID); deleteJournalTxRecords.addBatch(); transactions.remove(h.transactionID); @@ -255,7 +263,7 @@ public class JDBCJournalImpl implements Journal { // On rollback we must update the tx map to remove all the tx entries for (TransactionHolder txH : txHolders) { - if (txH.prepared == false && txH.recordInfos.isEmpty() && txH.recordsToDelete.isEmpty()) { + if (!txH.prepared && txH.recordInfos.isEmpty() && txH.recordsToDelete.isEmpty()) { transactions.remove(txH.transactionID); } } @@ -279,7 +287,14 @@ public class JDBCJournalImpl implements Journal { t.start(); } - private synchronized void appendRecord(JDBCJournalRecord record) { + private void appendRecord(JDBCJournalRecord record) throws Exception { + + SimpleWaitIOCallback callback = null; + if (record.isSync() && record.getIoCompletion() == null) { + callback = new SimpleWaitIOCallback(); + record.setIoCompletion(callback); + } + try { journalLock.writeLock().lock(); if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { @@ -290,6 +305,8 @@ public class JDBCJournalImpl implements Journal { finally { journalLock.writeLock().unlock(); } + + if (callback != null) callback.waitCompletion(); } private void addTxRecord(JDBCJournalRecord record) { @@ -334,7 +351,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); @@ -343,7 +360,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); @@ -356,7 +373,7 @@ public class JDBCJournalImpl implements Journal { EncodingSupport record, boolean sync, IOCompletion completionCallback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); @@ -366,7 +383,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); @@ -375,7 +392,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); @@ -388,7 +405,7 @@ public class JDBCJournalImpl implements Journal { EncodingSupport record, boolean sync, IOCompletion completionCallback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); @@ -398,14 +415,14 @@ public class JDBCJournalImpl implements Journal { @Override public void appendDeleteRecord(long id, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet()); r.setSync(sync); appendRecord(r); } @Override public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet()); r.setSync(sync); r.setIoCompletion(completionCallback); appendRecord(r); @@ -413,7 +430,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); @@ -425,7 +442,7 @@ public class JDBCJournalImpl implements Journal { long id, byte recordType, EncodingSupport record) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); @@ -434,7 +451,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); @@ -446,7 +463,7 @@ public class JDBCJournalImpl implements Journal { long id, byte recordType, EncodingSupport record) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); @@ -455,7 +472,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); r.setRecord(record); r.setTxId(txID); appendRecord(r); @@ -463,7 +480,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); r.setRecord(record); r.setTxId(txID); appendRecord(r); @@ -471,21 +488,21 @@ public class JDBCJournalImpl implements Journal { @Override public void appendDeleteRecordTransactional(long txID, long id) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); r.setTxId(txID); appendRecord(r); } @Override public void appendCommitRecord(long txID, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); appendRecord(r); } @Override public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setIoCompletion(callback); appendRecord(r); @@ -496,7 +513,7 @@ public class JDBCJournalImpl implements Journal { boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setStoreLineUp(lineUpContext); r.setIoCompletion(callback); @@ -505,7 +522,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setTxData(transactionData); r.setSync(sync); @@ -517,7 +534,7 @@ public class JDBCJournalImpl implements Journal { EncodingSupport transactionData, boolean sync, IOCompletion callback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setTxData(transactionData); r.setTxData(transactionData); @@ -528,7 +545,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setTxData(transactionData); r.setSync(sync); @@ -537,7 +554,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendRollbackRecord(long txID, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setSync(sync); appendRecord(r); @@ -545,7 +562,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setSync(sync); r.setIoCompletion(callback); @@ -594,13 +611,15 @@ public class JDBCJournalImpl implements Journal { throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType()); } noRecords++; + if (r.getSeq() > seq.longValue()) { + seq.set(r.getSeq()); + } } jrc.checkPreparedTx(); jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId()); jli.setNumberOfRecords(noRecords); transactions = jrc.getTransactions(); - isLoaded = true; } return jli; } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallback.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallback.java index 12b8671abe..0627c54a15 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallback.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallback.java @@ -29,19 +29,13 @@ import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; public class JDBCJournalLoaderCallback implements LoaderCallback { - private static final int DELETE_FLUSH = 20000; - private final List preparedTransactions; private final TransactionFailureCallback failureCallback; - private final boolean fixBadTX; - /* We keep track of list entries for each ID. This preserves order and allows multiple record insertions with the same ID. We use this for deleting records */ - private final Map> deleteReferences = new HashMap>(); - - private Runtime runtime = Runtime.getRuntime(); + private final Map> deleteReferences = new HashMap<>(); private final List committedRecords; @@ -54,7 +48,6 @@ public class JDBCJournalLoaderCallback implements LoaderCallback { this.committedRecords = committedRecords; this.preparedTransactions = preparedTransactions; this.failureCallback = failureCallback; - this.fixBadTX = fixBadTX; } public synchronized void checkMaxId(long id) { @@ -71,7 +64,7 @@ public class JDBCJournalLoaderCallback implements LoaderCallback { int index = committedRecords.size(); committedRecords.add(index, info); - ArrayList indexes = new ArrayList(); + ArrayList indexes = new ArrayList<>(); indexes.add(index); deleteReferences.put(info.id, indexes); @@ -89,10 +82,6 @@ public class JDBCJournalLoaderCallback implements LoaderCallback { } } - public int getNoRecords() { - return committedRecords.size(); - } - @Override public void failedTransaction(final long transactionID, final List records, diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalReaderCallback.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalReaderCallback.java index 844fd4a26f..445d303f4e 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalReaderCallback.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalReaderCallback.java @@ -29,7 +29,7 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; public class JDBCJournalReaderCallback implements JournalReaderCallback { - private final Map loadTransactions = new LinkedHashMap(); + private final Map loadTransactions = new LinkedHashMap<>(); private final LoaderCallback loadManager; @@ -90,8 +90,11 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback { public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception { // It is possible that the TX could be null, since deletes could have happened in the journal. - TransactionHolder tx = loadTransactions.remove(transactionID); + TransactionHolder tx = loadTransactions.get(transactionID); + + // We can remove local Tx without associated records if (tx != null) { + tx.committed = true; for (RecordInfo txRecord : tx.recordInfos) { if (txRecord.isUpdate) { loadManager.updateRecord(txRecord); @@ -117,11 +120,11 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback { public void checkPreparedTx() { for (TransactionHolder transaction : loadTransactions.values()) { - if (!transaction.prepared || transaction.invalid) { + if ((!transaction.prepared && !transaction.committed) || transaction.invalid) { ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID); loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete); } - else { + else if (!transaction.committed) { PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData); info.getRecords().addAll(transaction.recordInfos); info.getRecordsToDelete().addAll(transaction.recordsToDelete); diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java index bf52a17abf..2d31a8f177 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java @@ -88,7 +88,9 @@ public class JDBCJournalRecord { private boolean isTransactional; - public JDBCJournalRecord(long id, byte recordType) { + private long seq; + + public JDBCJournalRecord(long id, byte recordType, long seq) { this.id = id; this.recordType = recordType; @@ -104,41 +106,32 @@ public class JDBCJournalRecord { txDataSize = 0; txData = new ByteArrayInputStream(new byte[0]); txCheckNoRecords = 0; + + this.seq = seq; } public static String createTableSQL(String tableName) { - return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,timestamp BIGINT)"; + return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT)"; } public static String insertRecordsSQL(String tableName) { - return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,timestamp) " + return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)"; } public static String selectRecordsSQL(String tableName) { - return "SELECT id," + "recordType," + "compactCount," + "txId," + "userRecordType," + "variableSize," + "record," + "txDataSize," + "txData," + "txCheckNoRecords " + "FROM " + tableName; + return "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq " + + "FROM " + tableName + " ORDER BY seq ASC"; } public static String deleteRecordsSQL(String tableName) { return "DELETE FROM " + tableName + " WHERE id = ?"; } - public static String deleteCommittedDeleteRecordsForTxSQL(String tableName) { - return "DELETE FROM " + tableName + " WHERE id IN (SELECT id FROM " + tableName + " WHERE txID=?)"; - } - - public static String deleteCommittedTxRecordsSQL(String tableName) { - return "DELETE FROM " + tableName + " WHERE txId=? AND (recordType=" + PREPARE_RECORD + " OR recordType=" + COMMIT_RECORD + ")"; - } - public static String deleteJournalTxRecordsSQL(String tableName) { return "DELETE FROM " + tableName + " WHERE txId=?"; } - public static String deleteRolledBackTxSQL(String tableName) { - return "DELETE FROM " + tableName + " WHERE txId=?"; - } - public void complete(boolean success) { if (ioCompletion != null) { if (success) { @@ -179,22 +172,17 @@ public class JDBCJournalRecord { statement.setInt(8, txDataSize); statement.setBytes(9, txDataBytes); statement.setInt(10, txCheckNoRecords); - statement.setLong(11, System.currentTimeMillis()); + statement.setLong(11, seq); statement.addBatch(); } - protected void writeDeleteTxRecord(PreparedStatement deleteTxStatement) throws SQLException { - deleteTxStatement.setLong(1, txId); - deleteTxStatement.addBatch(); - } - protected void writeDeleteRecord(PreparedStatement deleteStatement) throws SQLException { deleteStatement.setLong(1, id); deleteStatement.addBatch(); } public static JDBCJournalRecord readRecord(ResultSet rs) throws SQLException { - JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2)); + JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2), rs.getLong(11)); record.setCompactCount((byte) rs.getShort(3)); record.setTxId(rs.getLong(4)); record.setUserRecordType((byte) rs.getShort(5)); @@ -355,4 +343,8 @@ public class JDBCJournalRecord { public boolean isTransactional() { return isTransactional; } + + public long getSeq() { + return seq; + } } \ No newline at end of file diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/TransactionHolder.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/TransactionHolder.java index 4c3cd846d3..c03e7479d4 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/TransactionHolder.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/TransactionHolder.java @@ -39,4 +39,6 @@ final class TransactionHolder { public boolean invalid; public byte[] extraData; + + public boolean committed; }