ARTEMIS-714 Add suport for DataSource and SQLProvider

* add DataSource property to DatabaseStorageConfiguration to be able to
  communicate with the data store using this DataSource instance instead
  of relying on the creation the SQL connnection using the JDBC connection
  URL/driver class name tuple.
* add SQLProvider.Factory property to DatabaseStorageConfiguration to
  externalize the choice of the SQLProvider instead of relying on
  hard-coded choices. If the property is null, the current behaviour will
  be used (determing the SQLProvider based on the driver class name)
* bindingsJournal and messageJournal are already started in the start()
  method. Remove redundant calls that were creating unused JDBC
  connections that are never closed.

JIRA: https://issues.apache.org/jira/browse/ARTEMIS-714
This commit is contained in:
Jeff Mesnil 2016-09-08 17:46:09 +02:00 committed by Clebert Suconic
parent 4e444d53f9
commit c33f29631f
14 changed files with 215 additions and 66 deletions

View File

@ -23,6 +23,8 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import javax.sql.DataSource;
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider; import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.mysql.MySQLSQLProvider; import org.apache.activemq.artemis.jdbc.store.drivers.mysql.MySQLSQLProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider; import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
@ -84,56 +86,48 @@ public class JDBCUtils {
} }
public static SQLProvider getSQLProvider(String driverClass, String tableName) { public static SQLProvider getSQLProvider(String driverClass, String tableName) {
SQLProvider.Factory factory;
if (driverClass.contains("derby")) { if (driverClass.contains("derby")) {
logger.tracef("getSQLProvider Returning Derby SQL provider for driver::%s, tableName::%s", driverClass, tableName); logger.tracef("getSQLProvider Returning Derby SQL provider for driver::%s, tableName::%s", driverClass, tableName);
return new DerbySQLProvider(tableName); factory = new DerbySQLProvider.Factory();
} }
else if (driverClass.contains("postgres")) { else if (driverClass.contains("postgres")) {
logger.tracef("getSQLProvider Returning postgres SQL provider for driver::%s, tableName::%s", driverClass, tableName); logger.tracef("getSQLProvider Returning postgres SQL provider for driver::%s, tableName::%s", driverClass, tableName);
return new PostgresSQLProvider(tableName); factory = new PostgresSQLProvider.Factory();
} }
else if (driverClass.contains("mysql")) { else if (driverClass.contains("mysql")) {
logger.tracef("getSQLProvider Returning mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName); logger.tracef("getSQLProvider Returning mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName);
return new MySQLSQLProvider(tableName); factory = new MySQLSQLProvider.Factory();
} }
else { else {
logger.tracef("getSQLProvider Returning generic SQL provider for driver::%s, tableName::%s", driverClass, tableName); logger.tracef("getSQLProvider Returning generic SQL provider for driver::%s, tableName::%s", driverClass, tableName);
return new GenericSQLProvider(tableName); factory = new GenericSQLProvider.Factory();
} }
return factory.create(tableName);
} }
public static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass, public static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
String jdbcConnectionUrl,
SQLProvider provider) throws SQLException {
JDBCSequentialFileFactoryDriver dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(provider);
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
return dbDriver;
}
public static JDBCSequentialFileFactoryDriver getDBFileDriver(DataSource dataSource,
String tableName, String tableName,
String jdbcConnectionUrl) throws SQLException { SQLProvider provider) throws SQLException {
JDBCSequentialFileFactoryDriver dbDriver; JDBCSequentialFileFactoryDriver dbDriver;
if (driverClass.contains("derby")) { if (provider instanceof PostgresSQLProvider) {
logger.tracef("getDBFileDriver Returning Derby SQL provider for driver::%s, tableName::%s", driverClass, tableName);
dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(new DerbySQLProvider(tableName));
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
}
else if (driverClass.contains("postgres")) {
logger.tracef("getDBFileDriver Returning postgres SQL provider for driver::%s, tableName::%s", driverClass, tableName);
dbDriver = new PostgresSequentialSequentialFileDriver(); dbDriver = new PostgresSequentialSequentialFileDriver();
dbDriver.setSqlProvider(new PostgresSQLProvider(tableName)); dbDriver.setDataSource(dataSource);
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
}
else if (driverClass.contains("mysql")) {
logger.tracef("getDBFileDriver Returning mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName);
dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(new MySQLSQLProvider(tableName));
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
} }
else { else {
logger.tracef("getDBFileDriver generic mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName); dbDriver = new JDBCSequentialFileFactoryDriver(tableName, dataSource, provider);
dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(new GenericSQLProvider(tableName));
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
} }
return dbDriver; return dbDriver;
} }
} }

