ARTEMIS-1115 Call CriticalIOListener on JDBC Error

This commit is contained in:
Martyn Taylor 2017-04-17 10:40:26 +01:00 committed by Clebert Suconic
parent fc4d5edefa
commit 7b68b0a49a
9 changed files with 187 additions and 87 deletions

View File

@ -93,18 +93,23 @@ public class JDBCSequentialFile implements SequentialFile {
return fileFactory.listFiles(extension).contains(filename); return fileFactory.listFiles(extension).contains(filename);
} catch (Exception e) { } catch (Exception e) {
logger.warn(e.getMessage(), e); logger.warn(e.getMessage(), e);
fileFactory.onIOError(e, "Error checking JDBC file exists.", this);
return false; return false;
} }
} }
@Override @Override
public synchronized void open() throws Exception { public synchronized void open() throws Exception {
if (!isOpen) { try {
synchronized (writeLock) { if (!isOpen) {
dbDriver.openFile(this); synchronized (writeLock) {
isCreated = true; dbDriver.openFile(this);
isOpen = true; isCreated = true;
isOpen = true;
}
} }
} catch (SQLException e) {
fileFactory.onIOError(e, "Error attempting to open JDBC file.", this);
} }
} }
@ -142,34 +147,35 @@ public class JDBCSequentialFile implements SequentialFile {
} }
} }
} catch (SQLException e) { } catch (SQLException e) {
throw new ActiveMQException(ActiveMQExceptionType.IO_ERROR, e.getMessage(), e); fileFactory.onIOError(e, "Error deleting JDBC file.", this);
} }
} }
private synchronized int internalWrite(byte[] data, IOCallback callback) { private synchronized int internalWrite(byte[] data, IOCallback callback) throws Exception {
try { try {
synchronized (writeLock) { synchronized (writeLock) {
int noBytes = dbDriver.writeToFile(this, data); int noBytes = dbDriver.writeToFile(this, data);
seek(noBytes); seek(noBytes);
System.out.println("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size());
if (callback != null) if (callback != null)
callback.done(); callback.done();
return noBytes; return noBytes;
} }
} catch (Exception e) { } catch (Exception e) {
logger.warn("Failed to write to file", e.getMessage(), e);
if (callback != null) if (callback != null)
callback.onError(-1, e.getMessage()); callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
fileFactory.onIOError(e, "Error writing to JDBC file.", this);
} }
return -1; return 0;
} }
public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) { public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) throws Exception {
byte[] data = new byte[buffer.readableBytes()]; byte[] data = new byte[buffer.readableBytes()];
buffer.readBytes(data); buffer.readBytes(data);
return internalWrite(data, callback); return internalWrite(data, callback);
} }
private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) { private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) throws Exception {
return internalWrite(buffer.array(), callback); return internalWrite(buffer.array(), callback);
} }
@ -177,16 +183,27 @@ public class JDBCSequentialFile implements SequentialFile {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
internalWrite(bytes, callback); try {
internalWrite(bytes, callback);
} catch (Exception e) {
logger.error(e);
// internalWrite will notify the CriticalIOErrorListener
}
} }
}); });
} }
private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) { private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
final SequentialFile file = this;
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
internalWrite(bytes, callback); try {
internalWrite(bytes, callback);
} catch (Exception e) {
logger.error(e);
fileFactory.onIOError(e, "Error on JDBC file sync", file);
}
} }
}); });
} }
@ -226,7 +243,8 @@ public class JDBCSequentialFile implements SequentialFile {
scheduleWrite(bytes, waitIOCallback); scheduleWrite(bytes, waitIOCallback);
waitIOCallback.waitCompletion(); waitIOCallback.waitCompletion();
} catch (Exception e) { } catch (Exception e) {
waitIOCallback.onError(-1, e.getMessage()); waitIOCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error writing to JDBC file.");
fileFactory.onIOError(e, "Failed to write to file.", this);
} }
} else { } else {
scheduleWrite(bytes, callback); scheduleWrite(bytes, callback);
@ -249,12 +267,12 @@ public class JDBCSequentialFile implements SequentialFile {
if (callback != null) if (callback != null)
callback.done(); callback.done();
return read; return read;
} catch (Exception e) { } catch (SQLException e) {
if (callback != null) if (callback != null)
callback.onError(-1, e.getMessage()); callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
logger.warn("Failed to read from file", e.getMessage(), e); fileFactory.onIOError(e, "Error reading from JDBC file.", this);
return 0;
} }
return 0;
} }
} }
@ -291,7 +309,8 @@ public class JDBCSequentialFile implements SequentialFile {
try { try {
callback.waitCompletion(); callback.waitCompletion();
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error during JDBC file sync.");
fileFactory.onIOError(e, "Error during JDBC file sync.", this);
} }
} }
@ -303,7 +322,11 @@ public class JDBCSequentialFile implements SequentialFile {
@Override @Override
public void renameTo(String newFileName) throws Exception { public void renameTo(String newFileName) throws Exception {
synchronized (writeLock) { synchronized (writeLock) {
dbDriver.renameFile(this, newFileName); try {
dbDriver.renameFile(this, newFileName);
} catch (SQLException e) {
fileFactory.onIOError(e, "Error renaming JDBC file.", this);
}
} }
} }
@ -313,18 +336,21 @@ public class JDBCSequentialFile implements SequentialFile {
JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock); JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock);
return clone; return clone;
} catch (Exception e) { } catch (Exception e) {
logger.error("Error cloning file: " + filename, e); fileFactory.onIOError(e, "Error cloning JDBC file.", this);
return null;
} }
return null;
} }
@Override @Override
public void copyTo(SequentialFile cloneFile) throws Exception { public void copyTo(SequentialFile cloneFile) throws Exception {
JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile; JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
clone.open(); try {
synchronized (writeLock) {
synchronized (writeLock) { clone.open();
dbDriver.copyFileData(this, clone); dbDriver.copyFileData(this, clone);
}
} catch (Exception e) {
fileFactory.onIOError(e, "Error copying JDBC file.", this);
} }
} }

