Use SeqId in JDBC Records vs timestamp

This commit is contained in:
Martyn Taylor 2016-02-04 16:22:23 +00:00
parent af89b93004
commit 0533a5f5fe
2 changed files with 45 additions and 30 deletions

View File

@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -84,6 +85,9 @@ public class JDBCJournalImpl implements Journal {
// Track Tx Records
private Map<Long, TransactionHolder> transactions = new ConcurrentHashMap<>();
// Sequence ID for journal records
private AtomicLong seq = new AtomicLong(0);
public JDBCJournalImpl(String jdbcUrl, String tableName) {
this.tableName = tableName;
this.jdbcUrl = jdbcUrl;
@ -347,7 +351,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
@ -356,7 +360,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
@ -369,7 +373,7 @@ public class JDBCJournalImpl implements Journal {
EncodingSupport record,
boolean sync,
IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
@ -379,7 +383,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
@ -388,7 +392,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
@ -401,7 +405,7 @@ public class JDBCJournalImpl implements Journal {
EncodingSupport record,
boolean sync,
IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
@ -411,14 +415,14 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendDeleteRecord(long id, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
r.setSync(sync);
r.setIoCompletion(completionCallback);
appendRecord(r);
@ -426,7 +430,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
r.setUserRecordType(recordType);
r.setRecord(record);
r.setTxId(txID);
@ -438,7 +442,7 @@ public class JDBCJournalImpl implements Journal {
long id,
byte recordType,
EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
r.setUserRecordType(recordType);
r.setRecord(record);
r.setTxId(txID);
@ -447,7 +451,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet());
r.setUserRecordType(recordType);
r.setRecord(record);
r.setTxId(txID);
@ -459,7 +463,7 @@ public class JDBCJournalImpl implements Journal {
long id,
byte recordType,
EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet());
r.setUserRecordType(recordType);
r.setRecord(record);
r.setTxId(txID);
@ -468,7 +472,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
@ -476,7 +480,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
@ -484,21 +488,21 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendDeleteRecordTransactional(long txID, long id) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX);
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendCommitRecord(long txID, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
r.setTxId(txID);
r.setIoCompletion(callback);
appendRecord(r);
@ -509,7 +513,7 @@ public class JDBCJournalImpl implements Journal {
boolean sync,
IOCompletion callback,
boolean lineUpContext) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
r.setTxId(txID);
r.setStoreLineUp(lineUpContext);
r.setIoCompletion(callback);
@ -518,7 +522,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
r.setTxId(txID);
r.setTxData(transactionData);
r.setSync(sync);
@ -530,7 +534,7 @@ public class JDBCJournalImpl implements Journal {
EncodingSupport transactionData,
boolean sync,
IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
r.setTxId(txID);
r.setTxData(transactionData);
r.setTxData(transactionData);
@ -541,7 +545,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
r.setTxId(txID);
r.setTxData(transactionData);
r.setSync(sync);
@ -550,7 +554,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendRollbackRecord(long txID, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
r.setTxId(txID);
r.setSync(sync);
appendRecord(r);
@ -558,7 +562,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
r.setTxId(txID);
r.setSync(sync);
r.setIoCompletion(callback);
@ -607,6 +611,9 @@ public class JDBCJournalImpl implements Journal {
throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType());
}
noRecords++;
if (r.getSeq() > seq.longValue()) {
seq.set(r.getSeq());
}
}
jrc.checkPreparedTx();

View File

@ -88,7 +88,9 @@ public class JDBCJournalRecord {
private boolean isTransactional;
public JDBCJournalRecord(long id, byte recordType) {
private long seq;
public JDBCJournalRecord(long id, byte recordType, long seq) {
this.id = id;
this.recordType = recordType;
@ -104,20 +106,22 @@ public class JDBCJournalRecord {
txDataSize = 0;
txData = new ByteArrayInputStream(new byte[0]);
txCheckNoRecords = 0;
this.seq = seq;
}
public static String createTableSQL(String tableName) {
return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,timestamp BIGINT)";
return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT)";
}
public static String insertRecordsSQL(String tableName) {
return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,timestamp) "
return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) "
+ "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
}
public static String selectRecordsSQL(String tableName) {
return "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords "
+ "FROM " + tableName;
return "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq "
+ "FROM " + tableName + " ORDER BY seq ASC";
}
public static String deleteRecordsSQL(String tableName) {
@ -168,7 +172,7 @@ public class JDBCJournalRecord {
statement.setInt(8, txDataSize);
statement.setBytes(9, txDataBytes);
statement.setInt(10, txCheckNoRecords);
statement.setLong(11, System.currentTimeMillis());
statement.setLong(11, seq);
statement.addBatch();
}
@ -178,7 +182,7 @@ public class JDBCJournalRecord {
}
public static JDBCJournalRecord readRecord(ResultSet rs) throws SQLException {
JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2));
JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2), rs.getLong(11));
record.setCompactCount((byte) rs.getShort(3));
record.setTxId(rs.getLong(4));
record.setUserRecordType((byte) rs.getShort(5));
@ -339,4 +343,8 @@ public class JDBCJournalRecord {
public boolean isTransactional() {
return isTransactional;
}
public long getSeq() {
return seq;
}
}