View File

@ -22,6 +22,8 @@ import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.Properties; import java.util.Properties;
import javax.sql.DataSource;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
@ -41,13 +43,20 @@ public abstract class AbstractJDBCDriver {
protected Driver dbDriver; protected Driver dbDriver;
protected DataSource dataSource;
public AbstractJDBCDriver() { public AbstractJDBCDriver() {
} }
public AbstractJDBCDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) { public AbstractJDBCDriver(SQLProvider sqlProvider, String jdbcConnectionUrl, String jdbcDriverClass) {
this.jdbcConnectionUrl = jdbcConnectionUrl; this.jdbcConnectionUrl = jdbcConnectionUrl;
this.jdbcDriverClass = jdbcDriverClass; this.jdbcDriverClass = jdbcDriverClass;
this.sqlProvider = JDBCUtils.getSQLProvider(jdbcDriverClass, tableName); this.sqlProvider = sqlProvider;
}
public AbstractJDBCDriver(DataSource dataSource, SQLProvider provider) {
this.dataSource = dataSource;
this.sqlProvider = provider;
} }
public void start() throws Exception { public void start() throws Exception {
@ -71,6 +80,10 @@ public abstract class AbstractJDBCDriver {
} }
protected void connect() throws Exception { protected void connect() throws Exception {
if (dataSource != null) {
connection = dataSource.getConnection();
}
else {
try { try {
dbDriver = JDBCUtils.getDriver(jdbcDriverClass); dbDriver = JDBCUtils.getDriver(jdbcDriverClass);
connection = dbDriver.connect(jdbcConnectionUrl, new Properties()); connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
@ -80,6 +93,7 @@ public abstract class AbstractJDBCDriver {
throw new RuntimeException("Error connecting to database", e); throw new RuntimeException("Error connecting to database", e);
} }
} }
}
public void destroy() throws Exception { public void destroy() throws Exception {
try { try {
@ -126,4 +140,12 @@ public abstract class AbstractJDBCDriver {
public void setJdbcDriverClass(String jdbcDriverClass) { public void setJdbcDriverClass(String jdbcDriverClass) {
this.jdbcDriverClass = jdbcDriverClass; this.jdbcDriverClass = jdbcDriverClass;
} }
public DataSource getDataSource() {
return dataSource;
}
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.jdbc.store.drivers.derby; package org.apache.activemq.artemis.jdbc.store.drivers.derby;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
public class DerbySQLProvider extends GenericSQLProvider { public class DerbySQLProvider extends GenericSQLProvider {
@ -27,7 +28,7 @@ public class DerbySQLProvider extends GenericSQLProvider {
private final String appendToFileSQL; private final String appendToFileSQL;
public DerbySQLProvider(String tableName) { private DerbySQLProvider(String tableName) {
super(tableName); super(tableName);
createFileTableSQL = "CREATE TABLE " + tableName + createFileTableSQL = "CREATE TABLE " + tableName +
@ -56,4 +57,12 @@ public class DerbySQLProvider extends GenericSQLProvider {
public boolean closeConnectionOnShutdown() { public boolean closeConnectionOnShutdown() {
return false; return false;
} }
public static class Factory implements SQLProvider.Factory {
@Override
public SQLProvider create(String tableName) {
return new DerbySQLProvider(tableName);
}
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.jdbc.store.drivers.mysql; package org.apache.activemq.artemis.jdbc.store.drivers.mysql;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
public class MySQLSQLProvider extends GenericSQLProvider { public class MySQLSQLProvider extends GenericSQLProvider {
@ -28,7 +29,7 @@ public class MySQLSQLProvider extends GenericSQLProvider {
private final String copyFileRecordByIdSQL; private final String copyFileRecordByIdSQL;
public MySQLSQLProvider(String tName) { private MySQLSQLProvider(String tName) {
super(tName.toLowerCase()); super(tName.toLowerCase());
createFileTableSQL = "CREATE TABLE " + tableName + createFileTableSQL = "CREATE TABLE " + tableName +
@ -61,4 +62,11 @@ public class MySQLSQLProvider extends GenericSQLProvider {
public String getCopyFileRecordByIdSQL() { public String getCopyFileRecordByIdSQL() {
return copyFileRecordByIdSQL; return copyFileRecordByIdSQL;
} }
public static class Factory implements SQLProvider.Factory {
@Override
public SQLProvider create(String tableName) {
return new MySQLSQLProvider(tableName);
}
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.jdbc.store.drivers.postgres; package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
public class PostgresSQLProvider extends GenericSQLProvider { public class PostgresSQLProvider extends GenericSQLProvider {
@ -27,7 +28,7 @@ public class PostgresSQLProvider extends GenericSQLProvider {
private final String createJournalTableSQL; private final String createJournalTableSQL;
public PostgresSQLProvider(String tName) { private PostgresSQLProvider(String tName) {
super(tName.toLowerCase()); super(tName.toLowerCase());
createFileTableSQL = "CREATE TABLE " + tableName + createFileTableSQL = "CREATE TABLE " + tableName +
"(ID SERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))"; "(ID SERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))";
@ -49,5 +50,14 @@ public class PostgresSQLProvider extends GenericSQLProvider {
public int getMaxBlobSize() { public int getMaxBlobSize() {
return MAX_BLOB_SIZE; return MAX_BLOB_SIZE;
} }
public static class Factory implements SQLProvider.Factory {
@Override
public SQLProvider create(String tableName) {
return new PostgresSQLProvider(tableName);
}
}
} }

View File

@ -25,11 +25,14 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import javax.sql.DataSource;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent { public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
@ -44,13 +47,22 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
private final JDBCSequentialFileFactoryDriver dbDriver; private final JDBCSequentialFileFactoryDriver dbDriver;
public JDBCSequentialFileFactory(final String connectionUrl, public JDBCSequentialFileFactory(final DataSource dataSource,
final SQLProvider sqlProvider,
final String tableName, final String tableName,
final String className,
Executor executor) throws Exception { Executor executor) throws Exception {
this.executor = executor; this.executor = executor;
files = new ArrayList<>(); files = new ArrayList<>();
dbDriver = JDBCUtils.getDBFileDriver(className, tableName, connectionUrl); dbDriver = JDBCUtils.getDBFileDriver(dataSource, tableName, sqlProvider);
}
public JDBCSequentialFileFactory(final String connectionUrl,
final String className,
final SQLProvider sqlProvider,
Executor executor) throws Exception {
this.executor = executor;
files = new ArrayList<>();
dbDriver = JDBCUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
} }
@Override @Override

View File

@ -25,7 +25,10 @@ import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import javax.sql.DataSource;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
@ -49,8 +52,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
super(); super();
} }
public JDBCSequentialFileFactoryDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) { public JDBCSequentialFileFactoryDriver(String tableName, DataSource dataSource, SQLProvider provider) {
super(tableName, jdbcConnectionUrl, jdbcDriverClass); super(dataSource, provider);
} }
@Override @Override

View File

@ -29,6 +29,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.sql.DataSource;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.IOCompletion;
@ -41,6 +43,7 @@ import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -82,8 +85,15 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
// Sequence ID for journal records // Sequence ID for journal records
private AtomicLong seq = new AtomicLong(0); private AtomicLong seq = new AtomicLong(0);
public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor) { public JDBCJournalImpl(DataSource dataSource, SQLProvider provider, String tableName, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor) {
super(tableName, jdbcUrl, jdbcDriverClass); super(dataSource, provider);
records = new ArrayList<>();
this.scheduledExecutorService = scheduledExecutorService;
this.completeExecutor = completeExecutor;
}
public JDBCJournalImpl(String jdbcUrl, String jdbcDriverClass, SQLProvider sqlProvider, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor) {
super(sqlProvider, jdbcUrl, jdbcDriverClass);
records = new ArrayList<>(); records = new ArrayList<>();
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
this.completeExecutor = completeExecutor; this.completeExecutor = completeExecutor;

View File

@ -57,7 +57,7 @@ public class GenericSQLProvider implements SQLProvider {
private final String countJournalRecordsSQL; private final String countJournalRecordsSQL;
public GenericSQLProvider(String tableName) { protected GenericSQLProvider(String tableName) {
this.tableName = tableName; this.tableName = tableName;
createFileTableSQL = "CREATE TABLE " + tableName + createFileTableSQL = "CREATE TABLE " + tableName +
@ -198,4 +198,11 @@ public class GenericSQLProvider implements SQLProvider {
public boolean closeConnectionOnShutdown() { public boolean closeConnectionOnShutdown() {
return true; return true;
} }
public static class Factory implements SQLProvider.Factory {
public SQLProvider create(String tableName) {
return new GenericSQLProvider(tableName);
}
}
} }

View File

@ -57,4 +57,8 @@ public interface SQLProvider {
String getCountJournalRecordsSQL(); String getCountJournalRecordsSQL();
boolean closeConnectionOnShutdown(); boolean closeConnectionOnShutdown();
interface Factory {
SQLProvider create(String tableName);
}
} }

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
@ -64,7 +65,7 @@ public class JDBCSequentialFileFactoryTest {
public void setup() throws Exception { public void setup() throws Exception {
Executor executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); Executor executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
factory = new JDBCSequentialFileFactory(connectionUrl, tableName, className, executor); factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName), executor);
factory.start(); factory.start();
} }

View File

@ -16,8 +16,11 @@
*/ */
package org.apache.activemq.artemis.core.config.storage; package org.apache.activemq.artemis.core.config.storage;
import javax.sql.DataSource;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
public class DatabaseStorageConfiguration implements StoreConfiguration { public class DatabaseStorageConfiguration implements StoreConfiguration {
@ -30,6 +33,11 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl(); private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName(); private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
private DataSource dataSource;
private SQLProvider.Factory sqlProviderFactory;
@Override @Override
public StoreType getStoreType() { public StoreType getStoreType() {
return StoreType.DATABASE; return StoreType.DATABASE;
@ -74,4 +82,37 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
public String getJdbcDriverClassName() { public String getJdbcDriverClassName() {
return jdbcDriverClassName; return jdbcDriverClassName;
} }
/**
* The DataSource to use to store Artemis data in the data store (can be {@code null} if {@code jdbcConnectionUrl} and {@code jdbcDriverClassName} are used instead).
*
* @return the DataSource used to store Artemis data in the JDBC data store.
*/
public DataSource getDataSource() {
return dataSource;
}
/**
* Configure the DataSource to use to store Artemis data in the data store.
*
* @param dataSource
*/
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
/**
* The {@link SQLProvider.Factory} used to communicate with the JDBC data store.
* It can be {@code null}. If the value is {@code null} and {@code dataSource} is set, the {@code {@link org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider.Factory} will be user,
* else the type of the factory will be determined based on the {@code jdbcDriverClassName).
*
* @return the factory used to communicate with the JDBC data store.
*/
public SQLProvider.Factory getSqlProviderFactory() {
return sqlProviderFactory;
}
public void setSqlProvider(SQLProvider.Factory sqlProviderFactory) {
this.sqlProviderFactory = sqlProviderFactory;
}
} }

View File

@ -25,10 +25,11 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
public class JDBCJournalStorageManager extends JournalStorageManager { public class JDBCJournalStorageManager extends JournalStorageManager {
@ -51,16 +52,43 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
try { try {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration(); DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor()); if (dbConf.getDataSource() != null) {
bindingsJournal = localBindings; SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
if (sqlProviderFactory == null) {
Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor()); sqlProviderFactory = new GenericSQLProvider.Factory();
messageJournal = localMessage; }
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(),
bindingsJournal.start(); sqlProviderFactory.create(dbConf.getBindingsTableName()),
messageJournal.start(); dbConf.getBindingsTableName(),
scheduledExecutorService,
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executorFactory.getExecutor()); executorFactory.getExecutor());
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(),
sqlProviderFactory.create(dbConf.getMessageTableName()),
dbConf.getMessageTableName(),
scheduledExecutorService,
executorFactory.getExecutor());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(),
sqlProviderFactory.create(dbConf.getLargeMessageTableName()),
dbConf.getLargeMessageTableName(),
executor);
}
else {
String driverClassName = dbConf.getJdbcDriverClassName();
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(),
driverClassName,
JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName()),
scheduledExecutorService,
executorFactory.getExecutor());
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(),
driverClassName,
JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName()),
scheduledExecutorService,
executorFactory.getExecutor());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(),
driverClassName,
JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName()),
executor);
}
largeMessagesFactory.start(); largeMessagesFactory.start();
} }
catch (Exception e) { catch (Exception e) {
@ -112,8 +140,4 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
@Override @Override
public void freeDirectBuffer(ByteBuffer buffer) { public void freeDirectBuffer(ByteBuffer buffer) {
} }
@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
}
} }

View File

@ -29,7 +29,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.junit.After; import org.junit.After;
@ -75,7 +77,9 @@ public class JDBCJournalTest extends ActiveMQTestBase {
scheduledExecutorService = new ScheduledThreadPoolExecutor(5); scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
executorService = Executors.newSingleThreadExecutor(); executorService = Executors.newSingleThreadExecutor();
jdbcUrl = "jdbc:derby:target/data;create=true"; jdbcUrl = "jdbc:derby:target/data;create=true";
journal = new JDBCJournalImpl(jdbcUrl, JOURNAL_TABLE_NAME, DRIVER_CLASS, scheduledExecutorService, executorService); SQLProvider.Factory factory = new DerbySQLProvider.Factory();
journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME),
scheduledExecutorService, executorService);
journal.start(); journal.start();
} }