diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 55e9f89538..cc7462ba58 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Properties; import java.util.Timer; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -84,6 +85,9 @@ public class JDBCJournalImpl implements Journal { // Track Tx Records private Map transactions = new ConcurrentHashMap<>(); + // Sequence ID for journal records + private AtomicLong seq = new AtomicLong(0); + public JDBCJournalImpl(String jdbcUrl, String tableName) { this.tableName = tableName; this.jdbcUrl = jdbcUrl; @@ -347,7 +351,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); @@ -356,7 +360,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); @@ -369,7 +373,7 @@ public class JDBCJournalImpl implements Journal { EncodingSupport record, boolean sync, IOCompletion completionCallback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); @@ -379,7 +383,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); @@ -388,7 +392,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); @@ -401,7 +405,7 @@ public class JDBCJournalImpl implements Journal { EncodingSupport record, boolean sync, IOCompletion completionCallback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); @@ -411,14 +415,14 @@ public class JDBCJournalImpl implements Journal { @Override public void appendDeleteRecord(long id, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet()); r.setSync(sync); appendRecord(r); } @Override public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet()); r.setSync(sync); r.setIoCompletion(completionCallback); appendRecord(r); @@ -426,7 +430,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); @@ -438,7 +442,7 @@ public class JDBCJournalImpl implements Journal { long id, byte recordType, EncodingSupport record) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); @@ -447,7 +451,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); @@ -459,7 +463,7 @@ public class JDBCJournalImpl implements Journal { long id, byte recordType, EncodingSupport record) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); @@ -468,7 +472,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); r.setRecord(record); r.setTxId(txID); appendRecord(r); @@ -476,7 +480,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); r.setRecord(record); r.setTxId(txID); appendRecord(r); @@ -484,21 +488,21 @@ public class JDBCJournalImpl implements Journal { @Override public void appendDeleteRecordTransactional(long txID, long id) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); r.setTxId(txID); appendRecord(r); } @Override public void appendCommitRecord(long txID, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); appendRecord(r); } @Override public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setIoCompletion(callback); appendRecord(r); @@ -509,7 +513,7 @@ public class JDBCJournalImpl implements Journal { boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setStoreLineUp(lineUpContext); r.setIoCompletion(callback); @@ -518,7 +522,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setTxData(transactionData); r.setSync(sync); @@ -530,7 +534,7 @@ public class JDBCJournalImpl implements Journal { EncodingSupport transactionData, boolean sync, IOCompletion callback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setTxData(transactionData); r.setTxData(transactionData); @@ -541,7 +545,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setTxData(transactionData); r.setSync(sync); @@ -550,7 +554,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendRollbackRecord(long txID, boolean sync) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setSync(sync); appendRecord(r); @@ -558,7 +562,7 @@ public class JDBCJournalImpl implements Journal { @Override public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception { - JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setSync(sync); r.setIoCompletion(callback); @@ -607,6 +611,9 @@ public class JDBCJournalImpl implements Journal { throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType()); } noRecords++; + if (r.getSeq() > seq.longValue()) { + seq.set(r.getSeq()); + } } jrc.checkPreparedTx(); diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java index 1948e5aa9c..2d31a8f177 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java @@ -88,7 +88,9 @@ public class JDBCJournalRecord { private boolean isTransactional; - public JDBCJournalRecord(long id, byte recordType) { + private long seq; + + public JDBCJournalRecord(long id, byte recordType, long seq) { this.id = id; this.recordType = recordType; @@ -104,20 +106,22 @@ public class JDBCJournalRecord { txDataSize = 0; txData = new ByteArrayInputStream(new byte[0]); txCheckNoRecords = 0; + + this.seq = seq; } public static String createTableSQL(String tableName) { - return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,timestamp BIGINT)"; + return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT)"; } public static String insertRecordsSQL(String tableName) { - return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,timestamp) " + return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)"; } public static String selectRecordsSQL(String tableName) { - return "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords " - + "FROM " + tableName; + return "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq " + + "FROM " + tableName + " ORDER BY seq ASC"; } public static String deleteRecordsSQL(String tableName) { @@ -168,7 +172,7 @@ public class JDBCJournalRecord { statement.setInt(8, txDataSize); statement.setBytes(9, txDataBytes); statement.setInt(10, txCheckNoRecords); - statement.setLong(11, System.currentTimeMillis()); + statement.setLong(11, seq); statement.addBatch(); } @@ -178,7 +182,7 @@ public class JDBCJournalRecord { } public static JDBCJournalRecord readRecord(ResultSet rs) throws SQLException { - JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2)); + JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2), rs.getLong(11)); record.setCompactCount((byte) rs.getShort(3)); record.setTxId(rs.getLong(4)); record.setUserRecordType((byte) rs.getShort(5)); @@ -339,4 +343,8 @@ public class JDBCJournalRecord { public boolean isTransactional() { return isTransactional; } + + public long getSeq() { + return seq; + } } \ No newline at end of file