diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 08068b5112..8fce7ead40 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -432,6 +432,9 @@ public final class ActiveMQDefaultConfiguration { // Default large messages table name, used with Database storage type private static final String DEFAULT_LARGE_MESSAGES_TABLE_NAME = "LARGE_MESSAGES"; + // Default large messages table name, used with Database storage type + private static final String DEFAULT_PAGE_STORE_TABLE_NAME = "PAGE_STORE"; + // Default period to wait between connection TTL checks public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000; @@ -1190,6 +1193,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_LARGE_MESSAGES_TABLE_NAME; } + public static String getDefaultPageStoreTableName() { + return DEFAULT_PAGE_STORE_TABLE_NAME; + } + public static long getDefaultConnectionTtlCheckInterval() { return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL; } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java index d75ea21207..18289117c8 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java @@ -36,6 +36,7 @@ import org.jboss.logging.Logger; /** * Class to hold common database functionality such as drivers and connections */ +@SuppressWarnings("SynchronizeOnNonFinalField") public abstract class AbstractJDBCDriver { private static final Logger logger = Logger.getLogger(AbstractJDBCDriver.class); @@ -66,17 +67,26 @@ public abstract class AbstractJDBCDriver { public void start() throws SQLException { connect(); - createSchema(); - prepareStatements(); + synchronized (connection) { + createSchema(); + prepareStatements(); + } + } + + public AbstractJDBCDriver(Connection connection, SQLProvider sqlProvider) { + this.connection = connection; + this.sqlProvider = sqlProvider; } public void stop() throws SQLException { - if (sqlProvider.closeConnectionOnShutdown()) { - try { - connection.close(); - } catch (SQLException e) { - logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); - throw e; + synchronized (connection) { + if (sqlProvider.closeConnectionOnShutdown()) { + try { + connection.close(); + } catch (SQLException e) { + logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); + throw e; + } } } } @@ -90,30 +100,32 @@ public abstract class AbstractJDBCDriver { } private void connect() throws SQLException { - if (dataSource != null) { - try { - connection = dataSource.getConnection(); - } catch (SQLException e) { - logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); - throw e; - } - } else { - try { - if (jdbcDriverClass == null || jdbcDriverClass.isEmpty()) { - throw new IllegalStateException("jdbcDriverClass is null or empty!"); + if (connection == null) { + if (dataSource != null) { + try { + connection = dataSource.getConnection(); + } catch (SQLException e) { + logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); + throw e; } - if (jdbcConnectionUrl == null || jdbcConnectionUrl.isEmpty()) { - throw new IllegalStateException("jdbcConnectionUrl is null or empty!"); + } else { + try { + if (jdbcDriverClass == null || jdbcDriverClass.isEmpty()) { + throw new IllegalStateException("jdbcDriverClass is null or empty!"); + } + if (jdbcConnectionUrl == null || jdbcConnectionUrl.isEmpty()) { + throw new IllegalStateException("jdbcConnectionUrl is null or empty!"); + } + final Driver dbDriver = getDriver(jdbcDriverClass); + connection = dbDriver.connect(jdbcConnectionUrl, new Properties()); + if (connection == null) { + throw new IllegalStateException("the driver: " + jdbcDriverClass + " isn't able to connect to the requested url: " + jdbcConnectionUrl); + } + } catch (SQLException e) { + logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); + ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl); + throw e; } - final Driver dbDriver = getDriver(jdbcDriverClass); - connection = dbDriver.connect(jdbcConnectionUrl, new Properties()); - if (connection == null) { - throw new IllegalStateException("the driver: " + jdbcDriverClass + " isn't able to connect to the requested url: " + jdbcConnectionUrl); - } - } catch (SQLException e) { - logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); - ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl); - throw e; } } } @@ -206,8 +218,10 @@ public abstract class AbstractJDBCDriver { return connection; } - public void setConnection(Connection connection) { - this.connection = connection; + public final void setConnection(Connection connection) { + if (connection == null) { + this.connection = connection; + } } public void setSqlProvider(SQLProvider sqlProvider) { @@ -225,4 +239,5 @@ public abstract class AbstractJDBCDriver { public void setDataSource(DataSource dataSource) { this.dataSource = dataSource; } + } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java index 121c6f721e..281ea88a3c 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java @@ -29,7 +29,7 @@ public class DerbySQLProvider extends GenericSQLProvider { private final String appendToFileSQL; private DerbySQLProvider(String tableName) { - super(tableName); + super(tableName.toUpperCase()); createFileTableSQL = "CREATE TABLE " + tableName + "(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java index 02b1128d26..58494b0b64 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.jdbc.store.file; import javax.sql.DataSource; +import java.sql.Connection; import java.sql.SQLException; import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider; @@ -45,4 +46,15 @@ class JDBCFileUtils { } return dbDriver; } + + static JDBCSequentialFileFactoryDriver getDBFileDriver(Connection connection, SQLProvider provider) throws SQLException { + JDBCSequentialFileFactoryDriver dbDriver; + if (provider instanceof PostgresSQLProvider) { + dbDriver = new PostgresSequentialSequentialFileDriver(); + dbDriver.setConnection(connection); + } else { + dbDriver = new JDBCSequentialFileFactoryDriver(connection, provider); + } + return dbDriver; + } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java index 34b6a4feab..3f078c2a66 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java @@ -88,7 +88,12 @@ public class JDBCSequentialFile implements SequentialFile { @Override public boolean exists() { - return isCreated; + if (isCreated) return true; + try { + return fileFactory.listFiles(extension).contains(filename); + } catch (Exception e) { + return false; + } } @Override diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java index 008e00052a..4b92c7100e 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.file; import javax.sql.DataSource; import java.io.File; import java.nio.ByteBuffer; +import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; @@ -60,6 +61,17 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider); } + public JDBCSequentialFileFactory(final Connection connection, + final SQLProvider sqlProvider, + final Executor executor) throws Exception { + this.executor = executor; + this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider); + } + + public JDBCSequentialFileFactoryDriver getDbDriver() { + return dbDriver; + } + @Override public SequentialFileFactory setDatasync(boolean enabled) { diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java index 7b9eaf1268..f9f206ac62 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.file; import javax.sql.DataSource; import java.nio.ByteBuffer; import java.sql.Blob; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -29,6 +30,7 @@ import java.util.List; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +@SuppressWarnings("SynchronizeOnNonFinalField") public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { protected PreparedStatement deleteFile; @@ -55,6 +57,10 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { super(dataSource, provider); } + JDBCSequentialFileFactoryDriver(Connection connection, SQLProvider sqlProvider) { + super(connection, sqlProvider); + } + @Override protected void createSchema() throws SQLException { createTable(sqlProvider.getCreateFileTableSQL()); @@ -72,22 +78,24 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL()); } - public synchronized List listFiles(String extension) throws Exception { - List fileNames = new ArrayList<>(); - try { - connection.setAutoCommit(false); - selectFileNamesByExtension.setString(1, extension); - try (ResultSet rs = selectFileNamesByExtension.executeQuery()) { - while (rs.next()) { - fileNames.add(rs.getString(1)); + public List listFiles(String extension) throws Exception { + synchronized (connection) { + List fileNames = new ArrayList<>(); + try { + connection.setAutoCommit(false); + selectFileNamesByExtension.setString(1, extension); + try (ResultSet rs = selectFileNamesByExtension.executeQuery()) { + while (rs.next()) { + fileNames.add(rs.getString(1)); + } } + connection.commit(); + } catch (SQLException e) { + connection.rollback(); + throw e; } - connection.commit(); - } catch (SQLException e) { - connection.rollback(); - throw e; + return fileNames; } - return fileNames; } /** @@ -113,16 +121,23 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @return * @throws SQLException */ - public synchronized int fileExists(JDBCSequentialFile file) throws SQLException { - connection.setAutoCommit(false); - selectFileByFileName.setString(1, file.getFileName()); - try (ResultSet rs = selectFileByFileName.executeQuery()) { - int id = rs.next() ? rs.getInt(1) : -1; - connection.commit(); - return id; - } catch (Exception e) { - connection.rollback(); - throw e; + public int fileExists(JDBCSequentialFile file) throws SQLException { + try { + synchronized (connection) { + connection.setAutoCommit(false); + selectFileByFileName.setString(1, file.getFileName()); + try (ResultSet rs = selectFileByFileName.executeQuery()) { + int id = rs.next() ? rs.getInt(1) : -1; + connection.commit(); + return id; + } catch (Exception e) { + connection.rollback(); + throw e; + } + } + } catch (NullPointerException npe) { + npe.printStackTrace(); + throw npe; } } @@ -132,18 +147,20 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @param file * @throws SQLException */ - public synchronized void loadFile(JDBCSequentialFile file) throws SQLException { - connection.setAutoCommit(false); - readLargeObject.setInt(1, file.getId()); + public void loadFile(JDBCSequentialFile file) throws SQLException { + synchronized (connection) { + connection.setAutoCommit(false); + readLargeObject.setInt(1, file.getId()); - try (ResultSet rs = readLargeObject.executeQuery()) { - if (rs.next()) { - file.setWritePosition((int) rs.getBlob(1).length()); + try (ResultSet rs = readLargeObject.executeQuery()) { + if (rs.next()) { + file.setWritePosition((int) rs.getBlob(1).length()); + } + connection.commit(); + } catch (SQLException e) { + connection.rollback(); + throw e; } - connection.commit(); - } catch (SQLException e) { - connection.rollback(); - throw e; } } @@ -153,21 +170,23 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @param file * @throws SQLException */ - public synchronized void createFile(JDBCSequentialFile file) throws SQLException { - try { - connection.setAutoCommit(false); - createFile.setString(1, file.getFileName()); - createFile.setString(2, file.getExtension()); - createFile.setBytes(3, new byte[0]); - createFile.executeUpdate(); - try (ResultSet keys = createFile.getGeneratedKeys()) { - keys.next(); - file.setId(keys.getInt(1)); + public void createFile(JDBCSequentialFile file) throws SQLException { + synchronized (connection) { + try { + connection.setAutoCommit(false); + createFile.setString(1, file.getFileName()); + createFile.setString(2, file.getExtension()); + createFile.setBytes(3, new byte[0]); + createFile.executeUpdate(); + try (ResultSet keys = createFile.getGeneratedKeys()) { + keys.next(); + file.setId(keys.getInt(1)); + } + connection.commit(); + } catch (SQLException e) { + connection.rollback(); + throw e; } - connection.commit(); - } catch (SQLException e) { - connection.rollback(); - throw e; } } @@ -178,16 +197,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @param newFileName * @throws SQLException */ - public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException { - try { - connection.setAutoCommit(false); - renameFile.setString(1, newFileName); - renameFile.setInt(2, file.getId()); - renameFile.executeUpdate(); - connection.commit(); - } catch (SQLException e) { - connection.rollback(); - throw e; + public void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException { + synchronized (connection) { + try { + connection.setAutoCommit(false); + renameFile.setString(1, newFileName); + renameFile.setInt(2, file.getId()); + renameFile.executeUpdate(); + connection.commit(); + } catch (SQLException e) { + connection.rollback(); + throw e; + } } } @@ -197,15 +218,17 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @param file * @throws SQLException */ - public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException { - try { - connection.setAutoCommit(false); - deleteFile.setInt(1, file.getId()); - deleteFile.executeUpdate(); - connection.commit(); - } catch (SQLException e) { - connection.rollback(); - throw e; + public void deleteFile(JDBCSequentialFile file) throws SQLException { + synchronized (connection) { + try { + connection.setAutoCommit(false); + deleteFile.setInt(1, file.getId()); + deleteFile.executeUpdate(); + connection.commit(); + } catch (SQLException e) { + connection.rollback(); + throw e; + } } } @@ -217,17 +240,19 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @return * @throws SQLException */ - public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException { - try { - connection.setAutoCommit(false); - appendToLargeObject.setBytes(1, data); - appendToLargeObject.setInt(2, file.getId()); - appendToLargeObject.executeUpdate(); - connection.commit(); - return data.length; - } catch (SQLException e) { - connection.rollback(); - throw e; + public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException { + synchronized (connection) { + try { + connection.setAutoCommit(false); + appendToLargeObject.setBytes(1, data); + appendToLargeObject.setInt(2, file.getId()); + appendToLargeObject.executeUpdate(); + connection.commit(); + return data.length; + } catch (SQLException e) { + connection.rollback(); + throw e; + } } } @@ -239,22 +264,24 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @return * @throws SQLException */ - public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException { - connection.setAutoCommit(false); - readLargeObject.setInt(1, file.getId()); - int readLength = 0; - try (ResultSet rs = readLargeObject.executeQuery()) { - if (rs.next()) { - Blob blob = rs.getBlob(1); - readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position()); - byte[] data = blob.getBytes(file.position() + 1, readLength); - bytes.put(data); + public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException { + synchronized (connection) { + connection.setAutoCommit(false); + readLargeObject.setInt(1, file.getId()); + int readLength = 0; + try (ResultSet rs = readLargeObject.executeQuery()) { + if (rs.next()) { + Blob blob = rs.getBlob(1); + readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position()); + byte[] data = blob.getBytes(file.position() + 1, readLength); + bytes.put(data); + } + connection.commit(); + return readLength; + } catch (Throwable e) { + connection.rollback(); + throw e; } - connection.commit(); - return readLength; - } catch (Throwable e) { - connection.rollback(); - throw e; } } @@ -265,16 +292,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @param fileTo * @throws SQLException */ - public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException { - try { - connection.setAutoCommit(false); - copyFileRecord.setInt(1, fileFrom.getId()); - copyFileRecord.setInt(2, fileTo.getId()); - copyFileRecord.executeUpdate(); - connection.commit(); - } catch (SQLException e) { - connection.rollback(); - throw e; + public void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException { + synchronized (connection) { + try { + connection.setAutoCommit(false); + copyFileRecord.setInt(1, fileFrom.getId()); + copyFileRecord.setInt(2, fileTo.getId()); + copyFileRecord.executeUpdate(); + connection.commit(); + } catch (SQLException e) { + connection.rollback(); + throw e; + } } } @@ -282,16 +311,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * Drop all tables and data */ @Override - public synchronized void destroy() throws SQLException { - try { - connection.setAutoCommit(false); - try (Statement statement = connection.createStatement()) { - statement.executeUpdate(sqlProvider.getDropFileTableSQL()); + public void destroy() throws SQLException { + synchronized (connection) { + try { + connection.setAutoCommit(false); + try (Statement statement = connection.createStatement()) { + statement.executeUpdate(sqlProvider.getDropFileTableSQL()); + } + connection.commit(); + } catch (SQLException e) { + connection.rollback(); + throw e; } - connection.commit(); - } catch (SQLException e) { - connection.rollback(); - throw e; } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java index c7411a6571..8c0f9759a0 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java @@ -24,6 +24,7 @@ import org.postgresql.PGConnection; import org.postgresql.largeobject.LargeObject; import org.postgresql.largeobject.LargeObjectManager; +@SuppressWarnings("SynchronizeOnNonFinalField") public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver { private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY"; @@ -33,100 +34,22 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa } @Override - public synchronized void createFile(JDBCSequentialFile file) throws SQLException { - try { - connection.setAutoCommit(false); + public void createFile(JDBCSequentialFile file) throws SQLException { + synchronized (connection) { + try { + connection.setAutoCommit(false); - LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI(); - long oid = lobjManager.createLO(); + LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI(); + long oid = lobjManager.createLO(); - createFile.setString(1, file.getFileName()); - createFile.setString(2, file.getExtension()); - createFile.setLong(3, oid); - createFile.executeUpdate(); + createFile.setString(1, file.getFileName()); + createFile.setString(2, file.getExtension()); + createFile.setLong(3, oid); + createFile.executeUpdate(); - try (ResultSet keys = createFile.getGeneratedKeys()) { - keys.next(); - file.setId(keys.getInt(1)); - } - connection.commit(); - } catch (SQLException e) { - connection.rollback(); - throw e; - } - } - - @Override - public synchronized void loadFile(JDBCSequentialFile file) throws SQLException { - connection.setAutoCommit(false); - readLargeObject.setInt(1, file.getId()); - - try (ResultSet rs = readLargeObject.executeQuery()) { - if (rs.next()) { - file.setWritePosition(getPostGresLargeObjectSize(file)); - } - connection.commit(); - } catch (SQLException e) { - connection.rollback(); - throw e; - } - } - - @Override - public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException { - LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI(); - LargeObject largeObject = null; - - Long oid = getOID(file); - try { - connection.setAutoCommit(false); - largeObject = lobjManager.open(oid, LargeObjectManager.WRITE); - largeObject.seek(largeObject.size()); - largeObject.write(data); - largeObject.close(); - connection.commit(); - } catch (Exception e) { - connection.rollback(); - throw e; - } - return data.length; - } - - @Override - public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException { - LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI(); - LargeObject largeObject = null; - long oid = getOID(file); - try { - connection.setAutoCommit(false); - largeObject = lobjManager.open(oid, LargeObjectManager.READ); - int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position()); - - if (readLength > 0) { - if (file.position() > 0) - largeObject.seek((int) file.position()); - byte[] data = largeObject.read(readLength); - bytes.put(data); - } - - largeObject.close(); - connection.commit(); - - return readLength; - } catch (SQLException e) { - connection.rollback(); - throw e; - } - } - - private synchronized Long getOID(JDBCSequentialFile file) throws SQLException { - Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY); - if (oid == null) { - connection.setAutoCommit(false); - readLargeObject.setInt(1, file.getId()); - try (ResultSet rs = readLargeObject.executeQuery()) { - if (rs.next()) { - file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1)); + try (ResultSet keys = createFile.getGeneratedKeys()) { + keys.next(); + file.setId(keys.getInt(1)); } connection.commit(); } catch (SQLException e) { @@ -134,27 +57,117 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa throw e; } } + } + + @Override + public void loadFile(JDBCSequentialFile file) throws SQLException { + synchronized (connection) { + connection.setAutoCommit(false); + readLargeObject.setInt(1, file.getId()); + + try (ResultSet rs = readLargeObject.executeQuery()) { + if (rs.next()) { + file.setWritePosition(getPostGresLargeObjectSize(file)); + } + connection.commit(); + } catch (SQLException e) { + connection.rollback(); + throw e; + } + } + } + + @Override + public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException { + synchronized (connection) { + LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI(); + LargeObject largeObject = null; + + Long oid = getOID(file); + try { + connection.setAutoCommit(false); + largeObject = lobjManager.open(oid, LargeObjectManager.WRITE); + largeObject.seek(largeObject.size()); + largeObject.write(data); + largeObject.close(); + connection.commit(); + } catch (Exception e) { + connection.rollback(); + throw e; + } + return data.length; + } + } + + @Override + public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException { + LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI(); + LargeObject largeObject = null; + long oid = getOID(file); + synchronized (connection) { + try { + connection.setAutoCommit(false); + largeObject = lobjManager.open(oid, LargeObjectManager.READ); + int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position()); + + if (readLength > 0) { + if (file.position() > 0) + largeObject.seek((int) file.position()); + byte[] data = largeObject.read(readLength); + bytes.put(data); + } + + largeObject.close(); + connection.commit(); + + return readLength; + } catch (SQLException e) { + connection.rollback(); + throw e; + } + } + } + + private Long getOID(JDBCSequentialFile file) throws SQLException { + Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY); + if (oid == null) { + synchronized (connection) { + connection.setAutoCommit(false); + readLargeObject.setInt(1, file.getId()); + try (ResultSet rs = readLargeObject.executeQuery()) { + if (rs.next()) { + file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1)); + } + connection.commit(); + } catch (SQLException e) { + connection.rollback(); + throw e; + } + } + } if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) { System.out.println("FD"); } return (Long) file.getMetaData(POSTGRES_OID_KEY); } - private synchronized int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException { + private int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException { LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI(); int size = 0; Long oid = getOID(file); if (oid != null) { - try { - connection.setAutoCommit(false); - LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ); - size = largeObject.size(); - largeObject.close(); - connection.commit(); - } catch (SQLException e) { - connection.rollback(); - throw e; + synchronized (connection) { + try { + connection.setAutoCommit(false); + LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ); + size = largeObject.size(); + largeObject.close(); + connection.commit(); + } catch (SQLException e) { + connection.rollback(); + throw e; + } } } return size; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java index 8b20770b64..eb8b435ad4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java @@ -30,6 +30,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { private String largeMessagesTableName = ActiveMQDefaultConfiguration.getDefaultLargeMessagesTableName(); + private String pageStoreTableName = ActiveMQDefaultConfiguration.getDefaultPageStoreTableName(); + private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl(); private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName(); @@ -67,6 +69,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { this.largeMessagesTableName = largeMessagesTableName; } + public String getPageStoreTableName() { + return pageStoreTableName; + } + + public void setPageStoreTableName(String pageStoreTableName) { + this.pageStoreTableName = pageStoreTableName; + } + public void setJdbcConnectionUrl(String jdbcConnectionUrl) { this.jdbcConnectionUrl = jdbcConnectionUrl; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index dff6f1b61e..2721214130 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1286,6 +1286,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { conf.setBindingsTableName(getString(storeNode, "bindings-table-name", conf.getBindingsTableName(), Validators.NO_CHECK)); conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK)); conf.setLargeMessageTableName(getString(storeNode, "large-message-table-name", conf.getJdbcConnectionUrl(), Validators.NO_CHECK)); + conf.setPageStoreTableName(getString(storeNode, "page-store-table-name", conf.getPageStoreTableName(), Validators.NO_CHECK)); conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK)); conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK)); return conf; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java new file mode 100644 index 0000000000..ee9d7bb557 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.paging.impl; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.PagingStoreFactory; +import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; +import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; +import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; +import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; +import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver; +import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.UUIDGenerator; + +/** + * Integration point between Paging and JDBC + */ +public class PagingStoreFactoryDatabase implements PagingStoreFactory { + + // Constants ----------------------------------------------------- + + private static final String ADDRESS_FILE = "address.txt"; + + private static final String DIRECTORY_NAME = "directory.txt"; + + // Attributes ---------------------------------------------------- + + protected final boolean syncNonTransactional; + + private PagingManager pagingManager; + + private final ScheduledExecutorService scheduledExecutor; + + private final long syncTimeout; + + protected final StorageManager storageManager; + + private JDBCSequentialFileFactoryDriver dbDriver; + + private DatabaseStorageConfiguration dbConf; + + private ExecutorFactory executorFactory; + + private JDBCSequentialFileFactory pagingFactoryFileFactory; + + private JDBCSequentialFile directoryList; + + public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf, + final StorageManager storageManager, + final long syncTimeout, + final ScheduledExecutorService scheduledExecutor, + final ExecutorFactory executorFactory, + final boolean syncNonTransactional, + final IOCriticalErrorListener critialErrorListener) throws Exception { + this.storageManager = storageManager; + this.executorFactory = executorFactory; + this.syncNonTransactional = syncNonTransactional; + this.scheduledExecutor = scheduledExecutor; + this.syncTimeout = syncTimeout; + this.dbConf = dbConf; + + if (dbConf.getDataSource() != null) { + SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory(); + if (sqlProviderFactory == null) { + sqlProviderFactory = new GenericSQLProvider.Factory(); + } + pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName()), executorFactory.getExecutor()); + } else { + String driverClassName = dbConf.getJdbcDriverClassName(); + pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName()), executorFactory.getExecutor()); + } + pagingFactoryFileFactory.start(); + directoryList = (JDBCSequentialFile) pagingFactoryFileFactory.createSequentialFile(DIRECTORY_NAME); + directoryList.open(); + } + + // Public -------------------------------------------------------- + + @Override + public void stop() { + pagingFactoryFileFactory.stop(); + } + + @Override + public void injectMonitor(FileStoreMonitor monitor) throws Exception { + } + + @Override + public PageCursorProvider newCursorProvider(PagingStore store, + StorageManager storageManager, + AddressSettings addressSettings, + Executor executor) { + return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize()); + } + + @Override + public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) { + + return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional); + } + + @Override + public synchronized SequentialFileFactory newFileFactory(final SimpleString address) throws Exception { + String guid = UUIDGenerator.getInstance().generateStringUUID(); + SequentialFileFactory factory = newFileFactory(guid, true); + factory.start(); + + SequentialFile file = factory.createSequentialFile(PagingStoreFactoryDatabase.ADDRESS_FILE); + file.open(); + + ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(SimpleString.sizeofNullableString(address)); + buffer.writeSimpleString(address); + file.write(buffer, true); + return factory; + } + + @Override + public void setPagingManager(final PagingManager pagingManager) { + this.pagingManager = pagingManager; + } + + @Override + public synchronized List reloadStores(final HierarchicalRepository addressSettingsRepository) throws Exception { + // We assume the directory list < Integer.MAX_VALUE (this is only a list of addresses). + int size = ((Long) directoryList.size()).intValue(); + ActiveMQBuffer buffer = readActiveMQBuffer(directoryList, size); + + ArrayList storesReturn = new ArrayList<>(); + + while (buffer.readableBytes() > 0) { + SimpleString guid = buffer.readSimpleString(); + + JDBCSequentialFileFactory factory = (JDBCSequentialFileFactory) newFileFactory(guid.toString(), false); + factory.start(); + + JDBCSequentialFile addressFile = (JDBCSequentialFile) factory.createSequentialFile(ADDRESS_FILE); + addressFile.open(); + + size = ((Long) addressFile.size()).intValue(); + if (size == 0) { + continue; + } + + ActiveMQBuffer addrBuffer = readActiveMQBuffer(addressFile, size); + SimpleString address = addrBuffer.readSimpleString(); + + AddressSettings settings = addressSettingsRepository.getMatch(address.toString()); + + PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), syncNonTransactional); + + storesReturn.add(store); + } + return storesReturn; + } + + private synchronized SequentialFileFactory newFileFactory(final String directoryName, boolean writeToDirectory) throws Exception { + SimpleString simpleString = SimpleString.toSimpleString(directoryName); + ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(simpleString.sizeof()); + buffer.writeSimpleString(simpleString); + if (writeToDirectory) directoryList.write(buffer, true); + return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName)), executorFactory.getExecutor()); + } + + private String getTableNameForGUID(String guid) { + return dbConf.getPageStoreTableName() + guid.replace("-", ""); + } + + private ActiveMQBuffer readActiveMQBuffer(SequentialFile file, int size) throws Exception { + ByteBuffer byteBuffer = ByteBuffer.allocate(size); + byteBuffer.mark(); + file.read(byteBuffer); + byteBuffer.reset(); + + ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(byteBuffer); + buffer.writerIndex(size); + return buffer; + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java index e0f3a2226c..823baf8861 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java @@ -70,7 +70,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { private final long syncTimeout; - private final StorageManager storageManager; + protected final StorageManager storageManager; private final IOCriticalErrorListener critialErrorListener; @@ -187,6 +187,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { } private SequentialFileFactory newFileFactory(final String directoryName) { + return new NIOSequentialFileFactory(new File(directory, directoryName), false, critialErrorListener, 1); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index 4e5c447caf..416da0bfb8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal; import java.nio.ByteBuffer; +import java.sql.Connection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -34,6 +35,8 @@ import org.apache.activemq.artemis.utils.ExecutorFactory; public class JDBCJournalStorageManager extends JournalStorageManager { + private Connection connection; + public JDBCJournalStorageManager(Configuration config, ExecutorFactory executorFactory, ExecutorFactory ioExecutorFactory, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 373e210442..5c392ccfc9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; @@ -70,8 +71,10 @@ import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl; import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.paging.PagingStoreFactory; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; +import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase; import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; @@ -1899,11 +1902,15 @@ public class ActiveMQServerImpl implements ActiveMQServer { this.queueFactory = factory; } - protected PagingManager createPagingManager() { + protected PagingManager createPagingManager() throws Exception { return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize()); } - protected PagingStoreFactoryNIO getPagingStoreFactory() { + protected PagingStoreFactory getPagingStoreFactory() throws Exception { + if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { + DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration(); + return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, false, shutdownOnCriticalIO); + } return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO); } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 4ba268ffc1..c23a2ea9d6 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1719,6 +1719,13 @@ + + + + The table name used to large message files + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index e6d68b1072..2f12b0585a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -455,12 +455,13 @@ public abstract class ActiveMQTestBase extends Assert { return configuration; } - private void setDBStoreType(Configuration configuration) { + protected void setDBStoreType(Configuration configuration) { DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration(); dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl()); dbStorageConfiguration.setBindingsTableName("BINDINGS"); dbStorageConfiguration.setMessageTableName("MESSAGE"); dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE"); + dbStorageConfiguration.setPageStoreTableName("PAGE_STORE"); dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName()); configuration.setStoreConfiguration(dbStorageConfiguration); diff --git a/artemis-server/src/test/resources/database-store-config.xml b/artemis-server/src/test/resources/database-store-config.xml index 1fa3bd6534..69f9da70f0 100644 --- a/artemis-server/src/test/resources/database-store-config.xml +++ b/artemis-server/src/test/resources/database-store-config.xml @@ -25,6 +25,7 @@ BINDINGS_TABLE MESSAGE_TABLE LARGE_MESSAGE_TABLE + PAGE_STORE_TABLE org.apache.derby.jdbc.EmbeddedDriver diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java index c94f54ab30..3960b49667 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.Queue; @@ -36,9 +37,16 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class GlobalPagingTest extends PagingTest { + public GlobalPagingTest(StoreConfiguration.StoreType storeType) { + super(storeType); + } + @Override @Before public void setUp() throws Exception { @@ -69,6 +77,8 @@ public class GlobalPagingTest extends PagingTest { @Test public void testPagingOverFullDisk() throws Exception { + if (storeType == StoreConfiguration.StoreType.DATABASE) return; + clearDataRecreateServerDirs(); Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 3daaa0ed9f..bc614ea57a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -24,6 +24,8 @@ import java.io.OutputStream; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -51,6 +53,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; @@ -86,7 +89,10 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class PagingTest extends ActiveMQTestBase { private static final Logger logger = Logger.getLogger(PagingTest.class); @@ -104,8 +110,19 @@ public class PagingTest extends ActiveMQTestBase { protected static final int PAGE_SIZE = 10 * 1024; + protected final StoreConfiguration.StoreType storeType; + static final SimpleString ADDRESS = new SimpleString("SimpleAddress"); + public PagingTest(StoreConfiguration.StoreType storeType) { + this.storeType = storeType; + } + + @Parameterized.Parameters(name = "storeType={0}") + public static Collection data() { + Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}}; + return Arrays.asList(params); + } @Before public void checkLoggerStart() throws Exception { @@ -123,8 +140,6 @@ public class PagingTest extends ActiveMQTestBase { } } - - @Override @Before public void setUp() throws Exception { @@ -1446,6 +1461,8 @@ public class PagingTest extends ActiveMQTestBase { @Test public void testMissingTXEverythingAcked() throws Exception { + if (storeType == StoreConfiguration.StoreType.DATABASE) return; + clearDataRecreateServerDirs(); Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); @@ -5633,7 +5650,11 @@ public class PagingTest extends ActiveMQTestBase { @Override protected Configuration createDefaultInVMConfig() throws Exception { - return super.createDefaultInVMConfig().setJournalSyncNonTransactional(false); + Configuration configuration = super.createDefaultInVMConfig().setJournalSyncNonTransactional(false); + if (storeType == StoreConfiguration.StoreType.DATABASE) { + setDBStoreType(configuration); + } + return configuration; } private static final class DummyOperationContext implements OperationContext {