View File

@ -27,15 +27,19 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent { public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
private static final Logger logger = Logger.getLogger(JDBCSequentialFile.class);
private boolean started; private boolean started;
private final List<JDBCSequentialFile> files = new ArrayList<>(); private final List<JDBCSequentialFile> files = new ArrayList<>();
@ -44,28 +48,53 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
private final Map<String, Object> fileLocks = new HashMap<>(); private final Map<String, Object> fileLocks = new HashMap<>();
private final JDBCSequentialFileFactoryDriver dbDriver; private JDBCSequentialFileFactoryDriver dbDriver;
private final IOCriticalErrorListener criticalErrorListener;
public JDBCSequentialFileFactory(final DataSource dataSource, public JDBCSequentialFileFactory(final DataSource dataSource,
final SQLProvider sqlProvider, final SQLProvider sqlProvider,
Executor executor) throws Exception { Executor executor,
IOCriticalErrorListener criticalErrorListener) throws Exception {
this.executor = executor; this.executor = executor;
dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider); this.criticalErrorListener = criticalErrorListener;
try {
this.dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider);
} catch (SQLException e) {
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
}
} }
public JDBCSequentialFileFactory(final String connectionUrl, public JDBCSequentialFileFactory(final String connectionUrl,
final String className, final String className,
final SQLProvider sqlProvider, final SQLProvider sqlProvider,
Executor executor) throws Exception { Executor executor,
IOCriticalErrorListener criticalErrorListener) throws Exception {
this.executor = executor; this.executor = executor;
dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider); this.criticalErrorListener = criticalErrorListener;
try {
this.dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
} catch (SQLException e) {
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
}
} }
public JDBCSequentialFileFactory(final Connection connection, public JDBCSequentialFileFactory(final Connection connection,
final SQLProvider sqlProvider, final SQLProvider sqlProvider,
final Executor executor) throws Exception { final Executor executor,
final IOCriticalErrorListener criticalErrorListener) throws Exception {
this.executor = executor; this.executor = executor;
this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider); this.criticalErrorListener = criticalErrorListener;
try {
this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
} catch (SQLException e) {
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
}
} }
public JDBCSequentialFileFactoryDriver getDbDriver() { public JDBCSequentialFileFactoryDriver getDbDriver() {
@ -74,8 +103,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
@Override @Override
public SequentialFileFactory setDatasync(boolean enabled) { public SequentialFileFactory setDatasync(boolean enabled) {
// noop
return this; return this;
} }
@ -92,7 +119,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
started = true; started = true;
} }
} catch (Exception e) { } catch (Exception e) {
ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database", e); criticalErrorListener.onIOException(e, "Unable to start database driver", null);
started = false; started = false;
} }
} }
@ -115,7 +142,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
files.add(file); files.add(file);
return file; return file;
} catch (Exception e) { } catch (Exception e) {
ActiveMQJournalLogger.LOGGER.error("Could not create file", e); criticalErrorListener.onIOException(e, "Error whilst creating JDBC file", null);
} }
return null; return null;
} }
@ -127,7 +154,12 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
@Override @Override
public List<String> listFiles(String extension) throws Exception { public List<String> listFiles(String extension) throws Exception {
return dbDriver.listFiles(extension); try {
return dbDriver.listFiles(extension);
} catch (SQLException e) {
criticalErrorListener.onIOException(e, "Error listing JDBC files.", null);
throw e;
}
} }
@Override @Override
@ -137,6 +169,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
@Override @Override
public void onIOError(Exception exception, String message, SequentialFile file) { public void onIOError(Exception exception, String message, SequentialFile file) {
criticalErrorListener.onIOException(exception, message, file);
} }
@Override @Override
@ -215,9 +248,20 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
@Override @Override
public void flush() { public void flush() {
for (SequentialFile file : files) {
try {
file.sync();
} catch (Exception e) {
criticalErrorListener.onIOException(e, "Error during JDBC file sync.", file);
}
}
} }
public synchronized void destroy() throws SQLException { public synchronized void destroy() throws SQLException {
dbDriver.destroy(); try {
dbDriver.destroy();
} catch (SQLException e) {
logger.error("Error destroying file factory", e);
}
} }
} }

