This commit is contained in:
franz1981 2020-10-06 09:23:37 +02:00
commit 4979262f2d
29 changed files with 1142 additions and 1264 deletions

View File

@ -455,6 +455,9 @@ public final class ActiveMQDefaultConfiguration {
// Default JDBC Driver class name, derby by default just for demo purposes
private static String DEFAULT_JDBC_DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
// Default JDBC Driver class name. DBCP2 BasicDataSource is used by default.
private static String DEFAULT_JDBC_DATA_SOURCE_CLASS_NAME = "org.apache.commons.dbcp2.BasicDataSource";
// Default message table name, used with Database storage type
private static String DEFAULT_MESSAGE_TABLE_NAME = "MESSAGES";
@ -1392,6 +1395,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_JDBC_DRIVER_CLASS_NAME;
}
public static String getDefaultDataSourceClassName() {
return DEFAULT_JDBC_DATA_SOURCE_CLASS_NAME;
}
public static String getDefaultLargeMessagesTableName() {
return DEFAULT_LARGE_MESSAGES_TABLE_NAME;
}

View File

@ -81,6 +81,13 @@
<version>${project.version}</version>
</dependency>
<!-- Default DataSource for database -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.1.1</version>
</dependency>
<!-- Database driver support -->
<dependency>
<groupId>org.apache.derby</groupId>

View File

