This commit is contained in:
Justin Bertram 2017-03-29 11:49:16 -05:00
commit 30ec72bfcb
13 changed files with 62 additions and 36 deletions

View File

@ -51,7 +51,7 @@ public class JDBCUtils {
return factory;
}
public static SQLProvider getSQLProvider(String driverClass, String tableName) {
public static SQLProvider getSQLProvider(String driverClass, String tableName, SQLProvider.DatabaseStoreType storeType) {
SQLProvider.Factory factory;
if (driverClass.contains("derby")) {
logger.tracef("getSQLProvider Returning Derby SQL provider for driver::%s, tableName::%s", driverClass, tableName);
@ -69,7 +69,7 @@ public class JDBCUtils {
logger.tracef("getSQLProvider Returning generic SQL provider for driver::%s, tableName::%s", driverClass, tableName);
factory = new GenericSQLProvider.Factory();
}
return factory.create(tableName);
return factory.create(tableName, storeType);
}
/**

View File

@ -26,8 +26,8 @@ public class DerbySQLProvider extends GenericSQLProvider {
private final String createFileTableSQL;
private DerbySQLProvider(String tableName) {
super(tableName.toUpperCase());
private DerbySQLProvider(String tableName, DatabaseStoreType databaseStoreType) {
super(tableName.toUpperCase(), databaseStoreType);
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
@ -52,8 +52,8 @@ public class DerbySQLProvider extends GenericSQLProvider {
public static class Factory implements SQLProvider.Factory {
@Override
public SQLProvider create(String tableName) {
return new DerbySQLProvider(tableName);
public SQLProvider create(String tableName, DatabaseStoreType databaseStoreType) {
return new DerbySQLProvider(tableName, databaseStoreType);
}
}
}

View File

@ -29,8 +29,8 @@ public class MySQLSQLProvider extends GenericSQLProvider {
private final String copyFileRecordByIdSQL;
private MySQLSQLProvider(String tName) {
super(tName.toLowerCase());
private MySQLSQLProvider(String tName, DatabaseStoreType databaseStoreType) {
super(tName.toLowerCase(), databaseStoreType);
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID BIGINT NOT NULL AUTO_INCREMENT," +
@ -68,8 +68,8 @@ public class MySQLSQLProvider extends GenericSQLProvider {
public static class Factory implements SQLProvider.Factory {
@Override
public SQLProvider create(String tableName) {
return new MySQLSQLProvider(tableName);
public SQLProvider create(String tableName, DatabaseStoreType databaseStoreType) {
return new MySQLSQLProvider(tableName, databaseStoreType);
}
}
}

View File

@ -27,8 +27,11 @@ public class Oracle12CSQLProvider extends GenericSQLProvider {
private static final long MAX_BLOB_SIZE = 4294967296L; //4GB
protected Oracle12CSQLProvider(String tableName) {
super(tableName.toUpperCase());
protected Oracle12CSQLProvider(String tableName, DatabaseStoreType databaseStoreType) {
super(tableName.toUpperCase(), databaseStoreType);
if (tableName.length() > 10 && databaseStoreType == DatabaseStoreType.PAGE) {
throw new RuntimeException("The maximum name size for the paging store table, when using Oracle12C is 10 characters.");
}
}
@Override
@ -49,8 +52,8 @@ public class Oracle12CSQLProvider extends GenericSQLProvider {
public static class Factory implements SQLProvider.Factory {
@Override
public SQLProvider create(String tableName) {
return new Oracle12CSQLProvider(tableName);
public SQLProvider create(String tableName, DatabaseStoreType databaseStoreType) {
return new Oracle12CSQLProvider(tableName, databaseStoreType);
}
}
}

View File

@ -28,8 +28,8 @@ public class PostgresSQLProvider extends GenericSQLProvider {
private final String[] createJournalTableSQL;
private PostgresSQLProvider(String tName) {
super(tName.toLowerCase());
private PostgresSQLProvider(String tName, DatabaseStoreType databaseStoreType) {
super(tName.toLowerCase(), databaseStoreType);
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID BIGSERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))";
@ -57,8 +57,8 @@ public class PostgresSQLProvider extends GenericSQLProvider {
public static class Factory implements SQLProvider.Factory {
@Override
public SQLProvider create(String tableName) {
return new PostgresSQLProvider(tableName);
public SQLProvider create(String tableName, DatabaseStoreType databaseStoreType) {
return new PostgresSQLProvider(tableName, databaseStoreType);
}
}
}

View File

@ -57,9 +57,13 @@ public class GenericSQLProvider implements SQLProvider {
private final String countJournalRecordsSQL;
protected GenericSQLProvider(String tableName) {
protected final DatabaseStoreType databaseStoreType;
protected GenericSQLProvider(String tableName, DatabaseStoreType databaseStoreType) {
this.tableName = tableName;
this.databaseStoreType = databaseStoreType;
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID BIGINT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
@ -205,8 +209,8 @@ public class GenericSQLProvider implements SQLProvider {
public static class Factory implements SQLProvider.Factory {
@Override
public SQLProvider create(String tableName) {
return new GenericSQLProvider(tableName);
public SQLProvider create(String tableName, DatabaseStoreType storeType) {
return new GenericSQLProvider(tableName, storeType);
}
}
}

View File

@ -18,6 +18,10 @@ package org.apache.activemq.artemis.jdbc.store.sql;
public interface SQLProvider {
enum DatabaseStoreType {
PAGE, MESSAGE_JOURNAL, BINDINGS_JOURNAL, LARGE_MESSAGE
}
long getMaxBlobSize();
String[] getCreateJournalTableSQL();
@ -59,7 +63,6 @@ public interface SQLProvider {
boolean closeConnectionOnShutdown();
interface Factory {
SQLProvider create(String tableName);
SQLProvider create(String tableName, DatabaseStoreType dbStoreType);
}
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.derby.jdbc.EmbeddedDriver;
@ -63,7 +64,7 @@ public class JDBCSequentialFileFactoryTest {
String connectionUrl = "jdbc:derby:target/data;create=true";
String tableName = "FILES";
factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName), executor);
factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor);
factory.start();
}

View File

@ -114,10 +114,10 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
if (sqlProviderFactory == null) {
sqlProviderFactory = new GenericSQLProvider.Factory();
}
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName()), executorFactory.getExecutor());
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
} else {
String driverClassName = dbConf.getJdbcDriverClassName();
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName()), executorFactory.getExecutor());
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor());
}
pagingFactoryFileFactory.start();
started = true;
@ -222,9 +222,9 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
SQLProvider sqlProvider = null;
if (dbConf.getDataSource() != null) {
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory() == null ? new GenericSQLProvider.Factory() : dbConf.getSqlProviderFactory();
sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName));
sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
} else {
sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName));
sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
}
return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor());

View File

@ -62,14 +62,14 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
if (sqlProviderFactory == null) {
sqlProviderFactory = new GenericSQLProvider.Factory();
}
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName()), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor());
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName()), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName()), executor);
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor());
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor);
} else {
String driverClassName = dbConf.getJdbcDriverClassName();
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName()), scheduledExecutorService, executorFactory.getExecutor());
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName()), scheduledExecutorService, executorFactory.getExecutor());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName()), executor);
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor());
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor);
}
largeMessagesFactory.start();
} catch (Exception e) {

View File

@ -20,6 +20,8 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.jdbc.store.drivers.oracle.Oracle12CSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test;
@ -32,6 +34,19 @@ public class DatabaseStoreConfigurationTest extends ActiveMQTestBase {
assertEquals(StoreConfiguration.StoreType.DATABASE, server.getConfiguration().getStoreConfiguration().getStoreType());
}
@Test
public void testOracle12TableSize() {
Throwable rte = null;
try {
new Oracle12CSQLProvider.Factory().create("A_TABLE_NAME_THAT_IS_TOO_LONG", SQLProvider.DatabaseStoreType.PAGE);
} catch (Throwable t) {
rte = t;
}
assertNotNull(rte);
assertTrue(rte.getMessage().contains("The maximum name size for the paging store table, when using Oracle12C is 10 characters."));
}
protected Configuration createConfiguration(String fileName) throws Exception {
FileConfiguration fc = new FileConfiguration();
FileDeploymentManager deploymentManager = new FileDeploymentManager(fileName);

View File

@ -474,7 +474,7 @@ public abstract class ActiveMQTestBase extends Assert {
try {
for (String tableName : tableNames) {
connection.setAutoCommit(false);
SQLProvider sqlProvider = JDBCUtils.getSQLProvider(getJDBCClassName(), tableName);
SQLProvider sqlProvider = JDBCUtils.getSQLProvider(getJDBCClassName(), tableName, SQLProvider.DatabaseStoreType.LARGE_MESSAGE);
try (ResultSet rs = connection.getMetaData().getTables(null, null, sqlProvider.getTableName(), null)) {
if (rs.next()) {
statement.execute("DROP TABLE " + sqlProvider.getTableName());

View File

@ -77,7 +77,7 @@ public class JDBCJournalTest extends ActiveMQTestBase {
executorService = Executors.newSingleThreadExecutor();
jdbcUrl = "jdbc:derby:target/data;create=true";
SQLProvider.Factory factory = new DerbySQLProvider.Factory();
journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME), scheduledExecutorService, executorService);
journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService);
journal.start();
}