View File

@ -29,10 +29,13 @@ import java.util.List;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.jboss.logging.Logger;
@SuppressWarnings("SynchronizeOnNonFinalField") @SuppressWarnings("SynchronizeOnNonFinalField")
public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
private static final Logger logger = Logger.getLogger(JDBCSequentialFileFactoryDriver.class);
protected PreparedStatement deleteFile; protected PreparedStatement deleteFile;
protected PreparedStatement createFile; protected PreparedStatement createFile;
@ -157,6 +160,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
Blob blob = rs.getBlob(1); Blob blob = rs.getBlob(1);
if (blob != null) { if (blob != null) {
file.setWritePosition((int) blob.length()); file.setWritePosition((int) blob.length());
} else {
logger.warn("ERROR NO BLOB FOR FILE" + "File: " + file.getFileName() + " " + file.getId());
} }
} }
connection.commit(); connection.commit();
@ -293,8 +298,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
connection.commit(); connection.commit();
return readLength; return readLength;
} catch (Throwable e) { } catch (Throwable e) {
connection.rollback();
throw e; throw e;
} finally {
connection.rollback();
} }
} }
} }

View File

@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.EncodingSupport;
@ -84,26 +86,32 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
// Sequence ID for journal records // Sequence ID for journal records
private final AtomicLong seq = new AtomicLong(0); private final AtomicLong seq = new AtomicLong(0);
private final IOCriticalErrorListener criticalIOErrorListener;
public JDBCJournalImpl(DataSource dataSource, public JDBCJournalImpl(DataSource dataSource,
SQLProvider provider, SQLProvider provider,
String tableName, String tableName,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
Executor completeExecutor) { Executor completeExecutor,
IOCriticalErrorListener criticalIOErrorListener) {
super(dataSource, provider); super(dataSource, provider);
records = new ArrayList<>(); records = new ArrayList<>();
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
this.completeExecutor = completeExecutor; this.completeExecutor = completeExecutor;
this.criticalIOErrorListener = criticalIOErrorListener;
} }
public JDBCJournalImpl(String jdbcUrl, public JDBCJournalImpl(String jdbcUrl,
String jdbcDriverClass, String jdbcDriverClass,
SQLProvider sqlProvider, SQLProvider sqlProvider,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
Executor completeExecutor) { Executor completeExecutor,
IOCriticalErrorListener criticalIOErrorListener) {
super(sqlProvider, jdbcUrl, jdbcDriverClass); super(sqlProvider, jdbcUrl, jdbcDriverClass);
records = new ArrayList<>(); records = new ArrayList<>();
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
this.completeExecutor = completeExecutor; this.completeExecutor = completeExecutor;
this.criticalIOErrorListener = criticalIOErrorListener;
} }
@Override @Override
@ -133,9 +141,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
} }
@Override @Override
public synchronized void stop() throws SQLException { public void stop() throws SQLException {
stop(true);
}
public synchronized void stop(boolean sync) throws SQLException {
if (started) { if (started) {
sync(); if (sync)
sync();
started = false; started = false;
super.stop(); super.stop();
} }
@ -166,8 +179,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
TransactionHolder holder; TransactionHolder holder;
boolean success = false;
try { try {
connection.setAutoCommit(false);
for (JDBCJournalRecord record : recordRef) { for (JDBCJournalRecord record : recordRef) {
switch (record.getRecordType()) { switch (record.getRecordType()) {
case JDBCJournalRecord.DELETE_RECORD: case JDBCJournalRecord.DELETE_RECORD:
@ -197,36 +211,25 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
break; break;
} }
} }
} catch (SQLException e) {
logger.warn(e.getMessage(), e);
executeCallbacks(recordRef, success);
return 0;
}
try {
connection.setAutoCommit(false);
insertJournalRecords.executeBatch(); insertJournalRecords.executeBatch();
deleteJournalRecords.executeBatch(); deleteJournalRecords.executeBatch();
deleteJournalTxRecords.executeBatch(); deleteJournalTxRecords.executeBatch();
connection.commit(); connection.commit();
success = true;
} catch (SQLException e) { cleanupTxRecords(deletedRecords, committedTransactions);
logger.warn(e.getMessage(), e); executeCallbacks(recordRef, true);
return recordRef.size();
} catch (Exception e) {
criticalIOErrorListener.onIOException(e, "Critical IO Error. Failed to process JDBC Record statements", null);
started = false;
executeCallbacks(recordRef, false);
performRollback(recordRef); performRollback(recordRef);
return 0;
} }
try {
if (success)
cleanupTxRecords(deletedRecords, committedTransactions);
} catch (SQLException e) {
logger.warn("Failed to remove the Tx Records", e.getMessage(), e);
} finally {
executeCallbacks(recordRef, success);
}
return recordRef.size();
} }
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
@ -262,8 +265,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
private void performRollback(List<JDBCJournalRecord> records) { private void performRollback(List<JDBCJournalRecord> records) {
try { try {
connection.rollback();
for (JDBCJournalRecord record : records) { for (JDBCJournalRecord record : records) {
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
removeTxRecord(record); removeTxRecord(record);
@ -279,13 +280,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
transactions.remove(txH.transactionID); transactions.remove(txH.transactionID);
} }
} }
connection.rollback();
} catch (Exception sqlE) { } catch (Exception sqlE) {
logger.warn(sqlE.getMessage(), sqlE); logger.error(sqlE.getMessage(), sqlE);
criticalIOErrorListener.onIOException(sqlE, sqlE.getMessage(), null);
ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE); ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE);
} }
} }
// TODO Use an executor.
private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean result) { private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean result) {
Runnable r = new Runnable() { Runnable r = new Runnable() {
@Override @Override
@ -299,7 +301,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
} }
private void appendRecord(JDBCJournalRecord record) throws Exception { private void appendRecord(JDBCJournalRecord record) throws Exception {
record.storeLineUp(); record.storeLineUp();
if (!started) {
if (record.getIoCompletion() != null) {
record.getIoCompletion().onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC Journal not started");
}
}
SimpleWaitIOCallback callback = null; SimpleWaitIOCallback callback = null;
if (record.isSync() && record.getIoCompletion() == null) { if (record.isSync() && record.getIoCompletion() == null) {
@ -318,10 +326,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
} }
syncTimer.delay(); syncTimer.delay();
if (callback != null) callback.waitCompletion();
if (callback != null) {
callback.waitCompletion();
}
} }
private synchronized void addTxRecord(JDBCJournalRecord record) { private synchronized void addTxRecord(JDBCJournalRecord record) {

View File

@ -26,6 +26,7 @@ import java.sql.SQLException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
@ -116,7 +117,7 @@ class JDBCJournalRecord {
if (success) { if (success) {
ioCompletion.done(); ioCompletion.done();
} else { } else {
ioCompletion.onError(1, "DATABASE TRANSACTION FAILED"); ioCompletion.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC Transaction failed.");
} }
} }
} }
@ -127,7 +128,7 @@ class JDBCJournalRecord {
} }
} }
void writeRecord(PreparedStatement statement) throws SQLException { void writeRecord(PreparedStatement statement) throws Exception {
byte[] recordBytes = new byte[variableSize]; byte[] recordBytes = new byte[variableSize];
byte[] txDataBytes = new byte[txDataSize]; byte[] txDataBytes = new byte[txDataSize];
@ -137,6 +138,7 @@ class JDBCJournalRecord {
txData.read(txDataBytes); txData.read(txDataBytes);
} catch (IOException e) { } catch (IOException e) {
ActiveMQJournalLogger.LOGGER.error("Error occurred whilst reading Journal Record", e); ActiveMQJournalLogger.LOGGER.error("Error occurred whilst reading Journal Record", e);
throw e;
} }
statement.setLong(1, id); statement.setLong(1, id);

View File

@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
@ -64,7 +65,11 @@ public class JDBCSequentialFileFactoryTest {
String connectionUrl = "jdbc:derby:target/data;create=true"; String connectionUrl = "jdbc:derby:target/data;create=true";
String tableName = "FILES"; String tableName = "FILES";
factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor); factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
}
});
factory.start(); factory.start();
} }