@ -16,23 +16,15 @@
*/
package org.apache.activemq.artemis.jdbc.store.drivers;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.activemq.artemis.jdbc.store.logging.LoggingConnection;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
/**
@ -43,80 +35,27 @@ public abstract class AbstractJDBCDriver {
private static final Logger logger = Logger.getLogger(AbstractJDBCDriver.class);
protected Connection connection;
protected SQLProvider sqlProvider;
private String jdbcConnectionUrl;
protected JDBCConnectionProvider connectionProvider;
private String jdbcDriverClass;
public AbstractJDBCDriver() { }
private DataSource dataSource;
private Executor networkTimeoutExecutor;
private int networkTimeoutMillis;
private String user;
private String password;
public AbstractJDBCDriver() {
this.networkTimeoutExecutor = null;
this.networkTimeoutMillis = -1;
}
public AbstractJDBCDriver(SQLProvider sqlProvider, String jdbcConnectionUrl, String user, String password, String jdbcDriverClass) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
this.user = user;
this.password = password;
this.jdbcDriverClass = jdbcDriverClass;
this.sqlProvider = sqlProvider;
this.networkTimeoutExecutor = null;
this.networkTimeoutMillis = -1;
}
public AbstractJDBCDriver(DataSource dataSource, SQLProvider provider) {
this.dataSource = dataSource;
public AbstractJDBCDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) {
this.connectionProvider = connectionProvider;
this.sqlProvider = provider;
this.networkTimeoutExecutor = null;
this.networkTimeoutMillis = -1;
}
public void start() throws SQLException {
connect();
synchronized (connection) {
createSchema();
prepareStatements();
}
}
public AbstractJDBCDriver(Connection connection, SQLProvider sqlProvider) {
if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
this.connection = new LoggingConnection(connection, logger);
} else {
this.connection = connection;
}
this.sqlProvider = sqlProvider;
this.networkTimeoutExecutor = null;
this.networkTimeoutMillis = -1;
createSchema();
prepareStatements();
}
public void stop() throws SQLException {
synchronized (connection) {
if (sqlProvider.closeConnectionOnShutdown()) {
try {
connection.setAutoCommit(true);
connection.close();
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
throw e;
}
}
}
}
protected abstract void prepareStatements() throws SQLException;
protected abstract void prepareStatements();
protected abstract void createSchema() throws SQLException;
@ -124,217 +63,116 @@ public abstract class AbstractJDBCDriver {
createTableIfNotExists(sqlProvider.getTableName(), schemaSqls);
}
private void connect() throws SQLException {
if (connection == null) {
if (dataSource != null) {
try {
connection = dataSource.getConnection();
if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
this.connection = new LoggingConnection(connection, logger);
}
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
throw e;
}
} else {
try {
if (jdbcDriverClass == null || jdbcDriverClass.isEmpty()) {
throw new IllegalStateException("jdbcDriverClass is null or empty!");
}
if (jdbcConnectionUrl == null || jdbcConnectionUrl.isEmpty()) {
throw new IllegalStateException("jdbcConnectionUrl is null or empty!");
}
final Driver dbDriver = getDriver(jdbcDriverClass);
Properties properties = new Properties();
if (user != null) {
properties.setProperty("user", user);
properties.setProperty("password", password);
}
connection = dbDriver.connect(jdbcConnectionUrl, properties);
if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
this.connection = new LoggingConnection(connection, logger);
}
if (connection == null) {
throw new IllegalStateException("the driver: " + jdbcDriverClass + " isn't able to connect to the requested url: " + jdbcConnectionUrl);
}
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
throw e;
}
}
if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor == null) {
logger.warn("Unable to set a network timeout on the JDBC connection: networkTimeoutExecutor is null");
}
if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor != null) {
try {
connection.setNetworkTimeout(this.networkTimeoutExecutor, this.networkTimeoutMillis);
} catch (SQLException e) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
ActiveMQJournalLogger.LOGGER.warn("Unable to set a network timeout on the JDBC connection");
} catch (Throwable throwable) {
//it included SecurityExceptions and UnsupportedOperationException
logger.warn("Unable to set a network timeout on the JDBC connection", throwable);
}
}
}
}
public void destroy() throws Exception {
final String dropTableSql = "DROP TABLE " + sqlProvider.getTableName();
try {
connection.setAutoCommit(false);
try (Statement statement = connection.createStatement()) {
statement.executeUpdate(dropTableSql);
}
connection.commit();
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, dropTableSql));
try (Connection connection = connectionProvider.getConnection()) {
try {
connection.rollback();
} catch (SQLException rollbackEx) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, dropTableSql));
throw rollbackEx;
connection.setAutoCommit(false);
try (Statement statement = connection.createStatement()) {
statement.executeUpdate(dropTableSql);
}
connection.commit();
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, dropTableSql));
try {
connection.rollback();
} catch (SQLException rollbackEx) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, dropTableSql));
throw rollbackEx;
}
throw e;
}
throw e;
}
}
private void createTableIfNotExists(String tableName, String... sqls) throws SQLException {
logger.tracef("Validating if table %s didn't exist before creating", tableName);
try {
connection.setAutoCommit(false);
final boolean tableExists;
try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) {
if (rs == null || !rs.next()) {
tableExists = false;
if (logger.isTraceEnabled()) {
logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls));
}
if (rs != null) {
final SQLWarning sqlWarning = rs.getWarnings();
if (sqlWarning != null) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning));
try (Connection connection = connectionProvider.getConnection()) {
try {
connection.setAutoCommit(false);
final boolean tableExists;
try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) {
if (rs == null || !rs.next()) {
tableExists = false;
if (logger.isTraceEnabled()) {
logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls));
}
}
} else {
tableExists = true;
}
}
if (tableExists) {
logger.tracef("Validating if the existing table %s is initialized or not", tableName);
try (Statement statement = connection.createStatement();
ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) {
logger.tracef("Validation of the existing table %s initialization is started", tableName);
int rows;
if (cntRs.next() && (rows = cntRs.getInt(1)) > 0) {
logger.tracef("Table %s did exist but is not empty. Skipping initialization. Found %d rows.", tableName, rows);
if (logger.isDebugEnabled()) {
final long expectedRows = Stream.of(sqls).map(String::toUpperCase).filter(sql -> sql.contains("INSERT INTO")).count();
if (rows < expectedRows) {
logger.debug("Table " + tableName + " was expected to contain " + expectedRows + " rows while it has " + rows + " rows.");
if (rs != null) {
final SQLWarning sqlWarning = rs.getWarnings();
if (sqlWarning != null) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning));
}
}
connection.commit();
return;
} else {
sqls = Stream.of(sqls).filter(sql -> {
final String upperCaseSql = sql.toUpperCase();
return !(upperCaseSql.contains("CREATE TABLE") || upperCaseSql.contains("CREATE INDEX"));
}).toArray(String[]::new);
if (sqls.length > 0) {
logger.tracef("Table %s did exist but is empty. Starting initialization.", tableName);
tableExists = true;
}
}
if (tableExists) {
logger.tracef("Validating if the existing table %s is initialized or not", tableName);
try (Statement statement = connection.createStatement();
ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) {
logger.tracef("Validation of the existing table %s initialization is started", tableName);
int rows;
if (cntRs.next() && (rows = cntRs.getInt(1)) > 0) {
logger.tracef("Table %s did exist but is not empty. Skipping initialization. Found %d rows.", tableName, rows);
if (logger.isDebugEnabled()) {
final long expectedRows = Stream.of(sqls).map(String::toUpperCase).filter(sql -> sql.contains("INSERT INTO")).count();
if (rows < expectedRows) {
logger.debug("Table " + tableName + " was expected to contain " + expectedRows + " rows while it has " + rows + " rows.");
}
}
connection.commit();
return;
} else {
logger.tracef("Table %s did exist but is empty. Initialization completed: no initialization statements left.", tableName);
sqls = Stream.of(sqls).filter(sql -> {
final String upperCaseSql = sql.toUpperCase();
return !(upperCaseSql.contains("CREATE TABLE") || upperCaseSql.contains("CREATE INDEX"));
}).toArray(String[]::new);
if (sqls.length > 0) {
logger.tracef("Table %s did exist but is empty. Starting initialization.", tableName);
} else {
logger.tracef("Table %s did exist but is empty. Initialization completed: no initialization statements left.", tableName);
}
}
} catch (SQLException e) {
//that's not a real issue and do not deserve any user-level log:
//some DBMS just return stale information about table existence
//and can fail on later attempts to access them
if (logger.isTraceEnabled()) {
logger.trace(JDBCUtils.appendSQLExceptionDetails(new StringBuilder("Can't verify the initialization of table ").append(tableName).append(" due to:"), e, sqlProvider.getCountJournalRecordsSQL()));
}
try {
connection.rollback();
} catch (SQLException rollbackEx) {
logger.debug("Rollback failed while validating initialization of a table", rollbackEx);
}
connection.setAutoCommit(false);
logger.tracef("Table %s seems to exist, but we can't verify the initialization. Keep trying to create and initialize.", tableName);
}
}
if (sqls.length > 0) {
try (Statement statement = connection.createStatement()) {
for (String sql : sqls) {
statement.executeUpdate(sql);
final SQLWarning statementSqlWarning = statement.getWarnings();
if (statementSqlWarning != null) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), statementSqlWarning, sql));
}
}
}
} catch (SQLException e) {
//that's not a real issue and do not deserve any user-level log:
//some DBMS just return stale information about table existence
//and can fail on later attempts to access them
if (logger.isTraceEnabled()) {
logger.trace(JDBCUtils.appendSQLExceptionDetails(new StringBuilder("Can't verify the initialization of table ").append(tableName).append(" due to:"), e, sqlProvider.getCountJournalRecordsSQL()));
}
try {
connection.rollback();
} catch (SQLException rollbackEx) {
logger.debug("Rollback failed while validating initialization of a table", rollbackEx);
}
connection.setAutoCommit(false);
logger.tracef("Table %s seems to exist, but we can't verify the initialization. Keep trying to create and initialize.", tableName);
connection.commit();
}
}
if (sqls.length > 0) {
try (Statement statement = connection.createStatement()) {
for (String sql : sqls) {
statement.executeUpdate(sql);
final SQLWarning statementSqlWarning = statement.getWarnings();
if (statementSqlWarning != null) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), statementSqlWarning, sql));
}
}
} catch (SQLException e) {
final String sqlStatements = String.join("\n", sqls);
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements));
try {
connection.rollback();
} catch (SQLException rollbackEx) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, sqlStatements));
throw rollbackEx;
}
connection.commit();
}
} catch (SQLException e) {
final String sqlStatements = String.join("\n", sqls);
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements));
try {
connection.rollback();
} catch (SQLException rollbackEx) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, sqlStatements));
throw rollbackEx;
}
throw e;
}
}
private static AtomicBoolean shutAdded = new AtomicBoolean(false);
private static class ShutdownDerby extends Thread {
@Override
public void run() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
} catch (Exception e) {
}
}
}
private Driver getDriver(String className) {
try {
Driver driver = (Driver) Class.forName(className).newInstance();
// Shutdown the derby if using the derby embedded driver.
if (className.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
if (shutAdded.compareAndSet(false, true)) {
Runtime.getRuntime().addShutdownHook(new ShutdownDerby());
}
}
return driver;
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException("Could not find class: " + className);
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate driver class: ", e);
}
}
public Connection getConnection() {
return connection;
}
public final void setConnection(Connection connection) {
if (this.connection == null) {
if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
this.connection = new LoggingConnection(connection, logger);
} else {
this.connection = connection;
throw e;
}
}
}
@ -343,37 +181,12 @@ public abstract class AbstractJDBCDriver {
this.sqlProvider = sqlProvider;
}
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
public void setJdbcConnectionProvider(JDBCConnectionProvider connectionProvider) {
this.connectionProvider = connectionProvider;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public void setJdbcDriverClass(String jdbcDriverClass) {
this.jdbcDriverClass = jdbcDriverClass;
}
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
public void setNetworkTimeout(Executor executor, int milliseconds) {
this.networkTimeoutExecutor = executor;
this.networkTimeoutMillis = milliseconds;
public JDBCConnectionProvider getJdbcConnectionProvider() {
return this.connectionProvider;
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.drivers;
import org.apache.activemq.artemis.jdbc.store.logging.LoggingConnection;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
public class JDBCConnectionProvider {
private static final Logger logger = Logger.getLogger(JDBCConnectionProvider.class);
private DataSource dataSource;
private Executor networkTimeoutExecutor;
private int networkTimeoutMillis;
public JDBCConnectionProvider(DataSource dataSource) {
this.dataSource = dataSource;
this.networkTimeoutExecutor = null;
this.networkTimeoutMillis = -1;
addDerbyShutdownHook();
}
public synchronized Connection getConnection() throws SQLException {
Connection connection;
try {
connection = dataSource.getConnection();
if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
connection = new LoggingConnection(connection, logger);
}
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
throw e;
}
if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor == null) {
logger.warn("Unable to set a network timeout on the JDBC connection: networkTimeoutExecutor is null");
}
if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor != null) {
try {
connection.setNetworkTimeout(this.networkTimeoutExecutor, this.networkTimeoutMillis);
} catch (SQLException e) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
ActiveMQJournalLogger.LOGGER.warn("Unable to set a network timeout on the JDBC connection");
} catch (Throwable throwable) {
//it included SecurityExceptions and UnsupportedOperationException
logger.warn("Unable to set a network timeout on the JDBC connection", throwable);
}
}
return connection;
}
private static AtomicBoolean shutAdded = new AtomicBoolean(false);
private static class ShutdownDerby extends Thread {
@Override
public void run() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
} catch (Exception e) { }
}
}
public void addDerbyShutdownHook() {
// Shutdown the derby if using the derby embedded driver.
try (Connection connection = getConnection()) {
if (connection.getMetaData().getDriverName().equals("org.apache.derby.jdbc.EmbeddedDriver")) {
if (shutAdded.compareAndSet(false, true)) {
Runtime.getRuntime().addShutdownHook(new ShutdownDerby());
}
}
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
}
}
public void setNetworkTimeout(Executor executor, int milliseconds) {
this.networkTimeoutExecutor = executor;
this.networkTimeoutMillis = milliseconds;
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.drivers;
import org.apache.commons.beanutils.PropertyUtils;
import org.jboss.logging.Logger;
import javax.sql.DataSource;
import java.util.Map;
import java.util.stream.Collectors;
public class JDBCDataSourceUtils {
private static final Logger logger = Logger.getLogger(JDBCDataSourceUtils.class);
public static DataSource getDataSource(String dataSourceClassName, Map<String, Object> dataSourceProperties) {
logger.info(new StringBuilder("Initialising JDBC data source: ").append(dataSourceClassName).append(" ")
.append(dataSourceProperties.keySet().stream()
.map(key -> key + "=" + dataSourceProperties.get(key))
.collect(Collectors.joining(", ", "{", "}"))));
try {
DataSource dataSource = (DataSource) Class.forName(dataSourceClassName).newInstance();
for (Map.Entry<String, Object> entry : dataSourceProperties.entrySet()) {
PropertyUtils.setProperty(dataSource, entry.getKey(), entry.getValue());
}
return dataSource;
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException("Could not find class: " + dataSourceClassName);
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate DataSource", e);
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.jdbc.store.drivers;
import java.sql.SQLException;
import java.util.Map;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@ -39,6 +40,13 @@ public class JDBCUtils {
return factory.create(tableName, storeType);
}
public static SQLProvider getSQLProvider(Map<String, Object> dataSourceProperties, String tableName, SQLProvider.DatabaseStoreType storeType) {
PropertySQLProvider.Factory.SQLDialect dialect = PropertySQLProvider.Factory.investigateDialect(dataSourceProperties);
logger.tracef("getSQLProvider Returning SQL provider for dialect %s, tableName::%s", dialect, tableName);
PropertySQLProvider.Factory factory = new PropertySQLProvider.Factory(dialect);
return factory.create(tableName, storeType);
}
/**
* Append to {@code errorMessage} a detailed description of the provided {@link SQLException}.<br>
* The information appended are:

View File

@ -17,41 +17,38 @@
package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@SuppressWarnings("SynchronizeOnNonFinalField")
public final class Db2SequentialFileDriver extends JDBCSequentialFileFactoryDriver {
private PreparedStatement replaceLargeObject;
private String replaceLargeObject;
public Db2SequentialFileDriver() {
super();
}
public Db2SequentialFileDriver(DataSource dataSource, SQLProvider provider) {
super(dataSource, provider);
}
public Db2SequentialFileDriver(Connection connection, SQLProvider provider) {
super(connection, provider);
public Db2SequentialFileDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) {
super(connectionProvider, provider);
}
@Override
protected void prepareStatements() throws SQLException {
this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), new String[]{"ID"});
this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
this.replaceLargeObject = connection.prepareStatement(sqlProvider.getReplaceLargeObjectSQL());
this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL());
this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
protected void prepareStatements() {
this.deleteFile = sqlProvider.getDeleteFileSQL();
this.createFile = sqlProvider.getInsertFileSQL();
this.createFileColumnNames = new String[]{"ID"};
this.selectFileByFileName = sqlProvider.getSelectFileByFileName();
this.copyFileRecord = sqlProvider.getCopyFileRecordByIdSQL();
this.renameFile = sqlProvider.getUpdateFileNameByIdSQL();
this.readLargeObject = sqlProvider.getReadLargeObjectSQL();
this.replaceLargeObject = sqlProvider.getReplaceLargeObjectSQL();
this.appendToLargeObject = sqlProvider.getAppendToLargeObjectSQL();
this.selectFileNamesByExtension = sqlProvider.getSelectFileNamesByExtensionSQL();
}
@Override
@ -59,9 +56,8 @@ public final class Db2SequentialFileDriver extends JDBCSequentialFileFactoryDriv
if (data == null || data.length == 0) {
return 0;
}
final PreparedStatement largeObjectStatement = append ? appendToLargeObject : replaceLargeObject;
synchronized (connection) {
try {
try (Connection connection = connectionProvider.getConnection()) {
try (PreparedStatement largeObjectStatement = connection.prepareStatement(append ? appendToLargeObject : replaceLargeObject)) {
connection.setAutoCommit(false);
int bytesWritten;
largeObjectStatement.setBytes(1, data);
@ -81,4 +77,4 @@ public final class Db2SequentialFileDriver extends JDBCSequentialFileFactoryDriv
}
}
}
}
}

View File

@ -17,10 +17,10 @@
package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@ -29,54 +29,18 @@ import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Fac
class JDBCFileUtils {
static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
String jdbcConnectionUrl,
String user,
String password,
SQLProvider provider) throws SQLException {
final JDBCSequentialFileFactoryDriver dbDriver;
final PropertySQLProvider.Factory.SQLDialect sqlDialect = PropertySQLProvider.Factory.identifyDialect(driverClass);
if (POSTGRESQL.equals(sqlDialect)) {
dbDriver = new PostgresSequentialSequentialFileDriver();
} else if (DB2.equals(sqlDialect)) {
dbDriver = new Db2SequentialFileDriver();
} else {
dbDriver = new JDBCSequentialFileFactoryDriver();
}
dbDriver.setSqlProvider(provider);
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
dbDriver.setUser(user);
dbDriver.setPassword(password);
return dbDriver;
}
static JDBCSequentialFileFactoryDriver getDBFileDriver(DataSource dataSource, SQLProvider provider) throws SQLException {
static JDBCSequentialFileFactoryDriver getDBFileDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) throws SQLException {
final JDBCSequentialFileFactoryDriver dbDriver;
final PropertySQLProvider.Factory.SQLDialect sqlDialect;
try (Connection connection = dataSource.getConnection()) {
try (Connection connection = connectionProvider.getConnection()) {
sqlDialect = PropertySQLProvider.Factory.investigateDialect(connection);
}
if (POSTGRESQL.equals(sqlDialect)) {
dbDriver = new PostgresSequentialSequentialFileDriver(dataSource, provider);
dbDriver = new PostgresSequentialSequentialFileDriver(connectionProvider, provider);
} else if (DB2.equals(sqlDialect)) {
dbDriver = new Db2SequentialFileDriver(dataSource, provider);
dbDriver = new Db2SequentialFileDriver(connectionProvider, provider);
} else {
dbDriver = new JDBCSequentialFileFactoryDriver(dataSource, provider);
}
return dbDriver;
}
static JDBCSequentialFileFactoryDriver getDBFileDriver(Connection connection, SQLProvider provider) throws SQLException {
JDBCSequentialFileFactoryDriver dbDriver;
final PropertySQLProvider.Factory.SQLDialect sqlDialect = PropertySQLProvider.Factory.investigateDialect(connection);
if (POSTGRESQL.equals(sqlDialect)) {
dbDriver = new PostgresSequentialSequentialFileDriver(connection, provider);
dbDriver.setConnection(connection);
} else if (DB2.equals(sqlDialect)) {
dbDriver = new Db2SequentialFileDriver(connection, provider);
} else {
dbDriver = new JDBCSequentialFileFactoryDriver(connection, provider);
dbDriver = new JDBCSequentialFileFactoryDriver(connectionProvider, provider);
}
return dbDriver;
}

View File

@ -16,10 +16,8 @@
*/
package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource;
import java.io.File;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@ -32,6 +30,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
@ -53,7 +52,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
private final IOCriticalErrorListener criticalErrorListener;
public JDBCSequentialFileFactory(final DataSource dataSource,
public JDBCSequentialFileFactory(final JDBCConnectionProvider connectionProvider,
final SQLProvider sqlProvider,
Executor executor,
IOCriticalErrorListener criticalErrorListener) throws Exception {
@ -62,38 +61,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
this.criticalErrorListener = criticalErrorListener;
try {
this.dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider);
} catch (SQLException e) {
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
}
}
public JDBCSequentialFileFactory(final String connectionUrl,
String userName,
String password,
final String className,
final SQLProvider sqlProvider,
Executor executor,
IOCriticalErrorListener criticalErrorListener) throws Exception {
this.executor = executor;
this.criticalErrorListener = criticalErrorListener;
try {
this.dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, userName, password, sqlProvider);
} catch (SQLException e) {
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
}
}
public JDBCSequentialFileFactory(final Connection connection,
final SQLProvider sqlProvider,
final Executor executor,
final IOCriticalErrorListener criticalErrorListener) throws Exception {
this.executor = executor;
this.criticalErrorListener = criticalErrorListener;
try {
this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
this.dbDriver = JDBCFileUtils.getDBFileDriver(connectionProvider, sqlProvider);
} catch (SQLException e) {
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
}
@ -103,14 +71,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
return dbDriver;
}
/**
* @see Connection#setNetworkTimeout(Executor, int)
**/
public JDBCSequentialFileFactory setNetworkTimeout(Executor executor, int milliseconds) {
this.dbDriver.setNetworkTimeout(executor, milliseconds);
return this;
}
@Override
public SequentialFileFactory setDatasync(boolean enabled) {
return this;

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource;
import java.nio.ByteBuffer;
import java.sql.Blob;
import java.sql.Connection;
@ -28,6 +27,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.jboss.logging.Logger;
@ -36,32 +36,25 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
private static final Logger logger = Logger.getLogger(JDBCSequentialFileFactoryDriver.class);
protected PreparedStatement deleteFile;
protected PreparedStatement createFile;
protected PreparedStatement selectFileByFileName;
protected PreparedStatement copyFileRecord;
protected PreparedStatement renameFile;
protected PreparedStatement readLargeObject;
protected PreparedStatement appendToLargeObject;
protected PreparedStatement selectFileNamesByExtension;
protected String deleteFile;
protected String createFile;
protected String[] createFileColumnNames;
protected int createFileAutogeneratedKeys;
protected String selectFileByFileName;
protected String copyFileRecord;
protected String renameFile;
protected String readLargeObject;
protected String appendToLargeObject;
protected Integer appendToLargeObjectResultSetType;
protected Integer appendToLargeObjectResultSetConcurrency;
protected String selectFileNamesByExtension;
JDBCSequentialFileFactoryDriver() {
super();
}
JDBCSequentialFileFactoryDriver(DataSource dataSource, SQLProvider provider) {
super(dataSource, provider);
}
JDBCSequentialFileFactoryDriver(Connection connection, SQLProvider sqlProvider) {
super(connection, sqlProvider);
JDBCSequentialFileFactoryDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) {
super(connectionProvider, provider);
}
@Override
@ -70,22 +63,25 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
}
@Override
protected void prepareStatements() throws SQLException {
this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), new String[] {"ID"});
this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
protected void prepareStatements() {
this.deleteFile = sqlProvider.getDeleteFileSQL();
this.createFile = sqlProvider.getInsertFileSQL();
this.createFileColumnNames = new String[] {"ID"};
this.selectFileByFileName = sqlProvider.getSelectFileByFileName();
this.copyFileRecord = sqlProvider.getCopyFileRecordByIdSQL();
this.renameFile = sqlProvider.getUpdateFileNameByIdSQL();
this.readLargeObject = sqlProvider.getReadLargeObjectSQL();
this.appendToLargeObject = sqlProvider.getAppendToLargeObjectSQL();
this.appendToLargeObjectResultSetType = ResultSet.TYPE_FORWARD_ONLY;
this.appendToLargeObjectResultSetConcurrency = ResultSet.CONCUR_UPDATABLE;
this.selectFileNamesByExtension = sqlProvider.getSelectFileNamesByExtensionSQL();
}
public List<String> listFiles(String extension) throws Exception {
synchronized (connection) {
List<String> fileNames = new ArrayList<>();
try {
connection.setAutoCommit(false);
List<String> fileNames = new ArrayList<>();
try (Connection connection = connectionProvider.getConnection()) {
connection.setAutoCommit(false);
try (PreparedStatement selectFileNamesByExtension = connection.prepareStatement(this.selectFileNamesByExtension)) {
selectFileNamesByExtension.setString(1, extension);
try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
while (rs.next()) {
@ -97,8 +93,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
connection.rollback();
throw e;
}
return fileNames;
}
return fileNames;
}
/**
@ -108,14 +104,12 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException
*/
public void openFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) {
final long fileId = fileExists(file);
if (fileId < 0) {
createFile(file);
} else {
file.setId(fileId);
loadFile(file);
}
final long fileId = fileExists(file);
if (fileId < 0) {
createFile(file);
} else {
file.setId(fileId);
loadFile(file);
}
}
@ -131,18 +125,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException
*/
public long fileExists(JDBCSequentialFile file) throws SQLException {
try {
synchronized (connection) {
try (Connection connection = connectionProvider.getConnection()) {
try (PreparedStatement selectFileByFileName = connection.prepareStatement(this.selectFileByFileName)) {
connection.setAutoCommit(false);
selectFileByFileName.setString(1, file.getFileName());
try (ResultSet rs = selectFileByFileName.executeQuery()) {
final long id = rs.next() ? rs.getLong(1) : -1;
connection.commit();
return id;
} catch (Exception e) {
connection.rollback();
throw e;
}
} catch (Exception e) {
connection.rollback();
throw e;
}
} catch (NullPointerException npe) {
npe.printStackTrace();
@ -157,20 +151,22 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException
*/
public void loadFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) {
connection.setAutoCommit(false);
readLargeObject.setLong(1, file.getId());
try (Connection connection = connectionProvider.getConnection()) {
try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) {
connection.setAutoCommit(false);
readLargeObject.setLong(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
Blob blob = rs.getBlob(1);
if (blob != null) {
file.setWritePosition(blob.length());
} else {
logger.trace("No Blob found for file: " + file.getFileName() + " " + file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
Blob blob = rs.getBlob(1);
if (blob != null) {
file.setWritePosition(blob.length());
} else {
logger.trace("No Blob found for file: " + file.getFileName() + " " + file.getId());
}
}
connection.commit();
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
@ -185,18 +181,23 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException
*/
public void createFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) {
try (Connection connection = connectionProvider.getConnection()) {
try {
connection.setAutoCommit(false);
createFile.setString(1, file.getFileName());
createFile.setString(2, file.getExtension());
createFile.setBytes(3, new byte[0]);
createFile.executeUpdate();
try (ResultSet keys = createFile.getGeneratedKeys()) {
keys.next();
file.setId(keys.getLong(1));
try (PreparedStatement createFile =
createFileColumnNames != null ?
connection.prepareStatement(this.createFile, this.createFileColumnNames) :
connection.prepareStatement(this.createFile, this.createFileAutogeneratedKeys)) {
createFile.setString(1, file.getFileName());
createFile.setString(2, file.getExtension());
createFile.setBytes(3, new byte[0]);
createFile.executeUpdate();
try (ResultSet keys = createFile.getGeneratedKeys()) {
keys.next();
file.setId(keys.getLong(1));
}
connection.commit();
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
@ -212,9 +213,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException
*/
public void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
synchronized (connection) {
try {
connection.setAutoCommit(false);
try (Connection connection = connectionProvider.getConnection()) {
connection.setAutoCommit(false);
try (PreparedStatement renameFile = connection.prepareStatement(this.renameFile)) {
renameFile.setString(1, newFileName);
renameFile.setLong(2, file.getId());
renameFile.executeUpdate();
@ -233,8 +234,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException
*/
public void deleteFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) {
try {
try (Connection connection = connectionProvider.getConnection()) {
try (PreparedStatement deleteFile = connection.prepareStatement(this.deleteFile)) {
connection.setAutoCommit(false);
deleteFile.setLong(1, file.getId());
deleteFile.executeUpdate();
@ -259,31 +260,36 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException
*/
public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException {
synchronized (connection) {
try (Connection connection = connectionProvider.getConnection()) {
connection.setAutoCommit(false);
appendToLargeObject.setLong(1, file.getId());
try (PreparedStatement appendToLargeObject =
this.appendToLargeObjectResultSetType != null && this.appendToLargeObjectResultSetConcurrency != null ?
connection.prepareStatement(this.appendToLargeObject, this.appendToLargeObjectResultSetType, this.appendToLargeObjectResultSetConcurrency) :
connection.prepareStatement(this.appendToLargeObject)) {
appendToLargeObject.setLong(1, file.getId());
int bytesWritten = 0;
try (ResultSet rs = appendToLargeObject.executeQuery()) {
if (rs.next()) {
Blob blob = rs.getBlob(1);
if (blob == null) {
blob = connection.createBlob();
int bytesWritten = 0;
try (ResultSet rs = appendToLargeObject.executeQuery()) {
if (rs.next()) {
Blob blob = rs.getBlob(1);
if (blob == null) {
blob = connection.createBlob();
}
if (append) {
bytesWritten = blob.setBytes(blob.length() + 1, data);
} else {
blob.truncate(0);
bytesWritten = blob.setBytes(1, data);
}
rs.updateBlob(1, blob);
rs.updateRow();
}
if (append) {
bytesWritten = blob.setBytes(blob.length() + 1, data);
} else {
blob.truncate(0);
bytesWritten = blob.setBytes(1, data);
}
rs.updateBlob(1, blob);
rs.updateRow();
connection.commit();
return bytesWritten;
} catch (SQLException e) {
connection.rollback();
throw e;
}
connection.commit();
return bytesWritten;
} catch (SQLException e) {
connection.rollback();
throw e;
}
}
}
@ -297,35 +303,37 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException
*/
public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
synchronized (connection) {
try (Connection connection = connectionProvider.getConnection()) {
connection.setAutoCommit(false);
readLargeObject.setLong(1, file.getId());
int readLength = 0;
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
final Blob blob = rs.getBlob(1);
if (blob != null) {
final long blobLength = blob.length();
final int bytesRemaining = bytes.remaining();
final long filePosition = file.position();
readLength = (int) calculateReadLength(blobLength, bytesRemaining, filePosition);
if (logger.isDebugEnabled()) {
logger.debugf("trying read %d bytes: blobLength = %d bytesRemaining = %d filePosition = %d",
readLength, blobLength, bytesRemaining, filePosition);
}
if (readLength < 0) {
readLength = -1;
} else if (readLength > 0) {
byte[] data = blob.getBytes(file.position() + 1, readLength);
bytes.put(data);
try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) {
readLargeObject.setLong(1, file.getId());
int readLength = 0;
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
final Blob blob = rs.getBlob(1);
if (blob != null) {
final long blobLength = blob.length();
final int bytesRemaining = bytes.remaining();
final long filePosition = file.position();
readLength = (int) calculateReadLength(blobLength, bytesRemaining, filePosition);
if (logger.isDebugEnabled()) {
logger.debugf("trying read %d bytes: blobLength = %d bytesRemaining = %d filePosition = %d",
readLength, blobLength, bytesRemaining, filePosition);
}
if (readLength < 0) {
readLength = -1;
} else if (readLength > 0) {
byte[] data = blob.getBytes(file.position() + 1, readLength);
bytes.put(data);
}
}
}
connection.commit();
return readLength;
} catch (SQLException e) {
connection.rollback();
throw e;
}
connection.commit();
return readLength;
} catch (SQLException e) {
connection.rollback();
throw e;
}
}
}
@ -338,9 +346,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException
*/
public void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
synchronized (connection) {
try {
connection.setAutoCommit(false);
try (Connection connection = connectionProvider.getConnection()) {
connection.setAutoCommit(false);
try (PreparedStatement copyFileRecord = connection.prepareStatement(this.copyFileRecord)) {
copyFileRecord.setLong(1, fileFrom.getId());
copyFileRecord.setLong(2, fileTo.getId());
copyFileRecord.executeUpdate();
@ -357,7 +365,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
*/
@Override
public void destroy() throws SQLException {
synchronized (connection) {
try (Connection connection = connectionProvider.getConnection()) {
try {
connection.setAutoCommit(false);
try (Statement statement = connection.createStatement()) {

View File

@ -19,6 +19,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject;
@ -43,11 +44,10 @@ public class PostgresLargeObjectManager {
*/
public static final int READWRITE = READ | WRITE;
private final Connection realConnection;
private boolean shouldUseReflection;
public PostgresLargeObjectManager(Connection connection) throws SQLException {
this.realConnection = unwrap(connection);
public PostgresLargeObjectManager() {
try {
this.getClass().getClassLoader().loadClass("org.postgresql.PGConnection");
shouldUseReflection = false;
@ -56,9 +56,9 @@ public class PostgresLargeObjectManager {
}
}
public final Long createLO() throws SQLException {
public final Long createLO(Connection connection) throws SQLException {
if (shouldUseReflection) {
Object largeObjectManager = getLargeObjectManager();
Object largeObjectManager = getLargeObjectManager(connection);
try {
Method method = largeObjectManager.getClass().getMethod("createLO");
return (Long) method.invoke(largeObjectManager);
@ -66,13 +66,13 @@ public class PostgresLargeObjectManager {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
}
} else {
return ((PGConnection) realConnection).getLargeObjectAPI().createLO();
return ((PGConnection) unwrap(connection)).getLargeObjectAPI().createLO();
}
}
public Object open(long oid, int mode) throws SQLException {
public Object open(Connection connection, long oid, int mode) throws SQLException {
if (shouldUseReflection) {
Object largeObjectManager = getLargeObjectManager();
Object largeObjectManager = getLargeObjectManager(connection);
try {
Method method = largeObjectManager.getClass().getMethod("open", long.class, int.class);
return method.invoke(largeObjectManager, oid, mode);
@ -80,7 +80,7 @@ public class PostgresLargeObjectManager {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
}
} else {
return ((PGConnection) realConnection).getLargeObjectAPI().open(oid, mode);
return ((PGConnection) unwrap(connection)).getLargeObjectAPI().open(oid, mode);
}
}
@ -162,22 +162,22 @@ public class PostgresLargeObjectManager {
}
}
private Object getLargeObjectManager() throws SQLException {
private Object getLargeObjectManager(Connection connection) throws SQLException {
if (shouldUseReflection) {
try {
Method method = realConnection.getClass().getMethod("getLargeObjectAPI");
return method.invoke(realConnection);
Connection conn = unwrap(connection);
Method method = conn.getClass().getMethod("getLargeObjectAPI");
return method.invoke(conn);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
}
} else {
return ((PGConnection) realConnection).getLargeObjectAPI();
return ((PGConnection) unwrap(connection)).getLargeObjectAPI();
}
}
public final Connection unwrap(Connection connection) throws SQLException {
Connection conn = connection.unwrap(Connection.class);
return unwrapIronJacamar(unwrapDbcp(unwrapSpring(conn)));
return unwrapIronJacamar(unwrapDbcp(unwrapDbcp2(unwrapSpring(connection.unwrap(Connection.class)))));
}
private Connection unwrapIronJacamar(Connection conn) {
@ -198,6 +198,15 @@ public class PostgresLargeObjectManager {
}
}
private Connection unwrapDbcp2(Connection conn) {
try {
Method method = conn.getClass().getMethod("getInnermostDelegateInternal");
return (Connection) method.invoke(conn);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
return conn;
}
}
private Connection unwrapSpring(Connection conn) {
try {
Method method = conn.getClass().getMethod("getTargetConnection");

View File

@ -18,14 +18,14 @@ package org.apache.activemq.artemis.jdbc.store.file;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import javax.sql.DataSource;
@SuppressWarnings("SynchronizeOnNonFinalField")
public final class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver {
@ -36,37 +36,32 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
super();
}
public PostgresSequentialSequentialFileDriver(DataSource dataSource, SQLProvider provider) {
public PostgresSequentialSequentialFileDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) {
super();
this.setDataSource(dataSource);
this.setSqlProvider(provider);
}
public PostgresSequentialSequentialFileDriver(Connection connection, SQLProvider provider) {
super();
this.setConnection(connection);
this.setJdbcConnectionProvider(connectionProvider);
this.setSqlProvider(provider);
}
@Override
protected void prepareStatements() throws SQLException {
this.largeObjectManager = new PostgresLargeObjectManager(connection);
this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL());
this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
protected void prepareStatements() {
this.largeObjectManager = new PostgresLargeObjectManager();
this.deleteFile = sqlProvider.getDeleteFileSQL();
this.createFile = sqlProvider.getInsertFileSQL();
this.createFileAutogeneratedKeys = Statement.RETURN_GENERATED_KEYS;
this.selectFileByFileName = sqlProvider.getSelectFileByFileName();
this.copyFileRecord = sqlProvider.getCopyFileRecordByIdSQL();
this.renameFile = sqlProvider.getUpdateFileNameByIdSQL();
this.readLargeObject = sqlProvider.getReadLargeObjectSQL();
this.appendToLargeObject = sqlProvider.getAppendToLargeObjectSQL();
this.selectFileNamesByExtension = sqlProvider.getSelectFileNamesByExtensionSQL();
}
@Override
public void createFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) {
try {
try (Connection connection = connectionProvider.getConnection()) {
try (PreparedStatement createFile = connection.prepareStatement(this.createFile, this.createFileAutogeneratedKeys)) {
connection.setAutoCommit(false);
Long oid = largeObjectManager.createLO();
Long oid = largeObjectManager.createLO(connection);
createFile.setString(1, file.getFileName());
createFile.setString(2, file.getExtension());
@ -87,31 +82,31 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
@Override
public void loadFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) {
connection.setAutoCommit(false);
readLargeObject.setLong(1, file.getId());
try (Connection connection = connectionProvider.getConnection()) {
try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) {
connection.setAutoCommit(false);
readLargeObject.setLong(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
file.setWritePosition(getPostGresLargeObjectSize(file));
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
file.setWritePosition(getPostGresLargeObjectSize(file));
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
}
}
}
@Override
public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException {
synchronized (connection) {
Object largeObject = null;
try (Connection connection = connectionProvider.getConnection()) {
Long oid = getOID(file);
try {
connection.setAutoCommit(false);
largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.WRITE);
Object largeObject = largeObjectManager.open(connection, oid, PostgresLargeObjectManager.WRITE);
if (append) {
largeObjectManager.seek(largeObject, largeObjectManager.size(largeObject));
} else {
@ -130,12 +125,11 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
@Override
public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
Object largeObject = null;
long oid = getOID(file);
synchronized (connection) {
try (Connection connection = connectionProvider.getConnection()) {
try {
connection.setAutoCommit(false);
largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.READ);
Object largeObject = largeObjectManager.open(connection, oid, PostgresLargeObjectManager.READ);
int readLength = (int) calculateReadLength(largeObjectManager.size(largeObject), bytes.remaining(), file.position());
if (readLength > 0) {
@ -160,17 +154,19 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
private Long getOID(JDBCSequentialFile file) throws SQLException {
Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
if (oid == null) {
synchronized (connection) {
connection.setAutoCommit(false);
readLargeObject.setLong(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
try (Connection connection = connectionProvider.getConnection()) {
try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) {
connection.setAutoCommit(false);
readLargeObject.setLong(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
}
}
}
@ -184,10 +180,10 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
int size = 0;
Long oid = getOID(file);
if (oid != null) {
synchronized (connection) {
try (Connection connection = connectionProvider.getConnection()) {
try {
connection.setAutoCommit(false);
Object largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.READ);
Object largeObject = largeObjectManager.open(connection, oid, PostgresLargeObjectManager.READ);
size = largeObjectManager.size(largeObject);
largeObjectManager.close(largeObject);
connection.commit();

View File

@ -17,7 +17,7 @@
package org.apache.activemq.artemis.jdbc.store.journal;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
import org.jboss.logging.Logger;
@ -67,15 +68,15 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
private final List<JDBCJournalRecord> records;
private PreparedStatement insertJournalRecords;
private String insertJournalRecords;
private PreparedStatement selectJournalRecords;
private String selectJournalRecords;
private PreparedStatement countJournalRecords;
private String countJournalRecords;
private PreparedStatement deleteJournalRecords;
private String deleteJournalRecords;
private PreparedStatement deleteJournalTxRecords;
private String deleteJournalTxRecords;
private boolean started;
@ -95,30 +96,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
private final IOCriticalErrorListener criticalIOErrorListener;
public JDBCJournalImpl(DataSource dataSource,
public JDBCJournalImpl(JDBCConnectionProvider connectionProvider,
SQLProvider provider,
ScheduledExecutorService scheduledExecutorService,
Executor completeExecutor,
IOCriticalErrorListener criticalIOErrorListener,
long syncDelay) {
super(dataSource, provider);
records = new ArrayList<>();
this.scheduledExecutorService = scheduledExecutorService;
this.completeExecutor = completeExecutor;
this.criticalIOErrorListener = criticalIOErrorListener;
this.syncDelay = syncDelay;
}
public JDBCJournalImpl(String jdbcUrl,
String user,
String password,
String jdbcDriverClass,
SQLProvider sqlProvider,
ScheduledExecutorService scheduledExecutorService,
Executor completeExecutor,
IOCriticalErrorListener criticalIOErrorListener,
long syncDelay) {
super(sqlProvider, jdbcUrl, user, password, jdbcDriverClass);
super(connectionProvider, provider);
records = new ArrayList<>();
this.scheduledExecutorService = scheduledExecutorService;
this.completeExecutor = completeExecutor;
@ -153,13 +137,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
@Override
protected void prepareStatements() throws SQLException {
protected void prepareStatements() {
logger.tracef("preparing statements");
insertJournalRecords = connection.prepareStatement(sqlProvider.getInsertJournalRecordsSQL());
selectJournalRecords = connection.prepareStatement(sqlProvider.getSelectJournalRecordsSQL());
countJournalRecords = connection.prepareStatement(sqlProvider.getCountJournalRecordsSQL());
deleteJournalRecords = connection.prepareStatement(sqlProvider.getDeleteJournalRecordsSQL());
deleteJournalTxRecords = connection.prepareStatement(sqlProvider.getDeleteJournalTxRecordsSQL());
insertJournalRecords = sqlProvider.getInsertJournalRecordsSQL();
selectJournalRecords = sqlProvider.getSelectJournalRecordsSQL();
countJournalRecords = sqlProvider.getCountJournalRecordsSQL();
deleteJournalRecords = sqlProvider.getDeleteJournalRecordsSQL();
deleteJournalTxRecords = sqlProvider.getDeleteJournalTxRecordsSQL();
}
@Override
@ -205,65 +189,70 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
TransactionHolder holder;
try {
connection.setAutoCommit(false);
try (Connection connection = connectionProvider.getConnection()) {
for (JDBCJournalRecord record : recordRef) {
try (PreparedStatement deleteJournalRecords = connection.prepareStatement(this.deleteJournalRecords);
PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords);
PreparedStatement insertJournalRecords = connection.prepareStatement(this.insertJournalRecords)) {
if (logger.isTraceEnabled()) {
logger.trace("sync::preparing JDBC statement for " + record);
}
connection.setAutoCommit(false);
for (JDBCJournalRecord record : recordRef) {
if (logger.isTraceEnabled()) {
logger.trace("sync::preparing JDBC statement for " + record);
}
switch (record.getRecordType()) {
case JDBCJournalRecord.DELETE_RECORD:
// Standard SQL Delete Record, Non transactional delete
deletedRecords.add(record.getId());
record.writeDeleteRecord(deleteJournalRecords);
break;
case JDBCJournalRecord.ROLLBACK_RECORD:
// Roll back we remove all records associated with this TX ID. This query is always performed last.
deleteJournalTxRecords.setLong(1, record.getTxId());
deleteJournalTxRecords.addBatch();
break;
case JDBCJournalRecord.COMMIT_RECORD:
// We perform all the deletes and add the commit record in the same Database TX
holder = transactions.get(record.getTxId());
for (RecordInfo info : holder.recordsToDelete) {
switch (record.getRecordType()) {
case JDBCJournalRecord.DELETE_RECORD:
// Standard SQL Delete Record, Non transactional delete
deletedRecords.add(record.getId());
deletedRecords.add(info.id);
deleteJournalRecords.setLong(1, info.id);
deleteJournalRecords.addBatch();
}
record.writeRecord(insertJournalRecords);
committedTransactions.add(record.getTxId());
break;
default:
// Default we add a new record to the DB
record.writeRecord(insertJournalRecords);
break;
record.writeDeleteRecord(deleteJournalRecords);
break;
case JDBCJournalRecord.ROLLBACK_RECORD:
// Roll back we remove all records associated with this TX ID. This query is always performed last.
deleteJournalTxRecords.setLong(1, record.getTxId());
deleteJournalTxRecords.addBatch();
break;
case JDBCJournalRecord.COMMIT_RECORD:
// We perform all the deletes and add the commit record in the same Database TX
holder = transactions.get(record.getTxId());
for (RecordInfo info : holder.recordsToDelete) {
deletedRecords.add(record.getId());
deletedRecords.add(info.id);
deleteJournalRecords.setLong(1, info.id);
deleteJournalRecords.addBatch();
}
record.writeRecord(insertJournalRecords);
committedTransactions.add(record.getTxId());
break;
default:
// Default we add a new record to the DB
record.writeRecord(insertJournalRecords);
break;
}
}
}
insertJournalRecords.executeBatch();
deleteJournalRecords.executeBatch();
deleteJournalTxRecords.executeBatch();
connection.commit();
if (logger.isTraceEnabled()) {
logger.trace("JDBC commit worked");
}
if (cleanupTxRecords(deletedRecords, committedTransactions)) {
insertJournalRecords.executeBatch();
deleteJournalRecords.executeBatch();
deleteJournalTxRecords.executeBatch();
connection.commit();
logger.trace("JDBC commit worked on cleanupTxRecords");
if (logger.isTraceEnabled()) {
logger.trace("JDBC commit worked");
}
if (cleanupTxRecords(deletedRecords, committedTransactions)) {
deleteJournalTxRecords.executeBatch();
connection.commit();
logger.trace("JDBC commit worked on cleanupTxRecords");
}
executeCallbacks(recordRef, true);
return recordRef.size();
}
executeCallbacks(recordRef, true);
return recordRef.size();
} catch (Exception e) {
handleException(recordRef, e);
return 0;
@ -280,18 +269,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
logger.trace("Rolling back Transaction, just in case");
}
try {
connection.rollback();
} catch (Throwable rollback) {
logger.warn(rollback);
}
try {
connection.close();
} catch (Throwable rollback) {
logger.warn(rollback);
}
if (recordRef != null) {
executeCallbacks(recordRef, false);
}
@ -308,23 +285,27 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
transactions.get(txId).committed = true;
}
boolean hasDeletedJournalTxRecords = false;
// TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
for (TransactionHolder h : iterableCopyTx) {
iterableCopy = new ArrayList<>();
iterableCopy.addAll(h.recordInfos);
try (Connection connection = connectionProvider.getConnection();
PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords)) {
// TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
for (TransactionHolder h : iterableCopyTx) {
for (RecordInfo info : iterableCopy) {
if (deletedRecords.contains(info.id)) {
h.recordInfos.remove(info);
iterableCopy = new ArrayList<>();
iterableCopy.addAll(h.recordInfos);
for (RecordInfo info : iterableCopy) {
if (deletedRecords.contains(info.id)) {
h.recordInfos.remove(info);
}
}
}
if (h.recordInfos.isEmpty() && h.committed) {
deleteJournalTxRecords.setLong(1, h.transactionID);
deleteJournalTxRecords.addBatch();
hasDeletedJournalTxRecords = true;
transactions.remove(h.transactionID);
if (h.recordInfos.isEmpty() && h.committed) {
deleteJournalTxRecords.setLong(1, h.transactionID);
deleteJournalTxRecords.addBatch();
hasDeletedJournalTxRecords = true;
transactions.remove(h.transactionID);
}
}
}
return hasDeletedJournalTxRecords;
@ -868,51 +849,54 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager);
JDBCJournalRecord r;
try (ResultSet rs = selectJournalRecords.executeQuery()) {
int noRecords = 0;
while (rs.next()) {
r = JDBCJournalRecord.readRecord(rs);
switch (r.getRecordType()) {
case JDBCJournalRecord.ADD_RECORD:
jrc.onReadAddRecord(r.toRecordInfo());
break;
case JDBCJournalRecord.UPDATE_RECORD:
jrc.onReadUpdateRecord(r.toRecordInfo());
break;
case JDBCJournalRecord.DELETE_RECORD:
jrc.onReadDeleteRecord(r.getId());
break;
case JDBCJournalRecord.ADD_RECORD_TX:
jrc.onReadAddRecordTX(r.getTxId(), r.toRecordInfo());
break;
case JDBCJournalRecord.UPDATE_RECORD_TX:
jrc.onReadUpdateRecordTX(r.getTxId(), r.toRecordInfo());
break;
case JDBCJournalRecord.DELETE_RECORD_TX:
jrc.onReadDeleteRecordTX(r.getTxId(), r.toRecordInfo());
break;
case JDBCJournalRecord.PREPARE_RECORD:
jrc.onReadPrepareRecord(r.getTxId(), r.getTxDataAsByteArray(), r.getTxCheckNoRecords());
break;
case JDBCJournalRecord.COMMIT_RECORD:
jrc.onReadCommitRecord(r.getTxId(), r.getTxCheckNoRecords());
break;
case JDBCJournalRecord.ROLLBACK_RECORD:
jrc.onReadRollbackRecord(r.getTxId());
break;
default:
throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType());
try (Connection connection = connectionProvider.getConnection();
PreparedStatement selectJournalRecords = connection.prepareStatement(this.selectJournalRecords)) {
try (ResultSet rs = selectJournalRecords.executeQuery()) {
int noRecords = 0;
while (rs.next()) {
r = JDBCJournalRecord.readRecord(rs);
switch (r.getRecordType()) {
case JDBCJournalRecord.ADD_RECORD:
jrc.onReadAddRecord(r.toRecordInfo());
break;
case JDBCJournalRecord.UPDATE_RECORD:
jrc.onReadUpdateRecord(r.toRecordInfo());
break;
case JDBCJournalRecord.DELETE_RECORD:
jrc.onReadDeleteRecord(r.getId());
break;
case JDBCJournalRecord.ADD_RECORD_TX:
jrc.onReadAddRecordTX(r.getTxId(), r.toRecordInfo());
break;
case JDBCJournalRecord.UPDATE_RECORD_TX:
jrc.onReadUpdateRecordTX(r.getTxId(), r.toRecordInfo());
break;
case JDBCJournalRecord.DELETE_RECORD_TX:
jrc.onReadDeleteRecordTX(r.getTxId(), r.toRecordInfo());
break;
case JDBCJournalRecord.PREPARE_RECORD:
jrc.onReadPrepareRecord(r.getTxId(), r.getTxDataAsByteArray(), r.getTxCheckNoRecords());
break;
case JDBCJournalRecord.COMMIT_RECORD:
jrc.onReadCommitRecord(r.getTxId(), r.getTxCheckNoRecords());
break;
case JDBCJournalRecord.ROLLBACK_RECORD:
jrc.onReadRollbackRecord(r.getTxId());
break;
default:
throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType());
}
noRecords++;
if (r.getSeq() > seq.longValue()) {
seq.set(r.getSeq());
}
}
noRecords++;
if (r.getSeq() > seq.longValue()) {
seq.set(r.getSeq());
}
}
jrc.checkPreparedTx();
jrc.checkPreparedTx();
jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
jli.setNumberOfRecords(noRecords);
transactions = jrc.getTransactions();
jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
jli.setNumberOfRecords(noRecords);
transactions = jrc.getTransactions();
}
} catch (Throwable e) {
handleException(null, e);
}
@ -962,9 +946,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
@Override
public int getNumberOfRecords() {
int count = 0;
try (ResultSet rs = countJournalRecords.executeQuery()) {
rs.next();
count = rs.getInt(1);
try (Connection connection = connectionProvider.getConnection();
PreparedStatement countJournalRecords = connection.prepareStatement(this.countJournalRecords)) {
try (ResultSet rs = countJournalRecords.executeQuery()) {
rs.next();
count = rs.getInt(1);
}
} catch (SQLException e) {
logger.warn(e.getMessage(), e);
return -1;

View File

@ -22,10 +22,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.jboss.logging.Logger;
@ -363,7 +365,15 @@ public class PropertySQLProvider implements SQLProvider {
}
public Factory(DataSource dataSource) {
this(investigateDialect(dataSource));
this(new JDBCConnectionProvider(dataSource));
}
public Factory(Map<String, Object> dataSourceProperties) {
this(investigateDialect(dataSourceProperties));
}
public Factory(JDBCConnectionProvider connectionProvider) {
this(investigateDialect(connectionProvider));
}
public static SQLDialect investigateDialect(Connection connection) {
@ -388,8 +398,21 @@ public class PropertySQLProvider implements SQLProvider {
return dialect;
}
private static SQLDialect investigateDialect(DataSource dataSource) {
try (Connection connection = dataSource.getConnection()) {
public static SQLDialect investigateDialect(Map<String, Object> dataSourceProperties) {
SQLDialect dialect = null;
for (Object entry : dataSourceProperties.values()) {
if (entry instanceof String) {
dialect = identifyDialect((String) entry);
if (dialect != null) {
return dialect;
}
}
}
return dialect;
}
private static SQLDialect investigateDialect(JDBCConnectionProvider connectionProvider) {
try (Connection connection = connectionProvider.getConnection()) {
return investigateDialect(connection);
} catch (Exception e) {
logger.debug("Unable to read JDBC metadata.", e);

View File

@ -21,9 +21,11 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@ -31,11 +33,14 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCDataSourceUtils;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
@ -79,15 +84,20 @@ public class JDBCSequentialFileFactoryTest {
@Before
public void setup() throws Exception {
executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
Map<String, Object> dataSourceProperties = new HashMap<>();
if (useAuthentication) {
user = "testuser";
password = "testpassword";
System.setProperty("derby.connection.requireAuthentication", "true");
System.setProperty("derby.user." + user, password);
dataSourceProperties.put("username", user);
dataSourceProperties.put("password", password);
}
String connectionUrl = "jdbc:derby:target/data;create=true";
dataSourceProperties.put("url", "jdbc:derby:target/data;create=true");
dataSourceProperties.put("driverClassName", className);
String tableName = "FILES";
factory = new JDBCSequentialFileFactory(connectionUrl, user, password, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() {
String jdbcDatasourceClass = ActiveMQDefaultConfiguration.getDefaultDataSourceClassName();
factory = new JDBCSequentialFileFactory(new JDBCConnectionProvider(JDBCDataSourceUtils.getDataSource(jdbcDatasourceClass, dataSourceProperties)), JDBCUtils.getSQLProvider(dataSourceProperties, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
}

View File

@ -44,21 +44,21 @@ public class PostgresLargeObjectManagerTest {
@Test
public void testShouldNotUseReflection() throws SQLException {
PostgresLargeObjectManager manager = new PostgresLargeObjectManager(new MockConnection());
PostgresLargeObjectManager manager = new PostgresLargeObjectManager();
try {
manager.createLO();
manager.createLO(new MockConnection());
fail("Shouldn't be using reflection");
} catch (ClassCastException ex) {
}
}
@Test
public void testShouldUseReflection() throws SQLException, ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException {
public void testShouldUseReflection() throws ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException {
ClassLoader loader = new FunkyClassLoader();
Class funkyClass = loader.loadClass("org.apache.activemq.artemis.jdbc.store.file.PostgresLargeObjectManager");
Object manager = funkyClass.getConstructor(Connection.class).newInstance(new MockConnection());
Object manager = funkyClass.getConstructor().newInstance();
try {
funkyClass.getMethod("createLO").invoke(manager);
funkyClass.getMethod("createLO", Connection.class).invoke(manager, new MockConnection());
fail("Shouldn't be using reflection");
} catch (java.lang.reflect.InvocationTargetException ex) {
assertEquals("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex.getCause().getMessage());

View File

@ -20,8 +20,13 @@ import javax.sql.DataSource;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCDataSourceUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import java.util.HashMap;
import java.util.Map;
public class DatabaseStorageConfiguration implements StoreConfiguration {
private String messageTableName = ActiveMQDefaultConfiguration.getDefaultMessageTableName();
@ -44,6 +49,12 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private DataSource dataSource;
private String dataSourceClassName = ActiveMQDefaultConfiguration.getDefaultDataSourceClassName();
private Map<String, Object> dataSourceProperties = new HashMap();
private JDBCConnectionProvider connectionProvider;
private SQLProvider.Factory sqlProviderFactory;
private int jdbcNetworkTimeout = ActiveMQDefaultConfiguration.getDefaultJdbcNetworkTimeout();
@ -138,7 +149,22 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
*
* @return the DataSource used to store Artemis data in the JDBC data store.
*/
public DataSource getDataSource() {
private DataSource getDataSource() {
if (dataSource == null) {
if (dataSourceProperties.isEmpty()) {
addDataSourceProperty("driverClassName", jdbcDriverClassName);
addDataSourceProperty("url", jdbcConnectionUrl);
if (jdbcUser != null) {
addDataSourceProperty("username", jdbcUser);
}
if (jdbcPassword != null) {
addDataSourceProperty("password", jdbcPassword);
}
// Let the pool to have unbounded number of connections by default to prevent connection starvation
addDataSourceProperty("maxTotal", "-1");
}
dataSource = JDBCDataSourceUtils.getDataSource(dataSourceClassName, dataSourceProperties);
}
return dataSource;
}
@ -151,6 +177,33 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
this.dataSource = dataSource;
}
public JDBCConnectionProvider getConnectionProvider() {
if (connectionProvider == null) {
connectionProvider = new JDBCConnectionProvider(getDataSource());
}
return connectionProvider;
}
public void addDataSourceProperty(String key, String value) {
if (value.toLowerCase().equals("true") || value.toLowerCase().equals("false")) {
dataSourceProperties.put(key, Boolean.parseBoolean(value.toLowerCase()));
} else {
try {
int i = Integer.parseInt(value);
dataSourceProperties.put(key, i);
} catch (NumberFormatException nfe) {
dataSourceProperties.put(key, value);
}
}
}
public String getDataSourceClassName() {
return dataSourceClassName;
}
public void setDataSourceClassName(String dataSourceClassName) {
this.dataSourceClassName = dataSourceClassName;
}
/**
* The {@link SQLProvider.Factory} used to communicate with the JDBC data store.
* It can be {@code null}. If the value is {@code null} and {@code dataSource} is set, the {@code {@link org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory}} will be used,

View File

@ -1738,6 +1738,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
password = PasswordMaskingUtil.resolveMask(mainConfig.isMaskPassword(), password, mainConfig.getPasswordCodec());
}
conf.setJdbcPassword(password);
conf.setDataSourceClassName(getString(storeNode, "data-source-class-name", conf.getDataSourceClassName(), Validators.NO_CHECK));
if (parameterExists(storeNode, "data-source-properties")) {
NodeList propertyNodeList = storeNode.getElementsByTagName("data-source-property");
for (int i = 0; i < propertyNodeList.getLength(); i++) {
Element propertyNode = (Element) propertyNodeList.item(i);
conf.addDataSourceProperty(propertyNode.getAttributeNode("key").getValue(), propertyNode.getAttributeNode("value").getValue());
}
}
//conf.initDataSource();
return conf;
}

View File

@ -40,10 +40,8 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@ -72,16 +70,12 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
protected final StorageManager storageManager;
private JDBCSequentialFileFactoryDriver dbDriver;
private DatabaseStorageConfiguration dbConf;
private ExecutorFactory executorFactory;
private JDBCSequentialFileFactory pagingFactoryFileFactory;
private JDBCSequentialFile directoryList;
private final boolean readWholePage;
@Override
@ -106,8 +100,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
final ScheduledExecutorService scheduledExecutor,
final ExecutorFactory executorFactory,
final boolean syncNonTransactional,
final IOCriticalErrorListener critialErrorListener) throws Exception {
this(dbConf, storageManager, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener, false);
final IOCriticalErrorListener criticalErrorListener) throws Exception {
this(dbConf, storageManager, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, criticalErrorListener, false);
}
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
@ -116,7 +110,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
final ScheduledExecutorService scheduledExecutor,
final ExecutorFactory executorFactory,
final boolean syncNonTransactional,
final IOCriticalErrorListener critialErrorListener,
final IOCriticalErrorListener criticalErrorListener,
final boolean readWholePage) throws Exception {
this.storageManager = storageManager;
this.executorFactory = executorFactory;
@ -124,7 +118,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
this.scheduledExecutor = scheduledExecutor;
this.syncTimeout = syncTimeout;
this.dbConf = dbConf;
this.criticalErrorListener = critialErrorListener;
this.criticalErrorListener = criticalErrorListener;
this.factoryToTableName = new HashMap<>();
this.readWholePage = readWholePage;
start();
@ -137,20 +131,11 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
if (pageStoreTableNamePrefix.length() > 10) {
throw new IllegalStateException("The maximum name size for the page store table prefix is 10 characters: THE PAGING STORE CAN'T START");
}
if (dbConf.getDataSource() != null) {
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
if (sqlProviderFactory == null) {
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
}
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
} else {
String driverClassName = dbConf.getJdbcDriverClassName();
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
}
final int jdbcNetworkTimeout = dbConf.getJdbcNetworkTimeout();
if (jdbcNetworkTimeout >= 0) {
pagingFactoryFileFactory.setNetworkTimeout(this.executorFactory.getExecutor(), jdbcNetworkTimeout);
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
if (sqlProviderFactory == null) {
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getConnectionProvider());
}
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getConnectionProvider(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
pagingFactoryFileFactory.start();
started = true;
}
@ -278,22 +263,14 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
directoryList.close();
final SQLProvider sqlProvider;
if (dbConf.getDataSource() != null) {
final SQLProvider.Factory sqlProviderFactory;
if (dbConf.getSqlProviderFactory() != null) {
sqlProviderFactory = dbConf.getSqlProviderFactory();
} else {
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
}
sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
final SQLProvider.Factory sqlProviderFactory;
if (dbConf.getSqlProviderFactory() != null) {
sqlProviderFactory = dbConf.getSqlProviderFactory();
} else {
sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
}
final JDBCSequentialFileFactory fileFactory = new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener);
final int jdbcNetworkTimeout = dbConf.getJdbcNetworkTimeout();
if (jdbcNetworkTimeout >= 0) {
fileFactory.setNetworkTimeout(this.executorFactory.getExecutor(), jdbcNetworkTimeout);
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getConnectionProvider());
}
sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
final JDBCSequentialFileFactory fileFactory = new JDBCSequentialFileFactory(dbConf.getConnectionProvider(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener);
factoryToTableName.put(fileFactory, directoryName);
return fileFactory;
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -26,7 +25,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
@ -36,8 +35,6 @@ import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
public class JDBCJournalStorageManager extends JournalStorageManager {
private Connection connection;
public JDBCJournalStorageManager(Configuration config,
CriticalAnalyzer analyzer,
ExecutorFactory executorFactory,
@ -59,33 +56,35 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
protected synchronized void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
try {
final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
final JDBCConnectionProvider connectionProvider = dbConf.getConnectionProvider();
final int networkTimeout = dbConf.getJdbcNetworkTimeout();
if (networkTimeout >= 0) {
connectionProvider.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout);
}
final JDBCJournalImpl bindingsJournal;
final JDBCJournalImpl messageJournal;
final JDBCSequentialFileFactory largeMessagesFactory;
if (dbConf.getDataSource() != null) {
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
if (sqlProviderFactory == null) {
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
}
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener,dbConf.getJdbcJournalSyncPeriodMillis());
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
} else {
String driverClassName = dbConf.getJdbcDriverClassName();
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
}
final int networkTimeout = dbConf.getJdbcNetworkTimeout();
if (networkTimeout >= 0) {
bindingsJournal.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout);
}
if (networkTimeout >= 0) {
messageJournal.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout);
}
if (networkTimeout >= 0) {
largeMessagesFactory.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout);
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
if (sqlProviderFactory == null) {
sqlProviderFactory = new PropertySQLProvider.Factory(connectionProvider);
}
bindingsJournal = new JDBCJournalImpl(
connectionProvider,
sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL),
scheduledExecutorService,
executorFactory.getExecutor(),
criticalErrorListener,dbConf.getJdbcJournalSyncPeriodMillis());
messageJournal = new JDBCJournalImpl(
connectionProvider,
sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL),
scheduledExecutorService, executorFactory.getExecutor(),
criticalErrorListener,
dbConf.getJdbcJournalSyncPeriodMillis());
largeMessagesFactory = new JDBCSequentialFileFactory(
connectionProvider,
sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE),
executorFactory.getExecutor(),
criticalErrorListener);
this.bindingsJournal = bindingsJournal;
this.messageJournal = messageJournal;
this.largeMessagesFactory = largeMessagesFactory;

View File

@ -26,6 +26,7 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.jboss.logging.Logger;
/**
@ -35,13 +36,13 @@ final class JdbcLeaseLock implements LeaseLock {
private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class);
private static final int MAX_HOLDER_ID_LENGTH = 128;
private final Connection connection;
private final JDBCConnectionProvider connectionProvider;
private final String holderId;
private final PreparedStatement tryAcquireLock;
private final PreparedStatement tryReleaseLock;
private final PreparedStatement renewLock;
private final PreparedStatement isLocked;
private final PreparedStatement currentDateTime;
private final String tryAcquireLock;
private final String tryReleaseLock;
private final String renewLock;
private final String isLocked;
private final String currentDateTime;
private final long expirationMillis;
private boolean maybeAcquired;
private final String lockName;
@ -51,12 +52,12 @@ final class JdbcLeaseLock implements LeaseLock {
* whose life cycle will be managed externally.
*/
JdbcLeaseLock(String holderId,
Connection connection,
PreparedStatement tryAcquireLock,
PreparedStatement tryReleaseLock,
PreparedStatement renewLock,
PreparedStatement isLocked,
PreparedStatement currentDateTime,
JDBCConnectionProvider connectionProvider,
String tryAcquireLock,
String tryReleaseLock,
String renewLock,
String isLocked,
String currentDateTime,
long expirationMIllis,
String lockName) {
if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
@ -70,7 +71,7 @@ final class JdbcLeaseLock implements LeaseLock {
this.currentDateTime = currentDateTime;
this.expirationMillis = expirationMIllis;
this.maybeAcquired = false;
this.connection = connection;
this.connectionProvider = connectionProvider;
this.lockName = lockName;
}
@ -84,13 +85,12 @@ final class JdbcLeaseLock implements LeaseLock {
}
private String readableLockStatus() {
try {
try (Connection connection = connectionProvider.getConnection()) {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try {
try (PreparedStatement preparedStatement = connection.prepareStatement(this.isLocked)) {
final String lockStatus;
final PreparedStatement preparedStatement = this.isLocked;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
lockStatus = null;
@ -114,100 +114,96 @@ final class JdbcLeaseLock implements LeaseLock {
}
}
private long dbCurrentTimeMillis() throws SQLException {
private long dbCurrentTimeMillis(Connection connection) throws SQLException {
final long start = System.nanoTime();
try (ResultSet resultSet = currentDateTime.executeQuery()) {
resultSet.next();
final Timestamp currentTimestamp = resultSet.getTimestamp(1);
final long elapsedTime = System.nanoTime() - start;
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms",
lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime));
try (PreparedStatement currentDateTime = connection.prepareStatement(this.currentDateTime)) {
try (ResultSet resultSet = currentDateTime.executeQuery()) {
resultSet.next();
final Timestamp currentTimestamp = resultSet.getTimestamp(1);
final long elapsedTime = System.nanoTime() - start;
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms",
lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime));
}
return currentTimestamp.getTime();
}
return currentTimestamp.getTime();
}
}
@Override
public boolean renew() {
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try {
final PreparedStatement preparedStatement = this.renewLock;
final long now = dbCurrentTimeMillis();
final Timestamp expirationTime = new Timestamp(now + expirationMillis);
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
lockName, holderId, expirationTime);
}
preparedStatement.setTimestamp(1, expirationTime);
preparedStatement.setString(2, holderId);
preparedStatement.setTimestamp(3, expirationTime);
preparedStatement.setTimestamp(4, expirationTime);
final int updatedRows = preparedStatement.executeUpdate();
final boolean renewed = updatedRows == 1;
connection.commit();
if (!renewed) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
} else {
LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId);
}
return renewed;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
try (Connection connection = connectionProvider.getConnection()) {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try (PreparedStatement preparedStatement = connection.prepareStatement(this.renewLock)) {
final long now = dbCurrentTimeMillis(connection);
final Timestamp expirationTime = new Timestamp(now + expirationMillis);
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
lockName, holderId, expirationTime);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
preparedStatement.setTimestamp(1, expirationTime);
preparedStatement.setString(2, holderId);
preparedStatement.setTimestamp(3, expirationTime);
preparedStatement.setTimestamp(4, expirationTime);
final int updatedRows = preparedStatement.executeUpdate();
final boolean renewed = updatedRows == 1;
connection.commit();
if (!renewed) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
} else {
LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId);
}
return renewed;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
@Override
public boolean tryAcquire() {
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try {
final PreparedStatement preparedStatement = tryAcquireLock;
final long now = dbCurrentTimeMillis();
preparedStatement.setString(1, holderId);
final Timestamp expirationTime = new Timestamp(now + expirationMillis);
preparedStatement.setTimestamp(2, expirationTime);
preparedStatement.setTimestamp(3, expirationTime);
LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s",
lockName, holderId, expirationTime);
final boolean acquired = preparedStatement.executeUpdate() == 1;
connection.commit();
if (acquired) {
this.maybeAcquired = true;
LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId);
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to acquire lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
try (Connection connection = connectionProvider.getConnection()) {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryAcquireLock)) {
final long now = dbCurrentTimeMillis(connection);
preparedStatement.setString(1, holderId);
final Timestamp expirationTime = new Timestamp(now + expirationMillis);
preparedStatement.setTimestamp(2, expirationTime);
preparedStatement.setTimestamp(3, expirationTime);
LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s",
lockName, holderId, expirationTime);
final boolean acquired = preparedStatement.executeUpdate() == 1;
connection.commit();
if (acquired) {
this.maybeAcquired = true;
LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId);
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to acquire lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
return acquired;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
return acquired;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
@ -222,104 +218,85 @@ final class JdbcLeaseLock implements LeaseLock {
}
private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) {
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try {
boolean result;
final PreparedStatement preparedStatement = this.isLocked;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
result = false;
} else {
final String currentHolderId = resultSet.getString(1);
result = holderIdFilter.test(currentHolderId);
final Timestamp expirationTime = resultSet.getTimestamp(2);
final Timestamp currentTimestamp = resultSet.getTimestamp(3);
final long currentTimestampMillis = currentTimestamp.getTime();
boolean zombie = false;
if (expirationTime != null) {
final long lockExpirationTime = expirationTime.getTime();
final long expiredBy = currentTimestampMillis - lockExpirationTime;
if (expiredBy > 0) {
result = false;
zombie = true;
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has found %s with holderId = %s expirationTime = %s currentTimestamp = %s",
lockName, holderId, zombie ? "zombie lock" : "lock",
currentHolderId, expirationTime, currentTimestamp);
try (Connection connection = connectionProvider.getConnection()) {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try (PreparedStatement preparedStatement = connection.prepareStatement(this.isLocked)) {
boolean result;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
result = false;
} else {
final String currentHolderId = resultSet.getString(1);
result = holderIdFilter.test(currentHolderId);
final Timestamp expirationTime = resultSet.getTimestamp(2);
final Timestamp currentTimestamp = resultSet.getTimestamp(3);
final long currentTimestampMillis = currentTimestamp.getTime();
boolean zombie = false;
if (expirationTime != null) {
final long lockExpirationTime = expirationTime.getTime();
final long expiredBy = currentTimestampMillis - lockExpirationTime;
if (expiredBy > 0) {
result = false;
zombie = true;
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has found %s with holderId = %s expirationTime = %s currentTimestamp = %s",
lockName, holderId, zombie ? "zombie lock" : "lock",
currentHolderId, expirationTime, currentTimestamp);
}
}
connection.commit();
return result;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
connection.commit();
return result;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
@Override
public void release() {
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try {
final PreparedStatement preparedStatement = this.tryReleaseLock;
preparedStatement.setString(1, holderId);
final boolean released = preparedStatement.executeUpdate() == 1;
//consider it as released to avoid on finalize to be reclaimed
this.maybeAcquired = false;
connection.commit();
if (!released) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to release lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
} else {
LOGGER.debugf("[%s] %s has released lock", lockName, holderId);
try (Connection connection = connectionProvider.getConnection()) {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryReleaseLock)) {
preparedStatement.setString(1, holderId);
final boolean released = preparedStatement.executeUpdate() == 1;
//consider it as released to avoid on finalize to be reclaimed
this.maybeAcquired = false;
connection.commit();
if (!released) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to release lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} else {
LOGGER.debugf("[%s] %s has released lock", lockName, holderId);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
@Override
public void close() throws SQLException {
synchronized (connection) {
//to avoid being called if not needed
if (!this.tryReleaseLock.isClosed()) {
try {
if (this.maybeAcquired) {
release();
}
} finally {
this.tryReleaseLock.close();
this.tryAcquireLock.close();
this.renewLock.close();
this.isLocked.close();
this.currentDateTime.close();
}
}
if (this.maybeAcquired) {
release();
}
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.server.impl.jdbc;
import javax.sql.DataSource;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@ -29,7 +28,7 @@ import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@ -61,59 +60,37 @@ public final class JdbcNodeManager extends NodeManager {
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
validateTimeoutConfiguration(configuration);
if (configuration.getDataSource() != null) {
final SQLProvider.Factory sqlProviderFactory;
if (configuration.getSqlProviderFactory() != null) {
sqlProviderFactory = configuration.getSqlProviderFactory();
} else {
sqlProviderFactory = new PropertySQLProvider.Factory(configuration.getDataSource());
}
final String brokerId = java.util.UUID.randomUUID().toString();
return usingDataSource(brokerId,
configuration.getJdbcNetworkTimeout(),
configuration.getJdbcLockExpirationMillis(),
configuration.getJdbcLockRenewPeriodMillis(),
configuration.getJdbcLockAcquisitionTimeoutMillis(),
configuration.getDataSource(),
sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
final SQLProvider.Factory sqlProviderFactory;
if (configuration.getSqlProviderFactory() != null) {
sqlProviderFactory = configuration.getSqlProviderFactory();
} else {
final SQLProvider sqlProvider = JDBCUtils.getSQLProvider(configuration.getJdbcDriverClassName(), configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
final String brokerId = java.util.UUID.randomUUID().toString();
return usingConnectionUrl(brokerId,
configuration.getJdbcNetworkTimeout(),
configuration.getJdbcLockExpirationMillis(),
configuration.getJdbcLockRenewPeriodMillis(),
configuration.getJdbcLockAcquisitionTimeoutMillis(),
configuration.getJdbcConnectionUrl(),
configuration.getJdbcUser(),
configuration.getJdbcPassword(),
configuration.getJdbcDriverClassName(),
sqlProvider,
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
sqlProviderFactory = new PropertySQLProvider.Factory(configuration.getConnectionProvider());
}
final String brokerId = java.util.UUID.randomUUID().toString();
return usingConnectionProvider(brokerId,
configuration.getJdbcLockExpirationMillis(),
configuration.getJdbcLockRenewPeriodMillis(),
configuration.getJdbcLockAcquisitionTimeoutMillis(),
configuration.getConnectionProvider(),
sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
}
private static JdbcNodeManager usingDataSource(String brokerId,
int networkTimeoutMillis,
private static JdbcNodeManager usingConnectionProvider(String brokerId,
long lockExpirationMillis,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
DataSource dataSource,
JDBCConnectionProvider connectionProvider,
SQLProvider provider,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(
() -> JdbcSharedStateManager.usingDataSource(brokerId,
networkTimeoutMillis,
executorFactory == null ? null : executorFactory.getExecutor(),
() -> JdbcSharedStateManager.usingConnectionProvider(brokerId,
lockExpirationMillis,
dataSource,
connectionProvider,
provider),
lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis,
@ -122,36 +99,6 @@ public final class JdbcNodeManager extends NodeManager {
ioCriticalErrorListener);
}
private static JdbcNodeManager usingConnectionUrl(String brokerId,
int networkTimeoutMillis,
long lockExpirationMillis,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
String jdbcUrl,
String user,
String password,
String driverClass,
SQLProvider provider,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(
() -> JdbcSharedStateManager.usingConnectionUrl(brokerId,
networkTimeoutMillis,
executorFactory == null ? null : executorFactory.getExecutor(),
lockExpirationMillis,
jdbcUrl,
user,
password,
driverClass,
provider),
lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis,
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
}
private static void validateTimeoutConfiguration(DatabaseStorageConfiguration configuration) {
final long lockExpiration = configuration.getJdbcLockExpirationMillis();
if (lockExpiration <= 0) {

View File

@ -17,15 +17,14 @@
package org.apache.activemq.artemis.core.server.impl.jdbc;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.UUID;
import org.jboss.logging.Logger;
@ -42,21 +41,18 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
private final long lockExpirationMillis;
private JdbcLeaseLock liveLock;
private JdbcLeaseLock backupLock;
private PreparedStatement readNodeId;
private PreparedStatement writeNodeId;
private PreparedStatement initializeNodeId;
private PreparedStatement readState;
private PreparedStatement writeState;
private String readNodeId;
private String writeNodeId;
private String initializeNodeId;
private String readState;
private String writeState;
public static JdbcSharedStateManager usingDataSource(String holderId,
int networkTimeout,
Executor networkTimeoutExecutor,
public static JdbcSharedStateManager usingConnectionProvider(String holderId,
long locksExpirationMillis,
DataSource dataSource,
JDBCConnectionProvider connectionProvider,
SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout);
sharedStateManager.setDataSource(dataSource);
sharedStateManager.setJdbcConnectionProvider(connectionProvider);
sharedStateManager.setSqlProvider(provider);
try {
sharedStateManager.start();
@ -66,64 +62,6 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
}
}
public static JdbcSharedStateManager usingConnectionUrl(String holderId,
long locksExpirationMillis,
String jdbcConnectionUrl,
String jdbcDriverClass,
SQLProvider provider) {
return JdbcSharedStateManager.usingConnectionUrl(holderId,
-1,
null,
locksExpirationMillis,
jdbcConnectionUrl,
null,
null,
jdbcDriverClass,
provider);
}
public static JdbcSharedStateManager usingConnectionUrl(String holderId,
long locksExpirationMillis,
String jdbcConnectionUrl,
String user,
String password,
String jdbcDriverClass,
SQLProvider provider) {
return JdbcSharedStateManager.usingConnectionUrl(holderId,
-1,
null,
locksExpirationMillis,
jdbcConnectionUrl,
user,
password,
jdbcDriverClass,
provider);
}
public static JdbcSharedStateManager usingConnectionUrl(String holderId,
int networkTimeout,
Executor networkTimeoutExecutor,
long locksExpirationMillis,
String jdbcConnectionUrl,
String user,
String password,
String jdbcDriverClass,
SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout);
sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl);
sharedStateManager.setJdbcDriverClass(jdbcDriverClass);
sharedStateManager.setSqlProvider(provider);
sharedStateManager.setUser(user);
sharedStateManager.setPassword(password);
try {
sharedStateManager.start();
return sharedStateManager;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
@Override
protected void createSchema() {
try {
@ -135,28 +73,28 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
}
static JdbcLeaseLock createLiveLock(String holderId,
Connection connection,
JDBCConnectionProvider connectionProvider,
SQLProvider sqlProvider,
long expirationMillis) throws SQLException {
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "LIVE");
long expirationMillis) {
return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(), sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(), sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "LIVE");
}
static JdbcLeaseLock createBackupLock(String holderId,
Connection connection,
JDBCConnectionProvider connectionProvider,
SQLProvider sqlProvider,
long expirationMillis) throws SQLException {
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "BACKUP");
long expirationMillis) {
return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(), sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(), sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "BACKUP");
}
@Override
protected void prepareStatements() throws SQLException {
this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis);
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis);
this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL());
this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL());
this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL());
this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL());
this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
protected void prepareStatements() {
this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis);
this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis);
this.readNodeId = sqlProvider.readNodeIdSQL();
this.writeNodeId = sqlProvider.writeNodeIdSQL();
this.initializeNodeId = sqlProvider.initializeNodeIdSQL();
this.writeState = sqlProvider.writeStateSQL();
this.readState = sqlProvider.readStateSQL();
}
private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
@ -174,17 +112,18 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
return this.backupLock;
}
private UUID rawReadNodeId() throws SQLException {
final PreparedStatement preparedStatement = this.readNodeId;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
return null;
} else {
final String nodeId = resultSet.getString(1);
if (nodeId != null) {
return new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeId));
} else {
private UUID rawReadNodeId(Connection connection) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement(this.readNodeId)) {
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
return null;
} else {
final String nodeId = resultSet.getString(1);
if (nodeId != null) {
return new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeId));
} else {
return null;
}
}
}
}
@ -192,65 +131,71 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
@Override
public UUID readNodeId() {
synchronized (connection) {
try (Connection connection = connectionProvider.getConnection()) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(true);
try {
return rawReadNodeId();
return rawReadNodeId(connection);
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
@Override
public void writeNodeId(UUID nodeId) {
synchronized (connection) {
try (Connection connection = connectionProvider.getConnection()) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(true);
try {
rawWriteNodeId(nodeId);
rawWriteNodeId(connection, nodeId);
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
private void rawWriteNodeId(UUID nodeId) throws SQLException {
final PreparedStatement preparedStatement = this.writeNodeId;
preparedStatement.setString(1, nodeId.toString());
if (preparedStatement.executeUpdate() != 1) {
throw new IllegalStateException("can't write NodeId on the JDBC Node Manager Store!");
private void rawWriteNodeId(Connection connection, UUID nodeId) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement(this.writeNodeId)) {
preparedStatement.setString(1, nodeId.toString());
if (preparedStatement.executeUpdate() != 1) {
throw new IllegalStateException("can't write NodeId on the JDBC Node Manager Store!");
}
}
}
private boolean rawInitializeNodeId(UUID nodeId) throws SQLException {
final PreparedStatement preparedStatement = this.initializeNodeId;
preparedStatement.setString(1, nodeId.toString());
final int rows = preparedStatement.executeUpdate();
assert rows <= 1;
return rows > 0;
private boolean rawInitializeNodeId(Connection connection, UUID nodeId) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement(this.initializeNodeId)) {
preparedStatement.setString(1, nodeId.toString());
final int rows = preparedStatement.executeUpdate();
assert rows <= 1;
return rows > 0;
}
}
@Override
public UUID setup(Supplier<? extends UUID> nodeIdFactory) {
SQLException lastError = null;
synchronized (connection) {
try (Connection connection = connectionProvider.getConnection()) {
final UUID newNodeId = nodeIdFactory.get();
for (int attempts = 0; attempts < MAX_SETUP_ATTEMPTS; attempts++) {
lastError = null;
try {
final UUID nodeId = initializeOrReadNodeId(newNodeId);
final UUID nodeId = initializeOrReadNodeId(connection, newNodeId);
if (nodeId != null) {
return nodeId;
}
@ -259,6 +204,8 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
lastError = e;
}
}
} catch (SQLException e) {
lastError = e;
}
if (lastError != null) {
logger.error("Unable to setup a NodeId on the JDBC shared state", lastError);
@ -268,7 +215,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
throw new IllegalStateException("FAILED TO SETUP the JDBC Shared State NodeId");
}
private UUID initializeOrReadNodeId(final UUID newNodeId) throws SQLException {
private UUID initializeOrReadNodeId(Connection connection, final UUID newNodeId) throws SQLException {
synchronized (connection) {
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
final boolean autoCommit = connection.getAutoCommit();
@ -276,10 +223,10 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
try {
final UUID nodeId;
//optimistic try to initialize nodeId
if (rawInitializeNodeId(newNodeId)) {
if (rawInitializeNodeId(connection, newNodeId)) {
nodeId = newNodeId;
} else {
nodeId = rawReadNodeId();
nodeId = rawReadNodeId(connection);
}
if (nodeId != null) {
connection.commit();
@ -335,76 +282,65 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
@Override
public State readState() {
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
final State state;
try {
final PreparedStatement preparedStatement = this.readState;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
state = State.FIRST_TIME_START;
} else {
state = decodeState(resultSet.getString(1));
}
try (Connection connection = connectionProvider.getConnection()) {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
final State state;
try (PreparedStatement preparedStatement = connection.prepareStatement(this.readState)) {
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
state = State.FIRST_TIME_START;
} else {
state = decodeState(resultSet.getString(1));
}
connection.commit();
return state;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
connection.commit();
return state;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
@Override
public void writeState(State state) {
final String encodedState = encodeState(state);
synchronized (connection) {
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try {
final PreparedStatement preparedStatement = this.writeState;
preparedStatement.setString(1, encodedState);
if (preparedStatement.executeUpdate() != 1) {
throw new IllegalStateException("can't write state to the JDBC Node Manager Store!");
}
connection.commit();
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
try (Connection connection = connectionProvider.getConnection()) {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try (PreparedStatement preparedStatement = connection.prepareStatement(this.writeState)) {
preparedStatement.setString(1, encodedState);
if (preparedStatement.executeUpdate() != 1) {
throw new IllegalStateException("can't write state to the JDBC Node Manager Store!");
}
} catch (SQLException e) {
throw new IllegalStateException(e);
connection.commit();
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
@Override
public void stop() throws SQLException {
//release all the managed resources inside the connection lock
synchronized (connection) {
this.readNodeId.close();
this.writeNodeId.close();
this.initializeNodeId.close();
this.readState.close();
this.writeState.close();
this.liveLock.close();
this.backupLock.close();
super.stop();
}
//synchronized (connection) {
this.liveLock.close();
this.backupLock.close();
super.stop();
//}
}
@Override

View File

@ -2361,14 +2361,14 @@
<xsd:complexType name="databaseStoreType">
<xsd:all>
<xsd:element name="jdbc-driver-class-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:element name="jdbc-driver-class-name" type="xsd:string" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The JDBC Driver class name
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="jdbc-connection-url" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:element name="jdbc-connection-url" type="xsd:string" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The JDBC Connection URL e.g. jdbc:mysql://localhost:3306/
@ -2391,6 +2391,31 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="data-source-class-name" type="xsd:string" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The DataSource class name
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="data-source-properties" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
A list of options for the DataSource
</xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element name="data-source-property" type="dataSourcePropertyType" minOccurs="1" maxOccurs="unbounded">
<xsd:annotation>
<xsd:documentation>
A key-value pair option for the DataSource
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
<xsd:element name="message-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
@ -2458,6 +2483,23 @@
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
<xsd:complexType name="dataSourcePropertyType">
<xsd:attribute name="key" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Configuration option key
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="value" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Configuration option value
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="haPolicyType">
<xsd:choice>
<xsd:element name="live-only" type="haLiveOnlyPolicyType" minOccurs="0" maxOccurs="1">

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
@ -67,10 +66,10 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
return JdbcSharedStateManager
.createLiveLock(
UUID.randomUUID().toString(),
jdbcSharedStateManager.getConnection(),
jdbcSharedStateManager.getJdbcConnectionProvider(),
sqlProvider,
acquireMillis);
} catch (SQLException e) {
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
@ -85,20 +84,18 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
if (withExistingTable) {
TestJDBCDriver testDriver = TestJDBCDriver
.usingConnectionUrl(
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
.usingDbConf(
dbConf,
sqlProvider);
testDriver.start();
testDriver.stop();
}
jdbcSharedStateManager = JdbcSharedStateManager
.usingConnectionUrl(
.usingConnectionProvider(
UUID.randomUUID().toString(),
dbConf.getJdbcLockExpirationMillis(),
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
dbConf.getConnectionProvider(),
sqlProvider);
}

View File

@ -41,19 +41,17 @@ public class JdbcSharedStateManagerTest extends ActiveMQTestBase {
}
private TestJDBCDriver createFakeDriver(boolean initializeTable) {
return TestJDBCDriver.usingConnectionUrl(
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
return TestJDBCDriver.usingDbConf(
dbConf,
sqlProvider,
initializeTable);
}
private JdbcSharedStateManager createSharedStateManager() {
return JdbcSharedStateManager.usingConnectionUrl(
return JdbcSharedStateManager.usingConnectionProvider(
UUID.randomUUID().toString(),
dbConf.getJdbcLockExpirationMillis(),
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
dbConf.getConnectionProvider(),
sqlProvider);
}

View File

@ -16,28 +16,28 @@
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.junit.Assert;
public class TestJDBCDriver extends AbstractJDBCDriver {
public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl,
String jdbcDriverClass,
SQLProvider provider) {
return usingConnectionUrl(jdbcConnectionUrl, jdbcDriverClass, provider, false);
public static TestJDBCDriver usingDbConf(DatabaseStorageConfiguration dbConf,
SQLProvider provider) {
return usingDbConf(dbConf, provider, false);
}
public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl,
String jdbcDriverClass,
SQLProvider provider,
boolean initialize) {
public static TestJDBCDriver usingDbConf(DatabaseStorageConfiguration dbConf,
SQLProvider provider,
boolean initialize) {
TestJDBCDriver driver = new TestJDBCDriver(initialize);
driver.setSqlProvider(provider);
driver.setJdbcConnectionUrl(jdbcConnectionUrl);
driver.setJdbcDriverClass(jdbcDriverClass);
driver.setJdbcConnectionProvider(dbConf.getConnectionProvider());
return driver;
}
@ -48,12 +48,11 @@ public class TestJDBCDriver extends AbstractJDBCDriver {
}
@Override
protected void prepareStatements() throws SQLException {
}
protected void prepareStatements() { }
@Override
protected void createSchema() throws SQLException {
try {
protected void createSchema() {
try (Connection connection = getJdbcConnectionProvider().getConnection()) {
connection.createStatement().execute(sqlProvider.createNodeManagerStoreTableSQL());
if (initialize) {
connection.createStatement().execute(sqlProvider.createNodeIdSQL());

View File

@ -747,6 +747,13 @@
<version>2.7.2</version>
</dependency>
<!-- needed by artemis-jdbc-store -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.1.1</version>
</dependency>
<!-- Needed for Micrometer -->
<dependency>
<groupId>io.micrometer</groupId>

View File

@ -123,7 +123,7 @@ public class JDBCJournalTest extends ActiveMQTestBase {
SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL);
scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
executorService = Executors.newSingleThreadExecutor();
journal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), getJdbcUser(), getJdbcPassword(), dbConf.getJdbcDriverClassName(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() {
journal = new JDBCJournalImpl(dbConf.getConnectionProvider(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
@ -145,10 +145,7 @@ public class JDBCJournalTest extends ActiveMQTestBase {
public void testConcurrentEmptyJournal() throws SQLException {
Assert.assertTrue(journal.isStarted());
Assert.assertEquals(0, journal.getNumberOfRecords());
final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(),
getJdbcUser(),
getJdbcPassword(),
dbConf.getJdbcDriverClassName(),
final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getConnectionProvider(),
sqlProvider, scheduledExecutorService,
executorService, (code, message, file) -> {
Assert.fail(message);