ARTEMIS-1115 Traces and tests on JDBC Persistence
This commit is contained in:
parent
7b68b0a49a
commit
bfe2bdd7b2
|
@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
|
@ -48,7 +49,6 @@ import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
|
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
|
@ -74,6 +74,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
|
|
||||||
private boolean started;
|
private boolean started;
|
||||||
|
|
||||||
|
private AtomicBoolean failed = new AtomicBoolean(false);
|
||||||
|
|
||||||
private JDBCJournalSync syncTimer;
|
private JDBCJournalSync syncTimer;
|
||||||
|
|
||||||
private final Executor completeExecutor;
|
private final Executor completeExecutor;
|
||||||
|
@ -161,8 +163,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized int sync() {
|
public synchronized int sync() {
|
||||||
if (!started)
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
List<JDBCJournalRecord> recordRef;
|
List<JDBCJournalRecord> recordRef;
|
||||||
synchronized (records) {
|
synchronized (records) {
|
||||||
|
@ -173,6 +173,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
records.clear();
|
records.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!started || failed.get()) {
|
||||||
|
executeCallbacks(recordRef, false);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// We keep a list of deleted records and committed tx (used for cleaning up old transaction data).
|
// We keep a list of deleted records and committed tx (used for cleaning up old transaction data).
|
||||||
List<Long> deletedRecords = new ArrayList<>();
|
List<Long> deletedRecords = new ArrayList<>();
|
||||||
List<Long> committedTransactions = new ArrayList<>();
|
List<Long> committedTransactions = new ArrayList<>();
|
||||||
|
@ -183,6 +189,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
connection.setAutoCommit(false);
|
connection.setAutoCommit(false);
|
||||||
|
|
||||||
for (JDBCJournalRecord record : recordRef) {
|
for (JDBCJournalRecord record : recordRef) {
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("sync::preparing JDBC statment for " + record);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
switch (record.getRecordType()) {
|
switch (record.getRecordType()) {
|
||||||
case JDBCJournalRecord.DELETE_RECORD:
|
case JDBCJournalRecord.DELETE_RECORD:
|
||||||
// Standard SQL Delete Record, Non transactional delete
|
// Standard SQL Delete Record, Non transactional delete
|
||||||
|
@ -217,6 +230,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
deleteJournalTxRecords.executeBatch();
|
deleteJournalTxRecords.executeBatch();
|
||||||
|
|
||||||
connection.commit();
|
connection.commit();
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("JDBC commit worked");
|
||||||
|
}
|
||||||
|
|
||||||
cleanupTxRecords(deletedRecords, committedTransactions);
|
cleanupTxRecords(deletedRecords, committedTransactions);
|
||||||
executeCallbacks(recordRef, true);
|
executeCallbacks(recordRef, true);
|
||||||
|
@ -224,14 +240,38 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
return recordRef.size();
|
return recordRef.size();
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
criticalIOErrorListener.onIOException(e, "Critical IO Error. Failed to process JDBC Record statements", null);
|
handleException(recordRef, e);
|
||||||
started = false;
|
|
||||||
executeCallbacks(recordRef, false);
|
|
||||||
performRollback(recordRef);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** public for tests only, not through API */
|
||||||
|
public void handleException(List<JDBCJournalRecord> recordRef, Throwable e) {
|
||||||
|
logger.warn(e.getMessage(), e);
|
||||||
|
failed.set(true);
|
||||||
|
criticalIOErrorListener.onIOException(e, "Critical IO Error. Failed to process JDBC Record statements", null);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Rolling back Transaction, just in case");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection.rollback();
|
||||||
|
} catch (Throwable rollback) {
|
||||||
|
logger.warn(rollback);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection.close();
|
||||||
|
} catch (Throwable rollback) {
|
||||||
|
logger.warn(rollback);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (recordRef != null) {
|
||||||
|
executeCallbacks(recordRef, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
|
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
|
||||||
we remove the Tx Records (i.e. PREPARE, COMMIT). */
|
we remove the Tx Records (i.e. PREPARE, COMMIT). */
|
||||||
private synchronized void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
|
private synchronized void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
|
||||||
|
@ -263,45 +303,49 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void performRollback(List<JDBCJournalRecord> records) {
|
private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean success) {
|
||||||
try {
|
|
||||||
for (JDBCJournalRecord record : records) {
|
|
||||||
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
|
|
||||||
removeTxRecord(record);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
List<TransactionHolder> txHolders = new ArrayList<>();
|
|
||||||
txHolders.addAll(transactions.values());
|
|
||||||
|
|
||||||
// On rollback we must update the tx map to remove all the tx entries
|
|
||||||
for (TransactionHolder txH : txHolders) {
|
|
||||||
if (!txH.prepared && txH.recordInfos.isEmpty() && txH.recordsToDelete.isEmpty()) {
|
|
||||||
transactions.remove(txH.transactionID);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
connection.rollback();
|
|
||||||
} catch (Exception sqlE) {
|
|
||||||
logger.error(sqlE.getMessage(), sqlE);
|
|
||||||
criticalIOErrorListener.onIOException(sqlE, sqlE.getMessage(), null);
|
|
||||||
ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean result) {
|
|
||||||
Runnable r = new Runnable() {
|
Runnable r = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
for (JDBCJournalRecord record : records) {
|
for (JDBCJournalRecord record : records) {
|
||||||
record.complete(result);
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Calling callback " + record + " with success = " + success);
|
||||||
|
}
|
||||||
|
record.complete(success);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
completeExecutor.execute(r);
|
completeExecutor.execute(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void checkStatus() {
|
||||||
|
checkStatus(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkStatus(IOCompletion callback) {
|
||||||
|
if (!started) {
|
||||||
|
if (callback != null) callback.onError(-1, "JDBC Journal is not loaded");
|
||||||
|
throw new IllegalStateException("JDBCJournal is not loaded");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (failed.get()) {
|
||||||
|
if (callback != null) callback.onError(-1, "JDBC Journal failed");
|
||||||
|
throw new IllegalStateException("JDBCJournal Failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void appendRecord(JDBCJournalRecord record) throws Exception {
|
private void appendRecord(JDBCJournalRecord record) throws Exception {
|
||||||
|
|
||||||
|
// extra measure I know, as all the callers are also checking for this..
|
||||||
|
// better to be safe ;)
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendRecord " + record);
|
||||||
|
}
|
||||||
|
|
||||||
record.storeLineUp();
|
record.storeLineUp();
|
||||||
if (!started) {
|
if (!started) {
|
||||||
if (record.getIoCompletion() != null) {
|
if (record.getIoCompletion() != null) {
|
||||||
|
@ -330,6 +374,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void addTxRecord(JDBCJournalRecord record) {
|
private synchronized void addTxRecord(JDBCJournalRecord record) {
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("addTxRecord " + record + ", started=" + started + ", failed=" + failed);
|
||||||
|
}
|
||||||
|
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
TransactionHolder txHolder = transactions.get(record.getTxId());
|
TransactionHolder txHolder = transactions.get(record.getTxId());
|
||||||
if (txHolder == null) {
|
if (txHolder == null) {
|
||||||
txHolder = new TransactionHolder(record.getTxId());
|
txHolder = new TransactionHolder(record.getTxId());
|
||||||
|
@ -349,28 +400,22 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void removeTxRecord(JDBCJournalRecord record) {
|
|
||||||
TransactionHolder txHolder = transactions.get(record.getTxId());
|
|
||||||
|
|
||||||
// We actually only need the record ID in this instance.
|
|
||||||
if (record.isTransactional()) {
|
|
||||||
RecordInfo info = new RecordInfo(record.getTxId(), record.getRecordType(), new byte[0], record.isUpdate(), record.getCompactCount());
|
|
||||||
if (record.getRecordType() == JDBCJournalRecord.DELETE_RECORD_TX) {
|
|
||||||
txHolder.recordsToDelete.remove(info);
|
|
||||||
} else {
|
|
||||||
txHolder.recordInfos.remove(info);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
txHolder.prepared = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
|
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
|
||||||
r.setUserRecordType(recordType);
|
r.setUserRecordType(recordType);
|
||||||
r.setRecord(record);
|
r.setRecord(record);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
|
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendAddRecord bytes[] " + r);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -380,6 +425,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
r.setUserRecordType(recordType);
|
r.setUserRecordType(recordType);
|
||||||
r.setRecord(persister, record);
|
r.setRecord(persister, record);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendAddRecord (encoding) " + r + " with record = " + record);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -390,20 +441,36 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
Object record,
|
Object record,
|
||||||
boolean sync,
|
boolean sync,
|
||||||
IOCompletion completionCallback) throws Exception {
|
IOCompletion completionCallback) throws Exception {
|
||||||
|
checkStatus(completionCallback);
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
|
||||||
r.setUserRecordType(recordType);
|
r.setUserRecordType(recordType);
|
||||||
r.setRecord(persister, record);
|
r.setRecord(persister, record);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
r.setIoCompletion(completionCallback);
|
r.setIoCompletion(completionCallback);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendAddRecord (completionCallback & encoding) " + r + " with record = " + record);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
|
public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
|
||||||
r.setUserRecordType(recordType);
|
r.setUserRecordType(recordType);
|
||||||
r.setRecord(record);
|
r.setRecord(record);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendUpdateRecord (bytes)) " + r);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -413,6 +480,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
r.setUserRecordType(recordType);
|
r.setUserRecordType(recordType);
|
||||||
r.setRecord(persister, record);
|
r.setRecord(persister, record);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendUpdateRecord (encoding)) " + r + " with record " + record);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -423,36 +496,67 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
Object record,
|
Object record,
|
||||||
boolean sync,
|
boolean sync,
|
||||||
IOCompletion completionCallback) throws Exception {
|
IOCompletion completionCallback) throws Exception {
|
||||||
|
checkStatus(completionCallback);
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
|
||||||
r.setUserRecordType(recordType);
|
r.setUserRecordType(recordType);
|
||||||
r.setRecord(persister, record);
|
r.setRecord(persister, record);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
r.setIoCompletion(completionCallback);
|
r.setIoCompletion(completionCallback);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendUpdateRecord (encoding & completioncallback)) " + r + " with record " + record);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendDeleteRecord(long id, boolean sync) throws Exception {
|
public void appendDeleteRecord(long id, boolean sync) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendDeleteRecord id=" + id + " sync=" + sync);
|
||||||
|
}
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
|
public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
|
||||||
|
checkStatus(completionCallback);
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
r.setIoCompletion(completionCallback);
|
r.setIoCompletion(completionCallback);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendDeleteRecord id=" + id + " sync=" + sync + " with completionCallback");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
|
public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
|
||||||
r.setUserRecordType(recordType);
|
r.setUserRecordType(recordType);
|
||||||
r.setRecord(record);
|
r.setRecord(record);
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendAddRecordTransactional txID=" + txID + " id=" + id + " using bytes[] r=" + r);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -465,15 +569,29 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
r.setUserRecordType(recordType);
|
r.setUserRecordType(recordType);
|
||||||
r.setRecord(persister, record);
|
r.setRecord(persister, record);
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendAddRecordTransactional txID=" + txID + " id=" + id + " using encoding=" + record + " and r=" + r);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
|
public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet());
|
||||||
r.setUserRecordType(recordType);
|
r.setUserRecordType(recordType);
|
||||||
r.setRecord(record);
|
r.setRecord(record);
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendUpdateRecordTransactional txID=" + txID + " id=" + id + " using bytes and r=" + r);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -487,46 +605,88 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
r.setUserRecordType(recordType);
|
r.setUserRecordType(recordType);
|
||||||
r.setRecord(persister, record);
|
r.setRecord(persister, record);
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendUpdateRecordTransactional txID=" + txID + " id=" + id + " using encoding=" + record + " and r=" + r);
|
||||||
|
}
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception {
|
public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
|
||||||
r.setRecord(record);
|
r.setRecord(record);
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" + id + " using bytes and r=" + r);
|
||||||
|
}
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
|
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
|
||||||
r.setRecord(EncoderPersister.getInstance(), record);
|
r.setRecord(EncoderPersister.getInstance(), record);
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" + id + " using encoding=" + record + " and r=" + r);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendDeleteRecordTransactional(long txID, long id) throws Exception {
|
public void appendDeleteRecordTransactional(long txID, long id) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" + id);
|
||||||
|
}
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendCommitRecord(long txID, boolean sync) throws Exception {
|
public void appendCommitRecord(long txID, boolean sync) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendCommitRecord txID=" + txID + " sync=" + sync);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
|
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
r.setIoCompletion(callback);
|
r.setIoCompletion(callback);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendCommitRecord txID=" + txID + " callback=" + callback);
|
||||||
|
}
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -535,20 +695,35 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
boolean sync,
|
boolean sync,
|
||||||
IOCompletion callback,
|
IOCompletion callback,
|
||||||
boolean lineUpContext) throws Exception {
|
boolean lineUpContext) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
r.setStoreLineUp(lineUpContext);
|
r.setStoreLineUp(lineUpContext);
|
||||||
r.setIoCompletion(callback);
|
r.setIoCompletion(callback);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendCommitRecord txID=" + txID + " using callback, lineup=" + lineUpContext);
|
||||||
|
}
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception {
|
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
r.setTxData(transactionData);
|
r.setTxData(transactionData);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendPrepareRecord txID=" + txID + " using sync=" + sync);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -557,43 +732,74 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
EncodingSupport transactionData,
|
EncodingSupport transactionData,
|
||||||
boolean sync,
|
boolean sync,
|
||||||
IOCompletion callback) throws Exception {
|
IOCompletion callback) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
r.setTxData(transactionData);
|
r.setTxData(transactionData);
|
||||||
r.setTxData(transactionData);
|
r.setTxData(transactionData);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
r.setIoCompletion(callback);
|
r.setIoCompletion(callback);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendPrepareRecord txID=" + txID + " using callback, sync=" + sync);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception {
|
public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
r.setTxData(transactionData);
|
r.setTxData(transactionData);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendPrepareRecord txID=" + txID + " transactionData, sync=" + sync);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendRollbackRecord(long txID, boolean sync) throws Exception {
|
public void appendRollbackRecord(long txID, boolean sync) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendRollbackRecord txID=" + txID + " sync=" + sync);
|
||||||
|
}
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
|
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
|
||||||
|
checkStatus();
|
||||||
|
|
||||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
|
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
|
||||||
r.setTxId(txID);
|
r.setTxId(txID);
|
||||||
r.setSync(sync);
|
r.setSync(sync);
|
||||||
r.setIoCompletion(callback);
|
r.setIoCompletion(callback);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("appendRollbackRecord txID=" + txID + " sync=" + sync + " using callback");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
appendRecord(r);
|
appendRecord(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized JournalLoadInformation load(LoaderCallback reloadManager) throws Exception {
|
public synchronized JournalLoadInformation load(LoaderCallback reloadManager) {
|
||||||
JournalLoadInformation jli = new JournalLoadInformation();
|
JournalLoadInformation jli = new JournalLoadInformation();
|
||||||
JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager);
|
JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager);
|
||||||
JDBCJournalRecord r;
|
JDBCJournalRecord r;
|
||||||
|
@ -643,6 +849,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
|
jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
|
||||||
jli.setNumberOfRecords(noRecords);
|
jli.setNumberOfRecords(noRecords);
|
||||||
transactions = jrc.getTransactions();
|
transactions = jrc.getTransactions();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
handleException(null, e);
|
||||||
}
|
}
|
||||||
return jli;
|
return jli;
|
||||||
}
|
}
|
||||||
|
|
|
@ -306,4 +306,23 @@ class JDBCJournalRecord {
|
||||||
long getSeq() {
|
long getSeq() {
|
||||||
return seq;
|
return seq;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "JDBCJournalRecord{" +
|
||||||
|
"compactCount=" + compactCount +
|
||||||
|
", id=" + id +
|
||||||
|
", isTransactional=" + isTransactional +
|
||||||
|
", isUpdate=" + isUpdate +
|
||||||
|
", recordType=" + recordType +
|
||||||
|
", seq=" + seq +
|
||||||
|
", storeLineUp=" + storeLineUp +
|
||||||
|
", sync=" + sync +
|
||||||
|
", txCheckNoRecords=" + txCheckNoRecords +
|
||||||
|
", txDataSize=" + txDataSize +
|
||||||
|
", txId=" + txId +
|
||||||
|
", userRecordType=" + userRecordType +
|
||||||
|
", variableSize=" + variableSize +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,12 +122,10 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
if (sqlProviderFactory == null) {
|
if (sqlProviderFactory == null) {
|
||||||
sqlProviderFactory = new GenericSQLProvider.Factory();
|
sqlProviderFactory = new GenericSQLProvider.Factory();
|
||||||
}
|
}
|
||||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
|
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
|
||||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
|
|
||||||
} else {
|
} else {
|
||||||
String driverClassName = dbConf.getJdbcDriverClassName();
|
String driverClassName = dbConf.getJdbcDriverClassName();
|
||||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
|
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
|
||||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
|
|
||||||
}
|
}
|
||||||
pagingFactoryFileFactory.start();
|
pagingFactoryFileFactory.start();
|
||||||
started = true;
|
started = true;
|
||||||
|
|
|
@ -614,13 +614,17 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
|
doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("ServerSessionPacketHandler::response sent::" + response);
|
logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void done() {
|
public void done() {
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("ServerSessionPacketHandler::regular response sent::" + response);
|
||||||
|
}
|
||||||
|
|
||||||
doConfirmAndResponse(confirmPacket, response, flush, closeChannel);
|
doConfirmAndResponse(confirmPacket, response, flush, closeChannel);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -0,0 +1,84 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class ShutdownServerTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
private ActiveMQServer server;
|
||||||
|
|
||||||
|
private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
|
||||||
|
|
||||||
|
private ServerLocator locator;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
server = createServer(true, createDefaultJDBCConfig(false), AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
locator = createFactory(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testShutdownServer() throws Throwable {
|
||||||
|
ClientSessionFactory sf = createSessionFactory(locator);
|
||||||
|
|
||||||
|
ClientSession session = sf.createSession(false, true, true, false);
|
||||||
|
|
||||||
|
session.createQueue(QUEUE, QUEUE, null, true);
|
||||||
|
|
||||||
|
ClientConsumer consumer = session.createConsumer(QUEUE);
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(QUEUE);
|
||||||
|
ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 0, System.currentTimeMillis(), (byte) 4);
|
||||||
|
message.getBodyBuffer().writeString("hi");
|
||||||
|
message.putStringProperty("hello", "elo");
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
ActiveMQServerImpl impl = (ActiveMQServerImpl) server;
|
||||||
|
JournalStorageManager journal = (JournalStorageManager) impl.getStorageManager();
|
||||||
|
JDBCJournalImpl journalimpl = (JDBCJournalImpl) journal.getMessageJournal();
|
||||||
|
journalimpl.handleException(null, new Exception("failure"));
|
||||||
|
|
||||||
|
Wait.waitFor(() -> !server.isStarted());
|
||||||
|
|
||||||
|
Assert.assertFalse(server.isStarted());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -95,7 +95,7 @@ public class BasicXaTest extends ActiveMQTestBase {
|
||||||
configuration = createDefaultNettyConfig();
|
configuration = createDefaultNettyConfig();
|
||||||
}
|
}
|
||||||
|
|
||||||
messagingService = createServer(false, configuration, -1, -1, addressSettings);
|
messagingService = createServer(true, configuration, -1, -1, addressSettings);
|
||||||
|
|
||||||
// start the server
|
// start the server
|
||||||
messagingService.start();
|
messagingService.start();
|
||||||
|
|
Loading…
Reference in New Issue