Handful of JDBC Journal Fixes

This patch fixes a number of bugs with the JDBC Journal implementation.
Mainly around how it was handling transactions.  The XA transactions
tests are now enabled to test both the File and Database store.
This commit is contained in:
Martyn Taylor 2016-01-26 11:13:03 +00:00 committed by Clebert Suconic
parent bd26115b6c
commit 5383a0c409
12 changed files with 442 additions and 120 deletions

View File

@ -70,5 +70,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import org.apache.derby.jdbc.AutoloadedDriver;
public class JDBCUtils {
public static Driver getDriver() throws Exception {
Driver dbDriver = null;
// Load Database driver, sets Derby Autoloaded Driver as lowest priority.
List<Driver> drivers = Collections.list(DriverManager.getDrivers());
if (drivers.size() <= 2 && drivers.size() > 0) {
dbDriver = drivers.get(0);
boolean isDerby = dbDriver instanceof AutoloadedDriver;
if (drivers.size() > 1 && isDerby) {
dbDriver = drivers.get(1);
}
if (isDerby) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
}
catch (Exception e) {
}
}
});
}
}
else {
String error = drivers.isEmpty() ? "No DB driver found on class path" : "Too many DB drivers on class path, not sure which to use";
throw new RuntimeException(error);
}
return dbDriver;
}
public static void createTableIfNotExists(Connection connection, String tableName, String sql) throws SQLException {
ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null);
if (!rs.next()) {
Statement statement = connection.createStatement();
statement.executeUpdate(sql);
}
}
}

View File

@ -19,17 +19,16 @@ package org.apache.activemq.artemis.jdbc.store.journal;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -43,7 +42,8 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.derby.jdbc.AutoloadedDriver;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public class JDBCJournalImpl implements Journal {
@ -66,7 +66,7 @@ public class JDBCJournalImpl implements Journal {
private PreparedStatement deleteJournalRecords;
private PreparedStatement deleteTxJournalRecords;
private PreparedStatement deleteJournalTxRecords;
private boolean started;
@ -78,61 +78,35 @@ public class JDBCJournalImpl implements Journal {
private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
private boolean isDerby = false;
private final String timerThread;
// Track Tx Records
private Map<Long, TransactionHolder> transactions = new ConcurrentHashMap<>();
private boolean isLoaded = false;
public JDBCJournalImpl(String jdbcUrl, String tableName) {
this.tableName = tableName;
this.jdbcUrl = jdbcUrl;
timerThread = "Timer JDBC Journal(" + tableName + ")";
records = new ArrayList<JDBCJournalRecord>();
}
@Override
public void start() throws Exception {
// Load Database driver, sets Derby Autoloaded Driver as lowest priority.
List<Driver> drivers = Collections.list(DriverManager.getDrivers());
if (drivers.size() <= 2 && drivers.size() > 0) {
dbDriver = drivers.get(0);
isDerby = dbDriver instanceof AutoloadedDriver;
if (drivers.size() > 1 && isDerby) {
dbDriver = drivers.get(1);
}
if (isDerby) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
}
catch (Exception e) {
}
}
});
}
}
else {
String error = drivers.isEmpty() ? "No DB driver found on class path" : "Too many DB drivers on class path, not sure which to use";
throw new RuntimeException(error);
}
dbDriver = JDBCUtils.getDriver();
connection = dbDriver.connect(jdbcUrl, new Properties());
// If JOURNAL table doesn't exist then create it
ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null);
if (!rs.next()) {
Statement statement = connection.createStatement();
statement.executeUpdate(JDBCJournalRecord.createTableSQL(tableName));
}
JDBCUtils.createTableIfNotExists(connection, tableName, JDBCJournalRecord.createTableSQL(tableName));
insertJournalRecords = connection.prepareStatement(JDBCJournalRecord.insertRecordsSQL(tableName));
selectJournalRecords = connection.prepareStatement(JDBCJournalRecord.selectRecordsSQL(tableName));
countJournalRecords = connection.prepareStatement("SELECT COUNT(*) FROM " + tableName);
deleteJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteRecordsSQL(tableName));
deleteTxJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteTxRecordsSQL(tableName));
deleteJournalTxRecords = connection.prepareStatement(JDBCJournalRecord.deleteJournalTxRecordsSQL(tableName));
syncTimer = new Timer();
syncTimer = new Timer(timerThread, true);
syncTimer.scheduleAtFixedRate(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY);
started = true;
@ -145,12 +119,17 @@ public class JDBCJournalImpl implements Journal {
public synchronized void stop(boolean shutdownConnection) throws Exception {
if (started) {
journalLock.writeLock().lock();
syncTimer.cancel();
sync();
if (shutdownConnection) {
connection.close();
}
started = false;
journalLock.writeLock().unlock();
}
}
@ -158,47 +137,134 @@ public class JDBCJournalImpl implements Journal {
connection.setAutoCommit(false);
Statement statement = connection.createStatement();
statement.executeUpdate("DROP TABLE " + tableName);
statement.close();
connection.commit();
stop();
}
public int sync() throws SQLException {
public synchronized int sync() {
if (!started)
return 0;
List<JDBCJournalRecord> recordRef = records;
records = new ArrayList<JDBCJournalRecord>();
for (JDBCJournalRecord record : recordRef) {
record.storeLineUp();
// We keep a list of deleted records (used for cleaning up old transaction data).
List<Long> deletedRecords = new ArrayList<>();
switch (record.getRecordType()) {
case JDBCJournalRecord.DELETE_RECORD:
record.writeDeleteRecord(deleteJournalRecords);
break;
case JDBCJournalRecord.DELETE_RECORD_TX:
record.writeDeleteTxRecord(deleteTxJournalRecords);
break;
default:
record.writeRecord(insertJournalRecords);
break;
}
}
TransactionHolder holder;
boolean success = false;
try {
for (JDBCJournalRecord record : recordRef) {
record.storeLineUp();
switch (record.getRecordType()) {
case JDBCJournalRecord.DELETE_RECORD:
// Standard SQL Delete Record, Non transactional delete
deletedRecords.add(record.getId());
record.writeDeleteRecord(deleteJournalRecords);
break;
case JDBCJournalRecord.ROLLBACK_RECORD:
// Roll back we remove all records associated with this TX ID. This query is always performed last.
holder = transactions.get(record.getTxId());
deleteJournalTxRecords.setLong(1, record.getTxId());
deleteJournalTxRecords.addBatch();
break;
case JDBCJournalRecord.COMMIT_RECORD:
// We perform all the deletes and add the commit record in the same Database TX
holder = transactions.get(record.getTxId());
for (RecordInfo info : holder.recordsToDelete) {
deletedRecords.add(record.getId());
deleteJournalRecords.setLong(1, info.id);
deleteJournalRecords.addBatch();
}
record.writeRecord(insertJournalRecords);
break;
default:
// Default we add a new record to the DB
record.writeRecord(insertJournalRecords);
break;
}
}
}
catch (SQLException e) {
executeCallbacks(recordRef, success);
return 0;
}
try {
connection.setAutoCommit(false);
insertJournalRecords.executeBatch();
deleteJournalRecords.executeBatch();
deleteTxJournalRecords.executeBatch();
deleteJournalTxRecords.executeBatch();
connection.commit();
cleanupTxRecords(deletedRecords);
success = true;
}
catch (SQLException e) {
connection.rollback();
e.printStackTrace();
performRollback(connection, recordRef);
}
executeCallbacks(recordRef, success);
return recordRef.size();
}
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
we remove the Tx Records (i.e. PREPARE, COMMIT). */
private void cleanupTxRecords(List<Long> deletedRecords) throws SQLException {
List<RecordInfo> iterableCopy;
List<TransactionHolder> iterableCopyTx = new ArrayList<>();
iterableCopyTx.addAll(transactions.values());
// TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
for (TransactionHolder h : iterableCopyTx) {
iterableCopy = new ArrayList<>();
iterableCopy.addAll(h.recordInfos);
for (RecordInfo info : iterableCopy) {
if (deletedRecords.contains(info.id)) {
h.recordInfos.remove(info);
}
}
if (h.recordInfos.isEmpty()) {
deleteJournalTxRecords.setLong(1, h.transactionID);
deleteJournalTxRecords.addBatch();
transactions.remove(h.transactionID);
}
}
}
private void performRollback(Connection connection, List<JDBCJournalRecord> records) {
try {
connection.rollback();
for (JDBCJournalRecord record : records) {
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
removeTxRecord(record);
}
}
List<TransactionHolder> txHolders = new ArrayList<>();
txHolders.addAll(transactions.values());
// On rollback we must update the tx map to remove all the tx entries
for (TransactionHolder txH : txHolders) {
if (txH.prepared == false && txH.recordInfos.isEmpty() && txH.recordsToDelete.isEmpty()) {
transactions.remove(txH.transactionID);
}
}
}
catch (Exception sqlE) {
ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE);
}
}
// TODO Use an executor.
private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean result) {
Runnable r = new Runnable() {
@ -213,9 +279,12 @@ public class JDBCJournalImpl implements Journal {
t.start();
}
private void appendRecord(JDBCJournalRecord record) throws SQLException {
private synchronized void appendRecord(JDBCJournalRecord record) {
try {
journalLock.writeLock().lock();
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
addTxRecord(record);
}
records.add(record);
}
finally {
@ -223,6 +292,46 @@ public class JDBCJournalImpl implements Journal {
}
}
private void addTxRecord(JDBCJournalRecord record) {
TransactionHolder txHolder = transactions.get(record.getTxId());
if (txHolder == null) {
txHolder = new TransactionHolder(record.getTxId());
transactions.put(record.getTxId(), txHolder);
}
// We actually only need the record ID in this instance.
if (record.isTransactional()) {
RecordInfo info = new RecordInfo(record.getTxId(), record.getRecordType(), new byte[0], record.isUpdate(), record.getCompactCount());
if (record.getRecordType() == JDBCJournalRecord.DELETE_RECORD_TX) {
txHolder.recordsToDelete.add(info);
}
else {
txHolder.recordInfos.add(info);
}
}
else {
txHolder.prepared = true;
}
}
private void removeTxRecord(JDBCJournalRecord record) {
TransactionHolder txHolder = transactions.get(record.getTxId());
// We actually only need the record ID in this instance.
if (record.isTransactional()) {
RecordInfo info = new RecordInfo(record.getTxId(), record.getRecordType(), new byte[0], record.isUpdate(), record.getCompactCount());
if (record.getRecordType() == JDBCJournalRecord.DELETE_RECORD_TX) {
txHolder.recordsToDelete.remove(info);
}
else {
txHolder.recordInfos.remove(info);
}
}
else {
txHolder.prepared = false;
}
}
@Override
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
@ -369,14 +478,14 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendCommitRecord(long txID, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD);
r.setTxId(txID);
r.setIoCompletion(callback);
appendRecord(r);
@ -387,7 +496,7 @@ public class JDBCJournalImpl implements Journal {
boolean sync,
IOCompletion callback,
boolean lineUpContext) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD);
r.setTxId(txID);
r.setStoreLineUp(lineUpContext);
r.setIoCompletion(callback);
@ -396,7 +505,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD);
r.setTxId(txID);
r.setTxData(transactionData);
r.setSync(sync);
@ -411,6 +520,7 @@ public class JDBCJournalImpl implements Journal {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
r.setTxId(txID);
r.setTxData(transactionData);
r.setTxData(transactionData);
r.setSync(sync);
r.setIoCompletion(callback);
appendRecord(r);
@ -435,7 +545,7 @@ public class JDBCJournalImpl implements Journal {
@Override
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD);
r.setTxId(txID);
r.setSync(sync);
r.setIoCompletion(callback);
@ -443,7 +553,7 @@ public class JDBCJournalImpl implements Journal {
}
@Override
public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception {
public synchronized JournalLoadInformation load(LoaderCallback reloadManager) throws Exception {
JournalLoadInformation jli = new JournalLoadInformation();
JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager);
JDBCJournalRecord r;
@ -485,8 +595,12 @@ public class JDBCJournalImpl implements Journal {
}
noRecords++;
}
jrc.checkPreparedTx();
jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
jli.setNumberOfRecords(noRecords);
transactions = jrc.getTransactions();
isLoaded = true;
}
return jli;
}

