ARTEMIS-830 Remove cyclic dependencies
Removes cyclic dependencies between classes and packages in the artemis-jdbc-store projetct by moving classes and methods to other locations and reducing the visibility of classes, fields and methods. Solving cyclic dependencies is important to keep the codebase maintainable. Scenarios where "everything uses everything" should be avoided.
This commit is contained in:
parent
e49eda9664
commit
4b5cbb86aa
|
@ -1,143 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jdbc.store;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.Connection;
|
||||
import java.sql.Driver;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.mysql.MySQLSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSequentialSequentialFileDriver;
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public class JDBCUtils {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(JDBCUtils.class);
|
||||
|
||||
public static Driver getDriver(String className) throws Exception {
|
||||
|
||||
try {
|
||||
Driver driver = (Driver) Class.forName(className).newInstance();
|
||||
|
||||
// Shutdown the derby if using the derby embedded driver.
|
||||
if (className.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
DriverManager.getConnection("jdbc:derby:;shutdown=true");
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
return driver;
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new RuntimeException("Could not find class: " + className);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Unable to instantiate driver class: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void createTableIfNotExists(Connection connection, String tableName, String sql) throws SQLException {
|
||||
logger.tracef("Validating if table %s didn't exist before creating", tableName);
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) {
|
||||
if (rs != null && !rs.next()) {
|
||||
logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, sql);
|
||||
try (Statement statement = connection.createStatement()) {
|
||||
statement.executeUpdate(sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
connection.commit();
|
||||
} catch (SQLException e) {
|
||||
connection.rollback();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static SQLProvider.Factory getSQLProviderFactory(String url) {
|
||||
SQLProvider.Factory factory;
|
||||
if (url.contains("derby")) {
|
||||
logger.tracef("getSQLProvider Returning Derby SQL provider for url::%s", url);
|
||||
factory = new DerbySQLProvider.Factory();
|
||||
} else if (url.contains("postgres")) {
|
||||
logger.tracef("getSQLProvider Returning postgres SQL provider for url::%s", url);
|
||||
factory = new PostgresSQLProvider.Factory();
|
||||
} else if (url.contains("mysql")) {
|
||||
logger.tracef("getSQLProvider Returning mysql SQL provider for url::%s", url);
|
||||
factory = new MySQLSQLProvider.Factory();
|
||||
} else {
|
||||
logger.tracef("getSQLProvider Returning generic SQL provider for url::%s", url);
|
||||
factory = new GenericSQLProvider.Factory();
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
public static SQLProvider getSQLProvider(String driverClass, String tableName) {
|
||||
SQLProvider.Factory factory;
|
||||
if (driverClass.contains("derby")) {
|
||||
logger.tracef("getSQLProvider Returning Derby SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
||||
factory = new DerbySQLProvider.Factory();
|
||||
} else if (driverClass.contains("postgres")) {
|
||||
logger.tracef("getSQLProvider Returning postgres SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
||||
factory = new PostgresSQLProvider.Factory();
|
||||
} else if (driverClass.contains("mysql")) {
|
||||
logger.tracef("getSQLProvider Returning mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
||||
factory = new MySQLSQLProvider.Factory();
|
||||
} else {
|
||||
logger.tracef("getSQLProvider Returning generic SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
||||
factory = new GenericSQLProvider.Factory();
|
||||
}
|
||||
return factory.create(tableName);
|
||||
}
|
||||
|
||||
public static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
|
||||
String jdbcConnectionUrl,
|
||||
SQLProvider provider) throws SQLException {
|
||||
JDBCSequentialFileFactoryDriver dbDriver = new JDBCSequentialFileFactoryDriver();
|
||||
dbDriver.setSqlProvider(provider);
|
||||
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
|
||||
dbDriver.setJdbcDriverClass(driverClass);
|
||||
return dbDriver;
|
||||
}
|
||||
|
||||
public static JDBCSequentialFileFactoryDriver getDBFileDriver(DataSource dataSource,
|
||||
String tableName,
|
||||
SQLProvider provider) throws SQLException {
|
||||
JDBCSequentialFileFactoryDriver dbDriver;
|
||||
if (provider instanceof PostgresSQLProvider) {
|
||||
dbDriver = new PostgresSequentialSequentialFileDriver();
|
||||
dbDriver.setDataSource(dataSource);
|
||||
} else {
|
||||
dbDriver = new JDBCSequentialFileFactoryDriver(tableName, dataSource, provider);
|
||||
}
|
||||
return dbDriver;
|
||||
}
|
||||
|
||||
}
|
|
@ -19,30 +19,32 @@ package org.apache.activemq.artemis.jdbc.store.drivers;
|
|||
import javax.sql.DataSource;
|
||||
import java.sql.Connection;
|
||||
import java.sql.Driver;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* Class to hold common database functionality such as drivers and connections
|
||||
*/
|
||||
public abstract class AbstractJDBCDriver {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(AbstractJDBCDriver.class);
|
||||
|
||||
protected Connection connection;
|
||||
|
||||
protected SQLProvider sqlProvider;
|
||||
|
||||
protected String jdbcConnectionUrl;
|
||||
private String jdbcConnectionUrl;
|
||||
|
||||
protected String jdbcDriverClass;
|
||||
private String jdbcDriverClass;
|
||||
|
||||
protected Driver dbDriver;
|
||||
|
||||
protected DataSource dataSource;
|
||||
private DataSource dataSource;
|
||||
|
||||
public AbstractJDBCDriver() {
|
||||
}
|
||||
|
@ -75,7 +77,7 @@ public abstract class AbstractJDBCDriver {
|
|||
protected abstract void createSchema() throws SQLException;
|
||||
|
||||
protected void createTable(String schemaSql) throws SQLException {
|
||||
JDBCUtils.createTableIfNotExists(connection, sqlProvider.getTableName(), schemaSql);
|
||||
createTableIfNotExists(connection, sqlProvider.getTableName(), schemaSql);
|
||||
}
|
||||
|
||||
protected void connect() throws Exception {
|
||||
|
@ -83,7 +85,7 @@ public abstract class AbstractJDBCDriver {
|
|||
connection = dataSource.getConnection();
|
||||
} else {
|
||||
try {
|
||||
dbDriver = JDBCUtils.getDriver(jdbcDriverClass);
|
||||
Driver dbDriver = getDriver(jdbcDriverClass);
|
||||
connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
|
||||
} catch (SQLException e) {
|
||||
ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
|
||||
|
@ -105,6 +107,48 @@ public abstract class AbstractJDBCDriver {
|
|||
}
|
||||
}
|
||||
|
||||
private static void createTableIfNotExists(Connection connection, String tableName, String sql) throws SQLException {
|
||||
logger.tracef("Validating if table %s didn't exist before creating", tableName);
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) {
|
||||
if (rs != null && !rs.next()) {
|
||||
logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, sql);
|
||||
try (Statement statement = connection.createStatement()) {
|
||||
statement.executeUpdate(sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
connection.commit();
|
||||
} catch (SQLException e) {
|
||||
connection.rollback();
|
||||
}
|
||||
}
|
||||
|
||||
private Driver getDriver(String className) throws Exception {
|
||||
try {
|
||||
Driver driver = (Driver) Class.forName(className).newInstance();
|
||||
|
||||
// Shutdown the derby if using the derby embedded driver.
|
||||
if (className.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
DriverManager.getConnection("jdbc:derby:;shutdown=true");
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
return driver;
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new RuntimeException("Could not find class: " + className);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Unable to instantiate driver class: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
public Connection getConnection() {
|
||||
return connection;
|
||||
}
|
||||
|
@ -113,34 +157,18 @@ public abstract class AbstractJDBCDriver {
|
|||
this.connection = connection;
|
||||
}
|
||||
|
||||
public SQLProvider getSqlProvider() {
|
||||
return sqlProvider;
|
||||
}
|
||||
|
||||
public void setSqlProvider(SQLProvider sqlProvider) {
|
||||
this.sqlProvider = sqlProvider;
|
||||
}
|
||||
|
||||
public String getJdbcConnectionUrl() {
|
||||
return jdbcConnectionUrl;
|
||||
}
|
||||
|
||||
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
|
||||
this.jdbcConnectionUrl = jdbcConnectionUrl;
|
||||
}
|
||||
|
||||
public String getJdbcDriverClass() {
|
||||
return jdbcDriverClass;
|
||||
}
|
||||
|
||||
public void setJdbcDriverClass(String jdbcDriverClass) {
|
||||
this.jdbcDriverClass = jdbcDriverClass;
|
||||
}
|
||||
|
||||
public DataSource getDataSource() {
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
public void setDataSource(DataSource dataSource) {
|
||||
this.dataSource = dataSource;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jdbc.store.drivers;
|
||||
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.mysql.MySQLSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public class JDBCUtils {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(JDBCUtils.class);
|
||||
|
||||
public static SQLProvider.Factory getSQLProviderFactory(String url) {
|
||||
SQLProvider.Factory factory;
|
||||
if (url.contains("derby")) {
|
||||
logger.tracef("getSQLProvider Returning Derby SQL provider for url::%s", url);
|
||||
factory = new DerbySQLProvider.Factory();
|
||||
} else if (url.contains("postgres")) {
|
||||
logger.tracef("getSQLProvider Returning postgres SQL provider for url::%s", url);
|
||||
factory = new PostgresSQLProvider.Factory();
|
||||
} else if (url.contains("mysql")) {
|
||||
logger.tracef("getSQLProvider Returning mysql SQL provider for url::%s", url);
|
||||
factory = new MySQLSQLProvider.Factory();
|
||||
} else {
|
||||
logger.tracef("getSQLProvider Returning generic SQL provider for url::%s", url);
|
||||
factory = new GenericSQLProvider.Factory();
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
public static SQLProvider getSQLProvider(String driverClass, String tableName) {
|
||||
SQLProvider.Factory factory;
|
||||
if (driverClass.contains("derby")) {
|
||||
logger.tracef("getSQLProvider Returning Derby SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
||||
factory = new DerbySQLProvider.Factory();
|
||||
} else if (driverClass.contains("postgres")) {
|
||||
logger.tracef("getSQLProvider Returning postgres SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
||||
factory = new PostgresSQLProvider.Factory();
|
||||
} else if (driverClass.contains("mysql")) {
|
||||
logger.tracef("getSQLProvider Returning mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
||||
factory = new MySQLSQLProvider.Factory();
|
||||
} else {
|
||||
logger.tracef("getSQLProvider Returning generic SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
||||
factory = new GenericSQLProvider.Factory();
|
||||
}
|
||||
return factory.create(tableName);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.jdbc.store.file;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
|
||||
class JDBCFileUtils {
|
||||
|
||||
static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
|
||||
String jdbcConnectionUrl,
|
||||
SQLProvider provider) throws SQLException {
|
||||
JDBCSequentialFileFactoryDriver dbDriver = new JDBCSequentialFileFactoryDriver();
|
||||
dbDriver.setSqlProvider(provider);
|
||||
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
|
||||
dbDriver.setJdbcDriverClass(driverClass);
|
||||
return dbDriver;
|
||||
}
|
||||
|
||||
static JDBCSequentialFileFactoryDriver getDBFileDriver(DataSource dataSource, SQLProvider provider) throws SQLException {
|
||||
JDBCSequentialFileFactoryDriver dbDriver;
|
||||
if (provider instanceof PostgresSQLProvider) {
|
||||
dbDriver = new PostgresSequentialSequentialFileDriver();
|
||||
dbDriver.setDataSource(dataSource);
|
||||
} else {
|
||||
dbDriver = new JDBCSequentialFileFactoryDriver(dataSource, provider);
|
||||
}
|
||||
return dbDriver;
|
||||
}
|
||||
}
|
|
@ -64,11 +64,11 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
// Allows DB Drivers to cache meta data.
|
||||
private final Map<Object, Object> metaData = new ConcurrentHashMap<>();
|
||||
|
||||
public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
|
||||
final String filename,
|
||||
final Executor executor,
|
||||
final JDBCSequentialFileFactoryDriver driver,
|
||||
final Object writeLock) throws SQLException {
|
||||
JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
|
||||
final String filename,
|
||||
final Executor executor,
|
||||
final JDBCSequentialFileFactoryDriver driver,
|
||||
final Object writeLock) throws SQLException {
|
||||
this.fileFactory = fileFactory;
|
||||
this.filename = filename;
|
||||
this.extension = filename.contains(".") ? filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : "";
|
||||
|
@ -77,7 +77,7 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
this.dbDriver = driver;
|
||||
}
|
||||
|
||||
public void setWritePosition(int writePosition) {
|
||||
void setWritePosition(int writePosition) {
|
||||
this.writePosition = writePosition;
|
||||
}
|
||||
|
||||
|
@ -172,7 +172,7 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
return internalWrite(buffer.array(), callback);
|
||||
}
|
||||
|
||||
public void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) {
|
||||
private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -181,7 +181,7 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
});
|
||||
}
|
||||
|
||||
public void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
|
||||
private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -358,10 +358,6 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
metaData.put(key, value);
|
||||
}
|
||||
|
||||
public Object removeMetaData(Object key) {
|
||||
return metaData.remove(key);
|
||||
}
|
||||
|
||||
public Object getMetaData(Object key) {
|
||||
return metaData.get(key);
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
|
|||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
|
||||
|
@ -48,10 +47,9 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
|
||||
public JDBCSequentialFileFactory(final DataSource dataSource,
|
||||
final SQLProvider sqlProvider,
|
||||
final String tableName,
|
||||
Executor executor) throws Exception {
|
||||
this.executor = executor;
|
||||
dbDriver = JDBCUtils.getDBFileDriver(dataSource, tableName, sqlProvider);
|
||||
dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider);
|
||||
}
|
||||
|
||||
public JDBCSequentialFileFactory(final String connectionUrl,
|
||||
|
@ -59,7 +57,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
final SQLProvider sqlProvider,
|
||||
Executor executor) throws Exception {
|
||||
this.executor = executor;
|
||||
dbDriver = JDBCUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
|
||||
dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -88,9 +86,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
@Override
|
||||
public SequentialFile createSequentialFile(String fileName) {
|
||||
try {
|
||||
if (fileLocks.get(fileName) == null) {
|
||||
fileLocks.put(fileName, new Object());
|
||||
}
|
||||
fileLocks.putIfAbsent(fileName, new Object());
|
||||
JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, executor, dbDriver, fileLocks.get(fileName));
|
||||
files.add(file);
|
||||
return file;
|
||||
|
|
|
@ -33,25 +33,25 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
|
||||
protected PreparedStatement deleteFile;
|
||||
|
||||
protected PreparedStatement createFile;
|
||||
PreparedStatement createFile;
|
||||
|
||||
protected PreparedStatement selectFileByFileName;
|
||||
private PreparedStatement selectFileByFileName;
|
||||
|
||||
protected PreparedStatement copyFileRecord;
|
||||
private PreparedStatement copyFileRecord;
|
||||
|
||||
protected PreparedStatement renameFile;
|
||||
private PreparedStatement renameFile;
|
||||
|
||||
protected PreparedStatement readLargeObject;
|
||||
PreparedStatement readLargeObject;
|
||||
|
||||
protected PreparedStatement appendToLargeObject;
|
||||
private PreparedStatement appendToLargeObject;
|
||||
|
||||
protected PreparedStatement selectFileNamesByExtension;
|
||||
private PreparedStatement selectFileNamesByExtension;
|
||||
|
||||
public JDBCSequentialFileFactoryDriver() {
|
||||
JDBCSequentialFileFactoryDriver() {
|
||||
super();
|
||||
}
|
||||
|
||||
public JDBCSequentialFileFactoryDriver(String tableName, DataSource dataSource, SQLProvider provider) {
|
||||
JDBCSequentialFileFactoryDriver(DataSource dataSource, SQLProvider provider) {
|
||||
super(dataSource, provider);
|
||||
}
|
||||
|
||||
|
|
|
@ -14,14 +14,12 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
|
||||
package org.apache.activemq.artemis.jdbc.store.file;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
|
||||
import org.postgresql.PGConnection;
|
||||
import org.postgresql.largeobject.LargeObject;
|
||||
import org.postgresql.largeobject.LargeObjectManager;
|
|
@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.journal.RecordInfo;
|
|||
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
||||
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
|
@ -51,7 +52,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
private static final Logger logger = Logger.getLogger(JDBCJournalImpl.class);
|
||||
|
||||
// Sync Delay in ms
|
||||
public static final int SYNC_DELAY = 5;
|
||||
private static final int SYNC_DELAY = 5;
|
||||
|
||||
private static int USER_VERSION = 1;
|
||||
|
||||
|
@ -741,4 +742,24 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
return started;
|
||||
}
|
||||
|
||||
private static class JDBCJournalSync extends ActiveMQScheduledComponent {
|
||||
|
||||
private final JDBCJournalImpl journal;
|
||||
|
||||
JDBCJournalSync(ScheduledExecutorService scheduledExecutorService,
|
||||
Executor executor,
|
||||
long checkPeriod,
|
||||
TimeUnit timeUnit,
|
||||
JDBCJournalImpl journal) {
|
||||
super(scheduledExecutorService, executor, checkPeriod, timeUnit, true);
|
||||
this.journal = journal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (journal.isStarted()) {
|
||||
journal.sync();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
|||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
|
||||
|
||||
public class JDBCJournalLoaderCallback implements LoaderCallback {
|
||||
class JDBCJournalLoaderCallback implements LoaderCallback {
|
||||
|
||||
private final List<PreparedTransactionInfo> preparedTransactions;
|
||||
|
||||
|
@ -41,16 +41,16 @@ public class JDBCJournalLoaderCallback implements LoaderCallback {
|
|||
|
||||
private long maxId = -1;
|
||||
|
||||
public JDBCJournalLoaderCallback(final List<RecordInfo> committedRecords,
|
||||
final List<PreparedTransactionInfo> preparedTransactions,
|
||||
final TransactionFailureCallback failureCallback,
|
||||
final boolean fixBadTX) {
|
||||
JDBCJournalLoaderCallback(final List<RecordInfo> committedRecords,
|
||||
final List<PreparedTransactionInfo> preparedTransactions,
|
||||
final TransactionFailureCallback failureCallback,
|
||||
final boolean fixBadTX) {
|
||||
this.committedRecords = committedRecords;
|
||||
this.preparedTransactions = preparedTransactions;
|
||||
this.failureCallback = failureCallback;
|
||||
}
|
||||
|
||||
public synchronized void checkMaxId(long id) {
|
||||
private synchronized void checkMaxId(long id) {
|
||||
if (maxId < id) {
|
||||
maxId = id;
|
||||
}
|
||||
|
|
|
@ -27,13 +27,13 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
|||
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
|
||||
public class JDBCJournalReaderCallback implements JournalReaderCallback {
|
||||
class JDBCJournalReaderCallback implements JournalReaderCallback {
|
||||
|
||||
private final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<>();
|
||||
|
||||
private final LoaderCallback loadManager;
|
||||
|
||||
public JDBCJournalReaderCallback(final LoaderCallback loadManager) {
|
||||
JDBCJournalReaderCallback(final LoaderCallback loadManager) {
|
||||
this.loadManager = loadManager;
|
||||
}
|
||||
|
||||
|
@ -126,7 +126,7 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
|
|||
// Not needed for JDBC journal impl
|
||||
}
|
||||
|
||||
public void checkPreparedTx() {
|
||||
void checkPreparedTx() {
|
||||
for (TransactionHolder transaction : loadTransactions.values()) {
|
||||
if ((!transaction.prepared && !transaction.committed) || transaction.invalid) {
|
||||
ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID);
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.activemq.artemis.core.journal.RecordInfo;
|
|||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
|
||||
|
||||
public class JDBCJournalRecord {
|
||||
class JDBCJournalRecord {
|
||||
/*
|
||||
Database Table Schema:
|
||||
|
||||
|
@ -49,17 +49,17 @@ public class JDBCJournalRecord {
|
|||
*/
|
||||
|
||||
// Record types taken from Journal Impl
|
||||
public static final byte ADD_RECORD = 11;
|
||||
public static final byte UPDATE_RECORD = 12;
|
||||
public static final byte ADD_RECORD_TX = 13;
|
||||
public static final byte UPDATE_RECORD_TX = 14;
|
||||
static final byte ADD_RECORD = 11;
|
||||
static final byte UPDATE_RECORD = 12;
|
||||
static final byte ADD_RECORD_TX = 13;
|
||||
static final byte UPDATE_RECORD_TX = 14;
|
||||
|
||||
public static final byte DELETE_RECORD_TX = 15;
|
||||
public static final byte DELETE_RECORD = 16;
|
||||
static final byte DELETE_RECORD_TX = 15;
|
||||
static final byte DELETE_RECORD = 16;
|
||||
|
||||
public static final byte PREPARE_RECORD = 17;
|
||||
public static final byte COMMIT_RECORD = 18;
|
||||
public static final byte ROLLBACK_RECORD = 19;
|
||||
static final byte PREPARE_RECORD = 17;
|
||||
static final byte COMMIT_RECORD = 18;
|
||||
static final byte ROLLBACK_RECORD = 19;
|
||||
|
||||
// Callback and sync operations
|
||||
private IOCompletion ioCompletion = null;
|
||||
|
@ -90,7 +90,7 @@ public class JDBCJournalRecord {
|
|||
|
||||
private long seq;
|
||||
|
||||
public JDBCJournalRecord(long id, byte recordType, long seq) {
|
||||
JDBCJournalRecord(long id, byte recordType, long seq) {
|
||||
this.id = id;
|
||||
this.recordType = recordType;
|
||||
|
||||
|
@ -110,26 +110,6 @@ public class JDBCJournalRecord {
|
|||
this.seq = seq;
|
||||
}
|
||||
|
||||
public static String createTableSQL(String tableName) {
|
||||
return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT)";
|
||||
}
|
||||
|
||||
public static String insertRecordsSQL(String tableName) {
|
||||
return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
|
||||
}
|
||||
|
||||
public static String selectRecordsSQL(String tableName) {
|
||||
return "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq " + "FROM " + tableName + " ORDER BY seq ASC";
|
||||
}
|
||||
|
||||
public static String deleteRecordsSQL(String tableName) {
|
||||
return "DELETE FROM " + tableName + " WHERE id = ?";
|
||||
}
|
||||
|
||||
public static String deleteJournalTxRecordsSQL(String tableName) {
|
||||
return "DELETE FROM " + tableName + " WHERE txId=?";
|
||||
}
|
||||
|
||||
public void complete(boolean success) {
|
||||
if (ioCompletion != null) {
|
||||
if (success) {
|
||||
|
@ -146,7 +126,7 @@ public class JDBCJournalRecord {
|
|||
}
|
||||
}
|
||||
|
||||
protected void writeRecord(PreparedStatement statement) throws SQLException {
|
||||
void writeRecord(PreparedStatement statement) throws SQLException {
|
||||
|
||||
byte[] recordBytes = new byte[variableSize];
|
||||
byte[] txDataBytes = new byte[txDataSize];
|
||||
|
@ -172,12 +152,12 @@ public class JDBCJournalRecord {
|
|||
statement.addBatch();
|
||||
}
|
||||
|
||||
protected void writeDeleteRecord(PreparedStatement deleteStatement) throws SQLException {
|
||||
void writeDeleteRecord(PreparedStatement deleteStatement) throws SQLException {
|
||||
deleteStatement.setLong(1, id);
|
||||
deleteStatement.addBatch();
|
||||
}
|
||||
|
||||
public static JDBCJournalRecord readRecord(ResultSet rs) throws SQLException {
|
||||
static JDBCJournalRecord readRecord(ResultSet rs) throws SQLException {
|
||||
JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2), rs.getLong(11));
|
||||
record.setCompactCount((byte) rs.getShort(3));
|
||||
record.setTxId(rs.getLong(4));
|
||||
|
@ -190,18 +170,14 @@ public class JDBCJournalRecord {
|
|||
return record;
|
||||
}
|
||||
|
||||
public IOCompletion getIoCompletion() {
|
||||
IOCompletion getIoCompletion() {
|
||||
return ioCompletion;
|
||||
}
|
||||
|
||||
public void setIoCompletion(IOCompletion ioCompletion) {
|
||||
void setIoCompletion(IOCompletion ioCompletion) {
|
||||
this.ioCompletion = ioCompletion;
|
||||
}
|
||||
|
||||
public boolean isStoreLineUp() {
|
||||
return storeLineUp;
|
||||
}
|
||||
|
||||
public void setStoreLineUp(boolean storeLineUp) {
|
||||
this.storeLineUp = storeLineUp;
|
||||
}
|
||||
|
@ -222,27 +198,23 @@ public class JDBCJournalRecord {
|
|||
return recordType;
|
||||
}
|
||||
|
||||
public byte getCompactCount() {
|
||||
byte getCompactCount() {
|
||||
return compactCount;
|
||||
}
|
||||
|
||||
public void setCompactCount(byte compactCount) {
|
||||
private void setCompactCount(byte compactCount) {
|
||||
this.compactCount = compactCount;
|
||||
}
|
||||
|
||||
public long getTxId() {
|
||||
long getTxId() {
|
||||
return txId;
|
||||
}
|
||||
|
||||
public void setTxId(long txId) {
|
||||
void setTxId(long txId) {
|
||||
this.txId = txId;
|
||||
}
|
||||
|
||||
public int getVariableSize() {
|
||||
return variableSize;
|
||||
}
|
||||
|
||||
public void setVariableSize(int variableSize) {
|
||||
private void setVariableSize(int variableSize) {
|
||||
this.variableSize = variableSize;
|
||||
}
|
||||
|
||||
|
@ -277,31 +249,19 @@ public class JDBCJournalRecord {
|
|||
return record;
|
||||
}
|
||||
|
||||
public int getTxCheckNoRecords() {
|
||||
int getTxCheckNoRecords() {
|
||||
return txCheckNoRecords;
|
||||
}
|
||||
|
||||
public void setTxCheckNoRecords(int txCheckNoRecords) {
|
||||
private void setTxCheckNoRecords(int txCheckNoRecords) {
|
||||
this.txCheckNoRecords = txCheckNoRecords;
|
||||
}
|
||||
|
||||
public void setTxDataSize(int txDataSize) {
|
||||
private void setTxDataSize(int txDataSize) {
|
||||
this.txDataSize = txDataSize;
|
||||
}
|
||||
|
||||
public int getTxDataSize() {
|
||||
return txDataSize;
|
||||
}
|
||||
|
||||
public InputStream getTxData() {
|
||||
return txData;
|
||||
}
|
||||
|
||||
public void setTxData(InputStream record) {
|
||||
this.record = record;
|
||||
}
|
||||
|
||||
public void setTxData(EncodingSupport txData) {
|
||||
void setTxData(EncodingSupport txData) {
|
||||
this.txDataSize = txData.getEncodeSize();
|
||||
|
||||
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(txDataSize);
|
||||
|
@ -309,7 +269,7 @@ public class JDBCJournalRecord {
|
|||
this.txData = new ActiveMQBufferInputStream(encodedBuffer);
|
||||
}
|
||||
|
||||
public void setTxData(byte[] txData) {
|
||||
void setTxData(byte[] txData) {
|
||||
if (txData != null) {
|
||||
this.txDataSize = txData.length;
|
||||
this.txData = new ByteArrayInputStream(txData);
|
||||
|
@ -320,19 +280,19 @@ public class JDBCJournalRecord {
|
|||
return isUpdate;
|
||||
}
|
||||
|
||||
public byte[] getRecordData() throws IOException {
|
||||
private byte[] getRecordData() throws IOException {
|
||||
byte[] data = new byte[variableSize];
|
||||
record.read(data);
|
||||
return data;
|
||||
}
|
||||
|
||||
public byte[] getTxDataAsByteArray() throws IOException {
|
||||
byte[] getTxDataAsByteArray() throws IOException {
|
||||
byte[] data = new byte[txDataSize];
|
||||
txData.read(data);
|
||||
return data;
|
||||
}
|
||||
|
||||
public RecordInfo toRecordInfo() throws IOException {
|
||||
RecordInfo toRecordInfo() throws IOException {
|
||||
return new RecordInfo(getId(), getUserRecordType(), getRecordData(), isUpdate(), getCompactCount());
|
||||
}
|
||||
|
||||
|
@ -340,7 +300,7 @@ public class JDBCJournalRecord {
|
|||
return isTransactional;
|
||||
}
|
||||
|
||||
public long getSeq() {
|
||||
long getSeq() {
|
||||
return seq;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,45 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.jdbc.store.journal;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
|
||||
|
||||
public class JDBCJournalSync extends ActiveMQScheduledComponent {
|
||||
|
||||
private final JDBCJournalImpl journal;
|
||||
|
||||
public JDBCJournalSync(ScheduledExecutorService scheduledExecutorService,
|
||||
Executor executor,
|
||||
long checkPeriod,
|
||||
TimeUnit timeUnit,
|
||||
JDBCJournalImpl journal) {
|
||||
super(scheduledExecutorService, executor, checkPeriod, timeUnit, true);
|
||||
this.journal = journal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (journal.isStarted()) {
|
||||
journal.sync();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,7 +30,7 @@ final class TransactionHolder {
|
|||
|
||||
public final long transactionID;
|
||||
|
||||
public final List<RecordInfo> recordInfos = new ArrayList<>();
|
||||
final List<RecordInfo> recordInfos = new ArrayList<>();
|
||||
|
||||
public final List<RecordInfo> recordsToDelete = new ArrayList<>();
|
||||
|
||||
|
@ -38,7 +38,7 @@ final class TransactionHolder {
|
|||
|
||||
public boolean invalid;
|
||||
|
||||
public byte[] extraData;
|
||||
byte[] extraData;
|
||||
|
||||
public boolean committed;
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
|
@ -53,10 +53,6 @@ public class JDBCSequentialFileFactoryTest {
|
|||
@Rule
|
||||
public ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
|
||||
|
||||
private static String connectionUrl = "jdbc:derby:target/data;create=true";
|
||||
|
||||
private static String tableName = "FILES";
|
||||
|
||||
private static String className = EmbeddedDriver.class.getCanonicalName();
|
||||
|
||||
private JDBCSequentialFileFactory factory;
|
||||
|
@ -65,6 +61,8 @@ public class JDBCSequentialFileFactoryTest {
|
|||
public void setup() throws Exception {
|
||||
Executor executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||
|
||||
String connectionUrl = "jdbc:derby:target/data;create=true";
|
||||
String tableName = "FILES";
|
||||
factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName), executor);
|
||||
factory.start();
|
||||
}
|
||||
|
@ -198,7 +196,7 @@ public class JDBCSequentialFileFactoryTest {
|
|||
fail(errorMessage);
|
||||
}
|
||||
|
||||
public void assertEmpty(int timeout) throws InterruptedException {
|
||||
void assertEmpty(int timeout) throws InterruptedException {
|
||||
countDownLatch.await(timeout, TimeUnit.SECONDS);
|
||||
assertEquals(countDownLatch.getCount(), 0);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.util.logging.Level;
|
|||
import java.util.logging.Logger;
|
||||
|
||||
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
|
||||
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
||||
import org.osgi.framework.BundleContext;
|
||||
import org.osgi.framework.ServiceReference;
|
||||
import org.osgi.util.tracker.ServiceTrackerCustomizer;
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
|
|||
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
||||
|
@ -59,7 +59,7 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
|
|||
}
|
||||
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName()), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor());
|
||||
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName()), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor());
|
||||
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName()), dbConf.getLargeMessageTableName(), executor);
|
||||
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName()), executor);
|
||||
} else {
|
||||
String driverClassName = dbConf.getJdbcDriverClassName();
|
||||
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName()), scheduledExecutorService, executorFactory.getExecutor());
|
||||
|
|
|
@ -129,7 +129,7 @@ import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivatio
|
|||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
||||
|
@ -467,7 +467,7 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
}
|
||||
|
||||
public void destroyTables(List<String> tableNames) throws Exception {
|
||||
Driver driver = JDBCUtils.getDriver(getJDBCClassName());
|
||||
Driver driver = getDriver(getJDBCClassName());
|
||||
Connection connection = driver.connect(getTestJDBCConnectionUrl(), null);
|
||||
Statement statement = connection.createStatement();
|
||||
try {
|
||||
|
@ -490,6 +490,30 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
}
|
||||
}
|
||||
|
||||
private Driver getDriver(String className) throws Exception {
|
||||
try {
|
||||
Driver driver = (Driver) Class.forName(className).newInstance();
|
||||
|
||||
// Shutdown the derby if using the derby embedded driver.
|
||||
if (className.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
DriverManager.getConnection("jdbc:derby:;shutdown=true");
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
return driver;
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new RuntimeException("Could not find class: " + className);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Unable to instantiate driver class: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<String, Object> generateInVMParams(final int node) {
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
|
||||
|
|
Loading…
Reference in New Issue