This closes #352
This commit is contained in:
commit
67b1245ed8
|
@ -70,5 +70,11 @@
|
|||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jdbc.store;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.Driver;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.derby.jdbc.AutoloadedDriver;
|
||||
|
||||
public class JDBCUtils {
|
||||
|
||||
public static Driver getDriver() throws Exception {
|
||||
Driver dbDriver = null;
|
||||
// Load Database driver, sets Derby Autoloaded Driver as lowest priority.
|
||||
List<Driver> drivers = Collections.list(DriverManager.getDrivers());
|
||||
if (drivers.size() <= 2 && drivers.size() > 0) {
|
||||
dbDriver = drivers.get(0);
|
||||
boolean isDerby = dbDriver instanceof AutoloadedDriver;
|
||||
|
||||
if (drivers.size() > 1 && isDerby) {
|
||||
dbDriver = drivers.get(1);
|
||||
}
|
||||
|
||||
if (isDerby) {
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
DriverManager.getConnection("jdbc:derby:;shutdown=true");
|
||||
}
|
||||
catch (Exception e) {
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
else {
|
||||
String error = drivers.isEmpty() ? "No DB driver found on class path" : "Too many DB drivers on class path, not sure which to use";
|
||||
throw new RuntimeException(error);
|
||||
}
|
||||
return dbDriver;
|
||||
}
|
||||
|
||||
public static void createTableIfNotExists(Connection connection, String tableName, String sql) throws SQLException {
|
||||
ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null);
|
||||
if (!rs.next()) {
|
||||
Statement statement = connection.createStatement();
|
||||
statement.executeUpdate(sql);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,17 +19,16 @@ package org.apache.activemq.artemis.jdbc.store.journal;
|
|||
|
||||
import java.sql.Connection;
|
||||
import java.sql.Driver;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Timer;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
|
@ -43,7 +42,8 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
|||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
||||
import org.apache.derby.jdbc.AutoloadedDriver;
|
||||
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
|
||||
public class JDBCJournalImpl implements Journal {
|
||||
|
||||
|
@ -66,7 +66,7 @@ public class JDBCJournalImpl implements Journal {
|
|||
|
||||
private PreparedStatement deleteJournalRecords;
|
||||
|
||||
private PreparedStatement deleteTxJournalRecords;
|
||||
private PreparedStatement deleteJournalTxRecords;
|
||||
|
||||
private boolean started;
|
||||
|
||||
|
@ -78,61 +78,35 @@ public class JDBCJournalImpl implements Journal {
|
|||
|
||||
private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
|
||||
|
||||
private boolean isDerby = false;
|
||||
private final String timerThread;
|
||||
|
||||
// Track Tx Records
|
||||
private Map<Long, TransactionHolder> transactions = new ConcurrentHashMap<>();
|
||||
|
||||
private boolean isLoaded = false;
|
||||
|
||||
public JDBCJournalImpl(String jdbcUrl, String tableName) {
|
||||
this.tableName = tableName;
|
||||
this.jdbcUrl = jdbcUrl;
|
||||
timerThread = "Timer JDBC Journal(" + tableName + ")";
|
||||
|
||||
records = new ArrayList<JDBCJournalRecord>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
// Load Database driver, sets Derby Autoloaded Driver as lowest priority.
|
||||
List<Driver> drivers = Collections.list(DriverManager.getDrivers());
|
||||
if (drivers.size() <= 2 && drivers.size() > 0) {
|
||||
dbDriver = drivers.get(0);
|
||||
isDerby = dbDriver instanceof AutoloadedDriver;
|
||||
|
||||
if (drivers.size() > 1 && isDerby) {
|
||||
dbDriver = drivers.get(1);
|
||||
}
|
||||
|
||||
if (isDerby) {
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
DriverManager.getConnection("jdbc:derby:;shutdown=true");
|
||||
}
|
||||
catch (Exception e) {
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
else {
|
||||
String error = drivers.isEmpty() ? "No DB driver found on class path" : "Too many DB drivers on class path, not sure which to use";
|
||||
throw new RuntimeException(error);
|
||||
}
|
||||
|
||||
dbDriver = JDBCUtils.getDriver();
|
||||
connection = dbDriver.connect(jdbcUrl, new Properties());
|
||||
|
||||
// If JOURNAL table doesn't exist then create it
|
||||
ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null);
|
||||
if (!rs.next()) {
|
||||
Statement statement = connection.createStatement();
|
||||
statement.executeUpdate(JDBCJournalRecord.createTableSQL(tableName));
|
||||
}
|
||||
JDBCUtils.createTableIfNotExists(connection, tableName, JDBCJournalRecord.createTableSQL(tableName));
|
||||
|
||||
insertJournalRecords = connection.prepareStatement(JDBCJournalRecord.insertRecordsSQL(tableName));
|
||||
selectJournalRecords = connection.prepareStatement(JDBCJournalRecord.selectRecordsSQL(tableName));
|
||||
countJournalRecords = connection.prepareStatement("SELECT COUNT(*) FROM " + tableName);
|
||||
deleteJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteRecordsSQL(tableName));
|
||||
deleteTxJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteTxRecordsSQL(tableName));
|
||||
deleteJournalTxRecords = connection.prepareStatement(JDBCJournalRecord.deleteJournalTxRecordsSQL(tableName));
|
||||
|
||||
syncTimer = new Timer();
|
||||
syncTimer = new Timer(timerThread, true);
|
||||
syncTimer.scheduleAtFixedRate(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY);
|
||||
|
||||
started = true;
|
||||
|
@ -145,12 +119,17 @@ public class JDBCJournalImpl implements Journal {
|
|||
|
||||
public synchronized void stop(boolean shutdownConnection) throws Exception {
|
||||
if (started) {
|
||||
journalLock.writeLock().lock();
|
||||
|
||||
syncTimer.cancel();
|
||||
|
||||
sync();
|
||||
if (shutdownConnection) {
|
||||
connection.close();
|
||||
}
|
||||
|
||||
started = false;
|
||||
journalLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -158,47 +137,134 @@ public class JDBCJournalImpl implements Journal {
|
|||
connection.setAutoCommit(false);
|
||||
Statement statement = connection.createStatement();
|
||||
statement.executeUpdate("DROP TABLE " + tableName);
|
||||
statement.close();
|
||||
connection.commit();
|
||||
stop();
|
||||
}
|
||||
|
||||
public int sync() throws SQLException {
|
||||
public synchronized int sync() {
|
||||
if (!started)
|
||||
return 0;
|
||||
|
||||
List<JDBCJournalRecord> recordRef = records;
|
||||
records = new ArrayList<JDBCJournalRecord>();
|
||||
|
||||
for (JDBCJournalRecord record : recordRef) {
|
||||
record.storeLineUp();
|
||||
// We keep a list of deleted records (used for cleaning up old transaction data).
|
||||
List<Long> deletedRecords = new ArrayList<>();
|
||||
|
||||
switch (record.getRecordType()) {
|
||||
case JDBCJournalRecord.DELETE_RECORD:
|
||||
record.writeDeleteRecord(deleteJournalRecords);
|
||||
break;
|
||||
case JDBCJournalRecord.DELETE_RECORD_TX:
|
||||
record.writeDeleteTxRecord(deleteTxJournalRecords);
|
||||
break;
|
||||
default:
|
||||
record.writeRecord(insertJournalRecords);
|
||||
break;
|
||||
}
|
||||
}
|
||||
TransactionHolder holder;
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
for (JDBCJournalRecord record : recordRef) {
|
||||
record.storeLineUp();
|
||||
|
||||
switch (record.getRecordType()) {
|
||||
case JDBCJournalRecord.DELETE_RECORD:
|
||||
// Standard SQL Delete Record, Non transactional delete
|
||||
deletedRecords.add(record.getId());
|
||||
record.writeDeleteRecord(deleteJournalRecords);
|
||||
break;
|
||||
case JDBCJournalRecord.ROLLBACK_RECORD:
|
||||
// Roll back we remove all records associated with this TX ID. This query is always performed last.
|
||||
holder = transactions.get(record.getTxId());
|
||||
deleteJournalTxRecords.setLong(1, record.getTxId());
|
||||
deleteJournalTxRecords.addBatch();
|
||||
break;
|
||||
case JDBCJournalRecord.COMMIT_RECORD:
|
||||
// We perform all the deletes and add the commit record in the same Database TX
|
||||
holder = transactions.get(record.getTxId());
|
||||
for (RecordInfo info : holder.recordsToDelete) {
|
||||
deletedRecords.add(record.getId());
|
||||
deleteJournalRecords.setLong(1, info.id);
|
||||
deleteJournalRecords.addBatch();
|
||||
}
|
||||
record.writeRecord(insertJournalRecords);
|
||||
break;
|
||||
default:
|
||||
// Default we add a new record to the DB
|
||||
record.writeRecord(insertJournalRecords);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (SQLException e) {
|
||||
executeCallbacks(recordRef, success);
|
||||
return 0;
|
||||
}
|
||||
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
|
||||
insertJournalRecords.executeBatch();
|
||||
deleteJournalRecords.executeBatch();
|
||||
deleteTxJournalRecords.executeBatch();
|
||||
deleteJournalTxRecords.executeBatch();
|
||||
|
||||
connection.commit();
|
||||
|
||||
cleanupTxRecords(deletedRecords);
|
||||
success = true;
|
||||
}
|
||||
catch (SQLException e) {
|
||||
connection.rollback();
|
||||
e.printStackTrace();
|
||||
performRollback(connection, recordRef);
|
||||
}
|
||||
|
||||
executeCallbacks(recordRef, success);
|
||||
return recordRef.size();
|
||||
}
|
||||
|
||||
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
|
||||
we remove the Tx Records (i.e. PREPARE, COMMIT). */
|
||||
private void cleanupTxRecords(List<Long> deletedRecords) throws SQLException {
|
||||
|
||||
List<RecordInfo> iterableCopy;
|
||||
List<TransactionHolder> iterableCopyTx = new ArrayList<>();
|
||||
iterableCopyTx.addAll(transactions.values());
|
||||
|
||||
// TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
|
||||
for (TransactionHolder h : iterableCopyTx) {
|
||||
|
||||
iterableCopy = new ArrayList<>();
|
||||
iterableCopy.addAll(h.recordInfos);
|
||||
|
||||
for (RecordInfo info : iterableCopy) {
|
||||
if (deletedRecords.contains(info.id)) {
|
||||
h.recordInfos.remove(info);
|
||||
}
|
||||
}
|
||||
|
||||
if (h.recordInfos.isEmpty()) {
|
||||
deleteJournalTxRecords.setLong(1, h.transactionID);
|
||||
deleteJournalTxRecords.addBatch();
|
||||
transactions.remove(h.transactionID);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void performRollback(Connection connection, List<JDBCJournalRecord> records) {
|
||||
try {
|
||||
connection.rollback();
|
||||
for (JDBCJournalRecord record : records) {
|
||||
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
|
||||
removeTxRecord(record);
|
||||
}
|
||||
}
|
||||
|
||||
List<TransactionHolder> txHolders = new ArrayList<>();
|
||||
txHolders.addAll(transactions.values());
|
||||
|
||||
// On rollback we must update the tx map to remove all the tx entries
|
||||
for (TransactionHolder txH : txHolders) {
|
||||
if (txH.prepared == false && txH.recordInfos.isEmpty() && txH.recordsToDelete.isEmpty()) {
|
||||
transactions.remove(txH.transactionID);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception sqlE) {
|
||||
ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO Use an executor.
|
||||
private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean result) {
|
||||
Runnable r = new Runnable() {
|
||||
|
@ -213,9 +279,12 @@ public class JDBCJournalImpl implements Journal {
|
|||
t.start();
|
||||
}
|
||||
|
||||
private void appendRecord(JDBCJournalRecord record) throws SQLException {
|
||||
private synchronized void appendRecord(JDBCJournalRecord record) {
|
||||
try {
|
||||
journalLock.writeLock().lock();
|
||||
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
|
||||
addTxRecord(record);
|
||||
}
|
||||
records.add(record);
|
||||
}
|
||||
finally {
|
||||
|
@ -223,6 +292,46 @@ public class JDBCJournalImpl implements Journal {
|
|||
}
|
||||
}
|
||||
|
||||
private void addTxRecord(JDBCJournalRecord record) {
|
||||
TransactionHolder txHolder = transactions.get(record.getTxId());
|
||||
if (txHolder == null) {
|
||||
txHolder = new TransactionHolder(record.getTxId());
|
||||
transactions.put(record.getTxId(), txHolder);
|
||||
}
|
||||
|
||||
// We actually only need the record ID in this instance.
|
||||
if (record.isTransactional()) {
|
||||
RecordInfo info = new RecordInfo(record.getTxId(), record.getRecordType(), new byte[0], record.isUpdate(), record.getCompactCount());
|
||||
if (record.getRecordType() == JDBCJournalRecord.DELETE_RECORD_TX) {
|
||||
txHolder.recordsToDelete.add(info);
|
||||
}
|
||||
else {
|
||||
txHolder.recordInfos.add(info);
|
||||
}
|
||||
}
|
||||
else {
|
||||
txHolder.prepared = true;
|
||||
}
|
||||
}
|
||||
|
||||
private void removeTxRecord(JDBCJournalRecord record) {
|
||||
TransactionHolder txHolder = transactions.get(record.getTxId());
|
||||
|
||||
// We actually only need the record ID in this instance.
|
||||
if (record.isTransactional()) {
|
||||
RecordInfo info = new RecordInfo(record.getTxId(), record.getRecordType(), new byte[0], record.isUpdate(), record.getCompactCount());
|
||||
if (record.getRecordType() == JDBCJournalRecord.DELETE_RECORD_TX) {
|
||||
txHolder.recordsToDelete.remove(info);
|
||||
}
|
||||
else {
|
||||
txHolder.recordInfos.remove(info);
|
||||
}
|
||||
}
|
||||
else {
|
||||
txHolder.prepared = false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
|
||||
|
@ -369,14 +478,14 @@ public class JDBCJournalImpl implements Journal {
|
|||
|
||||
@Override
|
||||
public void appendCommitRecord(long txID, boolean sync) throws Exception {
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD);
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD);
|
||||
r.setTxId(txID);
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD);
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD);
|
||||
r.setTxId(txID);
|
||||
r.setIoCompletion(callback);
|
||||
appendRecord(r);
|
||||
|
@ -387,7 +496,7 @@ public class JDBCJournalImpl implements Journal {
|
|||
boolean sync,
|
||||
IOCompletion callback,
|
||||
boolean lineUpContext) throws Exception {
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD);
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD);
|
||||
r.setTxId(txID);
|
||||
r.setStoreLineUp(lineUpContext);
|
||||
r.setIoCompletion(callback);
|
||||
|
@ -396,7 +505,7 @@ public class JDBCJournalImpl implements Journal {
|
|||
|
||||
@Override
|
||||
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception {
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD);
|
||||
r.setTxId(txID);
|
||||
r.setTxData(transactionData);
|
||||
r.setSync(sync);
|
||||
|
@ -411,6 +520,7 @@ public class JDBCJournalImpl implements Journal {
|
|||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
|
||||
r.setTxId(txID);
|
||||
r.setTxData(transactionData);
|
||||
r.setTxData(transactionData);
|
||||
r.setSync(sync);
|
||||
r.setIoCompletion(callback);
|
||||
appendRecord(r);
|
||||
|
@ -435,7 +545,7 @@ public class JDBCJournalImpl implements Journal {
|
|||
|
||||
@Override
|
||||
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD);
|
||||
r.setTxId(txID);
|
||||
r.setSync(sync);
|
||||
r.setIoCompletion(callback);
|
||||
|
@ -443,7 +553,7 @@ public class JDBCJournalImpl implements Journal {
|
|||
}
|
||||
|
||||
@Override
|
||||
public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception {
|
||||
public synchronized JournalLoadInformation load(LoaderCallback reloadManager) throws Exception {
|
||||
JournalLoadInformation jli = new JournalLoadInformation();
|
||||
JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager);
|
||||
JDBCJournalRecord r;
|
||||
|
@ -485,8 +595,12 @@ public class JDBCJournalImpl implements Journal {
|
|||
}
|
||||
noRecords++;
|
||||
}
|
||||
jrc.checkPreparedTx();
|
||||
|
||||
jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
|
||||
jli.setNumberOfRecords(noRecords);
|
||||
transactions = jrc.getTransactions();
|
||||
isLoaded = true;
|
||||
}
|
||||
return jli;
|
||||
}
|
||||
|
|
|
@ -81,7 +81,6 @@ public class JDBCJournalLoaderCallback implements LoaderCallback {
|
|||
public synchronized void updateRecord(final RecordInfo info) {
|
||||
int index = committedRecords.size();
|
||||
committedRecords.add(index, info);
|
||||
deleteReferences.get(info.id).add(index);
|
||||
}
|
||||
|
||||
public synchronized void deleteRecord(final long id) {
|
||||
|
@ -106,5 +105,4 @@ public class JDBCJournalLoaderCallback implements LoaderCallback {
|
|||
public long getMaxId() {
|
||||
return maxId;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,14 +19,13 @@ package org.apache.activemq.artemis.jdbc.store.journal;
|
|||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.activemq.artemis.core.journal.LoaderCallback;
|
||||
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalTransaction;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
|
||||
public class JDBCJournalReaderCallback implements JournalReaderCallback {
|
||||
|
||||
|
@ -34,9 +33,7 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
|
|||
|
||||
private final LoaderCallback loadManager;
|
||||
|
||||
private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<Long, JournalTransaction>();
|
||||
|
||||
public JDBCJournalReaderCallback(LoaderCallback loadManager) {
|
||||
public JDBCJournalReaderCallback(final LoaderCallback loadManager) {
|
||||
this.loadManager = loadManager;
|
||||
}
|
||||
|
||||
|
@ -53,7 +50,12 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
|
|||
}
|
||||
|
||||
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception {
|
||||
onReadAddRecordTX(transactionID, info);
|
||||
TransactionHolder tx = loadTransactions.get(transactionID);
|
||||
if (tx == null) {
|
||||
tx = new TransactionHolder(transactionID);
|
||||
loadTransactions.put(transactionID, tx);
|
||||
}
|
||||
tx.recordInfos.add(info);
|
||||
}
|
||||
|
||||
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception {
|
||||
|
@ -87,14 +89,9 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
|
|||
}
|
||||
|
||||
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
|
||||
// It is possible that the TX could be null, since deletes could have happened in the journal.
|
||||
TransactionHolder tx = loadTransactions.remove(transactionID);
|
||||
if (tx != null) {
|
||||
// JournalTransaction journalTransaction = transactions.remove(transactionID);
|
||||
// if (journalTransaction == null)
|
||||
// {
|
||||
// throw new IllegalStateException("Cannot Commit, tx not found with ID: " + transactionID);
|
||||
// }
|
||||
|
||||
for (RecordInfo txRecord : tx.recordInfos) {
|
||||
if (txRecord.isUpdate) {
|
||||
loadManager.updateRecord(txRecord);
|
||||
|
@ -103,10 +100,6 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
|
|||
loadManager.addRecord(txRecord);
|
||||
}
|
||||
}
|
||||
|
||||
for (RecordInfo deleteValue : tx.recordsToDelete) {
|
||||
loadManager.deleteRecord(deleteValue.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -121,4 +114,23 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
|
|||
public void markAsDataFile(JournalFile file) {
|
||||
// Not needed for JDBC journal impl
|
||||
}
|
||||
|
||||
public void checkPreparedTx() {
|
||||
for (TransactionHolder transaction : loadTransactions.values()) {
|
||||
if (!transaction.prepared || transaction.invalid) {
|
||||
ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID);
|
||||
loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete);
|
||||
}
|
||||
else {
|
||||
PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
|
||||
info.getRecords().addAll(transaction.recordInfos);
|
||||
info.getRecordsToDelete().addAll(transaction.recordsToDelete);
|
||||
loadManager.addPreparedTransaction(info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Map<Long, TransactionHolder> getTransactions() {
|
||||
return loadTransactions;
|
||||
}
|
||||
}
|
|
@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
|||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.journal.IOCompletion;
|
||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
|
||||
|
||||
public class JDBCJournalRecord {
|
||||
|
@ -85,10 +86,14 @@ public class JDBCJournalRecord {
|
|||
|
||||
private boolean isUpdate;
|
||||
|
||||
private boolean isTransactional;
|
||||
|
||||
public JDBCJournalRecord(long id, byte recordType) {
|
||||
this.id = id;
|
||||
this.recordType = recordType;
|
||||
this.isUpdate = recordType == UPDATE_RECORD || recordType == UPDATE_RECORD_TX;
|
||||
|
||||
isUpdate = recordType == UPDATE_RECORD || recordType == UPDATE_RECORD_TX;
|
||||
isTransactional = recordType == UPDATE_RECORD_TX || recordType == ADD_RECORD_TX || recordType == DELETE_RECORD_TX;
|
||||
|
||||
// set defaults
|
||||
compactCount = 0;
|
||||
|
@ -102,11 +107,12 @@ public class JDBCJournalRecord {
|
|||
}
|
||||
|
||||
public static String createTableSQL(String tableName) {
|
||||
return "CREATE TABLE " + tableName + "(id BIGINT, " + "recordType SMALLINT, " + "compactCount SMALLINT, " + "txId BIGINT, " + "userRecordType SMALLINT, " + "variableSize INTEGER, " + "record BLOB, " + "txDataSize INTEGER, " + "txData BLOB, " + "txCheckNoRecords INTEGER)";
|
||||
return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,timestamp BIGINT)";
|
||||
}
|
||||
|
||||
public static String insertRecordsSQL(String tableName) {
|
||||
return "INSERT INTO " + tableName + "(id," + "recordType," + "compactCount," + "txId," + "userRecordType," + "variableSize," + "record," + "txDataSize," + "txData," + "txCheckNoRecords) " + "VALUES (?,?,?,?,?,?,?,?,?,?)";
|
||||
return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,timestamp) "
|
||||
+ "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
|
||||
}
|
||||
|
||||
public static String selectRecordsSQL(String tableName) {
|
||||
|
@ -117,8 +123,20 @@ public class JDBCJournalRecord {
|
|||
return "DELETE FROM " + tableName + " WHERE id = ?";
|
||||
}
|
||||
|
||||
public static String deleteTxRecordsSQL(String tableName) {
|
||||
return "DELETE FROM " + tableName + " WHERE txId = ?";
|
||||
public static String deleteCommittedDeleteRecordsForTxSQL(String tableName) {
|
||||
return "DELETE FROM " + tableName + " WHERE id IN (SELECT id FROM " + tableName + " WHERE txID=?)";
|
||||
}
|
||||
|
||||
public static String deleteCommittedTxRecordsSQL(String tableName) {
|
||||
return "DELETE FROM " + tableName + " WHERE txId=? AND (recordType=" + PREPARE_RECORD + " OR recordType=" + COMMIT_RECORD + ")";
|
||||
}
|
||||
|
||||
public static String deleteJournalTxRecordsSQL(String tableName) {
|
||||
return "DELETE FROM " + tableName + " WHERE txId=?";
|
||||
}
|
||||
|
||||
public static String deleteRolledBackTxSQL(String tableName) {
|
||||
return "DELETE FROM " + tableName + " WHERE txId=?";
|
||||
}
|
||||
|
||||
public void complete(boolean success) {
|
||||
|
@ -127,7 +145,7 @@ public class JDBCJournalRecord {
|
|||
ioCompletion.done();
|
||||
}
|
||||
else {
|
||||
ioCompletion.onError(1, "DATABASE INSERT FAILED");
|
||||
ioCompletion.onError(1, "DATABASE TRANSACTION FAILED");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -139,16 +157,29 @@ public class JDBCJournalRecord {
|
|||
}
|
||||
|
||||
protected void writeRecord(PreparedStatement statement) throws SQLException {
|
||||
|
||||
byte[] recordBytes = new byte[variableSize];
|
||||
byte[] txDataBytes = new byte[txDataSize];
|
||||
|
||||
try {
|
||||
record.read(recordBytes);
|
||||
txData.read(txDataBytes);
|
||||
}
|
||||
catch (IOException e) {
|
||||
ActiveMQJournalLogger.LOGGER.error("Error occurred whilst reading Journal Record", e);
|
||||
}
|
||||
|
||||
statement.setLong(1, id);
|
||||
statement.setByte(2, recordType);
|
||||
statement.setByte(3, compactCount);
|
||||
statement.setLong(4, txId);
|
||||
statement.setByte(5, userRecordType);
|
||||
statement.setInt(6, variableSize);
|
||||
statement.setBlob(7, record);
|
||||
statement.setBytes(7, recordBytes);
|
||||
statement.setInt(8, txDataSize);
|
||||
statement.setBlob(9, txData);
|
||||
statement.setBytes(9, txDataBytes);
|
||||
statement.setInt(10, txCheckNoRecords);
|
||||
statement.setLong(11, System.currentTimeMillis());
|
||||
statement.addBatch();
|
||||
}
|
||||
|
||||
|
@ -240,8 +271,10 @@ public class JDBCJournalRecord {
|
|||
}
|
||||
|
||||
public void setRecord(byte[] record) {
|
||||
this.variableSize = record.length;
|
||||
this.record = new ByteArrayInputStream(record);
|
||||
if (record != null) {
|
||||
this.variableSize = record.length;
|
||||
this.record = new ByteArrayInputStream(record);
|
||||
}
|
||||
}
|
||||
|
||||
public void setRecord(InputStream record) {
|
||||
|
@ -287,14 +320,16 @@ public class JDBCJournalRecord {
|
|||
public void setTxData(EncodingSupport txData) {
|
||||
this.txDataSize = txData.getEncodeSize();
|
||||
|
||||
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(variableSize);
|
||||
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(txDataSize);
|
||||
txData.encode(encodedBuffer);
|
||||
this.txData = new ActiveMQBufferInputStream(encodedBuffer);
|
||||
}
|
||||
|
||||
public void setTxData(byte[] txData) {
|
||||
this.txDataSize = txData.length;
|
||||
this.txData = new ByteArrayInputStream(txData);
|
||||
if (txData != null) {
|
||||
this.txDataSize = txData.length;
|
||||
this.txData = new ByteArrayInputStream(txData);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isUpdate() {
|
||||
|
@ -316,4 +351,8 @@ public class JDBCJournalRecord {
|
|||
public RecordInfo toRecordInfo() throws IOException {
|
||||
return new RecordInfo(getId(), getUserRecordType(), getRecordData(), isUpdate(), getCompactCount());
|
||||
}
|
||||
|
||||
public boolean isTransactional() {
|
||||
return isTransactional;
|
||||
}
|
||||
}
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.activemq.artemis.jdbc.store.journal;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.TimerTask;
|
||||
|
||||
public class JDBCJournalSync extends TimerTask {
|
||||
|
@ -30,11 +29,8 @@ public class JDBCJournalSync extends TimerTask {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (journal.isStarted()) {
|
||||
journal.sync();
|
||||
}
|
||||
catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -171,6 +171,14 @@ public class ThreadLeakCheckRule extends ExternalResource {
|
|||
//another netty thread
|
||||
return true;
|
||||
}
|
||||
else if (threadName.contains("derby")) {
|
||||
// The derby engine is initialized once, and lasts the lifetime of the VM
|
||||
return true;
|
||||
}
|
||||
else if (threadName.contains("Timer")) {
|
||||
// The timer threads in Derby and JDBC use daemon and shutdown once user threads exit.
|
||||
return true;
|
||||
}
|
||||
else if (threadName.contains("hawtdispatch")) {
|
||||
// Static workers used by MQTT client.
|
||||
return true;
|
||||
|
|
|
@ -53,12 +53,11 @@ public class JDBCJournalTest {
|
|||
public void testInsertRecords() throws Exception {
|
||||
int noRecords = 10;
|
||||
for (int i = 0; i < noRecords; i++) {
|
||||
journal.appendAddRecord(1, (byte) 1, new byte[0], true);
|
||||
journal.appendAddRecord(i, (byte) 1, new byte[0], true);
|
||||
}
|
||||
|
||||
Thread.sleep(3000);
|
||||
assertEquals(noRecords, journal.getNumberOfRecords());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
|||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.core.management.QueueControl;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||
|
@ -38,15 +39,19 @@ import org.junit.After;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MBeanServerFactory;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class BasicXaRecoveryTest extends ActiveMQTestBase {
|
||||
|
||||
private static IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
@ -71,13 +76,32 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase {
|
|||
|
||||
private MBeanServer mbeanServer;
|
||||
|
||||
protected StoreConfiguration.StoreType storeType;
|
||||
|
||||
public BasicXaRecoveryTest(StoreConfiguration.StoreType storeType) {
|
||||
this.storeType = storeType;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "storeType")
|
||||
public static Collection<Object[]> data() {
|
||||
Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
|
||||
return Arrays.asList(params);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
addressSettings.clear();
|
||||
configuration = createDefaultInVMConfig().setJMXManagementEnabled(true);
|
||||
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) {
|
||||
configuration = createDefaultJDBCConfig().setJMXManagementEnabled(true);
|
||||
}
|
||||
else {
|
||||
configuration = createDefaultInVMConfig().setJMXManagementEnabled(true);
|
||||
}
|
||||
|
||||
|
||||
mbeanServer = MBeanServerFactory.createMBeanServer();
|
||||
|
||||
|
@ -211,15 +235,18 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase {
|
|||
|
||||
@Test
|
||||
public void testPagingServerRestarted() throws Exception {
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
|
||||
verifyPaging(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPaging() throws Exception {
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
|
||||
verifyPaging(false);
|
||||
}
|
||||
|
||||
public void verifyPaging(final boolean restartServer) throws Exception {
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
|
||||
Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
|
||||
|
||||
SimpleString pageQueue = new SimpleString("pagequeue");
|
||||
|
@ -285,11 +312,13 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase {
|
|||
|
||||
@Test
|
||||
public void testRollbackPaging() throws Exception {
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
|
||||
testRollbackPaging(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRollbackPagingServerRestarted() throws Exception {
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
|
||||
testRollbackPaging(true);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.activemq.artemis.tests.integration.xa;
|
|||
import javax.transaction.xa.XAException;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -34,6 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
|||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||
|
@ -44,7 +47,10 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class BasicXaTest extends ActiveMQTestBase {
|
||||
|
||||
private static IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
@ -63,13 +69,31 @@ public class BasicXaTest extends ActiveMQTestBase {
|
|||
|
||||
private ServerLocator locator;
|
||||
|
||||
private StoreConfiguration.StoreType storeType;
|
||||
|
||||
public BasicXaTest(StoreConfiguration.StoreType storeType) {
|
||||
this.storeType = storeType;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "storeType")
|
||||
public static Collection<Object[]> data() {
|
||||
Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
|
||||
return Arrays.asList(params);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
addressSettings.clear();
|
||||
configuration = createDefaultNettyConfig();
|
||||
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) {
|
||||
configuration = createDefaultJDBCConfig();
|
||||
}
|
||||
else {
|
||||
configuration = createDefaultNettyConfig();
|
||||
}
|
||||
|
||||
messagingService = createServer(false, configuration, -1, -1, addressSettings);
|
||||
|
||||
|
|
|
@ -16,6 +16,18 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.xa;
|
||||
|
||||
import javax.transaction.xa.XAException;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -27,7 +39,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
|
@ -42,17 +55,10 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import javax.transaction.xa.XAException;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class XaTimeoutTest extends ActiveMQTestBase {
|
||||
|
||||
private final Map<String, AddressSettings> addressSettings = new HashMap<>();
|
||||
|
@ -67,19 +73,39 @@ public class XaTimeoutTest extends ActiveMQTestBase {
|
|||
|
||||
private ClientSessionFactory sessionFactory;
|
||||
|
||||
private ConfigurationImpl configuration;
|
||||
private Configuration configuration;
|
||||
|
||||
private final SimpleString atestq = new SimpleString("atestq");
|
||||
|
||||
private ServerLocator locator;
|
||||
|
||||
private StoreConfiguration.StoreType storeType;
|
||||
|
||||
public XaTimeoutTest(StoreConfiguration.StoreType storeType) {
|
||||
this.storeType = storeType;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "storeType")
|
||||
public static Collection<Object[]> data() {
|
||||
Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
|
||||
return Arrays.asList(params);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
addressSettings.clear();
|
||||
configuration = createBasicConfig().setTransactionTimeoutScanPeriod(500).addAcceptorConfiguration(new TransportConfiguration(ActiveMQTestBase.INVM_ACCEPTOR_FACTORY));
|
||||
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) {
|
||||
configuration = createDefaultJDBCConfig();
|
||||
}
|
||||
else {
|
||||
configuration = createBasicConfig();
|
||||
}
|
||||
configuration.setTransactionTimeoutScanPeriod(500).addAcceptorConfiguration(new TransportConfiguration(ActiveMQTestBase.INVM_ACCEPTOR_FACTORY));
|
||||
|
||||
server = addServer(ActiveMQServers.newActiveMQServer(configuration, false));
|
||||
// start the server
|
||||
server.start();
|
||||
|
|
Loading…
Reference in New Issue