View File

@ -81,7 +81,6 @@ public class JDBCJournalLoaderCallback implements LoaderCallback {
public synchronized void updateRecord(final RecordInfo info) {
int index = committedRecords.size();
committedRecords.add(index, info);
deleteReferences.get(info.id).add(index);
}
public synchronized void deleteRecord(final long id) {
@ -106,5 +105,4 @@ public class JDBCJournalLoaderCallback implements LoaderCallback {
public long getMaxId() {
return maxId;
}
}

View File

@ -19,14 +19,13 @@ package org.apache.activemq.artemis.jdbc.store.journal;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalTransaction;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public class JDBCJournalReaderCallback implements JournalReaderCallback {
@ -34,9 +33,7 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
private final LoaderCallback loadManager;
private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<Long, JournalTransaction>();
public JDBCJournalReaderCallback(LoaderCallback loadManager) {
public JDBCJournalReaderCallback(final LoaderCallback loadManager) {
this.loadManager = loadManager;
}
@ -53,7 +50,12 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
}
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception {
onReadAddRecordTX(transactionID, info);
TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null) {
tx = new TransactionHolder(transactionID);
loadTransactions.put(transactionID, tx);
}
tx.recordInfos.add(info);
}
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception {
@ -87,14 +89,9 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
}
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
// It is possible that the TX could be null, since deletes could have happened in the journal.
TransactionHolder tx = loadTransactions.remove(transactionID);
if (tx != null) {
// JournalTransaction journalTransaction = transactions.remove(transactionID);
// if (journalTransaction == null)
// {
// throw new IllegalStateException("Cannot Commit, tx not found with ID: " + transactionID);
// }
for (RecordInfo txRecord : tx.recordInfos) {
if (txRecord.isUpdate) {
loadManager.updateRecord(txRecord);
@ -103,10 +100,6 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
loadManager.addRecord(txRecord);
}
}
for (RecordInfo deleteValue : tx.recordsToDelete) {
loadManager.deleteRecord(deleteValue.id);
}
}
}
@ -121,4 +114,23 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
public void markAsDataFile(JournalFile file) {
// Not needed for JDBC journal impl
}
public void checkPreparedTx() {
for (TransactionHolder transaction : loadTransactions.values()) {
if (!transaction.prepared || transaction.invalid) {
ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID);
loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete);
}
else {
PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
info.getRecords().addAll(transaction.recordInfos);
info.getRecordsToDelete().addAll(transaction.recordsToDelete);
loadManager.addPreparedTransaction(info);
}
}
}
public Map<Long, TransactionHolder> getTransactions() {
return loadTransactions;
}
}

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
public class JDBCJournalRecord {
@ -85,10 +86,14 @@ public class JDBCJournalRecord {
private boolean isUpdate;
private boolean isTransactional;
public JDBCJournalRecord(long id, byte recordType) {
this.id = id;
this.recordType = recordType;
this.isUpdate = recordType == UPDATE_RECORD || recordType == UPDATE_RECORD_TX;
isUpdate = recordType == UPDATE_RECORD || recordType == UPDATE_RECORD_TX;
isTransactional = recordType == UPDATE_RECORD_TX || recordType == ADD_RECORD_TX || recordType == DELETE_RECORD_TX;
// set defaults
compactCount = 0;
@ -102,11 +107,12 @@ public class JDBCJournalRecord {
}
public static String createTableSQL(String tableName) {
return "CREATE TABLE " + tableName + "(id BIGINT, " + "recordType SMALLINT, " + "compactCount SMALLINT, " + "txId BIGINT, " + "userRecordType SMALLINT, " + "variableSize INTEGER, " + "record BLOB, " + "txDataSize INTEGER, " + "txData BLOB, " + "txCheckNoRecords INTEGER)";
return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,timestamp BIGINT)";
}
public static String insertRecordsSQL(String tableName) {
return "INSERT INTO " + tableName + "(id," + "recordType," + "compactCount," + "txId," + "userRecordType," + "variableSize," + "record," + "txDataSize," + "txData," + "txCheckNoRecords) " + "VALUES (?,?,?,?,?,?,?,?,?,?)";
return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,timestamp) "
+ "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
}
public static String selectRecordsSQL(String tableName) {
@ -117,8 +123,20 @@ public class JDBCJournalRecord {
return "DELETE FROM " + tableName + " WHERE id = ?";
}
public static String deleteTxRecordsSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE txId = ?";
public static String deleteCommittedDeleteRecordsForTxSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE id IN (SELECT id FROM " + tableName + " WHERE txID=?)";
}
public static String deleteCommittedTxRecordsSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE txId=? AND (recordType=" + PREPARE_RECORD + " OR recordType=" + COMMIT_RECORD + ")";
}
public static String deleteJournalTxRecordsSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE txId=?";
}
public static String deleteRolledBackTxSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE txId=?";
}
public void complete(boolean success) {
@ -127,7 +145,7 @@ public class JDBCJournalRecord {
ioCompletion.done();
}
else {
ioCompletion.onError(1, "DATABASE INSERT FAILED");
ioCompletion.onError(1, "DATABASE TRANSACTION FAILED");
}
}
}
@ -139,16 +157,29 @@ public class JDBCJournalRecord {
}
protected void writeRecord(PreparedStatement statement) throws SQLException {
byte[] recordBytes = new byte[variableSize];
byte[] txDataBytes = new byte[txDataSize];
try {
record.read(recordBytes);
txData.read(txDataBytes);
}
catch (IOException e) {
ActiveMQJournalLogger.LOGGER.error("Error occurred whilst reading Journal Record", e);
}
statement.setLong(1, id);
statement.setByte(2, recordType);
statement.setByte(3, compactCount);
statement.setLong(4, txId);
statement.setByte(5, userRecordType);
statement.setInt(6, variableSize);
statement.setBlob(7, record);
statement.setBytes(7, recordBytes);
statement.setInt(8, txDataSize);
statement.setBlob(9, txData);
statement.setBytes(9, txDataBytes);
statement.setInt(10, txCheckNoRecords);
statement.setLong(11, System.currentTimeMillis());
statement.addBatch();
}
@ -240,8 +271,10 @@ public class JDBCJournalRecord {
}
public void setRecord(byte[] record) {
this.variableSize = record.length;
this.record = new ByteArrayInputStream(record);
if (record != null) {
this.variableSize = record.length;
this.record = new ByteArrayInputStream(record);
}
}
public void setRecord(InputStream record) {
@ -287,14 +320,16 @@ public class JDBCJournalRecord {
public void setTxData(EncodingSupport txData) {
this.txDataSize = txData.getEncodeSize();
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(variableSize);
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(txDataSize);
txData.encode(encodedBuffer);
this.txData = new ActiveMQBufferInputStream(encodedBuffer);
}
public void setTxData(byte[] txData) {
this.txDataSize = txData.length;
this.txData = new ByteArrayInputStream(txData);
if (txData != null) {
this.txDataSize = txData.length;
this.txData = new ByteArrayInputStream(txData);
}
}
public boolean isUpdate() {
@ -316,4 +351,8 @@ public class JDBCJournalRecord {
public RecordInfo toRecordInfo() throws IOException {
return new RecordInfo(getId(), getUserRecordType(), getRecordData(), isUpdate(), getCompactCount());
}
public boolean isTransactional() {
return isTransactional;
}
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.jdbc.store.journal;
import java.sql.SQLException;
import java.util.TimerTask;
public class JDBCJournalSync extends TimerTask {
@ -30,11 +29,8 @@ public class JDBCJournalSync extends TimerTask {
@Override
public void run() {
try {
if (journal.isStarted()) {
journal.sync();
}
catch (SQLException e) {
e.printStackTrace();
}
}
}

View File

@ -171,6 +171,14 @@ public class ThreadLeakCheckRule extends ExternalResource {
//another netty thread
return true;
}
else if (threadName.contains("derby")) {
// The derby engine is initialized once, and lasts the lifetime of the VM
return true;
}
else if (threadName.contains("Timer")) {
// The timer threads in Derby and JDBC use daemon and shutdown once user threads exit.
return true;
}
else if (threadName.contains("hawtdispatch")) {
// Static workers used by MQTT client.
return true;

View File

@ -53,12 +53,11 @@ public class JDBCJournalTest {
public void testInsertRecords() throws Exception {
int noRecords = 10;
for (int i = 0; i < noRecords; i++) {
journal.appendAddRecord(1, (byte) 1, new byte[0], true);
journal.appendAddRecord(i, (byte) 1, new byte[0], true);
}
Thread.sleep(3000);
assertEquals(noRecords, journal.getNumberOfRecords());
}
@Test

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
@ -38,15 +39,19 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@RunWith(Parameterized.class)
public class BasicXaRecoveryTest extends ActiveMQTestBase {
private static IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@ -71,13 +76,32 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase {
private MBeanServer mbeanServer;
protected StoreConfiguration.StoreType storeType;
public BasicXaRecoveryTest(StoreConfiguration.StoreType storeType) {
this.storeType = storeType;
}
@Parameterized.Parameters(name = "storeType")
public static Collection<Object[]> data() {
Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
return Arrays.asList(params);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
addressSettings.clear();
configuration = createDefaultInVMConfig().setJMXManagementEnabled(true);
if (storeType == StoreConfiguration.StoreType.DATABASE) {
configuration = createDefaultJDBCConfig().setJMXManagementEnabled(true);
}
else {
configuration = createDefaultInVMConfig().setJMXManagementEnabled(true);
}
mbeanServer = MBeanServerFactory.createMBeanServer();
@ -211,15 +235,18 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase {
@Test
public void testPagingServerRestarted() throws Exception {
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
verifyPaging(true);
}
@Test
public void testPaging() throws Exception {
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
verifyPaging(false);
}
public void verifyPaging(final boolean restartServer) throws Exception {
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
SimpleString pageQueue = new SimpleString("pagequeue");
@ -285,11 +312,13 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase {
@Test
public void testRollbackPaging() throws Exception {
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
testRollbackPaging(false);
}
@Test
public void testRollbackPagingServerRestarted() throws Exception {
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
testRollbackPaging(true);
}

View File

@ -19,6 +19,8 @@ package org.apache.activemq.artemis.tests.integration.xa;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -34,6 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
@ -44,7 +47,10 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class BasicXaTest extends ActiveMQTestBase {
private static IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@ -63,13 +69,31 @@ public class BasicXaTest extends ActiveMQTestBase {
private ServerLocator locator;
private StoreConfiguration.StoreType storeType;
public BasicXaTest(StoreConfiguration.StoreType storeType) {
this.storeType = storeType;
}
@Parameterized.Parameters(name = "storeType")
public static Collection<Object[]> data() {
Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
return Arrays.asList(params);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
addressSettings.clear();
configuration = createDefaultNettyConfig();
if (storeType == StoreConfiguration.StoreType.DATABASE) {
configuration = createDefaultJDBCConfig();
}
else {
configuration = createDefaultNettyConfig();
}
messagingService = createServer(false, configuration, -1, -1, addressSettings);

View File

@ -16,6 +16,18 @@
*/
package org.apache.activemq.artemis.tests.integration.xa;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -27,7 +39,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -42,17 +55,10 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@RunWith(Parameterized.class)
public class XaTimeoutTest extends ActiveMQTestBase {
private final Map<String, AddressSettings> addressSettings = new HashMap<>();
@ -67,19 +73,39 @@ public class XaTimeoutTest extends ActiveMQTestBase {
private ClientSessionFactory sessionFactory;
private ConfigurationImpl configuration;
private Configuration configuration;
private final SimpleString atestq = new SimpleString("atestq");
private ServerLocator locator;
private StoreConfiguration.StoreType storeType;
public XaTimeoutTest(StoreConfiguration.StoreType storeType) {
this.storeType = storeType;
}
@Parameterized.Parameters(name = "storeType")
public static Collection<Object[]> data() {
Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
return Arrays.asList(params);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
addressSettings.clear();
configuration = createBasicConfig().setTransactionTimeoutScanPeriod(500).addAcceptorConfiguration(new TransportConfiguration(ActiveMQTestBase.INVM_ACCEPTOR_FACTORY));
if (storeType == StoreConfiguration.StoreType.DATABASE) {
configuration = createDefaultJDBCConfig();
}
else {
configuration = createBasicConfig();
}
configuration.setTransactionTimeoutScanPeriod(500).addAcceptorConfiguration(new TransportConfiguration(ActiveMQTestBase.INVM_ACCEPTOR_FACTORY));
server = addServer(ActiveMQServers.newActiveMQServer(configuration, false));
// start the server
server.start();