From 2ccc4e14f1ed55f98a30c051dbb057ad35a005a2 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Mon, 17 Apr 2017 10:40:26 +0100 Subject: [PATCH 1/2] ARTEMIS-1115 Call CriticalIOListener on JDBC Error --- .../jdbc/store/file/JDBCSequentialFile.java | 80 ++++++++++++------- .../store/file/JDBCSequentialFileFactory.java | 70 +++++++++++++--- .../file/JDBCSequentialFileFactoryDriver.java | 8 +- .../jdbc/store/journal/JDBCJournalImpl.java | 75 +++++++++-------- .../jdbc/store/journal/JDBCJournalRecord.java | 6 +- .../file/JDBCSequentialFileFactoryTest.java | 7 +- .../impl/PagingStoreFactoryDatabase.java | 7 +- .../journal/JDBCJournalStorageManager.java | 12 +-- .../jdbc/store/journal/JDBCJournalTest.java | 9 ++- 9 files changed, 187 insertions(+), 87 deletions(-) diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java index 7e72785dec..e2da151739 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java @@ -93,18 +93,23 @@ public class JDBCSequentialFile implements SequentialFile { return fileFactory.listFiles(extension).contains(filename); } catch (Exception e) { logger.warn(e.getMessage(), e); + fileFactory.onIOError(e, "Error checking JDBC file exists.", this); return false; } } @Override public synchronized void open() throws Exception { - if (!isOpen) { - synchronized (writeLock) { - dbDriver.openFile(this); - isCreated = true; - isOpen = true; + try { + if (!isOpen) { + synchronized (writeLock) { + dbDriver.openFile(this); + isCreated = true; + isOpen = true; + } } + } catch (SQLException e) { + fileFactory.onIOError(e, "Error attempting to open JDBC file.", this); } } @@ -142,34 +147,35 @@ public class JDBCSequentialFile implements SequentialFile { } } } catch (SQLException e) { - throw new ActiveMQException(ActiveMQExceptionType.IO_ERROR, e.getMessage(), e); + fileFactory.onIOError(e, "Error deleting JDBC file.", this); } } - private synchronized int internalWrite(byte[] data, IOCallback callback) { + private synchronized int internalWrite(byte[] data, IOCallback callback) throws Exception { try { synchronized (writeLock) { int noBytes = dbDriver.writeToFile(this, data); seek(noBytes); + System.out.println("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size()); if (callback != null) callback.done(); return noBytes; } } catch (Exception e) { - logger.warn("Failed to write to file", e.getMessage(), e); if (callback != null) - callback.onError(-1, e.getMessage()); + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); + fileFactory.onIOError(e, "Error writing to JDBC file.", this); } - return -1; + return 0; } - public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) { + public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) throws Exception { byte[] data = new byte[buffer.readableBytes()]; buffer.readBytes(data); return internalWrite(data, callback); } - private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) { + private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) throws Exception { return internalWrite(buffer.array(), callback); } @@ -177,16 +183,27 @@ public class JDBCSequentialFile implements SequentialFile { executor.execute(new Runnable() { @Override public void run() { - internalWrite(bytes, callback); + try { + internalWrite(bytes, callback); + } catch (Exception e) { + logger.error(e); + // internalWrite will notify the CriticalIOErrorListener + } } }); } private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) { + final SequentialFile file = this; executor.execute(new Runnable() { @Override public void run() { - internalWrite(bytes, callback); + try { + internalWrite(bytes, callback); + } catch (Exception e) { + logger.error(e); + fileFactory.onIOError(e, "Error on JDBC file sync", file); + } } }); } @@ -226,7 +243,8 @@ public class JDBCSequentialFile implements SequentialFile { scheduleWrite(bytes, waitIOCallback); waitIOCallback.waitCompletion(); } catch (Exception e) { - waitIOCallback.onError(-1, e.getMessage()); + waitIOCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error writing to JDBC file."); + fileFactory.onIOError(e, "Failed to write to file.", this); } } else { scheduleWrite(bytes, callback); @@ -249,12 +267,12 @@ public class JDBCSequentialFile implements SequentialFile { if (callback != null) callback.done(); return read; - } catch (Exception e) { + } catch (SQLException e) { if (callback != null) - callback.onError(-1, e.getMessage()); - logger.warn("Failed to read from file", e.getMessage(), e); - return 0; + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); + fileFactory.onIOError(e, "Error reading from JDBC file.", this); } + return 0; } } @@ -291,7 +309,8 @@ public class JDBCSequentialFile implements SequentialFile { try { callback.waitCompletion(); } catch (Exception e) { - throw new IOException(e); + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error during JDBC file sync."); + fileFactory.onIOError(e, "Error during JDBC file sync.", this); } } @@ -303,7 +322,11 @@ public class JDBCSequentialFile implements SequentialFile { @Override public void renameTo(String newFileName) throws Exception { synchronized (writeLock) { - dbDriver.renameFile(this, newFileName); + try { + dbDriver.renameFile(this, newFileName); + } catch (SQLException e) { + fileFactory.onIOError(e, "Error renaming JDBC file.", this); + } } } @@ -313,18 +336,21 @@ public class JDBCSequentialFile implements SequentialFile { JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock); return clone; } catch (Exception e) { - logger.error("Error cloning file: " + filename, e); - return null; + fileFactory.onIOError(e, "Error cloning JDBC file.", this); } + return null; } @Override public void copyTo(SequentialFile cloneFile) throws Exception { JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile; - clone.open(); - - synchronized (writeLock) { - dbDriver.copyFileData(this, clone); + try { + synchronized (writeLock) { + clone.open(); + dbDriver.copyFileData(this, clone); + } + } catch (Exception e) { + fileFactory.onIOError(e, "Error copying JDBC file.", this); } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java index fa88a85623..d5a92a2329 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -27,15 +27,19 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.jboss.logging.Logger; public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent { + private static final Logger logger = Logger.getLogger(JDBCSequentialFile.class); + private boolean started; private final List files = new ArrayList<>(); @@ -44,28 +48,53 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM private final Map fileLocks = new HashMap<>(); - private final JDBCSequentialFileFactoryDriver dbDriver; + private JDBCSequentialFileFactoryDriver dbDriver; + + private final IOCriticalErrorListener criticalErrorListener; public JDBCSequentialFileFactory(final DataSource dataSource, final SQLProvider sqlProvider, - Executor executor) throws Exception { + Executor executor, + IOCriticalErrorListener criticalErrorListener) throws Exception { + this.executor = executor; - dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider); + this.criticalErrorListener = criticalErrorListener; + + try { + this.dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider); + } catch (SQLException e) { + criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null); + } + } public JDBCSequentialFileFactory(final String connectionUrl, final String className, final SQLProvider sqlProvider, - Executor executor) throws Exception { + Executor executor, + IOCriticalErrorListener criticalErrorListener) throws Exception { this.executor = executor; - dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider); + this.criticalErrorListener = criticalErrorListener; + try { + this.dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider); + } catch (SQLException e) { + criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null); + } + } public JDBCSequentialFileFactory(final Connection connection, final SQLProvider sqlProvider, - final Executor executor) throws Exception { + final Executor executor, + final IOCriticalErrorListener criticalErrorListener) throws Exception { this.executor = executor; - this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider); + this.criticalErrorListener = criticalErrorListener; + + try { + this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider); + } catch (SQLException e) { + criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null); + } } public JDBCSequentialFileFactoryDriver getDbDriver() { @@ -74,8 +103,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public SequentialFileFactory setDatasync(boolean enabled) { - - // noop return this; } @@ -92,7 +119,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM started = true; } } catch (Exception e) { - ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database", e); + criticalErrorListener.onIOException(e, "Unable to start database driver", null); started = false; } } @@ -115,7 +142,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM files.add(file); return file; } catch (Exception e) { - ActiveMQJournalLogger.LOGGER.error("Could not create file", e); + criticalErrorListener.onIOException(e, "Error whilst creating JDBC file", null); } return null; } @@ -127,7 +154,12 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public List listFiles(String extension) throws Exception { - return dbDriver.listFiles(extension); + try { + return dbDriver.listFiles(extension); + } catch (SQLException e) { + criticalErrorListener.onIOException(e, "Error listing JDBC files.", null); + throw e; + } } @Override @@ -137,6 +169,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public void onIOError(Exception exception, String message, SequentialFile file) { + criticalErrorListener.onIOException(exception, message, file); } @Override @@ -215,9 +248,20 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public void flush() { + for (SequentialFile file : files) { + try { + file.sync(); + } catch (Exception e) { + criticalErrorListener.onIOException(e, "Error during JDBC file sync.", file); + } + } } public synchronized void destroy() throws SQLException { - dbDriver.destroy(); + try { + dbDriver.destroy(); + } catch (SQLException e) { + logger.error("Error destroying file factory", e); + } } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java index cf8d39d137..a901f6a3b0 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java @@ -29,10 +29,13 @@ import java.util.List; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.jboss.logging.Logger; @SuppressWarnings("SynchronizeOnNonFinalField") public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { + private static final Logger logger = Logger.getLogger(JDBCSequentialFileFactoryDriver.class); + protected PreparedStatement deleteFile; protected PreparedStatement createFile; @@ -157,6 +160,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { Blob blob = rs.getBlob(1); if (blob != null) { file.setWritePosition((int) blob.length()); + } else { + logger.warn("ERROR NO BLOB FOR FILE" + "File: " + file.getFileName() + " " + file.getId()); } } connection.commit(); @@ -293,8 +298,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { connection.commit(); return readLength; } catch (Throwable e) { - connection.rollback(); throw e; + } finally { + connection.rollback(); } } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index a548157471..64f20a8a29 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; @@ -82,26 +84,32 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { // Sequence ID for journal records private final AtomicLong seq = new AtomicLong(0); + private final IOCriticalErrorListener criticalIOErrorListener; + public JDBCJournalImpl(DataSource dataSource, SQLProvider provider, String tableName, ScheduledExecutorService scheduledExecutorService, - Executor completeExecutor) { + Executor completeExecutor, + IOCriticalErrorListener criticalIOErrorListener) { super(dataSource, provider); records = new ArrayList<>(); this.scheduledExecutorService = scheduledExecutorService; this.completeExecutor = completeExecutor; + this.criticalIOErrorListener = criticalIOErrorListener; } public JDBCJournalImpl(String jdbcUrl, String jdbcDriverClass, SQLProvider sqlProvider, ScheduledExecutorService scheduledExecutorService, - Executor completeExecutor) { + Executor completeExecutor, + IOCriticalErrorListener criticalIOErrorListener) { super(sqlProvider, jdbcUrl, jdbcDriverClass); records = new ArrayList<>(); this.scheduledExecutorService = scheduledExecutorService; this.completeExecutor = completeExecutor; + this.criticalIOErrorListener = criticalIOErrorListener; } @Override @@ -131,9 +139,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public synchronized void stop() throws SQLException { + public void stop() throws SQLException { + stop(true); + } + + public synchronized void stop(boolean sync) throws SQLException { if (started) { - sync(); + if (sync) + sync(); started = false; super.stop(); } @@ -164,8 +177,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { TransactionHolder holder; - boolean success = false; try { + connection.setAutoCommit(false); + for (JDBCJournalRecord record : recordRef) { switch (record.getRecordType()) { case JDBCJournalRecord.DELETE_RECORD: @@ -195,36 +209,25 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { break; } } - } catch (SQLException e) { - logger.warn(e.getMessage(), e); - executeCallbacks(recordRef, success); - return 0; - } - - try { - connection.setAutoCommit(false); insertJournalRecords.executeBatch(); deleteJournalRecords.executeBatch(); deleteJournalTxRecords.executeBatch(); connection.commit(); - success = true; - } catch (SQLException e) { - logger.warn(e.getMessage(), e); + + cleanupTxRecords(deletedRecords, committedTransactions); + executeCallbacks(recordRef, true); + + return recordRef.size(); + + } catch (Exception e) { + criticalIOErrorListener.onIOException(e, "Critical IO Error. Failed to process JDBC Record statements", null); + started = false; + executeCallbacks(recordRef, false); performRollback(recordRef); + return 0; } - - try { - if (success) - cleanupTxRecords(deletedRecords, committedTransactions); - } catch (SQLException e) { - logger.warn("Failed to remove the Tx Records", e.getMessage(), e); - } finally { - executeCallbacks(recordRef, success); - } - - return recordRef.size(); } /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, @@ -260,8 +263,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { private void performRollback(List records) { try { - connection.rollback(); - for (JDBCJournalRecord record : records) { if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { removeTxRecord(record); @@ -277,13 +278,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { transactions.remove(txH.transactionID); } } + connection.rollback(); } catch (Exception sqlE) { - logger.warn(sqlE.getMessage(), sqlE); + logger.error(sqlE.getMessage(), sqlE); + criticalIOErrorListener.onIOException(sqlE, sqlE.getMessage(), null); ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE); } } - // TODO Use an executor. private void executeCallbacks(final List records, final boolean result) { Runnable r = new Runnable() { @Override @@ -297,7 +299,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } private void appendRecord(JDBCJournalRecord record) throws Exception { + record.storeLineUp(); + if (!started) { + if (record.getIoCompletion() != null) { + record.getIoCompletion().onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC Journal not started"); + } + } SimpleWaitIOCallback callback = null; if (record.isSync() && record.getIoCompletion() == null) { @@ -316,10 +324,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } syncTimer.delay(); - - if (callback != null) { - callback.waitCompletion(); - } + if (callback != null) callback.waitCompletion(); } private synchronized void addTxRecord(JDBCJournalRecord record) { diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java index 9691d3ea49..8888e0208e 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java @@ -26,6 +26,7 @@ import java.sql.SQLException; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.RecordInfo; @@ -115,7 +116,7 @@ class JDBCJournalRecord { if (success) { ioCompletion.done(); } else { - ioCompletion.onError(1, "DATABASE TRANSACTION FAILED"); + ioCompletion.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC Transaction failed."); } } } @@ -126,7 +127,7 @@ class JDBCJournalRecord { } } - void writeRecord(PreparedStatement statement) throws SQLException { + void writeRecord(PreparedStatement statement) throws Exception { byte[] recordBytes = new byte[variableSize]; byte[] txDataBytes = new byte[txDataSize]; @@ -136,6 +137,7 @@ class JDBCJournalRecord { txData.read(txDataBytes); } catch (IOException e) { ActiveMQJournalLogger.LOGGER.error("Error occurred whilst reading Journal Record", e); + throw e; } statement.setLong(1, id); diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java index b7d0c9d02f..b04b74ff0f 100644 --- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java +++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; @@ -64,7 +65,11 @@ public class JDBCSequentialFileFactoryTest { String connectionUrl = "jdbc:derby:target/data;create=true"; String tableName = "FILES"; - factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor); + factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() { + @Override + public void onIOException(Throwable code, String message, SequentialFile file) { + } + }); factory.start(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java index 4591c8ba81..b2a3eb5a9b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java @@ -81,6 +81,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { private boolean started = false; + private final IOCriticalErrorListener criticalErrorListener; + public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf, final StorageManager storageManager, final long syncTimeout, @@ -94,6 +96,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { this.scheduledExecutor = scheduledExecutor; this.syncTimeout = syncTimeout; this.dbConf = dbConf; + this.criticalErrorListener = critialErrorListener; start(); } @@ -109,9 +112,11 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { if (sqlProviderFactory == null) { sqlProviderFactory = new GenericSQLProvider.Factory(); } + pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor()); } else { String driverClassName = dbConf.getJdbcDriverClassName(); + pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor()); } pagingFactoryFileFactory.start(); @@ -222,7 +227,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE); } - return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor()); + return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener); } private String getTableNameForGUID(String guid) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index 8634638794..5592c9e2a5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -62,14 +62,14 @@ public class JDBCJournalStorageManager extends JournalStorageManager { if (sqlProviderFactory == null) { sqlProviderFactory = new GenericSQLProvider.Factory(); } - bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor()); - messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor()); - largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor); + bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); + messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); + largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener); } else { String driverClassName = dbConf.getJdbcDriverClassName(); - bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor()); - messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor()); - largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor); + bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); + messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); + largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener); } largeMessagesFactory.start(); } catch (Exception e) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java index ebb5c0eb9f..8f1d25a99c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java @@ -26,6 +26,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; @@ -77,7 +79,12 @@ public class JDBCJournalTest extends ActiveMQTestBase { executorService = Executors.newSingleThreadExecutor(); jdbcUrl = "jdbc:derby:target/data;create=true"; SQLProvider.Factory factory = new DerbySQLProvider.Factory(); - journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService); + journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService, new IOCriticalErrorListener() { + @Override + public void onIOException(Throwable code, String message, SequentialFile file) { + + } + }); journal.start(); } From d8ff3f500f90bcb342e9093bbe16077a709fcf4d Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 18 Apr 2017 14:36:25 -0400 Subject: [PATCH 2/2] ARTEMIS-1115 Traces and tests on JDBC Persistence --- .../jdbc/store/journal/JDBCJournalImpl.java | 318 +++++++++++++++--- .../jdbc/store/journal/JDBCJournalRecord.java | 19 ++ .../impl/PagingStoreFactoryDatabase.java | 6 +- .../core/ServerSessionPacketHandler.java | 6 +- .../store/journal/ShutdownServerTest.java | 84 +++++ .../tests/integration/xa/BasicXaTest.java | 2 +- 6 files changed, 378 insertions(+), 57 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/ShutdownServerTest.java diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 64f20a8a29..db8789100f 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -46,7 +47,6 @@ import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.jboss.logging.Logger; public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { @@ -72,6 +72,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { private boolean started; + private AtomicBoolean failed = new AtomicBoolean(false); + private JDBCJournalSync syncTimer; private final Executor completeExecutor; @@ -159,8 +161,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } public synchronized int sync() { - if (!started) - return 0; List recordRef; synchronized (records) { @@ -171,6 +171,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { records.clear(); } + if (!started || failed.get()) { + executeCallbacks(recordRef, false); + return 0; + } + + // We keep a list of deleted records and committed tx (used for cleaning up old transaction data). List deletedRecords = new ArrayList<>(); List committedTransactions = new ArrayList<>(); @@ -181,6 +187,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { connection.setAutoCommit(false); for (JDBCJournalRecord record : recordRef) { + + if (logger.isTraceEnabled()) { + logger.trace("sync::preparing JDBC statment for " + record); + } + + + switch (record.getRecordType()) { case JDBCJournalRecord.DELETE_RECORD: // Standard SQL Delete Record, Non transactional delete @@ -215,6 +228,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { deleteJournalTxRecords.executeBatch(); connection.commit(); + if (logger.isTraceEnabled()) { + logger.trace("JDBC commit worked"); + } cleanupTxRecords(deletedRecords, committedTransactions); executeCallbacks(recordRef, true); @@ -222,14 +238,38 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { return recordRef.size(); } catch (Exception e) { - criticalIOErrorListener.onIOException(e, "Critical IO Error. Failed to process JDBC Record statements", null); - started = false; - executeCallbacks(recordRef, false); - performRollback(recordRef); + handleException(recordRef, e); return 0; } } + /** public for tests only, not through API */ + public void handleException(List recordRef, Throwable e) { + logger.warn(e.getMessage(), e); + failed.set(true); + criticalIOErrorListener.onIOException(e, "Critical IO Error. Failed to process JDBC Record statements", null); + + if (logger.isTraceEnabled()) { + logger.trace("Rolling back Transaction, just in case"); + } + + try { + connection.rollback(); + } catch (Throwable rollback) { + logger.warn(rollback); + } + + try { + connection.close(); + } catch (Throwable rollback) { + logger.warn(rollback); + } + + if (recordRef != null) { + executeCallbacks(recordRef, false); + } + } + /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, we remove the Tx Records (i.e. PREPARE, COMMIT). */ private synchronized void cleanupTxRecords(List deletedRecords, List committedTx) throws SQLException { @@ -261,45 +301,49 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } } - private void performRollback(List records) { - try { - for (JDBCJournalRecord record : records) { - if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { - removeTxRecord(record); - } - } - - List txHolders = new ArrayList<>(); - txHolders.addAll(transactions.values()); - - // On rollback we must update the tx map to remove all the tx entries - for (TransactionHolder txH : txHolders) { - if (!txH.prepared && txH.recordInfos.isEmpty() && txH.recordsToDelete.isEmpty()) { - transactions.remove(txH.transactionID); - } - } - connection.rollback(); - } catch (Exception sqlE) { - logger.error(sqlE.getMessage(), sqlE); - criticalIOErrorListener.onIOException(sqlE, sqlE.getMessage(), null); - ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE); - } - } - - private void executeCallbacks(final List records, final boolean result) { + private void executeCallbacks(final List records, final boolean success) { Runnable r = new Runnable() { @Override public void run() { for (JDBCJournalRecord record : records) { - record.complete(result); + if (logger.isTraceEnabled()) { + logger.trace("Calling callback " + record + " with success = " + success); + } + record.complete(success); } } }; completeExecutor.execute(r); } + + private void checkStatus() { + checkStatus(null); + } + + private void checkStatus(IOCompletion callback) { + if (!started) { + if (callback != null) callback.onError(-1, "JDBC Journal is not loaded"); + throw new IllegalStateException("JDBCJournal is not loaded"); + } + + if (failed.get()) { + if (callback != null) callback.onError(-1, "JDBC Journal failed"); + throw new IllegalStateException("JDBCJournal Failed"); + } + } + + private void appendRecord(JDBCJournalRecord record) throws Exception { + // extra measure I know, as all the callers are also checking for this.. + // better to be safe ;) + checkStatus(); + + if (logger.isTraceEnabled()) { + logger.trace("appendRecord " + record); + } + record.storeLineUp(); if (!started) { if (record.getIoCompletion() != null) { @@ -328,6 +372,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } private synchronized void addTxRecord(JDBCJournalRecord record) { + + if (logger.isTraceEnabled()) { + logger.trace("addTxRecord " + record + ", started=" + started + ", failed=" + failed); + } + + checkStatus(); + TransactionHolder txHolder = transactions.get(record.getTxId()); if (txHolder == null) { txHolder = new TransactionHolder(record.getTxId()); @@ -347,37 +398,39 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } } - private synchronized void removeTxRecord(JDBCJournalRecord record) { - TransactionHolder txHolder = transactions.get(record.getTxId()); - - // We actually only need the record ID in this instance. - if (record.isTransactional()) { - RecordInfo info = new RecordInfo(record.getTxId(), record.getRecordType(), new byte[0], record.isUpdate(), record.getCompactCount()); - if (record.getRecordType() == JDBCJournalRecord.DELETE_RECORD_TX) { - txHolder.recordsToDelete.remove(info); - } else { - txHolder.recordInfos.remove(info); - } - } else { - txHolder.prepared = false; - } - } - @Override public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); + + + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecord bytes[] " + r); + } + + + appendRecord(r); } @Override public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecord (encoding) " + r + " with record = " + record); + } + + appendRecord(r); } @@ -387,29 +440,53 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { EncodingSupport record, boolean sync, IOCompletion completionCallback) throws Exception { + checkStatus(completionCallback); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); r.setIoCompletion(completionCallback); + + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecord (completionCallback & encoding) " + r + " with record = " + record); + } + + appendRecord(r); } @Override public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendUpdateRecord (bytes)) " + r); + } + + appendRecord(r); } @Override public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendUpdateRecord (encoding)) " + r + " with record " + record); + } + + appendRecord(r); } @@ -419,36 +496,67 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { EncodingSupport record, boolean sync, IOCompletion completionCallback) throws Exception { + checkStatus(completionCallback); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setSync(sync); r.setIoCompletion(completionCallback); + + if (logger.isTraceEnabled()) { + logger.trace("appendUpdateRecord (encoding & completioncallback)) " + r + " with record " + record); + } + + appendRecord(r); } @Override public void appendDeleteRecord(long id, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet()); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecord id=" + id + " sync=" + sync); + } + appendRecord(r); } @Override public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception { + checkStatus(completionCallback); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet()); r.setSync(sync); r.setIoCompletion(completionCallback); + + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecord id=" + id + " sync=" + sync + " with completionCallback"); + } + + appendRecord(r); } @Override public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); appendRecord(r); + + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecordTransactional txID=" + txID + " id=" + id + " using bytes[] r=" + r); + } + + } @Override @@ -456,19 +564,35 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { long id, byte recordType, EncodingSupport record) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); + + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecordTransactional txID=" + txID + " id=" + id + " using encoding=" + record + " and r=" + r); + } + + appendRecord(r); } @Override public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); + + if (logger.isTraceEnabled()) { + logger.trace("appendUpdateRecordTransactional txID=" + txID + " id=" + id + " using bytes and r=" + r); + } + + appendRecord(r); } @@ -477,50 +601,94 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { long id, byte recordType, EncodingSupport record) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); r.setRecord(record); r.setTxId(txID); + + if (logger.isTraceEnabled()) { + logger.trace("appendUpdateRecordTransactional txID=" + txID + " id=" + id + " using encoding=" + record + " and r=" + r); + } + appendRecord(r); } @Override public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); r.setRecord(record); r.setTxId(txID); + + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" + id + " using bytes and r=" + r); + } + appendRecord(r); } @Override public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); r.setRecord(record); r.setTxId(txID); + + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" + id + " using encoding=" + record + " and r=" + r); + } + + appendRecord(r); } @Override public void appendDeleteRecordTransactional(long txID, long id) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); r.setTxId(txID); + + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecordTransactional txID=" + txID + " id=" + id); + } + appendRecord(r); } @Override public void appendCommitRecord(long txID, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendCommitRecord txID=" + txID + " sync=" + sync); + } + + appendRecord(r); } @Override public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setSync(sync); r.setIoCompletion(callback); + + if (logger.isTraceEnabled()) { + logger.trace("appendCommitRecord txID=" + txID + " callback=" + callback); + } + appendRecord(r); } @@ -529,20 +697,35 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setStoreLineUp(lineUpContext); r.setIoCompletion(callback); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendCommitRecord txID=" + txID + " using callback, lineup=" + lineUpContext); + } + appendRecord(r); } @Override public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setTxData(transactionData); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendPrepareRecord txID=" + txID + " using sync=" + sync); + } + + appendRecord(r); } @@ -551,43 +734,74 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { EncodingSupport transactionData, boolean sync, IOCompletion callback) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setTxData(transactionData); r.setTxData(transactionData); r.setSync(sync); r.setIoCompletion(callback); + + if (logger.isTraceEnabled()) { + logger.trace("appendPrepareRecord txID=" + txID + " using callback, sync=" + sync); + } + + appendRecord(r); } @Override public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setTxData(transactionData); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendPrepareRecord txID=" + txID + " transactionData, sync=" + sync); + } + + appendRecord(r); } @Override public void appendRollbackRecord(long txID, boolean sync) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setSync(sync); + + if (logger.isTraceEnabled()) { + logger.trace("appendRollbackRecord txID=" + txID + " sync=" + sync); + } + appendRecord(r); } @Override public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception { + checkStatus(); + JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet()); r.setTxId(txID); r.setSync(sync); r.setIoCompletion(callback); + + if (logger.isTraceEnabled()) { + logger.trace("appendRollbackRecord txID=" + txID + " sync=" + sync + " using callback"); + } + + appendRecord(r); } @Override - public synchronized JournalLoadInformation load(LoaderCallback reloadManager) throws Exception { + public synchronized JournalLoadInformation load(LoaderCallback reloadManager) { JournalLoadInformation jli = new JournalLoadInformation(); JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager); JDBCJournalRecord r; @@ -637,6 +851,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId()); jli.setNumberOfRecords(noRecords); transactions = jrc.getTransactions(); + } catch (Throwable e) { + handleException(null, e); } return jli; } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java index 8888e0208e..7f40b06a1e 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java @@ -305,4 +305,23 @@ class JDBCJournalRecord { long getSeq() { return seq; } + + @Override + public String toString() { + return "JDBCJournalRecord{" + + "compactCount=" + compactCount + + ", id=" + id + + ", isTransactional=" + isTransactional + + ", isUpdate=" + isUpdate + + ", recordType=" + recordType + + ", seq=" + seq + + ", storeLineUp=" + storeLineUp + + ", sync=" + sync + + ", txCheckNoRecords=" + txCheckNoRecords + + ", txDataSize=" + txDataSize + + ", txId=" + txId + + ", userRecordType=" + userRecordType + + ", variableSize=" + variableSize + + '}'; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java index b2a3eb5a9b..afe97089e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java @@ -112,12 +112,10 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { if (sqlProviderFactory == null) { sqlProviderFactory = new GenericSQLProvider.Factory(); } - pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); - pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor()); + pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); } else { String driverClassName = dbConf.getJdbcDriverClassName(); - pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); - pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor()); + pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); } pagingFactoryFileFactory.start(); started = true; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index b52534cb37..731d6ca357 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -541,13 +541,17 @@ public class ServerSessionPacketHandler implements ChannelHandler { doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel); if (logger.isTraceEnabled()) { - logger.trace("ServerSessionPacketHandler::response sent::" + response); + logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionMessage); } } @Override public void done() { + if (logger.isTraceEnabled()) { + logger.trace("ServerSessionPacketHandler::regular response sent::" + response); + } + doConfirmAndResponse(confirmPacket, response, flush, closeChannel); } }); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/ShutdownServerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/ShutdownServerTest.java new file mode 100644 index 0000000000..cf2815c677 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/ShutdownServerTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jdbc.store.journal; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ShutdownServerTest extends ActiveMQTestBase { + + private ActiveMQServer server; + + private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue"); + + private ServerLocator locator; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + server = createServer(true, createDefaultJDBCConfig(false), AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES); + server.start(); + + locator = createFactory(false); + } + + @Test + public void testShutdownServer() throws Throwable { + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true, false); + + session.createQueue(QUEUE, QUEUE, null, true); + + ClientConsumer consumer = session.createConsumer(QUEUE); + + ClientProducer producer = session.createProducer(QUEUE); + ClientMessage message = session.createMessage(Message.TEXT_TYPE, true, 0, System.currentTimeMillis(), (byte) 4); + message.getBodyBuffer().writeString("hi"); + message.putStringProperty("hello", "elo"); + producer.send(message); + + ActiveMQServerImpl impl = (ActiveMQServerImpl) server; + JournalStorageManager journal = (JournalStorageManager) impl.getStorageManager(); + JDBCJournalImpl journalimpl = (JDBCJournalImpl) journal.getMessageJournal(); + journalimpl.handleException(null, new Exception("failure")); + + Wait.waitFor(() -> !server.isStarted()); + + Assert.assertFalse(server.isStarted()); + + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java index 5ee71b138a..ec184a0648 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java @@ -94,7 +94,7 @@ public class BasicXaTest extends ActiveMQTestBase { configuration = createDefaultNettyConfig(); } - messagingService = createServer(false, configuration, -1, -1, addressSettings); + messagingService = createServer(true, configuration, -1, -1, addressSettings); // start the server messagingService.start();