This closes #771

This commit is contained in:
Clebert Suconic 2016-09-15 11:15:24 -04:00
commit 1f392da88e
14 changed files with 215 additions and 66 deletions

View File

@ -23,6 +23,8 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import javax.sql.DataSource;
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.mysql.MySQLSQLProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
@ -84,56 +86,48 @@ public class JDBCUtils {
}
public static SQLProvider getSQLProvider(String driverClass, String tableName) {
SQLProvider.Factory factory;
if (driverClass.contains("derby")) {
logger.tracef("getSQLProvider Returning Derby SQL provider for driver::%s, tableName::%s", driverClass, tableName);
return new DerbySQLProvider(tableName);
factory = new DerbySQLProvider.Factory();
}
else if (driverClass.contains("postgres")) {
logger.tracef("getSQLProvider Returning postgres SQL provider for driver::%s, tableName::%s", driverClass, tableName);
return new PostgresSQLProvider(tableName);
factory = new PostgresSQLProvider.Factory();
}
else if (driverClass.contains("mysql")) {
logger.tracef("getSQLProvider Returning mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName);
return new MySQLSQLProvider(tableName);
factory = new MySQLSQLProvider.Factory();
}
else {
logger.tracef("getSQLProvider Returning generic SQL provider for driver::%s, tableName::%s", driverClass, tableName);
return new GenericSQLProvider(tableName);
factory = new GenericSQLProvider.Factory();
}
return factory.create(tableName);
}
public static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
String jdbcConnectionUrl,
SQLProvider provider) throws SQLException {
JDBCSequentialFileFactoryDriver dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(provider);
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
return dbDriver;
}
public static JDBCSequentialFileFactoryDriver getDBFileDriver(DataSource dataSource,
String tableName,
String jdbcConnectionUrl) throws SQLException {
SQLProvider provider) throws SQLException {
JDBCSequentialFileFactoryDriver dbDriver;
if (driverClass.contains("derby")) {
logger.tracef("getDBFileDriver Returning Derby SQL provider for driver::%s, tableName::%s", driverClass, tableName);
dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(new DerbySQLProvider(tableName));
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
}
else if (driverClass.contains("postgres")) {
logger.tracef("getDBFileDriver Returning postgres SQL provider for driver::%s, tableName::%s", driverClass, tableName);
if (provider instanceof PostgresSQLProvider) {
dbDriver = new PostgresSequentialSequentialFileDriver();
dbDriver.setSqlProvider(new PostgresSQLProvider(tableName));
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
}
else if (driverClass.contains("mysql")) {
logger.tracef("getDBFileDriver Returning mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName);
dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(new MySQLSQLProvider(tableName));
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
dbDriver.setDataSource(dataSource);
}
else {
logger.tracef("getDBFileDriver generic mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName);
dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(new GenericSQLProvider(tableName));
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
dbDriver = new JDBCSequentialFileFactoryDriver(tableName, dataSource, provider);
}
return dbDriver;
}
}

View File

@ -22,6 +22,8 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import javax.sql.DataSource;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
@ -41,13 +43,20 @@ public abstract class AbstractJDBCDriver {
protected Driver dbDriver;
protected DataSource dataSource;
public AbstractJDBCDriver() {
}
public AbstractJDBCDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) {
public AbstractJDBCDriver(SQLProvider sqlProvider, String jdbcConnectionUrl, String jdbcDriverClass) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
this.jdbcDriverClass = jdbcDriverClass;
this.sqlProvider = JDBCUtils.getSQLProvider(jdbcDriverClass, tableName);
this.sqlProvider = sqlProvider;
}
public AbstractJDBCDriver(DataSource dataSource, SQLProvider provider) {
this.dataSource = dataSource;
this.sqlProvider = provider;
}
public void start() throws Exception {
@ -71,13 +80,18 @@ public abstract class AbstractJDBCDriver {
}
protected void connect() throws Exception {
try {
dbDriver = JDBCUtils.getDriver(jdbcDriverClass);
connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
if (dataSource != null) {
connection = dataSource.getConnection();
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
throw new RuntimeException("Error connecting to database", e);
else {
try {
dbDriver = JDBCUtils.getDriver(jdbcDriverClass);
connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
throw new RuntimeException("Error connecting to database", e);
}
}
}
@ -126,4 +140,12 @@ public abstract class AbstractJDBCDriver {
public void setJdbcDriverClass(String jdbcDriverClass) {
this.jdbcDriverClass = jdbcDriverClass;
}
public DataSource getDataSource() {
return dataSource;
}
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.jdbc.store.drivers.derby;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
public class DerbySQLProvider extends GenericSQLProvider {
@ -27,7 +28,7 @@ public class DerbySQLProvider extends GenericSQLProvider {
private final String appendToFileSQL;
public DerbySQLProvider(String tableName) {
private DerbySQLProvider(String tableName) {
super(tableName);
createFileTableSQL = "CREATE TABLE " + tableName +
@ -56,4 +57,12 @@ public class DerbySQLProvider extends GenericSQLProvider {
public boolean closeConnectionOnShutdown() {
return false;
}
public static class Factory implements SQLProvider.Factory {
@Override
public SQLProvider create(String tableName) {
return new DerbySQLProvider(tableName);
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.jdbc.store.drivers.mysql;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
public class MySQLSQLProvider extends GenericSQLProvider {
@ -28,7 +29,7 @@ public class MySQLSQLProvider extends GenericSQLProvider {
private final String copyFileRecordByIdSQL;
public MySQLSQLProvider(String tName) {
private MySQLSQLProvider(String tName) {
super(tName.toLowerCase());
createFileTableSQL = "CREATE TABLE " + tableName +
@ -61,4 +62,11 @@ public class MySQLSQLProvider extends GenericSQLProvider {
public String getCopyFileRecordByIdSQL() {
return copyFileRecordByIdSQL;
}
public static class Factory implements SQLProvider.Factory {
@Override
public SQLProvider create(String tableName) {
return new MySQLSQLProvider(tableName);
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
public class PostgresSQLProvider extends GenericSQLProvider {
@ -27,7 +28,7 @@ public class PostgresSQLProvider extends GenericSQLProvider {
private final String createJournalTableSQL;
public PostgresSQLProvider(String tName) {
private PostgresSQLProvider(String tName) {
super(tName.toLowerCase());
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID SERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))";
@ -49,5 +50,14 @@ public class PostgresSQLProvider extends GenericSQLProvider {
public int getMaxBlobSize() {
return MAX_BLOB_SIZE;
}
public static class Factory implements SQLProvider.Factory {
@Override
public SQLProvider create(String tableName) {
return new PostgresSQLProvider(tableName);
}
}
}

View File

@ -25,11 +25,14 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.sql.DataSource;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
@ -44,13 +47,22 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
private final JDBCSequentialFileFactoryDriver dbDriver;
public JDBCSequentialFileFactory(final String connectionUrl,
public JDBCSequentialFileFactory(final DataSource dataSource,
final SQLProvider sqlProvider,
final String tableName,
final String className,
Executor executor) throws Exception {
this.executor = executor;
files = new ArrayList<>();
dbDriver = JDBCUtils.getDBFileDriver(className, tableName, connectionUrl);
dbDriver = JDBCUtils.getDBFileDriver(dataSource, tableName, sqlProvider);
}
public JDBCSequentialFileFactory(final String connectionUrl,
final String className,
final SQLProvider sqlProvider,
Executor executor) throws Exception {
this.executor = executor;
files = new ArrayList<>();
dbDriver = JDBCUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
}
@Override

View File

@ -25,7 +25,10 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
@ -49,8 +52,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
super();
}
public JDBCSequentialFileFactoryDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) {
super(tableName, jdbcConnectionUrl, jdbcDriverClass);
public JDBCSequentialFileFactoryDriver(String tableName, DataSource dataSource, SQLProvider provider) {
super(dataSource, provider);
}
@Override

View File

@ -29,6 +29,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.sql.DataSource;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
@ -41,6 +43,7 @@ import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
@ -82,8 +85,15 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
// Sequence ID for journal records
private AtomicLong seq = new AtomicLong(0);
public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor) {
super(tableName, jdbcUrl, jdbcDriverClass);
public JDBCJournalImpl(DataSource dataSource, SQLProvider provider, String tableName, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor) {
super(dataSource, provider);
records = new ArrayList<>();
this.scheduledExecutorService = scheduledExecutorService;
this.completeExecutor = completeExecutor;
}
public JDBCJournalImpl(String jdbcUrl, String jdbcDriverClass, SQLProvider sqlProvider, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor) {
super(sqlProvider, jdbcUrl, jdbcDriverClass);
records = new ArrayList<>();
this.scheduledExecutorService = scheduledExecutorService;
this.completeExecutor = completeExecutor;

View File

@ -57,7 +57,7 @@ public class GenericSQLProvider implements SQLProvider {
private final String countJournalRecordsSQL;
public GenericSQLProvider(String tableName) {
protected GenericSQLProvider(String tableName) {
this.tableName = tableName;
createFileTableSQL = "CREATE TABLE " + tableName +
@ -198,4 +198,11 @@ public class GenericSQLProvider implements SQLProvider {
public boolean closeConnectionOnShutdown() {
return true;
}
public static class Factory implements SQLProvider.Factory {
public SQLProvider create(String tableName) {
return new GenericSQLProvider(tableName);
}
}
}

View File

@ -57,4 +57,8 @@ public interface SQLProvider {
String getCountJournalRecordsSQL();
boolean closeConnectionOnShutdown();
interface Factory {
SQLProvider create(String tableName);
}
}

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
@ -64,7 +65,7 @@ public class JDBCSequentialFileFactoryTest {
public void setup() throws Exception {
Executor executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
factory = new JDBCSequentialFileFactory(connectionUrl, tableName, className, executor);
factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName), executor);
factory.start();
}

View File

@ -16,8 +16,11 @@
*/
package org.apache.activemq.artemis.core.config.storage;
import javax.sql.DataSource;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
public class DatabaseStorageConfiguration implements StoreConfiguration {
@ -30,6 +33,11 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
private DataSource dataSource;
private SQLProvider.Factory sqlProviderFactory;
@Override
public StoreType getStoreType() {
return StoreType.DATABASE;
@ -74,4 +82,37 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
public String getJdbcDriverClassName() {
return jdbcDriverClassName;
}
/**
* The DataSource to use to store Artemis data in the data store (can be {@code null} if {@code jdbcConnectionUrl} and {@code jdbcDriverClassName} are used instead).
*
* @return the DataSource used to store Artemis data in the JDBC data store.
*/
public DataSource getDataSource() {
return dataSource;
}
/**
* Configure the DataSource to use to store Artemis data in the data store.
*
* @param dataSource
*/
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
/**
* The {@link SQLProvider.Factory} used to communicate with the JDBC data store.
* It can be {@code null}. If the value is {@code null} and {@code dataSource} is set, the {@code {@link org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider.Factory} will be user,
* else the type of the factory will be determined based on the {@code jdbcDriverClassName).
*
* @return the factory used to communicate with the JDBC data store.
*/
public SQLProvider.Factory getSqlProviderFactory() {
return sqlProviderFactory;
}
public void setSqlProvider(SQLProvider.Factory sqlProviderFactory) {
this.sqlProviderFactory = sqlProviderFactory;
}
}

View File

@ -25,10 +25,11 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ExecutorFactory;
public class JDBCJournalStorageManager extends JournalStorageManager {
@ -51,16 +52,43 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
try {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor());
bindingsJournal = localBindings;
Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName(), scheduledExecutorService, executorFactory.getExecutor());
messageJournal = localMessage;
bindingsJournal.start();
messageJournal.start();
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executorFactory.getExecutor());
if (dbConf.getDataSource() != null) {
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
if (sqlProviderFactory == null) {
sqlProviderFactory = new GenericSQLProvider.Factory();
}
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(),
sqlProviderFactory.create(dbConf.getBindingsTableName()),
dbConf.getBindingsTableName(),
scheduledExecutorService,
executorFactory.getExecutor());
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(),
sqlProviderFactory.create(dbConf.getMessageTableName()),
dbConf.getMessageTableName(),
scheduledExecutorService,
executorFactory.getExecutor());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(),
sqlProviderFactory.create(dbConf.getLargeMessageTableName()),
dbConf.getLargeMessageTableName(),
executor);
}
else {
String driverClassName = dbConf.getJdbcDriverClassName();
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(),
driverClassName,
JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName()),
scheduledExecutorService,
executorFactory.getExecutor());
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(),
driverClassName,
JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName()),
scheduledExecutorService,
executorFactory.getExecutor());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(),
driverClassName,
JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName()),
executor);
}
largeMessagesFactory.start();
}
catch (Exception e) {
@ -112,8 +140,4 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
@Override
public void freeDirectBuffer(ByteBuffer buffer) {
}
@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
}
}

View File

@ -29,7 +29,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.junit.After;
@ -75,7 +77,9 @@ public class JDBCJournalTest extends ActiveMQTestBase {
scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
executorService = Executors.newSingleThreadExecutor();
jdbcUrl = "jdbc:derby:target/data;create=true";
journal = new JDBCJournalImpl(jdbcUrl, JOURNAL_TABLE_NAME, DRIVER_CLASS, scheduledExecutorService, executorService);
SQLProvider.Factory factory = new DerbySQLProvider.Factory();
journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME),
scheduledExecutorService, executorService);
journal.start();
}