This closes #1212
This commit is contained in:
commit
ec085b8ea0
|
@ -93,18 +93,23 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
return fileFactory.listFiles(extension).contains(filename);
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
fileFactory.onIOError(e, "Error checking JDBC file exists.", this);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void open() throws Exception {
|
||||
if (!isOpen) {
|
||||
synchronized (writeLock) {
|
||||
dbDriver.openFile(this);
|
||||
isCreated = true;
|
||||
isOpen = true;
|
||||
try {
|
||||
if (!isOpen) {
|
||||
synchronized (writeLock) {
|
||||
dbDriver.openFile(this);
|
||||
isCreated = true;
|
||||
isOpen = true;
|
||||
}
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
fileFactory.onIOError(e, "Error attempting to open JDBC file.", this);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -142,34 +147,35 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
}
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new ActiveMQException(ActiveMQExceptionType.IO_ERROR, e.getMessage(), e);
|
||||
fileFactory.onIOError(e, "Error deleting JDBC file.", this);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized int internalWrite(byte[] data, IOCallback callback) {
|
||||
private synchronized int internalWrite(byte[] data, IOCallback callback) throws Exception {
|
||||
try {
|
||||
synchronized (writeLock) {
|
||||
int noBytes = dbDriver.writeToFile(this, data);
|
||||
seek(noBytes);
|
||||
System.out.println("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size());
|
||||
if (callback != null)
|
||||
callback.done();
|
||||
return noBytes;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("Failed to write to file", e.getMessage(), e);
|
||||
if (callback != null)
|
||||
callback.onError(-1, e.getMessage());
|
||||
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
|
||||
fileFactory.onIOError(e, "Error writing to JDBC file.", this);
|
||||
}
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) {
|
||||
public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) throws Exception {
|
||||
byte[] data = new byte[buffer.readableBytes()];
|
||||
buffer.readBytes(data);
|
||||
return internalWrite(data, callback);
|
||||
}
|
||||
|
||||
private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) {
|
||||
private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) throws Exception {
|
||||
return internalWrite(buffer.array(), callback);
|
||||
}
|
||||
|
||||
|
@ -177,16 +183,27 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
internalWrite(bytes, callback);
|
||||
try {
|
||||
internalWrite(bytes, callback);
|
||||
} catch (Exception e) {
|
||||
logger.error(e);
|
||||
// internalWrite will notify the CriticalIOErrorListener
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
|
||||
final SequentialFile file = this;
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
internalWrite(bytes, callback);
|
||||
try {
|
||||
internalWrite(bytes, callback);
|
||||
} catch (Exception e) {
|
||||
logger.error(e);
|
||||
fileFactory.onIOError(e, "Error on JDBC file sync", file);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -226,7 +243,8 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
scheduleWrite(bytes, waitIOCallback);
|
||||
waitIOCallback.waitCompletion();
|
||||
} catch (Exception e) {
|
||||
waitIOCallback.onError(-1, e.getMessage());
|
||||
waitIOCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error writing to JDBC file.");
|
||||
fileFactory.onIOError(e, "Failed to write to file.", this);
|
||||
}
|
||||
} else {
|
||||
scheduleWrite(bytes, callback);
|
||||
|
@ -249,12 +267,12 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
if (callback != null)
|
||||
callback.done();
|
||||
return read;
|
||||
} catch (Exception e) {
|
||||
} catch (SQLException e) {
|
||||
if (callback != null)
|
||||
callback.onError(-1, e.getMessage());
|
||||
logger.warn("Failed to read from file", e.getMessage(), e);
|
||||
return 0;
|
||||
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
|
||||
fileFactory.onIOError(e, "Error reading from JDBC file.", this);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -291,7 +309,8 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
try {
|
||||
callback.waitCompletion();
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error during JDBC file sync.");
|
||||
fileFactory.onIOError(e, "Error during JDBC file sync.", this);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -303,7 +322,11 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
@Override
|
||||
public void renameTo(String newFileName) throws Exception {
|
||||
synchronized (writeLock) {
|
||||
dbDriver.renameFile(this, newFileName);
|
||||
try {
|
||||
dbDriver.renameFile(this, newFileName);
|
||||
} catch (SQLException e) {
|
||||
fileFactory.onIOError(e, "Error renaming JDBC file.", this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -313,18 +336,21 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock);
|
||||
return clone;
|
||||
} catch (Exception e) {
|
||||
logger.error("Error cloning file: " + filename, e);
|
||||
return null;
|
||||
fileFactory.onIOError(e, "Error cloning JDBC file.", this);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void copyTo(SequentialFile cloneFile) throws Exception {
|
||||
JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
|
||||
clone.open();
|
||||
|
||||
synchronized (writeLock) {
|
||||
dbDriver.copyFileData(this, clone);
|
||||
try {
|
||||
synchronized (writeLock) {
|
||||
clone.open();
|
||||
dbDriver.copyFileData(this, clone);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
fileFactory.onIOError(e, "Error copying JDBC file.", this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,15 +27,19 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(JDBCSequentialFile.class);
|
||||
|
||||
private boolean started;
|
||||
|
||||
private final List<JDBCSequentialFile> files = new ArrayList<>();
|
||||
|
@ -44,28 +48,53 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
|
||||
private final Map<String, Object> fileLocks = new HashMap<>();
|
||||
|
||||
private final JDBCSequentialFileFactoryDriver dbDriver;
|
||||
private JDBCSequentialFileFactoryDriver dbDriver;
|
||||
|
||||
private final IOCriticalErrorListener criticalErrorListener;
|
||||
|
||||
public JDBCSequentialFileFactory(final DataSource dataSource,
|
||||
final SQLProvider sqlProvider,
|
||||
Executor executor) throws Exception {
|
||||
Executor executor,
|
||||
IOCriticalErrorListener criticalErrorListener) throws Exception {
|
||||
|
||||
this.executor = executor;
|
||||
dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider);
|
||||
this.criticalErrorListener = criticalErrorListener;
|
||||
|
||||
try {
|
||||
this.dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider);
|
||||
} catch (SQLException e) {
|
||||
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public JDBCSequentialFileFactory(final String connectionUrl,
|
||||
final String className,
|
||||
final SQLProvider sqlProvider,
|
||||
Executor executor) throws Exception {
|
||||
Executor executor,
|
||||
IOCriticalErrorListener criticalErrorListener) throws Exception {
|
||||
this.executor = executor;
|
||||
dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
|
||||
this.criticalErrorListener = criticalErrorListener;
|
||||
try {
|
||||
this.dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
|
||||
} catch (SQLException e) {
|
||||
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public JDBCSequentialFileFactory(final Connection connection,
|
||||
final SQLProvider sqlProvider,
|
||||
final Executor executor) throws Exception {
|
||||
final Executor executor,
|
||||
final IOCriticalErrorListener criticalErrorListener) throws Exception {
|
||||
this.executor = executor;
|
||||
this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
|
||||
this.criticalErrorListener = criticalErrorListener;
|
||||
|
||||
try {
|
||||
this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
|
||||
} catch (SQLException e) {
|
||||
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
|
||||
}
|
||||
}
|
||||
|
||||
public JDBCSequentialFileFactoryDriver getDbDriver() {
|
||||
|
@ -74,8 +103,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
|
||||
@Override
|
||||
public SequentialFileFactory setDatasync(boolean enabled) {
|
||||
|
||||
// noop
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -92,7 +119,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
started = true;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database", e);
|
||||
criticalErrorListener.onIOException(e, "Unable to start database driver", null);
|
||||
started = false;
|
||||
}
|
||||
}
|
||||
|
@ -115,7 +142,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
files.add(file);
|
||||
return file;
|
||||
} catch (Exception e) {
|
||||
ActiveMQJournalLogger.LOGGER.error("Could not create file", e);
|
||||
criticalErrorListener.onIOException(e, "Error whilst creating JDBC file", null);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -127,7 +154,12 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
|
||||
@Override
|
||||
public List<String> listFiles(String extension) throws Exception {
|
||||
return dbDriver.listFiles(extension);
|
||||
try {
|
||||
return dbDriver.listFiles(extension);
|
||||
} catch (SQLException e) {
|
||||
criticalErrorListener.onIOException(e, "Error listing JDBC files.", null);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -137,6 +169,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
|
||||
@Override
|
||||
public void onIOError(Exception exception, String message, SequentialFile file) {
|
||||
criticalErrorListener.onIOException(exception, message, file);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -215,9 +248,20 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
|
||||
@Override
|
||||
public void flush() {
|
||||
for (SequentialFile file : files) {
|
||||
try {
|
||||
file.sync();
|
||||
} catch (Exception e) {
|
||||
criticalErrorListener.onIOException(e, "Error during JDBC file sync.", file);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void destroy() throws SQLException {
|
||||
dbDriver.destroy();
|
||||
try {
|
||||
dbDriver.destroy();
|
||||
} catch (SQLException e) {
|
||||
logger.error("Error destroying file factory", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,10 +29,13 @@ import java.util.List;
|
|||
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
@SuppressWarnings("SynchronizeOnNonFinalField")
|
||||
public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(JDBCSequentialFileFactoryDriver.class);
|
||||
|
||||
protected PreparedStatement deleteFile;
|
||||
|
||||
protected PreparedStatement createFile;
|
||||
|
@ -157,6 +160,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
Blob blob = rs.getBlob(1);
|
||||
if (blob != null) {
|
||||
file.setWritePosition((int) blob.length());
|
||||
} else {
|
||||
logger.warn("ERROR NO BLOB FOR FILE" + "File: " + file.getFileName() + " " + file.getId());
|
||||
}
|
||||
}
|
||||
connection.commit();
|
||||
|
@ -293,8 +298,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
connection.commit();
|
||||
return readLength;
|
||||
} catch (Throwable e) {
|
||||
connection.rollback();
|
||||
throw e;
|
||||
} finally {
|
||||
connection.rollback();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,8 +28,11 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.journal.IOCompletion;
|
||||
|
@ -44,7 +47,6 @@ import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||
|
@ -70,6 +72,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
|
||||
private boolean started;
|
||||
|
||||
private AtomicBoolean failed = new AtomicBoolean(false);
|
||||
|
||||
private JDBCJournalSync syncTimer;
|
||||
|
||||
private final Executor completeExecutor;
|
||||
|
@ -82,26 +86,32 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
// Sequence ID for journal records
|
||||
private final AtomicLong seq = new AtomicLong(0);
|
||||
|
||||
private final IOCriticalErrorListener criticalIOErrorListener;
|
||||
|
||||
public JDBCJournalImpl(DataSource dataSource,
|
||||
SQLProvider provider,
|
||||
String tableName,
|
||||
ScheduledExecutorService scheduledExecutorService,
|
||||
Executor completeExecutor) {
|
||||
Executor completeExecutor,
|
||||
IOCriticalErrorListener criticalIOErrorListener) {
|
||||
super(dataSource, provider);
|
||||
records = new ArrayList<>();
|
||||
this.scheduledExecutorService = scheduledExecutorService;
|
||||
this.completeExecutor = completeExecutor;
|
||||
this.criticalIOErrorListener = criticalIOErrorListener;
|
||||
}
|
||||
|
||||
public JDBCJournalImpl(String jdbcUrl,
|
||||
String jdbcDriverClass,
|
||||
SQLProvider sqlProvider,
|
||||
ScheduledExecutorService scheduledExecutorService,
|
||||
Executor completeExecutor) {
|
||||
Executor completeExecutor,
|
||||
IOCriticalErrorListener criticalIOErrorListener) {
|
||||
super(sqlProvider, jdbcUrl, jdbcDriverClass);
|
||||
records = new ArrayList<>();
|
||||
this.scheduledExecutorService = scheduledExecutorService;
|
||||
this.completeExecutor = completeExecutor;
|
||||
this.criticalIOErrorListener = criticalIOErrorListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -131,9 +141,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() throws SQLException {
|
||||
public void stop() throws SQLException {
|
||||
stop(true);
|
||||
}
|
||||
|
||||
public synchronized void stop(boolean sync) throws SQLException {
|
||||
if (started) {
|
||||
sync();
|
||||
if (sync)
|
||||
sync();
|
||||
started = false;
|
||||
super.stop();
|
||||
}
|
||||
|
@ -146,8 +161,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
}
|
||||
|
||||
public synchronized int sync() {
|
||||
if (!started)
|
||||
return 0;
|
||||
|
||||
List<JDBCJournalRecord> recordRef;
|
||||
synchronized (records) {
|
||||
|
@ -158,15 +171,29 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
records.clear();
|
||||
}
|
||||
|
||||
if (!started || failed.get()) {
|
||||
executeCallbacks(recordRef, false);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
// We keep a list of deleted records and committed tx (used for cleaning up old transaction data).
|
||||
List<Long> deletedRecords = new ArrayList<>();
|
||||
List<Long> committedTransactions = new ArrayList<>();
|
||||
|
||||
TransactionHolder holder;
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
|
||||
for (JDBCJournalRecord record : recordRef) {
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("sync::preparing JDBC statment for " + record);
|
||||
}
|
||||
|
||||
|
||||
|
||||
switch (record.getRecordType()) {
|
||||
case JDBCJournalRecord.DELETE_RECORD:
|
||||
// Standard SQL Delete Record, Non transactional delete
|
||||
|
@ -195,36 +222,52 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
break;
|
||||
}
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
executeCallbacks(recordRef, success);
|
||||
return 0;
|
||||
}
|
||||
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
|
||||
insertJournalRecords.executeBatch();
|
||||
deleteJournalRecords.executeBatch();
|
||||
deleteJournalTxRecords.executeBatch();
|
||||
|
||||
connection.commit();
|
||||
success = true;
|
||||
} catch (SQLException e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
performRollback(recordRef);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("JDBC commit worked");
|
||||
}
|
||||
|
||||
cleanupTxRecords(deletedRecords, committedTransactions);
|
||||
executeCallbacks(recordRef, true);
|
||||
|
||||
return recordRef.size();
|
||||
|
||||
} catch (Exception e) {
|
||||
handleException(recordRef, e);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/** public for tests only, not through API */
|
||||
public void handleException(List<JDBCJournalRecord> recordRef, Throwable e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
failed.set(true);
|
||||
criticalIOErrorListener.onIOException(e, "Critical IO Error. Failed to process JDBC Record statements", null);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Rolling back Transaction, just in case");
|
||||
}
|
||||
|
||||
try {
|
||||
if (success)
|
||||
cleanupTxRecords(deletedRecords, committedTransactions);
|
||||
} catch (SQLException e) {
|
||||
logger.warn("Failed to remove the Tx Records", e.getMessage(), e);
|
||||
} finally {
|
||||
executeCallbacks(recordRef, success);
|
||||
connection.rollback();
|
||||
} catch (Throwable rollback) {
|
||||
logger.warn(rollback);
|
||||
}
|
||||
|
||||
return recordRef.size();
|
||||
try {
|
||||
connection.close();
|
||||
} catch (Throwable rollback) {
|
||||
logger.warn(rollback);
|
||||
}
|
||||
|
||||
if (recordRef != null) {
|
||||
executeCallbacks(recordRef, false);
|
||||
}
|
||||
}
|
||||
|
||||
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
|
||||
|
@ -258,46 +301,55 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
}
|
||||
}
|
||||
|
||||
private void performRollback(List<JDBCJournalRecord> records) {
|
||||
try {
|
||||
connection.rollback();
|
||||
|
||||
for (JDBCJournalRecord record : records) {
|
||||
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
|
||||
removeTxRecord(record);
|
||||
}
|
||||
}
|
||||
|
||||
List<TransactionHolder> txHolders = new ArrayList<>();
|
||||
txHolders.addAll(transactions.values());
|
||||
|
||||
// On rollback we must update the tx map to remove all the tx entries
|
||||
for (TransactionHolder txH : txHolders) {
|
||||
if (!txH.prepared && txH.recordInfos.isEmpty() && txH.recordsToDelete.isEmpty()) {
|
||||
transactions.remove(txH.transactionID);
|
||||
}
|
||||
}
|
||||
} catch (Exception sqlE) {
|
||||
logger.warn(sqlE.getMessage(), sqlE);
|
||||
ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO Use an executor.
|
||||
private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean result) {
|
||||
private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean success) {
|
||||
Runnable r = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (JDBCJournalRecord record : records) {
|
||||
record.complete(result);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Calling callback " + record + " with success = " + success);
|
||||
}
|
||||
record.complete(success);
|
||||
}
|
||||
}
|
||||
};
|
||||
completeExecutor.execute(r);
|
||||
}
|
||||
|
||||
|
||||
private void checkStatus() {
|
||||
checkStatus(null);
|
||||
}
|
||||
|
||||
private void checkStatus(IOCompletion callback) {
|
||||
if (!started) {
|
||||
if (callback != null) callback.onError(-1, "JDBC Journal is not loaded");
|
||||
throw new IllegalStateException("JDBCJournal is not loaded");
|
||||
}
|
||||
|
||||
if (failed.get()) {
|
||||
if (callback != null) callback.onError(-1, "JDBC Journal failed");
|
||||
throw new IllegalStateException("JDBCJournal Failed");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void appendRecord(JDBCJournalRecord record) throws Exception {
|
||||
|
||||
// extra measure I know, as all the callers are also checking for this..
|
||||
// better to be safe ;)
|
||||
checkStatus();
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendRecord " + record);
|
||||
}
|
||||
|
||||
record.storeLineUp();
|
||||
if (!started) {
|
||||
if (record.getIoCompletion() != null) {
|
||||
record.getIoCompletion().onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC Journal not started");
|
||||
}
|
||||
}
|
||||
|
||||
SimpleWaitIOCallback callback = null;
|
||||
if (record.isSync() && record.getIoCompletion() == null) {
|
||||
|
@ -316,13 +368,17 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
}
|
||||
|
||||
syncTimer.delay();
|
||||
|
||||
if (callback != null) {
|
||||
callback.waitCompletion();
|
||||
}
|
||||
if (callback != null) callback.waitCompletion();
|
||||
}
|
||||
|
||||
private synchronized void addTxRecord(JDBCJournalRecord record) {
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("addTxRecord " + record + ", started=" + started + ", failed=" + failed);
|
||||
}
|
||||
|
||||
checkStatus();
|
||||
|
||||
TransactionHolder txHolder = transactions.get(record.getTxId());
|
||||
if (txHolder == null) {
|
||||
txHolder = new TransactionHolder(record.getTxId());
|
||||
|
@ -342,37 +398,39 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
}
|
||||
}
|
||||
|
||||
private synchronized void removeTxRecord(JDBCJournalRecord record) {
|
||||
TransactionHolder txHolder = transactions.get(record.getTxId());
|
||||
|
||||
// We actually only need the record ID in this instance.
|
||||
if (record.isTransactional()) {
|
||||
RecordInfo info = new RecordInfo(record.getTxId(), record.getRecordType(), new byte[0], record.isUpdate(), record.getCompactCount());
|
||||
if (record.getRecordType() == JDBCJournalRecord.DELETE_RECORD_TX) {
|
||||
txHolder.recordsToDelete.remove(info);
|
||||
} else {
|
||||
txHolder.recordInfos.remove(info);
|
||||
}
|
||||
} else {
|
||||
txHolder.prepared = false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
|
||||
r.setUserRecordType(recordType);
|
||||
r.setRecord(record);
|
||||
r.setSync(sync);
|
||||
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendAddRecord bytes[] " + r);
|
||||
}
|
||||
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
|
||||
r.setUserRecordType(recordType);
|
||||
r.setRecord(record);
|
||||
r.setSync(sync);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendAddRecord (encoding) " + r + " with record = " + record);
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
|
@ -382,29 +440,53 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
EncodingSupport record,
|
||||
boolean sync,
|
||||
IOCompletion completionCallback) throws Exception {
|
||||
checkStatus(completionCallback);
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
|
||||
r.setUserRecordType(recordType);
|
||||
r.setRecord(record);
|
||||
r.setSync(sync);
|
||||
r.setIoCompletion(completionCallback);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendAddRecord (completionCallback & encoding) " + r + " with record = " + record);
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
|
||||
r.setUserRecordType(recordType);
|
||||
r.setRecord(record);
|
||||
r.setSync(sync);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendUpdateRecord (bytes)) " + r);
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
|
||||
r.setUserRecordType(recordType);
|
||||
r.setRecord(record);
|
||||
r.setSync(sync);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendUpdateRecord (encoding)) " + r + " with record " + record);
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
|
@ -414,36 +496,67 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
EncodingSupport record,
|
||||
boolean sync,
|
||||
IOCompletion completionCallback) throws Exception {
|
||||
checkStatus(completionCallback);
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
|
||||
r.setUserRecordType(recordType);
|
||||
r.setRecord(record);
|
||||
r.setSync(sync);
|
||||
r.setIoCompletion(completionCallback);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendUpdateRecord (encoding & completioncallback)) " + r + " with record " + record);
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendDeleteRecord(long id, boolean sync) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
|
||||
r.setSync(sync);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendDeleteRecord id=" + id + " sync=" + sync);
|
||||
}
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
|
||||
checkStatus(completionCallback);
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
|
||||
r.setSync(sync);
|
||||
r.setIoCompletion(completionCallback);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendDeleteRecord id=" + id + " sync=" + sync + " with completionCallback");
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
|
||||
r.setUserRecordType(recordType);
|
||||
r.setRecord(record);
|
||||
r.setTxId(txID);
|
||||
appendRecord(r);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendAddRecordTransactional txID=" + txID + " id=" + id + " using bytes[] r=" + r);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -451,19 +564,35 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
long id,
|
||||
byte recordType,
|
||||
EncodingSupport record) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
|
||||
r.setUserRecordType(recordType);
|
||||
r.setRecord(record);
|
||||
r.setTxId(txID);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendAddRecordTransactional txID=" + txID + " id=" + id + " using encoding=" + record + " and r=" + r);
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet());
|
||||
r.setUserRecordType(recordType);
|
||||
r.setRecord(record);
|
||||
r.setTxId(txID);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendUpdateRecordTransactional txID=" + txID + " id=" + id + " using bytes and r=" + r);
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
|
@ -472,50 +601,94 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
long id,
|
||||
byte recordType,
|
||||
EncodingSupport record) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet());
|
||||
r.setUserRecordType(recordType);
|
||||
r.setRecord(record);
|
||||
r.setTxId(txID);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendUpdateRecordTransactional txID=" + txID + " id=" + id + " using encoding=" + record + " and r=" + r);
|
||||
}
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
|
||||
r.setRecord(record);
|
||||
r.setTxId(txID);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" + id + " using bytes and r=" + r);
|
||||
}
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
|
||||
r.setRecord(record);
|
||||
r.setTxId(txID);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" + id + " using encoding=" + record + " and r=" + r);
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendDeleteRecordTransactional(long txID, long id) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet());
|
||||
r.setTxId(txID);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" + id);
|
||||
}
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendCommitRecord(long txID, boolean sync) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
|
||||
r.setTxId(txID);
|
||||
r.setSync(sync);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendCommitRecord txID=" + txID + " sync=" + sync);
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
|
||||
r.setTxId(txID);
|
||||
r.setSync(sync);
|
||||
r.setIoCompletion(callback);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendCommitRecord txID=" + txID + " callback=" + callback);
|
||||
}
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
|
@ -524,20 +697,35 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
boolean sync,
|
||||
IOCompletion callback,
|
||||
boolean lineUpContext) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
|
||||
r.setTxId(txID);
|
||||
r.setStoreLineUp(lineUpContext);
|
||||
r.setIoCompletion(callback);
|
||||
r.setSync(sync);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendCommitRecord txID=" + txID + " using callback, lineup=" + lineUpContext);
|
||||
}
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
|
||||
r.setTxId(txID);
|
||||
r.setTxData(transactionData);
|
||||
r.setSync(sync);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendPrepareRecord txID=" + txID + " using sync=" + sync);
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
|
@ -546,43 +734,74 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
EncodingSupport transactionData,
|
||||
boolean sync,
|
||||
IOCompletion callback) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
|
||||
r.setTxId(txID);
|
||||
r.setTxData(transactionData);
|
||||
r.setTxData(transactionData);
|
||||
r.setSync(sync);
|
||||
r.setIoCompletion(callback);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendPrepareRecord txID=" + txID + " using callback, sync=" + sync);
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
|
||||
r.setTxId(txID);
|
||||
r.setTxData(transactionData);
|
||||
r.setSync(sync);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendPrepareRecord txID=" + txID + " transactionData, sync=" + sync);
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendRollbackRecord(long txID, boolean sync) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
|
||||
r.setTxId(txID);
|
||||
r.setSync(sync);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendRollbackRecord txID=" + txID + " sync=" + sync);
|
||||
}
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
|
||||
checkStatus();
|
||||
|
||||
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
|
||||
r.setTxId(txID);
|
||||
r.setSync(sync);
|
||||
r.setIoCompletion(callback);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("appendRollbackRecord txID=" + txID + " sync=" + sync + " using callback");
|
||||
}
|
||||
|
||||
|
||||
appendRecord(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized JournalLoadInformation load(LoaderCallback reloadManager) throws Exception {
|
||||
public synchronized JournalLoadInformation load(LoaderCallback reloadManager) {
|
||||
JournalLoadInformation jli = new JournalLoadInformation();
|
||||
JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager);
|
||||
JDBCJournalRecord r;
|
||||
|
@ -632,6 +851,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
|
||||
jli.setNumberOfRecords(noRecords);
|
||||
transactions = jrc.getTransactions();
|
||||
} catch (Throwable e) {
|
||||
handleException(null, e);
|
||||
}
|
||||
return jli;
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.sql.SQLException;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.journal.IOCompletion;
|
||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
|
@ -115,7 +116,7 @@ class JDBCJournalRecord {
|
|||
if (success) {
|
||||
ioCompletion.done();
|
||||
} else {
|
||||
ioCompletion.onError(1, "DATABASE TRANSACTION FAILED");
|
||||
ioCompletion.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC Transaction failed.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -126,7 +127,7 @@ class JDBCJournalRecord {
|
|||
}
|
||||
}
|
||||
|
||||
void writeRecord(PreparedStatement statement) throws SQLException {
|
||||
void writeRecord(PreparedStatement statement) throws Exception {
|
||||
|
||||
byte[] recordBytes = new byte[variableSize];
|
||||
byte[] txDataBytes = new byte[txDataSize];
|
||||
|
@ -136,6 +137,7 @@ class JDBCJournalRecord {
|
|||
txData.read(txDataBytes);
|
||||
} catch (IOException e) {
|
||||
ActiveMQJournalLogger.LOGGER.error("Error occurred whilst reading Journal Record", e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
statement.setLong(1, id);
|
||||
|
@ -303,4 +305,23 @@ class JDBCJournalRecord {
|
|||
long getSeq() {
|
||||
return seq;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "JDBCJournalRecord{" +
|
||||
"compactCount=" + compactCount +
|
||||
", id=" + id +
|
||||
", isTransactional=" + isTransactional +
|
||||
", isUpdate=" + isUpdate +
|
||||
", recordType=" + recordType +
|
||||
", seq=" + seq +
|
||||
", storeLineUp=" + storeLineUp +
|
||||
", sync=" + sync +
|
||||
", txCheckNoRecords=" + txCheckNoRecords +
|
||||
", txDataSize=" + txDataSize +
|
||||
", txId=" + txId +
|
||||
", userRecordType=" + userRecordType +
|
||||
", variableSize=" + variableSize +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
|
||||
|
@ -64,7 +65,11 @@ public class JDBCSequentialFileFactoryTest {
|
|||
|
||||
String connectionUrl = "jdbc:derby:target/data;create=true";
|
||||
String tableName = "FILES";
|
||||
factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor);
|
||||
factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() {
|
||||
@Override
|
||||
public void onIOException(Throwable code, String message, SequentialFile file) {
|
||||
}
|
||||
});
|
||||
factory.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -81,6 +81,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
|||
|
||||
private boolean started = false;
|
||||
|
||||
private final IOCriticalErrorListener criticalErrorListener;
|
||||
|
||||
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
|
||||
final StorageManager storageManager,
|
||||
final long syncTimeout,
|
||||
|
@ -94,6 +96,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
|||
this.scheduledExecutor = scheduledExecutor;
|
||||
this.syncTimeout = syncTimeout;
|
||||
this.dbConf = dbConf;
|
||||
this.criticalErrorListener = critialErrorListener;
|
||||
start();
|
||||
}
|
||||
|
||||
|
@ -109,10 +112,10 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
|||
if (sqlProviderFactory == null) {
|
||||
sqlProviderFactory = new GenericSQLProvider.Factory();
|
||||
}
|
||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
|
||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
|
||||
} else {
|
||||
String driverClassName = dbConf.getJdbcDriverClassName();
|
||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
|
||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
|
||||
}
|
||||
pagingFactoryFileFactory.start();
|
||||
started = true;
|
||||
|
@ -222,7 +225,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
|||
sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
|
||||
}
|
||||
|
||||
return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor());
|
||||
return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener);
|
||||
}
|
||||
|
||||
private String getTableNameForGUID(String guid) {
|
||||
|
|
|
@ -62,14 +62,14 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
|
|||
if (sqlProviderFactory == null) {
|
||||
sqlProviderFactory = new GenericSQLProvider.Factory();
|
||||
}
|
||||
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor());
|
||||
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor());
|
||||
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor);
|
||||
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
||||
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
||||
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener);
|
||||
} else {
|
||||
String driverClassName = dbConf.getJdbcDriverClassName();
|
||||
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor());
|
||||
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor());
|
||||
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor);
|
||||
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
||||
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
||||
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener);
|
||||
}
|
||||
largeMessagesFactory.start();
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -541,13 +541,17 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("ServerSessionPacketHandler::response sent::" + response);
|
||||
logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionMessage);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void done() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("ServerSessionPacketHandler::regular response sent::" + response);
|
||||
}
|
||||
|
||||
doConfirmAndResponse(confirmPacket, response, flush, closeChannel);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -26,6 +26,8 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.journal.IOCompletion;
|
||||
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
|
@ -77,7 +79,12 @@ public class JDBCJournalTest extends ActiveMQTestBase {
|
|||
executorService = Executors.newSingleThreadExecutor();
|
||||
jdbcUrl = "jdbc:derby:target/data;create=true";
|
||||
SQLProvider.Factory factory = new DerbySQLProvider.Factory();
|
||||
journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService);
|
||||
journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService, new IOCriticalErrorListener() {
|
||||
@Override
|
||||
public void onIOException(Throwable code, String message, SequentialFile file) {
|
||||
|
||||
}
|
||||
});
|
||||
journal.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ShutdownServerTest extends ActiveMQTestBase {
|
||||
|
||||
private ActiveMQServer server;
|
||||
|
||||
private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
|
||||
|
||||
private ServerLocator locator;
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
server = createServer(true, createDefaultJDBCConfig(false), AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
|
||||
server.start();
|
||||
|
||||
locator = createFactory(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShutdownServer() throws Throwable {
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession session = sf.createSession(false, true, true, false);
|
||||
|
||||
session.createQueue(QUEUE, QUEUE, null, true);
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(QUEUE);
|
||||
|
||||
ClientProducer producer = session.createProducer(QUEUE);
|
||||
ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 0, System.currentTimeMillis(), (byte) 4);
|
||||
message.getBodyBuffer().writeString("hi");
|
||||
message.putStringProperty("hello", "elo");
|
||||
producer.send(message);
|
||||
|
||||
ActiveMQServerImpl impl = (ActiveMQServerImpl) server;
|
||||
JournalStorageManager journal = (JournalStorageManager) impl.getStorageManager();
|
||||
JDBCJournalImpl journalimpl = (JDBCJournalImpl) journal.getMessageJournal();
|
||||
journalimpl.handleException(null, new Exception("failure"));
|
||||
|
||||
Wait.waitFor(() -> !server.isStarted());
|
||||
|
||||
Assert.assertFalse(server.isStarted());
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -94,7 +94,7 @@ public class BasicXaTest extends ActiveMQTestBase {
|
|||
configuration = createDefaultNettyConfig();
|
||||
}
|
||||
|
||||
messagingService = createServer(false, configuration, -1, -1, addressSettings);
|
||||
messagingService = createServer(true, configuration, -1, -1, addressSettings);
|
||||
|
||||
// start the server
|
||||
messagingService.start();
|
||||
|
|
Loading…
Reference in New Issue