View File

@ -91,6 +91,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
private boolean started = false; private boolean started = false;
private final IOCriticalErrorListener criticalErrorListener;
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf, public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
final StorageManager storageManager, final StorageManager storageManager,
final long syncTimeout, final long syncTimeout,
@ -104,6 +106,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
this.scheduledExecutor = scheduledExecutor; this.scheduledExecutor = scheduledExecutor;
this.syncTimeout = syncTimeout; this.syncTimeout = syncTimeout;
this.dbConf = dbConf; this.dbConf = dbConf;
this.criticalErrorListener = critialErrorListener;
start(); start();
} }
@ -119,9 +122,11 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
if (sqlProviderFactory == null) { if (sqlProviderFactory == null) {
sqlProviderFactory = new GenericSQLProvider.Factory(); sqlProviderFactory = new GenericSQLProvider.Factory();
} }
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor()); pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
} else { } else {
String driverClassName = dbConf.getJdbcDriverClassName(); String driverClassName = dbConf.getJdbcDriverClassName();
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor()); pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
} }
pagingFactoryFileFactory.start(); pagingFactoryFileFactory.start();
@ -232,7 +237,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE); sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
} }
return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor()); return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener);
} }
private String getTableNameForGUID(String guid) { private String getTableNameForGUID(String guid) {

View File

@ -62,14 +62,14 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
if (sqlProviderFactory == null) { if (sqlProviderFactory == null) {
sqlProviderFactory = new GenericSQLProvider.Factory(); sqlProviderFactory = new GenericSQLProvider.Factory();
} }
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor()); bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor()); messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor); largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener);
} else { } else {
String driverClassName = dbConf.getJdbcDriverClassName(); String driverClassName = dbConf.getJdbcDriverClassName();
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor()); bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor()); messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor); largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener);
} }
largeMessagesFactory.start(); largeMessagesFactory.start();
} catch (Exception e) { } catch (Exception e) {

View File

@ -26,6 +26,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.RecordInfo;
@ -77,7 +79,12 @@ public class JDBCJournalTest extends ActiveMQTestBase {
executorService = Executors.newSingleThreadExecutor(); executorService = Executors.newSingleThreadExecutor();
jdbcUrl = "jdbc:derby:target/data;create=true"; jdbcUrl = "jdbc:derby:target/data;create=true";
SQLProvider.Factory factory = new DerbySQLProvider.Factory(); SQLProvider.Factory factory = new DerbySQLProvider.Factory();
journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService); journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService, new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
}
});
journal.start(); journal.start();
} }