ARTEMIS-1115 Call CriticalIOListener on JDBC Error
This commit is contained in:
parent
c35960f6a4
commit
2ccc4e14f1
|
@ -93,18 +93,23 @@ public class JDBCSequentialFile implements SequentialFile {
|
||||||
return fileFactory.listFiles(extension).contains(filename);
|
return fileFactory.listFiles(extension).contains(filename);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn(e.getMessage(), e);
|
logger.warn(e.getMessage(), e);
|
||||||
|
fileFactory.onIOError(e, "Error checking JDBC file exists.", this);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void open() throws Exception {
|
public synchronized void open() throws Exception {
|
||||||
if (!isOpen) {
|
try {
|
||||||
synchronized (writeLock) {
|
if (!isOpen) {
|
||||||
dbDriver.openFile(this);
|
synchronized (writeLock) {
|
||||||
isCreated = true;
|
dbDriver.openFile(this);
|
||||||
isOpen = true;
|
isCreated = true;
|
||||||
|
isOpen = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
fileFactory.onIOError(e, "Error attempting to open JDBC file.", this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,34 +147,35 @@ public class JDBCSequentialFile implements SequentialFile {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
throw new ActiveMQException(ActiveMQExceptionType.IO_ERROR, e.getMessage(), e);
|
fileFactory.onIOError(e, "Error deleting JDBC file.", this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized int internalWrite(byte[] data, IOCallback callback) {
|
private synchronized int internalWrite(byte[] data, IOCallback callback) throws Exception {
|
||||||
try {
|
try {
|
||||||
synchronized (writeLock) {
|
synchronized (writeLock) {
|
||||||
int noBytes = dbDriver.writeToFile(this, data);
|
int noBytes = dbDriver.writeToFile(this, data);
|
||||||
seek(noBytes);
|
seek(noBytes);
|
||||||
|
System.out.println("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size());
|
||||||
if (callback != null)
|
if (callback != null)
|
||||||
callback.done();
|
callback.done();
|
||||||
return noBytes;
|
return noBytes;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("Failed to write to file", e.getMessage(), e);
|
|
||||||
if (callback != null)
|
if (callback != null)
|
||||||
callback.onError(-1, e.getMessage());
|
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
|
||||||
|
fileFactory.onIOError(e, "Error writing to JDBC file.", this);
|
||||||
}
|
}
|
||||||
return -1;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) {
|
public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) throws Exception {
|
||||||
byte[] data = new byte[buffer.readableBytes()];
|
byte[] data = new byte[buffer.readableBytes()];
|
||||||
buffer.readBytes(data);
|
buffer.readBytes(data);
|
||||||
return internalWrite(data, callback);
|
return internalWrite(data, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) {
|
private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) throws Exception {
|
||||||
return internalWrite(buffer.array(), callback);
|
return internalWrite(buffer.array(), callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,16 +183,27 @@ public class JDBCSequentialFile implements SequentialFile {
|
||||||
executor.execute(new Runnable() {
|
executor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
internalWrite(bytes, callback);
|
try {
|
||||||
|
internalWrite(bytes, callback);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error(e);
|
||||||
|
// internalWrite will notify the CriticalIOErrorListener
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
|
private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
|
||||||
|
final SequentialFile file = this;
|
||||||
executor.execute(new Runnable() {
|
executor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
internalWrite(bytes, callback);
|
try {
|
||||||
|
internalWrite(bytes, callback);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error(e);
|
||||||
|
fileFactory.onIOError(e, "Error on JDBC file sync", file);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -226,7 +243,8 @@ public class JDBCSequentialFile implements SequentialFile {
|
||||||
scheduleWrite(bytes, waitIOCallback);
|
scheduleWrite(bytes, waitIOCallback);
|
||||||
waitIOCallback.waitCompletion();
|
waitIOCallback.waitCompletion();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
waitIOCallback.onError(-1, e.getMessage());
|
waitIOCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error writing to JDBC file.");
|
||||||
|
fileFactory.onIOError(e, "Failed to write to file.", this);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
scheduleWrite(bytes, callback);
|
scheduleWrite(bytes, callback);
|
||||||
|
@ -249,12 +267,12 @@ public class JDBCSequentialFile implements SequentialFile {
|
||||||
if (callback != null)
|
if (callback != null)
|
||||||
callback.done();
|
callback.done();
|
||||||
return read;
|
return read;
|
||||||
} catch (Exception e) {
|
} catch (SQLException e) {
|
||||||
if (callback != null)
|
if (callback != null)
|
||||||
callback.onError(-1, e.getMessage());
|
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
|
||||||
logger.warn("Failed to read from file", e.getMessage(), e);
|
fileFactory.onIOError(e, "Error reading from JDBC file.", this);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,7 +309,8 @@ public class JDBCSequentialFile implements SequentialFile {
|
||||||
try {
|
try {
|
||||||
callback.waitCompletion();
|
callback.waitCompletion();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IOException(e);
|
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error during JDBC file sync.");
|
||||||
|
fileFactory.onIOError(e, "Error during JDBC file sync.", this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,7 +322,11 @@ public class JDBCSequentialFile implements SequentialFile {
|
||||||
@Override
|
@Override
|
||||||
public void renameTo(String newFileName) throws Exception {
|
public void renameTo(String newFileName) throws Exception {
|
||||||
synchronized (writeLock) {
|
synchronized (writeLock) {
|
||||||
dbDriver.renameFile(this, newFileName);
|
try {
|
||||||
|
dbDriver.renameFile(this, newFileName);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
fileFactory.onIOError(e, "Error renaming JDBC file.", this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,18 +336,21 @@ public class JDBCSequentialFile implements SequentialFile {
|
||||||
JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock);
|
JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock);
|
||||||
return clone;
|
return clone;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Error cloning file: " + filename, e);
|
fileFactory.onIOError(e, "Error cloning JDBC file.", this);
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void copyTo(SequentialFile cloneFile) throws Exception {
|
public void copyTo(SequentialFile cloneFile) throws Exception {
|
||||||
JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
|
JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
|
||||||
clone.open();
|
try {
|
||||||
|
synchronized (writeLock) {
|
||||||
synchronized (writeLock) {
|
clone.open();
|
||||||
dbDriver.copyFileData(this, clone);
|
dbDriver.copyFileData(this, clone);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
fileFactory.onIOError(e, "Error copying JDBC file.", this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,15 +27,19 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
|
public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(JDBCSequentialFile.class);
|
||||||
|
|
||||||
private boolean started;
|
private boolean started;
|
||||||
|
|
||||||
private final List<JDBCSequentialFile> files = new ArrayList<>();
|
private final List<JDBCSequentialFile> files = new ArrayList<>();
|
||||||
|
@ -44,28 +48,53 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
||||||
|
|
||||||
private final Map<String, Object> fileLocks = new HashMap<>();
|
private final Map<String, Object> fileLocks = new HashMap<>();
|
||||||
|
|
||||||
private final JDBCSequentialFileFactoryDriver dbDriver;
|
private JDBCSequentialFileFactoryDriver dbDriver;
|
||||||
|
|
||||||
|
private final IOCriticalErrorListener criticalErrorListener;
|
||||||
|
|
||||||
public JDBCSequentialFileFactory(final DataSource dataSource,
|
public JDBCSequentialFileFactory(final DataSource dataSource,
|
||||||
final SQLProvider sqlProvider,
|
final SQLProvider sqlProvider,
|
||||||
Executor executor) throws Exception {
|
Executor executor,
|
||||||
|
IOCriticalErrorListener criticalErrorListener) throws Exception {
|
||||||
|
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider);
|
this.criticalErrorListener = criticalErrorListener;
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public JDBCSequentialFileFactory(final String connectionUrl,
|
public JDBCSequentialFileFactory(final String connectionUrl,
|
||||||
final String className,
|
final String className,
|
||||||
final SQLProvider sqlProvider,
|
final SQLProvider sqlProvider,
|
||||||
Executor executor) throws Exception {
|
Executor executor,
|
||||||
|
IOCriticalErrorListener criticalErrorListener) throws Exception {
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
|
this.criticalErrorListener = criticalErrorListener;
|
||||||
|
try {
|
||||||
|
this.dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public JDBCSequentialFileFactory(final Connection connection,
|
public JDBCSequentialFileFactory(final Connection connection,
|
||||||
final SQLProvider sqlProvider,
|
final SQLProvider sqlProvider,
|
||||||
final Executor executor) throws Exception {
|
final Executor executor,
|
||||||
|
final IOCriticalErrorListener criticalErrorListener) throws Exception {
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
|
this.criticalErrorListener = criticalErrorListener;
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public JDBCSequentialFileFactoryDriver getDbDriver() {
|
public JDBCSequentialFileFactoryDriver getDbDriver() {
|
||||||
|
@ -74,8 +103,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SequentialFileFactory setDatasync(boolean enabled) {
|
public SequentialFileFactory setDatasync(boolean enabled) {
|
||||||
|
|
||||||
// noop
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,7 +119,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
||||||
started = true;
|
started = true;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database", e);
|
criticalErrorListener.onIOException(e, "Unable to start database driver", null);
|
||||||
started = false;
|
started = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -115,7 +142,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
||||||
files.add(file);
|
files.add(file);
|
||||||
return file;
|
return file;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ActiveMQJournalLogger.LOGGER.error("Could not create file", e);
|
criticalErrorListener.onIOException(e, "Error whilst creating JDBC file", null);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -127,7 +154,12 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<String> listFiles(String extension) throws Exception {
|
public List<String> listFiles(String extension) throws Exception {
|
||||||
return dbDriver.listFiles(extension);
|
try {
|
||||||
|
return dbDriver.listFiles(extension);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
criticalErrorListener.onIOException(e, "Error listing JDBC files.", null);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -137,6 +169,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onIOError(Exception exception, String message, SequentialFile file) {
|
public void onIOError(Exception exception, String message, SequentialFile file) {
|
||||||
|
criticalErrorListener.onIOException(exception, message, file);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -215,9 +248,20 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flush() {
|
public void flush() {
|
||||||
|
for (SequentialFile file : files) {
|
||||||
|
try {
|
||||||
|
file.sync();
|
||||||
|
} catch (Exception e) {
|
||||||
|
criticalErrorListener.onIOException(e, "Error during JDBC file sync.", file);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void destroy() throws SQLException {
|
public synchronized void destroy() throws SQLException {
|
||||||
dbDriver.destroy();
|
try {
|
||||||
|
dbDriver.destroy();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
logger.error("Error destroying file factory", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,10 +29,13 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
|
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
@SuppressWarnings("SynchronizeOnNonFinalField")
|
@SuppressWarnings("SynchronizeOnNonFinalField")
|
||||||
public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(JDBCSequentialFileFactoryDriver.class);
|
||||||
|
|
||||||
protected PreparedStatement deleteFile;
|
protected PreparedStatement deleteFile;
|
||||||
|
|
||||||
protected PreparedStatement createFile;
|
protected PreparedStatement createFile;
|
||||||
|
@ -157,6 +160,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
||||||
Blob blob = rs.getBlob(1);
|
Blob blob = rs.getBlob(1);
|
||||||
if (blob != null) {
|
if (blob != null) {
|
||||||
file.setWritePosition((int) blob.length());
|
file.setWritePosition((int) blob.length());
|
||||||
|
} else {
|
||||||
|
logger.warn("ERROR NO BLOB FOR FILE" + "File: " + file.getFileName() + " " + file.getId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
connection.commit();
|
connection.commit();
|
||||||
|
@ -293,8 +298,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
||||||
connection.commit();
|
connection.commit();
|
||||||
return readLength;
|
return readLength;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
connection.rollback();
|
|
||||||
throw e;
|
throw e;
|
||||||
|
} finally {
|
||||||
|
connection.rollback();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||||
import org.apache.activemq.artemis.core.journal.IOCompletion;
|
import org.apache.activemq.artemis.core.journal.IOCompletion;
|
||||||
|
@ -82,26 +84,32 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
// Sequence ID for journal records
|
// Sequence ID for journal records
|
||||||
private final AtomicLong seq = new AtomicLong(0);
|
private final AtomicLong seq = new AtomicLong(0);
|
||||||
|
|
||||||
|
private final IOCriticalErrorListener criticalIOErrorListener;
|
||||||
|
|
||||||
public JDBCJournalImpl(DataSource dataSource,
|
public JDBCJournalImpl(DataSource dataSource,
|
||||||
SQLProvider provider,
|
SQLProvider provider,
|
||||||
String tableName,
|
String tableName,
|
||||||
ScheduledExecutorService scheduledExecutorService,
|
ScheduledExecutorService scheduledExecutorService,
|
||||||
Executor completeExecutor) {
|
Executor completeExecutor,
|
||||||
|
IOCriticalErrorListener criticalIOErrorListener) {
|
||||||
super(dataSource, provider);
|
super(dataSource, provider);
|
||||||
records = new ArrayList<>();
|
records = new ArrayList<>();
|
||||||
this.scheduledExecutorService = scheduledExecutorService;
|
this.scheduledExecutorService = scheduledExecutorService;
|
||||||
this.completeExecutor = completeExecutor;
|
this.completeExecutor = completeExecutor;
|
||||||
|
this.criticalIOErrorListener = criticalIOErrorListener;
|
||||||
}
|
}
|
||||||
|
|
||||||
public JDBCJournalImpl(String jdbcUrl,
|
public JDBCJournalImpl(String jdbcUrl,
|
||||||
String jdbcDriverClass,
|
String jdbcDriverClass,
|
||||||
SQLProvider sqlProvider,
|
SQLProvider sqlProvider,
|
||||||
ScheduledExecutorService scheduledExecutorService,
|
ScheduledExecutorService scheduledExecutorService,
|
||||||
Executor completeExecutor) {
|
Executor completeExecutor,
|
||||||
|
IOCriticalErrorListener criticalIOErrorListener) {
|
||||||
super(sqlProvider, jdbcUrl, jdbcDriverClass);
|
super(sqlProvider, jdbcUrl, jdbcDriverClass);
|
||||||
records = new ArrayList<>();
|
records = new ArrayList<>();
|
||||||
this.scheduledExecutorService = scheduledExecutorService;
|
this.scheduledExecutorService = scheduledExecutorService;
|
||||||
this.completeExecutor = completeExecutor;
|
this.completeExecutor = completeExecutor;
|
||||||
|
this.criticalIOErrorListener = criticalIOErrorListener;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -131,9 +139,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void stop() throws SQLException {
|
public void stop() throws SQLException {
|
||||||
|
stop(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void stop(boolean sync) throws SQLException {
|
||||||
if (started) {
|
if (started) {
|
||||||
sync();
|
if (sync)
|
||||||
|
sync();
|
||||||
started = false;
|
started = false;
|
||||||
super.stop();
|
super.stop();
|
||||||
}
|
}
|
||||||
|
@ -164,8 +177,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
|
|
||||||
TransactionHolder holder;
|
TransactionHolder holder;
|
||||||
|
|
||||||
boolean success = false;
|
|
||||||
try {
|
try {
|
||||||
|
connection.setAutoCommit(false);
|
||||||
|
|
||||||
for (JDBCJournalRecord record : recordRef) {
|
for (JDBCJournalRecord record : recordRef) {
|
||||||
switch (record.getRecordType()) {
|
switch (record.getRecordType()) {
|
||||||
case JDBCJournalRecord.DELETE_RECORD:
|
case JDBCJournalRecord.DELETE_RECORD:
|
||||||
|
@ -195,36 +209,25 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (SQLException e) {
|
|
||||||
logger.warn(e.getMessage(), e);
|
|
||||||
executeCallbacks(recordRef, success);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
connection.setAutoCommit(false);
|
|
||||||
|
|
||||||
insertJournalRecords.executeBatch();
|
insertJournalRecords.executeBatch();
|
||||||
deleteJournalRecords.executeBatch();
|
deleteJournalRecords.executeBatch();
|
||||||
deleteJournalTxRecords.executeBatch();
|
deleteJournalTxRecords.executeBatch();
|
||||||
|
|
||||||
connection.commit();
|
connection.commit();
|
||||||
success = true;
|
|
||||||
} catch (SQLException e) {
|
cleanupTxRecords(deletedRecords, committedTransactions);
|
||||||
logger.warn(e.getMessage(), e);
|
executeCallbacks(recordRef, true);
|
||||||
|
|
||||||
|
return recordRef.size();
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
criticalIOErrorListener.onIOException(e, "Critical IO Error. Failed to process JDBC Record statements", null);
|
||||||
|
started = false;
|
||||||
|
executeCallbacks(recordRef, false);
|
||||||
performRollback(recordRef);
|
performRollback(recordRef);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
|
||||||
if (success)
|
|
||||||
cleanupTxRecords(deletedRecords, committedTransactions);
|
|
||||||
} catch (SQLException e) {
|
|
||||||
logger.warn("Failed to remove the Tx Records", e.getMessage(), e);
|
|
||||||
} finally {
|
|
||||||
executeCallbacks(recordRef, success);
|
|
||||||
}
|
|
||||||
|
|
||||||
return recordRef.size();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
|
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
|
||||||
|
@ -260,8 +263,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
|
|
||||||
private void performRollback(List<JDBCJournalRecord> records) {
|
private void performRollback(List<JDBCJournalRecord> records) {
|
||||||
try {
|
try {
|
||||||
connection.rollback();
|
|
||||||
|
|
||||||
for (JDBCJournalRecord record : records) {
|
for (JDBCJournalRecord record : records) {
|
||||||
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
|
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
|
||||||
removeTxRecord(record);
|
removeTxRecord(record);
|
||||||
|
@ -277,13 +278,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
transactions.remove(txH.transactionID);
|
transactions.remove(txH.transactionID);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
connection.rollback();
|
||||||
} catch (Exception sqlE) {
|
} catch (Exception sqlE) {
|
||||||
logger.warn(sqlE.getMessage(), sqlE);
|
logger.error(sqlE.getMessage(), sqlE);
|
||||||
|
criticalIOErrorListener.onIOException(sqlE, sqlE.getMessage(), null);
|
||||||
ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE);
|
ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Use an executor.
|
|
||||||
private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean result) {
|
private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean result) {
|
||||||
Runnable r = new Runnable() {
|
Runnable r = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -297,7 +299,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void appendRecord(JDBCJournalRecord record) throws Exception {
|
private void appendRecord(JDBCJournalRecord record) throws Exception {
|
||||||
|
|
||||||
record.storeLineUp();
|
record.storeLineUp();
|
||||||
|
if (!started) {
|
||||||
|
if (record.getIoCompletion() != null) {
|
||||||
|
record.getIoCompletion().onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC Journal not started");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SimpleWaitIOCallback callback = null;
|
SimpleWaitIOCallback callback = null;
|
||||||
if (record.isSync() && record.getIoCompletion() == null) {
|
if (record.isSync() && record.getIoCompletion() == null) {
|
||||||
|
@ -316,10 +324,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
}
|
}
|
||||||
|
|
||||||
syncTimer.delay();
|
syncTimer.delay();
|
||||||
|
if (callback != null) callback.waitCompletion();
|
||||||
if (callback != null) {
|
|
||||||
callback.waitCompletion();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void addTxRecord(JDBCJournalRecord record) {
|
private synchronized void addTxRecord(JDBCJournalRecord record) {
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.sql.SQLException;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||||
import org.apache.activemq.artemis.core.journal.IOCompletion;
|
import org.apache.activemq.artemis.core.journal.IOCompletion;
|
||||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||||
|
@ -115,7 +116,7 @@ class JDBCJournalRecord {
|
||||||
if (success) {
|
if (success) {
|
||||||
ioCompletion.done();
|
ioCompletion.done();
|
||||||
} else {
|
} else {
|
||||||
ioCompletion.onError(1, "DATABASE TRANSACTION FAILED");
|
ioCompletion.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC Transaction failed.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -126,7 +127,7 @@ class JDBCJournalRecord {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void writeRecord(PreparedStatement statement) throws SQLException {
|
void writeRecord(PreparedStatement statement) throws Exception {
|
||||||
|
|
||||||
byte[] recordBytes = new byte[variableSize];
|
byte[] recordBytes = new byte[variableSize];
|
||||||
byte[] txDataBytes = new byte[txDataSize];
|
byte[] txDataBytes = new byte[txDataSize];
|
||||||
|
@ -136,6 +137,7 @@ class JDBCJournalRecord {
|
||||||
txData.read(txDataBytes);
|
txData.read(txDataBytes);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
ActiveMQJournalLogger.LOGGER.error("Error occurred whilst reading Journal Record", e);
|
ActiveMQJournalLogger.LOGGER.error("Error occurred whilst reading Journal Record", e);
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
statement.setLong(1, id);
|
statement.setLong(1, id);
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
||||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
|
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
|
||||||
|
@ -64,7 +65,11 @@ public class JDBCSequentialFileFactoryTest {
|
||||||
|
|
||||||
String connectionUrl = "jdbc:derby:target/data;create=true";
|
String connectionUrl = "jdbc:derby:target/data;create=true";
|
||||||
String tableName = "FILES";
|
String tableName = "FILES";
|
||||||
factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor);
|
factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() {
|
||||||
|
@Override
|
||||||
|
public void onIOException(Throwable code, String message, SequentialFile file) {
|
||||||
|
}
|
||||||
|
});
|
||||||
factory.start();
|
factory.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -81,6 +81,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
|
|
||||||
private boolean started = false;
|
private boolean started = false;
|
||||||
|
|
||||||
|
private final IOCriticalErrorListener criticalErrorListener;
|
||||||
|
|
||||||
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
|
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
|
||||||
final StorageManager storageManager,
|
final StorageManager storageManager,
|
||||||
final long syncTimeout,
|
final long syncTimeout,
|
||||||
|
@ -94,6 +96,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
this.scheduledExecutor = scheduledExecutor;
|
this.scheduledExecutor = scheduledExecutor;
|
||||||
this.syncTimeout = syncTimeout;
|
this.syncTimeout = syncTimeout;
|
||||||
this.dbConf = dbConf;
|
this.dbConf = dbConf;
|
||||||
|
this.criticalErrorListener = critialErrorListener;
|
||||||
start();
|
start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,9 +112,11 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
if (sqlProviderFactory == null) {
|
if (sqlProviderFactory == null) {
|
||||||
sqlProviderFactory = new GenericSQLProvider.Factory();
|
sqlProviderFactory = new GenericSQLProvider.Factory();
|
||||||
}
|
}
|
||||||
|
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
|
||||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
|
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
|
||||||
} else {
|
} else {
|
||||||
String driverClassName = dbConf.getJdbcDriverClassName();
|
String driverClassName = dbConf.getJdbcDriverClassName();
|
||||||
|
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
|
||||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
|
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
|
||||||
}
|
}
|
||||||
pagingFactoryFileFactory.start();
|
pagingFactoryFileFactory.start();
|
||||||
|
@ -222,7 +227,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
|
sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor());
|
return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getTableNameForGUID(String guid) {
|
private String getTableNameForGUID(String guid) {
|
||||||
|
|
|
@ -62,14 +62,14 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
|
||||||
if (sqlProviderFactory == null) {
|
if (sqlProviderFactory == null) {
|
||||||
sqlProviderFactory = new GenericSQLProvider.Factory();
|
sqlProviderFactory = new GenericSQLProvider.Factory();
|
||||||
}
|
}
|
||||||
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor());
|
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
||||||
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor());
|
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
||||||
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor);
|
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener);
|
||||||
} else {
|
} else {
|
||||||
String driverClassName = dbConf.getJdbcDriverClassName();
|
String driverClassName = dbConf.getJdbcDriverClassName();
|
||||||
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor());
|
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
||||||
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor());
|
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
||||||
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor);
|
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener);
|
||||||
}
|
}
|
||||||
largeMessagesFactory.start();
|
largeMessagesFactory.start();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -26,6 +26,8 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.core.journal.IOCompletion;
|
import org.apache.activemq.artemis.core.journal.IOCompletion;
|
||||||
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
||||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||||
|
@ -77,7 +79,12 @@ public class JDBCJournalTest extends ActiveMQTestBase {
|
||||||
executorService = Executors.newSingleThreadExecutor();
|
executorService = Executors.newSingleThreadExecutor();
|
||||||
jdbcUrl = "jdbc:derby:target/data;create=true";
|
jdbcUrl = "jdbc:derby:target/data;create=true";
|
||||||
SQLProvider.Factory factory = new DerbySQLProvider.Factory();
|
SQLProvider.Factory factory = new DerbySQLProvider.Factory();
|
||||||
journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService);
|
journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService, new IOCriticalErrorListener() {
|
||||||
|
@Override
|
||||||
|
public void onIOException(Throwable code, String message, SequentialFile file) {
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
journal.start();
|
journal.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue