diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java index f27efec4c4..43b94e08bb 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java @@ -23,6 +23,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import javax.sql.DataSource; + import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider; import org.apache.activemq.artemis.jdbc.store.drivers.mysql.MySQLSQLProvider; import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider; @@ -84,56 +86,48 @@ public class JDBCUtils { } public static SQLProvider getSQLProvider(String driverClass, String tableName) { + SQLProvider.Factory factory; if (driverClass.contains("derby")) { logger.tracef("getSQLProvider Returning Derby SQL provider for driver::%s, tableName::%s", driverClass, tableName); - return new DerbySQLProvider(tableName); + factory = new DerbySQLProvider.Factory(); } else if (driverClass.contains("postgres")) { logger.tracef("getSQLProvider Returning postgres SQL provider for driver::%s, tableName::%s", driverClass, tableName); - return new PostgresSQLProvider(tableName); + factory = new PostgresSQLProvider.Factory(); } else if (driverClass.contains("mysql")) { logger.tracef("getSQLProvider Returning mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName); - return new MySQLSQLProvider(tableName); + factory = new MySQLSQLProvider.Factory(); } else { logger.tracef("getSQLProvider Returning generic SQL provider for driver::%s, tableName::%s", driverClass, tableName); - return new GenericSQLProvider(tableName); + factory = new GenericSQLProvider.Factory(); } + return factory.create(tableName); } public static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass, + String jdbcConnectionUrl, + SQLProvider provider) throws SQLException { + JDBCSequentialFileFactoryDriver dbDriver = new JDBCSequentialFileFactoryDriver(); + dbDriver.setSqlProvider(provider); + dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl); + dbDriver.setJdbcDriverClass(driverClass); + return dbDriver; + } + + public static JDBCSequentialFileFactoryDriver getDBFileDriver(DataSource dataSource, String tableName, - String jdbcConnectionUrl) throws SQLException { + SQLProvider provider) throws SQLException { JDBCSequentialFileFactoryDriver dbDriver; - if (driverClass.contains("derby")) { - logger.tracef("getDBFileDriver Returning Derby SQL provider for driver::%s, tableName::%s", driverClass, tableName); - dbDriver = new JDBCSequentialFileFactoryDriver(); - dbDriver.setSqlProvider(new DerbySQLProvider(tableName)); - dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl); - dbDriver.setJdbcDriverClass(driverClass); - } - else if (driverClass.contains("postgres")) { - logger.tracef("getDBFileDriver Returning postgres SQL provider for driver::%s, tableName::%s", driverClass, tableName); + if (provider instanceof PostgresSQLProvider) { dbDriver = new PostgresSequentialSequentialFileDriver(); - dbDriver.setSqlProvider(new PostgresSQLProvider(tableName)); - dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl); - dbDriver.setJdbcDriverClass(driverClass); - } - else if (driverClass.contains("mysql")) { - logger.tracef("getDBFileDriver Returning mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName); - dbDriver = new JDBCSequentialFileFactoryDriver(); - dbDriver.setSqlProvider(new MySQLSQLProvider(tableName)); - dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl); - dbDriver.setJdbcDriverClass(driverClass); + dbDriver.setDataSource(dataSource); } else { - logger.tracef("getDBFileDriver generic mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName); - dbDriver = new JDBCSequentialFileFactoryDriver(); - dbDriver.setSqlProvider(new GenericSQLProvider(tableName)); - dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl); - dbDriver.setJdbcDriverClass(driverClass); + dbDriver = new JDBCSequentialFileFactoryDriver(tableName, dataSource, provider); } return dbDriver; } + } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java index beac03e27d..9ab28de301 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java @@ -22,6 +22,8 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; +import javax.sql.DataSource; + import org.apache.activemq.artemis.jdbc.store.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; @@ -41,13 +43,20 @@ public abstract class AbstractJDBCDriver { protected Driver dbDriver; + protected DataSource dataSource; + public AbstractJDBCDriver() { } - public AbstractJDBCDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) { + public AbstractJDBCDriver(SQLProvider sqlProvider, String jdbcConnectionUrl, String jdbcDriverClass) { this.jdbcConnectionUrl = jdbcConnectionUrl; this.jdbcDriverClass = jdbcDriverClass; - this.sqlProvider = JDBCUtils.getSQLProvider(jdbcDriverClass, tableName); + this.sqlProvider = sqlProvider; + } + + public AbstractJDBCDriver(DataSource dataSource, SQLProvider provider) { + this.dataSource = dataSource; + this.sqlProvider = provider; } public void start() throws Exception { @@ -71,13 +80,18 @@ public abstract class AbstractJDBCDriver { } protected void connect() throws Exception { - try { - dbDriver = JDBCUtils.getDriver(jdbcDriverClass); - connection = dbDriver.connect(jdbcConnectionUrl, new Properties()); + if (dataSource != null) { + connection = dataSource.getConnection(); } - catch (SQLException e) { - ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl); - throw new RuntimeException("Error connecting to database", e); + else { + try { + dbDriver = JDBCUtils.getDriver(jdbcDriverClass); + connection = dbDriver.connect(jdbcConnectionUrl, new Properties()); + } + catch (SQLException e) { + ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl); + throw new RuntimeException("Error connecting to database", e); + } } } @@ -126,4 +140,12 @@ public abstract class AbstractJDBCDriver { public void setJdbcDriverClass(String jdbcDriverClass) { this.jdbcDriverClass = jdbcDriverClass; } + + public DataSource getDataSource() { + return dataSource; + } + + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java index d9cbed4a7b..121c6f721e 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.jdbc.store.drivers.derby; import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; public class DerbySQLProvider extends GenericSQLProvider { @@ -27,7 +28,7 @@ public class DerbySQLProvider extends GenericSQLProvider { private final String appendToFileSQL; - public DerbySQLProvider(String tableName) { + private DerbySQLProvider(String tableName) { super(tableName); createFileTableSQL = "CREATE TABLE " + tableName + @@ -56,4 +57,12 @@ public class DerbySQLProvider extends GenericSQLProvider { public boolean closeConnectionOnShutdown() { return false; } + + public static class Factory implements SQLProvider.Factory { + + @Override + public SQLProvider create(String tableName) { + return new DerbySQLProvider(tableName); + } + } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/mysql/MySQLSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/mysql/MySQLSQLProvider.java index 1400382c0f..7a32fcf631 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/mysql/MySQLSQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/mysql/MySQLSQLProvider.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.jdbc.store.drivers.mysql; import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; public class MySQLSQLProvider extends GenericSQLProvider { @@ -28,7 +29,7 @@ public class MySQLSQLProvider extends GenericSQLProvider { private final String copyFileRecordByIdSQL; - public MySQLSQLProvider(String tName) { + private MySQLSQLProvider(String tName) { super(tName.toLowerCase()); createFileTableSQL = "CREATE TABLE " + tableName + @@ -61,4 +62,11 @@ public class MySQLSQLProvider extends GenericSQLProvider { public String getCopyFileRecordByIdSQL() { return copyFileRecordByIdSQL; } + + public static class Factory implements SQLProvider.Factory { + @Override + public SQLProvider create(String tableName) { + return new MySQLSQLProvider(tableName); + } + } } \ No newline at end of file diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java index 664202b606..d69cff9a65 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.jdbc.store.drivers.postgres; import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; public class PostgresSQLProvider extends GenericSQLProvider { @@ -27,7 +28,7 @@ public class PostgresSQLProvider extends GenericSQLProvider { private final String createJournalTableSQL; - public PostgresSQLProvider(String tName) { + private PostgresSQLProvider(String tName) { super(tName.toLowerCase()); createFileTableSQL = "CREATE TABLE " + tableName + "(ID SERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))"; @@ -49,5 +50,14 @@ public class PostgresSQLProvider extends GenericSQLProvider { public int getMaxBlobSize() { return MAX_BLOB_SIZE; } + + public static class Factory implements SQLProvider.Factory { + + + @Override + public SQLProvider create(String tableName) { + return new PostgresSQLProvider(tableName); + } + } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java index cf45a8a5c5..b222fd97d3 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -25,11 +25,14 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import javax.sql.DataSource; + import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.jdbc.store.JDBCUtils; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent { @@ -44,13 +47,22 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM private final JDBCSequentialFileFactoryDriver dbDriver; - public JDBCSequentialFileFactory(final String connectionUrl, + public JDBCSequentialFileFactory(final DataSource dataSource, + final SQLProvider sqlProvider, final String tableName, - final String className, Executor executor) throws Exception { this.executor = executor; files = new ArrayList<>(); - dbDriver = JDBCUtils.getDBFileDriver(className, tableName, connectionUrl); + dbDriver = JDBCUtils.getDBFileDriver(dataSource, tableName, sqlProvider); + } + + public JDBCSequentialFileFactory(final String connectionUrl, + final String className, + final SQLProvider sqlProvider, + Executor executor) throws Exception { + this.executor = executor; + files = new ArrayList<>(); + dbDriver = JDBCUtils.getDBFileDriver(className, connectionUrl, sqlProvider); } @Override diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java index 8937992430..0ae0335bcf 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java @@ -25,7 +25,10 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.List; +import javax.sql.DataSource; + import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { @@ -49,8 +52,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { super(); } - public JDBCSequentialFileFactoryDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) { - super(tableName, jdbcConnectionUrl, jdbcDriverClass); + public JDBCSequentialFileFactoryDriver(String tableName, DataSource dataSource, SQLProvider provider) { + super(dataSource, provider); } @Override diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 51f3a3e25d..d472efa235 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -29,6 +29,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import javax.sql.DataSource; + import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; @@ -41,6 +43,7 @@ import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.jboss.logging.Logger; @@ -82,8 +85,15 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { // Sequence ID for journal records private AtomicLong seq = new AtomicLong(0); - public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor) { - super(tableName, jdbcUrl, jdbcDriverClass); + public JDBCJournalImpl(DataSource dataSource, SQLProvider provider, String tableName, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor) { + super(dataSource, provider); + records = new ArrayList<>(); + this.scheduledExecutorService = scheduledExecutorService; + this.completeExecutor = completeExecutor; + } + + public JDBCJournalImpl(String jdbcUrl, String jdbcDriverClass, SQLProvider sqlProvider, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor) { + super(sqlProvider, jdbcUrl, jdbcDriverClass); records = new ArrayList<>(); this.scheduledExecutorService = scheduledExecutorService; this.completeExecutor = completeExecutor; diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java index 6efa170b64..a246dd5ea8 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java @@ -57,7 +57,7 @@ public class GenericSQLProvider implements SQLProvider { private final String countJournalRecordsSQL; - public GenericSQLProvider(String tableName) { + protected GenericSQLProvider(String tableName) { this.tableName = tableName; createFileTableSQL = "CREATE TABLE " + tableName + @@ -198,4 +198,11 @@ public class GenericSQLProvider implements SQLProvider { public boolean closeConnectionOnShutdown() { return true; } + + public static class Factory implements SQLProvider.Factory { + + public SQLProvider create(String tableName) { + return new GenericSQLProvider(tableName); + } + } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java index 5645ebc87e..0f354bc056 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java @@ -57,4 +57,8 @@ public interface SQLProvider { String getCountJournalRecordsSQL(); boolean closeConnectionOnShutdown(); + + interface Factory { + SQLProvider create(String tableName); + } } diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java index 8157e6f9f4..0aa26ce77d 100644 --- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java +++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.jdbc.store.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; @@ -64,7 +65,7 @@ public class JDBCSequentialFileFactoryTest { public void setup() throws Exception { Executor executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); - factory = new JDBCSequentialFileFactory(connectionUrl, tableName, className, executor); + factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName), executor); factory.start(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java index 49a2251065..ffeb708359 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java @@ -16,8 +16,11 @@ */ package org.apache.activemq.artemis.core.config.storage; +import javax.sql.DataSource; + import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; public class DatabaseStorageConfiguration implements StoreConfiguration { @@ -30,6 +33,11 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl(); private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName(); + + private DataSource dataSource; + + private SQLProvider.Factory sqlProviderFactory; + @Override public StoreType getStoreType() { return StoreType.DATABASE; @@ -74,4 +82,37 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { public String getJdbcDriverClassName() { return jdbcDriverClassName; } + + /** + * The DataSource to use to store Artemis data in the data store (can be {@code null} if {@code jdbcConnectionUrl} and {@code jdbcDriverClassName} are used instead). + * + * @return the DataSource used to store Artemis data in the JDBC data store. + */ + public DataSource getDataSource() { + return dataSource; + } + + /** + * Configure the DataSource to use to store Artemis data in the data store. + * + * @param dataSource + */ + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } + + /** + * The {@link SQLProvider.Factory} used to communicate with the JDBC data store. + * It can be {@code null}. If the value is {@code null} and {@code dataSource} is set, the {@code {@link org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider.Factory} will be user, + * else the type of the factory will be determined based on the {@code jdbcDriverClassName). + * + * @return the factory used to communicate with the JDBC data store. + */ + public SQLProvider.Factory getSqlProviderFactory() { + return sqlProviderFactory; + } + + public void setSqlProvider(SQLProvider.Factory sqlProviderFactory) { + this.sqlProviderFactory = sqlProviderFactory; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index 70d824f204..3c313fea06 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -25,10 +25,11 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; -import org.apache.activemq.artemis.core.journal.Journal; -import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; +import org.apache.activemq.artemis.jdbc.store.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; +import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.utils.ExecutorFactory; public class JDBCJournalStorageManager extends JournalStorageManager { @@ -51,16 +52,43 @@ public class JDBCJournalStorageManager extends JournalStorageManager { try { DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration(); - Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor()); - bindingsJournal = localBindings; - - Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor()); - messageJournal = localMessage; - - bindingsJournal.start(); - messageJournal.start(); - - largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executorFactory.getExecutor()); + if (dbConf.getDataSource() != null) { + SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory(); + if (sqlProviderFactory == null) { + sqlProviderFactory = new GenericSQLProvider.Factory(); + } + bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), + sqlProviderFactory.create(dbConf.getBindingsTableName()), + dbConf.getBindingsTableName(), + scheduledExecutorService, + executorFactory.getExecutor()); + messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), + sqlProviderFactory.create(dbConf.getMessageTableName()), + dbConf.getMessageTableName(), + scheduledExecutorService, + executorFactory.getExecutor()); + largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), + sqlProviderFactory.create(dbConf.getLargeMessageTableName()), + dbConf.getLargeMessageTableName(), + executor); + } + else { + String driverClassName = dbConf.getJdbcDriverClassName(); + bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), + driverClassName, + JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName()), + scheduledExecutorService, + executorFactory.getExecutor()); + messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), + driverClassName, + JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName()), + scheduledExecutorService, + executorFactory.getExecutor()); + largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), + driverClassName, + JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName()), + executor); + } largeMessagesFactory.start(); } catch (Exception e) { @@ -112,8 +140,4 @@ public class JDBCJournalStorageManager extends JournalStorageManager { @Override public void freeDirectBuffer(ByteBuffer buffer) { } - - @Override - public void injectMonitor(FileStoreMonitor monitor) throws Exception { - } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java index fc3d9ff612..c9ffb3814a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java @@ -29,7 +29,9 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.junit.After; @@ -75,7 +77,9 @@ public class JDBCJournalTest extends ActiveMQTestBase { scheduledExecutorService = new ScheduledThreadPoolExecutor(5); executorService = Executors.newSingleThreadExecutor(); jdbcUrl = "jdbc:derby:target/data;create=true"; - journal = new JDBCJournalImpl(jdbcUrl, JOURNAL_TABLE_NAME, DRIVER_CLASS, scheduledExecutorService, executorService); + SQLProvider.Factory factory = new DerbySQLProvider.Factory(); + journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME), + scheduledExecutorService, executorService); journal.start(); }