ARTEMIS-2823 Use datasource with JDBC store db connections

Replaces direct jdbc connections with dbcp2 datasource. Adds
configuration options to use alternative datasources and to alter the
parameters. While adding slight overhead, this vastly improves the
management and pooling capabilities with db connections.
This commit is contained in:
Mikko Uoti 2020-05-28 09:42:26 +03:00
parent 90d6bad879
commit 2faafec737
29 changed files with 1142 additions and 1264 deletions

View File

@ -455,6 +455,9 @@ public final class ActiveMQDefaultConfiguration {
// Default JDBC Driver class name, derby by default just for demo purposes // Default JDBC Driver class name, derby by default just for demo purposes
private static String DEFAULT_JDBC_DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; private static String DEFAULT_JDBC_DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
// Default JDBC Driver class name. DBCP2 BasicDataSource is used by default.
private static String DEFAULT_JDBC_DATA_SOURCE_CLASS_NAME = "org.apache.commons.dbcp2.BasicDataSource";
// Default message table name, used with Database storage type // Default message table name, used with Database storage type
private static String DEFAULT_MESSAGE_TABLE_NAME = "MESSAGES"; private static String DEFAULT_MESSAGE_TABLE_NAME = "MESSAGES";
@ -1392,6 +1395,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_JDBC_DRIVER_CLASS_NAME; return DEFAULT_JDBC_DRIVER_CLASS_NAME;
} }
public static String getDefaultDataSourceClassName() {
return DEFAULT_JDBC_DATA_SOURCE_CLASS_NAME;
}
public static String getDefaultLargeMessagesTableName() { public static String getDefaultLargeMessagesTableName() {
return DEFAULT_LARGE_MESSAGES_TABLE_NAME; return DEFAULT_LARGE_MESSAGES_TABLE_NAME;
} }

View File

@ -81,6 +81,13 @@
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<!-- Default DataSource for database -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.1.1</version>
</dependency>
<!-- Database driver support --> <!-- Database driver support -->
<dependency> <dependency>
<groupId>org.apache.derby</groupId> <groupId>org.apache.derby</groupId>

View File

@ -16,23 +16,15 @@
*/ */
package org.apache.activemq.artemis.jdbc.store.drivers; package org.apache.activemq.artemis.jdbc.store.drivers;
import javax.sql.DataSource;
import java.sql.Connection; import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLWarning; import java.sql.SQLWarning;
import java.sql.Statement; import java.sql.Statement;
import java.util.Arrays; import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.activemq.artemis.jdbc.store.logging.LoggingConnection;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
/** /**
@ -43,80 +35,27 @@ public abstract class AbstractJDBCDriver {
private static final Logger logger = Logger.getLogger(AbstractJDBCDriver.class); private static final Logger logger = Logger.getLogger(AbstractJDBCDriver.class);
protected Connection connection;
protected SQLProvider sqlProvider; protected SQLProvider sqlProvider;
private String jdbcConnectionUrl; protected JDBCConnectionProvider connectionProvider;
private String jdbcDriverClass; public AbstractJDBCDriver() { }
private DataSource dataSource; public AbstractJDBCDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) {
this.connectionProvider = connectionProvider;
private Executor networkTimeoutExecutor;
private int networkTimeoutMillis;
private String user;
private String password;
public AbstractJDBCDriver() {
this.networkTimeoutExecutor = null;
this.networkTimeoutMillis = -1;
}
public AbstractJDBCDriver(SQLProvider sqlProvider, String jdbcConnectionUrl, String user, String password, String jdbcDriverClass) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
this.user = user;
this.password = password;
this.jdbcDriverClass = jdbcDriverClass;
this.sqlProvider = sqlProvider;
this.networkTimeoutExecutor = null;
this.networkTimeoutMillis = -1;
}
public AbstractJDBCDriver(DataSource dataSource, SQLProvider provider) {
this.dataSource = dataSource;
this.sqlProvider = provider; this.sqlProvider = provider;
this.networkTimeoutExecutor = null;
this.networkTimeoutMillis = -1;
} }
public void start() throws SQLException { public void start() throws SQLException {
connect(); createSchema();
synchronized (connection) { prepareStatements();
createSchema();
prepareStatements();
}
}
public AbstractJDBCDriver(Connection connection, SQLProvider sqlProvider) {
if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
this.connection = new LoggingConnection(connection, logger);
} else {
this.connection = connection;
}
this.sqlProvider = sqlProvider;
this.networkTimeoutExecutor = null;
this.networkTimeoutMillis = -1;
} }
public void stop() throws SQLException { public void stop() throws SQLException {
synchronized (connection) {
if (sqlProvider.closeConnectionOnShutdown()) {
try {
connection.setAutoCommit(true);
connection.close();
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
throw e;
}
}
}
} }
protected abstract void prepareStatements() throws SQLException; protected abstract void prepareStatements();
protected abstract void createSchema() throws SQLException; protected abstract void createSchema() throws SQLException;
@ -124,217 +63,116 @@ public abstract class AbstractJDBCDriver {
createTableIfNotExists(sqlProvider.getTableName(), schemaSqls); createTableIfNotExists(sqlProvider.getTableName(), schemaSqls);
} }
private void connect() throws SQLException {
if (connection == null) {
if (dataSource != null) {
try {
connection = dataSource.getConnection();
if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
this.connection = new LoggingConnection(connection, logger);
}
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
throw e;
}
} else {
try {
if (jdbcDriverClass == null || jdbcDriverClass.isEmpty()) {
throw new IllegalStateException("jdbcDriverClass is null or empty!");
}
if (jdbcConnectionUrl == null || jdbcConnectionUrl.isEmpty()) {
throw new IllegalStateException("jdbcConnectionUrl is null or empty!");
}
final Driver dbDriver = getDriver(jdbcDriverClass);
Properties properties = new Properties();
if (user != null) {
properties.setProperty("user", user);
properties.setProperty("password", password);
}
connection = dbDriver.connect(jdbcConnectionUrl, properties);
if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
this.connection = new LoggingConnection(connection, logger);
}
if (connection == null) {
throw new IllegalStateException("the driver: " + jdbcDriverClass + " isn't able to connect to the requested url: " + jdbcConnectionUrl);
}
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
throw e;
}
}
if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor == null) {
logger.warn("Unable to set a network timeout on the JDBC connection: networkTimeoutExecutor is null");
}
if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor != null) {
try {
connection.setNetworkTimeout(this.networkTimeoutExecutor, this.networkTimeoutMillis);
} catch (SQLException e) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
ActiveMQJournalLogger.LOGGER.warn("Unable to set a network timeout on the JDBC connection");
} catch (Throwable throwable) {
//it included SecurityExceptions and UnsupportedOperationException
logger.warn("Unable to set a network timeout on the JDBC connection", throwable);
}
}
}
}
public void destroy() throws Exception { public void destroy() throws Exception {
final String dropTableSql = "DROP TABLE " + sqlProvider.getTableName(); final String dropTableSql = "DROP TABLE " + sqlProvider.getTableName();
try { try (Connection connection = connectionProvider.getConnection()) {
connection.setAutoCommit(false);
try (Statement statement = connection.createStatement()) {
statement.executeUpdate(dropTableSql);
}
connection.commit();
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, dropTableSql));
try { try {
connection.rollback(); connection.setAutoCommit(false);
} catch (SQLException rollbackEx) { try (Statement statement = connection.createStatement()) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, dropTableSql)); statement.executeUpdate(dropTableSql);
throw rollbackEx; }
connection.commit();
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, dropTableSql));
try {
connection.rollback();
} catch (SQLException rollbackEx) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, dropTableSql));
throw rollbackEx;
}
throw e;
} }
throw e;
} }
} }
private void createTableIfNotExists(String tableName, String... sqls) throws SQLException { private void createTableIfNotExists(String tableName, String... sqls) throws SQLException {
logger.tracef("Validating if table %s didn't exist before creating", tableName); logger.tracef("Validating if table %s didn't exist before creating", tableName);
try { try (Connection connection = connectionProvider.getConnection()) {
connection.setAutoCommit(false); try {
final boolean tableExists; connection.setAutoCommit(false);
try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) { final boolean tableExists;
if (rs == null || !rs.next()) { try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) {
tableExists = false; if (rs == null || !rs.next()) {
if (logger.isTraceEnabled()) { tableExists = false;
logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls)); if (logger.isTraceEnabled()) {
} logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls));
if (rs != null) {
final SQLWarning sqlWarning = rs.getWarnings();
if (sqlWarning != null) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning));
} }
} if (rs != null) {
} else { final SQLWarning sqlWarning = rs.getWarnings();
tableExists = true; if (sqlWarning != null) {
} logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning));
}
if (tableExists) {
logger.tracef("Validating if the existing table %s is initialized or not", tableName);
try (Statement statement = connection.createStatement();
ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) {
logger.tracef("Validation of the existing table %s initialization is started", tableName);
int rows;
if (cntRs.next() && (rows = cntRs.getInt(1)) > 0) {
logger.tracef("Table %s did exist but is not empty. Skipping initialization. Found %d rows.", tableName, rows);
if (logger.isDebugEnabled()) {
final long expectedRows = Stream.of(sqls).map(String::toUpperCase).filter(sql -> sql.contains("INSERT INTO")).count();
if (rows < expectedRows) {
logger.debug("Table " + tableName + " was expected to contain " + expectedRows + " rows while it has " + rows + " rows.");
} }
} }
connection.commit();
return;
} else { } else {
sqls = Stream.of(sqls).filter(sql -> { tableExists = true;
final String upperCaseSql = sql.toUpperCase(); }
return !(upperCaseSql.contains("CREATE TABLE") || upperCaseSql.contains("CREATE INDEX")); }
}).toArray(String[]::new); if (tableExists) {
if (sqls.length > 0) { logger.tracef("Validating if the existing table %s is initialized or not", tableName);
logger.tracef("Table %s did exist but is empty. Starting initialization.", tableName); try (Statement statement = connection.createStatement();
ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) {
logger.tracef("Validation of the existing table %s initialization is started", tableName);
int rows;
if (cntRs.next() && (rows = cntRs.getInt(1)) > 0) {
logger.tracef("Table %s did exist but is not empty. Skipping initialization. Found %d rows.", tableName, rows);
if (logger.isDebugEnabled()) {
final long expectedRows = Stream.of(sqls).map(String::toUpperCase).filter(sql -> sql.contains("INSERT INTO")).count();
if (rows < expectedRows) {
logger.debug("Table " + tableName + " was expected to contain " + expectedRows + " rows while it has " + rows + " rows.");
}
}
connection.commit();
return;
} else { } else {
logger.tracef("Table %s did exist but is empty. Initialization completed: no initialization statements left.", tableName); sqls = Stream.of(sqls).filter(sql -> {
final String upperCaseSql = sql.toUpperCase();
return !(upperCaseSql.contains("CREATE TABLE") || upperCaseSql.contains("CREATE INDEX"));
}).toArray(String[]::new);
if (sqls.length > 0) {
logger.tracef("Table %s did exist but is empty. Starting initialization.", tableName);
} else {
logger.tracef("Table %s did exist but is empty. Initialization completed: no initialization statements left.", tableName);
}
}
} catch (SQLException e) {
//that's not a real issue and do not deserve any user-level log:
//some DBMS just return stale information about table existence
//and can fail on later attempts to access them
if (logger.isTraceEnabled()) {
logger.trace(JDBCUtils.appendSQLExceptionDetails(new StringBuilder("Can't verify the initialization of table ").append(tableName).append(" due to:"), e, sqlProvider.getCountJournalRecordsSQL()));
}
try {
connection.rollback();
} catch (SQLException rollbackEx) {
logger.debug("Rollback failed while validating initialization of a table", rollbackEx);
}
connection.setAutoCommit(false);
logger.tracef("Table %s seems to exist, but we can't verify the initialization. Keep trying to create and initialize.", tableName);
}
}
if (sqls.length > 0) {
try (Statement statement = connection.createStatement()) {
for (String sql : sqls) {
statement.executeUpdate(sql);
final SQLWarning statementSqlWarning = statement.getWarnings();
if (statementSqlWarning != null) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), statementSqlWarning, sql));
}
} }
} }
} catch (SQLException e) {
//that's not a real issue and do not deserve any user-level log: connection.commit();
//some DBMS just return stale information about table existence
//and can fail on later attempts to access them
if (logger.isTraceEnabled()) {
logger.trace(JDBCUtils.appendSQLExceptionDetails(new StringBuilder("Can't verify the initialization of table ").append(tableName).append(" due to:"), e, sqlProvider.getCountJournalRecordsSQL()));
}
try {
connection.rollback();
} catch (SQLException rollbackEx) {
logger.debug("Rollback failed while validating initialization of a table", rollbackEx);
}
connection.setAutoCommit(false);
logger.tracef("Table %s seems to exist, but we can't verify the initialization. Keep trying to create and initialize.", tableName);
} }
} } catch (SQLException e) {
if (sqls.length > 0) { final String sqlStatements = String.join("\n", sqls);
try (Statement statement = connection.createStatement()) { logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements));
for (String sql : sqls) { try {
statement.executeUpdate(sql); connection.rollback();
final SQLWarning statementSqlWarning = statement.getWarnings(); } catch (SQLException rollbackEx) {
if (statementSqlWarning != null) { logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, sqlStatements));
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), statementSqlWarning, sql)); throw rollbackEx;
}
}
} }
throw e;
connection.commit();
}
} catch (SQLException e) {
final String sqlStatements = String.join("\n", sqls);
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements));
try {
connection.rollback();
} catch (SQLException rollbackEx) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, sqlStatements));
throw rollbackEx;
}
throw e;
}
}
private static AtomicBoolean shutAdded = new AtomicBoolean(false);
private static class ShutdownDerby extends Thread {
@Override
public void run() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
} catch (Exception e) {
}
}
}
private Driver getDriver(String className) {
try {
Driver driver = (Driver) Class.forName(className).newInstance();
// Shutdown the derby if using the derby embedded driver.
if (className.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
if (shutAdded.compareAndSet(false, true)) {
Runtime.getRuntime().addShutdownHook(new ShutdownDerby());
}
}
return driver;
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException("Could not find class: " + className);
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate driver class: ", e);
}
}
public Connection getConnection() {
return connection;
}
public final void setConnection(Connection connection) {
if (this.connection == null) {
if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
this.connection = new LoggingConnection(connection, logger);
} else {
this.connection = connection;
} }
} }
} }
@ -343,37 +181,12 @@ public abstract class AbstractJDBCDriver {
this.sqlProvider = sqlProvider; this.sqlProvider = sqlProvider;
} }
public void setJdbcConnectionUrl(String jdbcConnectionUrl) { public void setJdbcConnectionProvider(JDBCConnectionProvider connectionProvider) {
this.jdbcConnectionUrl = jdbcConnectionUrl; this.connectionProvider = connectionProvider;
} }
public String getUser() { public JDBCConnectionProvider getJdbcConnectionProvider() {
return user; return this.connectionProvider;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public void setJdbcDriverClass(String jdbcDriverClass) {
this.jdbcDriverClass = jdbcDriverClass;
}
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
public void setNetworkTimeout(Executor executor, int milliseconds) {
this.networkTimeoutExecutor = executor;
this.networkTimeoutMillis = milliseconds;
} }
} }

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.drivers;
import org.apache.activemq.artemis.jdbc.store.logging.LoggingConnection;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
public class JDBCConnectionProvider {
private static final Logger logger = Logger.getLogger(JDBCConnectionProvider.class);
private DataSource dataSource;
private Executor networkTimeoutExecutor;
private int networkTimeoutMillis;
public JDBCConnectionProvider(DataSource dataSource) {
this.dataSource = dataSource;
this.networkTimeoutExecutor = null;
this.networkTimeoutMillis = -1;
addDerbyShutdownHook();
}
public synchronized Connection getConnection() throws SQLException {
Connection connection;
try {
connection = dataSource.getConnection();
if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
connection = new LoggingConnection(connection, logger);
}
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
throw e;
}
if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor == null) {
logger.warn("Unable to set a network timeout on the JDBC connection: networkTimeoutExecutor is null");
}
if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor != null) {
try {
connection.setNetworkTimeout(this.networkTimeoutExecutor, this.networkTimeoutMillis);
} catch (SQLException e) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
ActiveMQJournalLogger.LOGGER.warn("Unable to set a network timeout on the JDBC connection");
} catch (Throwable throwable) {
//it included SecurityExceptions and UnsupportedOperationException
logger.warn("Unable to set a network timeout on the JDBC connection", throwable);
}
}
return connection;
}
private static AtomicBoolean shutAdded = new AtomicBoolean(false);
private static class ShutdownDerby extends Thread {
@Override
public void run() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
} catch (Exception e) { }
}
}
public void addDerbyShutdownHook() {
// Shutdown the derby if using the derby embedded driver.
try (Connection connection = getConnection()) {
if (connection.getMetaData().getDriverName().equals("org.apache.derby.jdbc.EmbeddedDriver")) {
if (shutAdded.compareAndSet(false, true)) {
Runtime.getRuntime().addShutdownHook(new ShutdownDerby());
}
}
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
}
}
public void setNetworkTimeout(Executor executor, int milliseconds) {
this.networkTimeoutExecutor = executor;
this.networkTimeoutMillis = milliseconds;
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.drivers;
import org.apache.commons.beanutils.PropertyUtils;
import org.jboss.logging.Logger;
import javax.sql.DataSource;
import java.util.Map;
import java.util.stream.Collectors;
public class JDBCDataSourceUtils {
private static final Logger logger = Logger.getLogger(JDBCDataSourceUtils.class);
public static DataSource getDataSource(String dataSourceClassName, Map<String, Object> dataSourceProperties) {
logger.info(new StringBuilder("Initialising JDBC data source: ").append(dataSourceClassName).append(" ")
.append(dataSourceProperties.keySet().stream()
.map(key -> key + "=" + dataSourceProperties.get(key))
.collect(Collectors.joining(", ", "{", "}"))));
try {
DataSource dataSource = (DataSource) Class.forName(dataSourceClassName).newInstance();
for (Map.Entry<String, Object> entry : dataSourceProperties.entrySet()) {
PropertyUtils.setProperty(dataSource, entry.getKey(), entry.getValue());
}
return dataSource;
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException("Could not find class: " + dataSourceClassName);
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate DataSource", e);
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.jdbc.store.drivers; package org.apache.activemq.artemis.jdbc.store.drivers;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Map;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@ -39,6 +40,13 @@ public class JDBCUtils {
return factory.create(tableName, storeType); return factory.create(tableName, storeType);
} }
public static SQLProvider getSQLProvider(Map<String, Object> dataSourceProperties, String tableName, SQLProvider.DatabaseStoreType storeType) {
PropertySQLProvider.Factory.SQLDialect dialect = PropertySQLProvider.Factory.investigateDialect(dataSourceProperties);
logger.tracef("getSQLProvider Returning SQL provider for dialect %s, tableName::%s", dialect, tableName);
PropertySQLProvider.Factory factory = new PropertySQLProvider.Factory(dialect);
return factory.create(tableName, storeType);
}
/** /**
* Append to {@code errorMessage} a detailed description of the provided {@link SQLException}.<br> * Append to {@code errorMessage} a detailed description of the provided {@link SQLException}.<br>
* The information appended are: * The information appended are:

View File

@ -17,41 +17,38 @@
package org.apache.activemq.artemis.jdbc.store.file; package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@SuppressWarnings("SynchronizeOnNonFinalField") @SuppressWarnings("SynchronizeOnNonFinalField")
public final class Db2SequentialFileDriver extends JDBCSequentialFileFactoryDriver { public final class Db2SequentialFileDriver extends JDBCSequentialFileFactoryDriver {
private PreparedStatement replaceLargeObject; private String replaceLargeObject;
public Db2SequentialFileDriver() { public Db2SequentialFileDriver() {
super(); super();
} }
public Db2SequentialFileDriver(DataSource dataSource, SQLProvider provider) { public Db2SequentialFileDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) {
super(dataSource, provider); super(connectionProvider, provider);
}
public Db2SequentialFileDriver(Connection connection, SQLProvider provider) {
super(connection, provider);
} }
@Override @Override
protected void prepareStatements() throws SQLException { protected void prepareStatements() {
this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL()); this.deleteFile = sqlProvider.getDeleteFileSQL();
this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), new String[]{"ID"}); this.createFile = sqlProvider.getInsertFileSQL();
this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName()); this.createFileColumnNames = new String[]{"ID"};
this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL()); this.selectFileByFileName = sqlProvider.getSelectFileByFileName();
this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL()); this.copyFileRecord = sqlProvider.getCopyFileRecordByIdSQL();
this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL()); this.renameFile = sqlProvider.getUpdateFileNameByIdSQL();
this.replaceLargeObject = connection.prepareStatement(sqlProvider.getReplaceLargeObjectSQL()); this.readLargeObject = sqlProvider.getReadLargeObjectSQL();
this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL()); this.replaceLargeObject = sqlProvider.getReplaceLargeObjectSQL();
this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL()); this.appendToLargeObject = sqlProvider.getAppendToLargeObjectSQL();
this.selectFileNamesByExtension = sqlProvider.getSelectFileNamesByExtensionSQL();
} }
@Override @Override
@ -59,9 +56,8 @@ public final class Db2SequentialFileDriver extends JDBCSequentialFileFactoryDriv
if (data == null || data.length == 0) { if (data == null || data.length == 0) {
return 0; return 0;
} }
final PreparedStatement largeObjectStatement = append ? appendToLargeObject : replaceLargeObject; try (Connection connection = connectionProvider.getConnection()) {
synchronized (connection) { try (PreparedStatement largeObjectStatement = connection.prepareStatement(append ? appendToLargeObject : replaceLargeObject)) {
try {
connection.setAutoCommit(false); connection.setAutoCommit(false);
int bytesWritten; int bytesWritten;
largeObjectStatement.setBytes(1, data); largeObjectStatement.setBytes(1, data);
@ -81,4 +77,4 @@ public final class Db2SequentialFileDriver extends JDBCSequentialFileFactoryDriv
} }
} }
} }
} }

View File

@ -17,10 +17,10 @@
package org.apache.activemq.artemis.jdbc.store.file; package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@ -29,54 +29,18 @@ import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Fac
class JDBCFileUtils { class JDBCFileUtils {
static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass, static JDBCSequentialFileFactoryDriver getDBFileDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) throws SQLException {
String jdbcConnectionUrl,
String user,
String password,
SQLProvider provider) throws SQLException {
final JDBCSequentialFileFactoryDriver dbDriver;
final PropertySQLProvider.Factory.SQLDialect sqlDialect = PropertySQLProvider.Factory.identifyDialect(driverClass);
if (POSTGRESQL.equals(sqlDialect)) {
dbDriver = new PostgresSequentialSequentialFileDriver();
} else if (DB2.equals(sqlDialect)) {
dbDriver = new Db2SequentialFileDriver();
} else {
dbDriver = new JDBCSequentialFileFactoryDriver();
}
dbDriver.setSqlProvider(provider);
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
dbDriver.setUser(user);
dbDriver.setPassword(password);
return dbDriver;
}
static JDBCSequentialFileFactoryDriver getDBFileDriver(DataSource dataSource, SQLProvider provider) throws SQLException {
final JDBCSequentialFileFactoryDriver dbDriver; final JDBCSequentialFileFactoryDriver dbDriver;
final PropertySQLProvider.Factory.SQLDialect sqlDialect; final PropertySQLProvider.Factory.SQLDialect sqlDialect;
try (Connection connection = dataSource.getConnection()) { try (Connection connection = connectionProvider.getConnection()) {
sqlDialect = PropertySQLProvider.Factory.investigateDialect(connection); sqlDialect = PropertySQLProvider.Factory.investigateDialect(connection);
} }
if (POSTGRESQL.equals(sqlDialect)) { if (POSTGRESQL.equals(sqlDialect)) {
dbDriver = new PostgresSequentialSequentialFileDriver(dataSource, provider); dbDriver = new PostgresSequentialSequentialFileDriver(connectionProvider, provider);
} else if (DB2.equals(sqlDialect)) { } else if (DB2.equals(sqlDialect)) {
dbDriver = new Db2SequentialFileDriver(dataSource, provider); dbDriver = new Db2SequentialFileDriver(connectionProvider, provider);
} else { } else {
dbDriver = new JDBCSequentialFileFactoryDriver(dataSource, provider); dbDriver = new JDBCSequentialFileFactoryDriver(connectionProvider, provider);
}
return dbDriver;
}
static JDBCSequentialFileFactoryDriver getDBFileDriver(Connection connection, SQLProvider provider) throws SQLException {
JDBCSequentialFileFactoryDriver dbDriver;
final PropertySQLProvider.Factory.SQLDialect sqlDialect = PropertySQLProvider.Factory.investigateDialect(connection);
if (POSTGRESQL.equals(sqlDialect)) {
dbDriver = new PostgresSequentialSequentialFileDriver(connection, provider);
dbDriver.setConnection(connection);
} else if (DB2.equals(sqlDialect)) {
dbDriver = new Db2SequentialFileDriver(connection, provider);
} else {
dbDriver = new JDBCSequentialFileFactoryDriver(connection, provider);
} }
return dbDriver; return dbDriver;
} }

View File

@ -16,10 +16,8 @@
*/ */
package org.apache.activemq.artemis.jdbc.store.file; package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource;
import java.io.File; import java.io.File;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -32,6 +30,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
@ -53,7 +52,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
private final IOCriticalErrorListener criticalErrorListener; private final IOCriticalErrorListener criticalErrorListener;
public JDBCSequentialFileFactory(final DataSource dataSource, public JDBCSequentialFileFactory(final JDBCConnectionProvider connectionProvider,
final SQLProvider sqlProvider, final SQLProvider sqlProvider,
Executor executor, Executor executor,
IOCriticalErrorListener criticalErrorListener) throws Exception { IOCriticalErrorListener criticalErrorListener) throws Exception {
@ -62,38 +61,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
this.criticalErrorListener = criticalErrorListener; this.criticalErrorListener = criticalErrorListener;
try { try {
this.dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider); this.dbDriver = JDBCFileUtils.getDBFileDriver(connectionProvider, sqlProvider);
} catch (SQLException e) {
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
}
}
public JDBCSequentialFileFactory(final String connectionUrl,
String userName,
String password,
final String className,
final SQLProvider sqlProvider,
Executor executor,
IOCriticalErrorListener criticalErrorListener) throws Exception {
this.executor = executor;
this.criticalErrorListener = criticalErrorListener;
try {
this.dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, userName, password, sqlProvider);
} catch (SQLException e) {
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
}
}
public JDBCSequentialFileFactory(final Connection connection,
final SQLProvider sqlProvider,
final Executor executor,
final IOCriticalErrorListener criticalErrorListener) throws Exception {
this.executor = executor;
this.criticalErrorListener = criticalErrorListener;
try {
this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
} catch (SQLException e) { } catch (SQLException e) {
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null); criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
} }
@ -103,14 +71,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
return dbDriver; return dbDriver;
} }
/**
* @see Connection#setNetworkTimeout(Executor, int)
**/
public JDBCSequentialFileFactory setNetworkTimeout(Executor executor, int milliseconds) {
this.dbDriver.setNetworkTimeout(executor, milliseconds);
return this;
}
@Override @Override
public SequentialFileFactory setDatasync(boolean enabled) { public SequentialFileFactory setDatasync(boolean enabled) {
return this; return this;

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.jdbc.store.file; package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.Blob; import java.sql.Blob;
import java.sql.Connection; import java.sql.Connection;
@ -28,6 +27,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -36,32 +36,25 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
private static final Logger logger = Logger.getLogger(JDBCSequentialFileFactoryDriver.class); private static final Logger logger = Logger.getLogger(JDBCSequentialFileFactoryDriver.class);
protected PreparedStatement deleteFile; protected String deleteFile;
protected String createFile;
protected PreparedStatement createFile; protected String[] createFileColumnNames;
protected int createFileAutogeneratedKeys;
protected PreparedStatement selectFileByFileName; protected String selectFileByFileName;
protected String copyFileRecord;
protected PreparedStatement copyFileRecord; protected String renameFile;
protected String readLargeObject;
protected PreparedStatement renameFile; protected String appendToLargeObject;
protected Integer appendToLargeObjectResultSetType;
protected PreparedStatement readLargeObject; protected Integer appendToLargeObjectResultSetConcurrency;
protected String selectFileNamesByExtension;
protected PreparedStatement appendToLargeObject;
protected PreparedStatement selectFileNamesByExtension;
JDBCSequentialFileFactoryDriver() { JDBCSequentialFileFactoryDriver() {
super(); super();
} }
JDBCSequentialFileFactoryDriver(DataSource dataSource, SQLProvider provider) { JDBCSequentialFileFactoryDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) {
super(dataSource, provider); super(connectionProvider, provider);
}
JDBCSequentialFileFactoryDriver(Connection connection, SQLProvider sqlProvider) {
super(connection, sqlProvider);
} }
@Override @Override
@ -70,22 +63,25 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
} }
@Override @Override
protected void prepareStatements() throws SQLException { protected void prepareStatements() {
this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL()); this.deleteFile = sqlProvider.getDeleteFileSQL();
this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), new String[] {"ID"}); this.createFile = sqlProvider.getInsertFileSQL();
this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName()); this.createFileColumnNames = new String[] {"ID"};
this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL()); this.selectFileByFileName = sqlProvider.getSelectFileByFileName();
this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL()); this.copyFileRecord = sqlProvider.getCopyFileRecordByIdSQL();
this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL()); this.renameFile = sqlProvider.getUpdateFileNameByIdSQL();
this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE); this.readLargeObject = sqlProvider.getReadLargeObjectSQL();
this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL()); this.appendToLargeObject = sqlProvider.getAppendToLargeObjectSQL();
this.appendToLargeObjectResultSetType = ResultSet.TYPE_FORWARD_ONLY;
this.appendToLargeObjectResultSetConcurrency = ResultSet.CONCUR_UPDATABLE;
this.selectFileNamesByExtension = sqlProvider.getSelectFileNamesByExtensionSQL();
} }
public List<String> listFiles(String extension) throws Exception { public List<String> listFiles(String extension) throws Exception {
synchronized (connection) { List<String> fileNames = new ArrayList<>();
List<String> fileNames = new ArrayList<>(); try (Connection connection = connectionProvider.getConnection()) {
try { connection.setAutoCommit(false);
connection.setAutoCommit(false); try (PreparedStatement selectFileNamesByExtension = connection.prepareStatement(this.selectFileNamesByExtension)) {
selectFileNamesByExtension.setString(1, extension); selectFileNamesByExtension.setString(1, extension);
try (ResultSet rs = selectFileNamesByExtension.executeQuery()) { try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
while (rs.next()) { while (rs.next()) {
@ -97,8 +93,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
connection.rollback(); connection.rollback();
throw e; throw e;
} }
return fileNames;
} }
return fileNames;
} }
/** /**
@ -108,14 +104,12 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException * @throws SQLException
*/ */
public void openFile(JDBCSequentialFile file) throws SQLException { public void openFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) { final long fileId = fileExists(file);
final long fileId = fileExists(file); if (fileId < 0) {
if (fileId < 0) { createFile(file);
createFile(file); } else {
} else { file.setId(fileId);
file.setId(fileId); loadFile(file);
loadFile(file);
}
} }
} }
@ -131,18 +125,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException * @throws SQLException
*/ */
public long fileExists(JDBCSequentialFile file) throws SQLException { public long fileExists(JDBCSequentialFile file) throws SQLException {
try { try (Connection connection = connectionProvider.getConnection()) {
synchronized (connection) { try (PreparedStatement selectFileByFileName = connection.prepareStatement(this.selectFileByFileName)) {
connection.setAutoCommit(false); connection.setAutoCommit(false);
selectFileByFileName.setString(1, file.getFileName()); selectFileByFileName.setString(1, file.getFileName());
try (ResultSet rs = selectFileByFileName.executeQuery()) { try (ResultSet rs = selectFileByFileName.executeQuery()) {
final long id = rs.next() ? rs.getLong(1) : -1; final long id = rs.next() ? rs.getLong(1) : -1;
connection.commit(); connection.commit();
return id; return id;
} catch (Exception e) {
connection.rollback();
throw e;
} }
} catch (Exception e) {
connection.rollback();
throw e;
} }
} catch (NullPointerException npe) { } catch (NullPointerException npe) {
npe.printStackTrace(); npe.printStackTrace();
@ -157,20 +151,22 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException * @throws SQLException
*/ */
public void loadFile(JDBCSequentialFile file) throws SQLException { public void loadFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
connection.setAutoCommit(false); try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) {
readLargeObject.setLong(1, file.getId()); connection.setAutoCommit(false);
readLargeObject.setLong(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) { try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) { if (rs.next()) {
Blob blob = rs.getBlob(1); Blob blob = rs.getBlob(1);
if (blob != null) { if (blob != null) {
file.setWritePosition(blob.length()); file.setWritePosition(blob.length());
} else { } else {
logger.trace("No Blob found for file: " + file.getFileName() + " " + file.getId()); logger.trace("No Blob found for file: " + file.getFileName() + " " + file.getId());
}
} }
connection.commit();
} }
connection.commit();
} catch (SQLException e) { } catch (SQLException e) {
connection.rollback(); connection.rollback();
throw e; throw e;
@ -185,18 +181,23 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException * @throws SQLException
*/ */
public void createFile(JDBCSequentialFile file) throws SQLException { public void createFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { try {
connection.setAutoCommit(false); connection.setAutoCommit(false);
createFile.setString(1, file.getFileName()); try (PreparedStatement createFile =
createFile.setString(2, file.getExtension()); createFileColumnNames != null ?
createFile.setBytes(3, new byte[0]); connection.prepareStatement(this.createFile, this.createFileColumnNames) :
createFile.executeUpdate(); connection.prepareStatement(this.createFile, this.createFileAutogeneratedKeys)) {
try (ResultSet keys = createFile.getGeneratedKeys()) { createFile.setString(1, file.getFileName());
keys.next(); createFile.setString(2, file.getExtension());
file.setId(keys.getLong(1)); createFile.setBytes(3, new byte[0]);
createFile.executeUpdate();
try (ResultSet keys = createFile.getGeneratedKeys()) {
keys.next();
file.setId(keys.getLong(1));
}
connection.commit();
} }
connection.commit();
} catch (SQLException e) { } catch (SQLException e) {
connection.rollback(); connection.rollback();
throw e; throw e;
@ -212,9 +213,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException * @throws SQLException
*/ */
public void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException { public void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { connection.setAutoCommit(false);
connection.setAutoCommit(false); try (PreparedStatement renameFile = connection.prepareStatement(this.renameFile)) {
renameFile.setString(1, newFileName); renameFile.setString(1, newFileName);
renameFile.setLong(2, file.getId()); renameFile.setLong(2, file.getId());
renameFile.executeUpdate(); renameFile.executeUpdate();
@ -233,8 +234,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException * @throws SQLException
*/ */
public void deleteFile(JDBCSequentialFile file) throws SQLException { public void deleteFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { try (PreparedStatement deleteFile = connection.prepareStatement(this.deleteFile)) {
connection.setAutoCommit(false); connection.setAutoCommit(false);
deleteFile.setLong(1, file.getId()); deleteFile.setLong(1, file.getId());
deleteFile.executeUpdate(); deleteFile.executeUpdate();
@ -259,31 +260,36 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException * @throws SQLException
*/ */
public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException { public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
connection.setAutoCommit(false); connection.setAutoCommit(false);
appendToLargeObject.setLong(1, file.getId()); try (PreparedStatement appendToLargeObject =
this.appendToLargeObjectResultSetType != null && this.appendToLargeObjectResultSetConcurrency != null ?
connection.prepareStatement(this.appendToLargeObject, this.appendToLargeObjectResultSetType, this.appendToLargeObjectResultSetConcurrency) :
connection.prepareStatement(this.appendToLargeObject)) {
appendToLargeObject.setLong(1, file.getId());
int bytesWritten = 0; int bytesWritten = 0;
try (ResultSet rs = appendToLargeObject.executeQuery()) { try (ResultSet rs = appendToLargeObject.executeQuery()) {
if (rs.next()) { if (rs.next()) {
Blob blob = rs.getBlob(1); Blob blob = rs.getBlob(1);
if (blob == null) { if (blob == null) {
blob = connection.createBlob(); blob = connection.createBlob();
}
if (append) {
bytesWritten = blob.setBytes(blob.length() + 1, data);
} else {
blob.truncate(0);
bytesWritten = blob.setBytes(1, data);
}
rs.updateBlob(1, blob);
rs.updateRow();
} }
if (append) { connection.commit();
bytesWritten = blob.setBytes(blob.length() + 1, data); return bytesWritten;
} else { } catch (SQLException e) {
blob.truncate(0); connection.rollback();
bytesWritten = blob.setBytes(1, data); throw e;
}
rs.updateBlob(1, blob);
rs.updateRow();
} }
connection.commit();
return bytesWritten;
} catch (SQLException e) {
connection.rollback();
throw e;
} }
} }
} }
@ -297,35 +303,37 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException * @throws SQLException
*/ */
public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException { public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
connection.setAutoCommit(false); connection.setAutoCommit(false);
readLargeObject.setLong(1, file.getId()); try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) {
int readLength = 0; readLargeObject.setLong(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) { int readLength = 0;
if (rs.next()) { try (ResultSet rs = readLargeObject.executeQuery()) {
final Blob blob = rs.getBlob(1); if (rs.next()) {
if (blob != null) { final Blob blob = rs.getBlob(1);
final long blobLength = blob.length(); if (blob != null) {
final int bytesRemaining = bytes.remaining(); final long blobLength = blob.length();
final long filePosition = file.position(); final int bytesRemaining = bytes.remaining();
readLength = (int) calculateReadLength(blobLength, bytesRemaining, filePosition); final long filePosition = file.position();
if (logger.isDebugEnabled()) { readLength = (int) calculateReadLength(blobLength, bytesRemaining, filePosition);
logger.debugf("trying read %d bytes: blobLength = %d bytesRemaining = %d filePosition = %d", if (logger.isDebugEnabled()) {
readLength, blobLength, bytesRemaining, filePosition); logger.debugf("trying read %d bytes: blobLength = %d bytesRemaining = %d filePosition = %d",
} readLength, blobLength, bytesRemaining, filePosition);
if (readLength < 0) { }
readLength = -1; if (readLength < 0) {
} else if (readLength > 0) { readLength = -1;
byte[] data = blob.getBytes(file.position() + 1, readLength); } else if (readLength > 0) {
bytes.put(data); byte[] data = blob.getBytes(file.position() + 1, readLength);
bytes.put(data);
}
} }
} }
connection.commit();
return readLength;
} catch (SQLException e) {
connection.rollback();
throw e;
} }
connection.commit();
return readLength;
} catch (SQLException e) {
connection.rollback();
throw e;
} }
} }
} }
@ -338,9 +346,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @throws SQLException * @throws SQLException
*/ */
public void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException { public void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { connection.setAutoCommit(false);
connection.setAutoCommit(false); try (PreparedStatement copyFileRecord = connection.prepareStatement(this.copyFileRecord)) {
copyFileRecord.setLong(1, fileFrom.getId()); copyFileRecord.setLong(1, fileFrom.getId());
copyFileRecord.setLong(2, fileTo.getId()); copyFileRecord.setLong(2, fileTo.getId());
copyFileRecord.executeUpdate(); copyFileRecord.executeUpdate();
@ -357,7 +365,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
*/ */
@Override @Override
public void destroy() throws SQLException { public void destroy() throws SQLException {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { try {
connection.setAutoCommit(false); connection.setAutoCommit(false);
try (Statement statement = connection.createStatement()) { try (Statement statement = connection.createStatement()) {

View File

@ -19,6 +19,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import org.postgresql.PGConnection; import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject; import org.postgresql.largeobject.LargeObject;
@ -43,11 +44,10 @@ public class PostgresLargeObjectManager {
*/ */
public static final int READWRITE = READ | WRITE; public static final int READWRITE = READ | WRITE;
private final Connection realConnection;
private boolean shouldUseReflection; private boolean shouldUseReflection;
public PostgresLargeObjectManager(Connection connection) throws SQLException {
this.realConnection = unwrap(connection); public PostgresLargeObjectManager() {
try { try {
this.getClass().getClassLoader().loadClass("org.postgresql.PGConnection"); this.getClass().getClassLoader().loadClass("org.postgresql.PGConnection");
shouldUseReflection = false; shouldUseReflection = false;
@ -56,9 +56,9 @@ public class PostgresLargeObjectManager {
} }
} }
public final Long createLO() throws SQLException { public final Long createLO(Connection connection) throws SQLException {
if (shouldUseReflection) { if (shouldUseReflection) {
Object largeObjectManager = getLargeObjectManager(); Object largeObjectManager = getLargeObjectManager(connection);
try { try {
Method method = largeObjectManager.getClass().getMethod("createLO"); Method method = largeObjectManager.getClass().getMethod("createLO");
return (Long) method.invoke(largeObjectManager); return (Long) method.invoke(largeObjectManager);
@ -66,13 +66,13 @@ public class PostgresLargeObjectManager {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex); throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
} }
} else { } else {
return ((PGConnection) realConnection).getLargeObjectAPI().createLO(); return ((PGConnection) unwrap(connection)).getLargeObjectAPI().createLO();
} }
} }
public Object open(long oid, int mode) throws SQLException { public Object open(Connection connection, long oid, int mode) throws SQLException {
if (shouldUseReflection) { if (shouldUseReflection) {
Object largeObjectManager = getLargeObjectManager(); Object largeObjectManager = getLargeObjectManager(connection);
try { try {
Method method = largeObjectManager.getClass().getMethod("open", long.class, int.class); Method method = largeObjectManager.getClass().getMethod("open", long.class, int.class);
return method.invoke(largeObjectManager, oid, mode); return method.invoke(largeObjectManager, oid, mode);
@ -80,7 +80,7 @@ public class PostgresLargeObjectManager {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex); throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
} }
} else { } else {
return ((PGConnection) realConnection).getLargeObjectAPI().open(oid, mode); return ((PGConnection) unwrap(connection)).getLargeObjectAPI().open(oid, mode);
} }
} }
@ -162,22 +162,22 @@ public class PostgresLargeObjectManager {
} }
} }
private Object getLargeObjectManager() throws SQLException { private Object getLargeObjectManager(Connection connection) throws SQLException {
if (shouldUseReflection) { if (shouldUseReflection) {
try { try {
Method method = realConnection.getClass().getMethod("getLargeObjectAPI"); Connection conn = unwrap(connection);
return method.invoke(realConnection); Method method = conn.getClass().getMethod("getLargeObjectAPI");
return method.invoke(conn);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) { } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex); throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
} }
} else { } else {
return ((PGConnection) realConnection).getLargeObjectAPI(); return ((PGConnection) unwrap(connection)).getLargeObjectAPI();
} }
} }
public final Connection unwrap(Connection connection) throws SQLException { public final Connection unwrap(Connection connection) throws SQLException {
Connection conn = connection.unwrap(Connection.class); return unwrapIronJacamar(unwrapDbcp(unwrapDbcp2(unwrapSpring(connection.unwrap(Connection.class)))));
return unwrapIronJacamar(unwrapDbcp(unwrapSpring(conn)));
} }
private Connection unwrapIronJacamar(Connection conn) { private Connection unwrapIronJacamar(Connection conn) {
@ -198,6 +198,15 @@ public class PostgresLargeObjectManager {
} }
} }
private Connection unwrapDbcp2(Connection conn) {
try {
Method method = conn.getClass().getMethod("getInnermostDelegateInternal");
return (Connection) method.invoke(conn);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
return conn;
}
}
private Connection unwrapSpring(Connection conn) { private Connection unwrapSpring(Connection conn) {
try { try {
Method method = conn.getClass().getMethod("getTargetConnection"); Method method = conn.getClass().getMethod("getTargetConnection");

View File

@ -18,14 +18,14 @@ package org.apache.activemq.artemis.jdbc.store.file;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import javax.sql.DataSource;
@SuppressWarnings("SynchronizeOnNonFinalField") @SuppressWarnings("SynchronizeOnNonFinalField")
public final class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver { public final class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver {
@ -36,37 +36,32 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
super(); super();
} }
public PostgresSequentialSequentialFileDriver(DataSource dataSource, SQLProvider provider) { public PostgresSequentialSequentialFileDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) {
super(); super();
this.setDataSource(dataSource); this.setJdbcConnectionProvider(connectionProvider);
this.setSqlProvider(provider);
}
public PostgresSequentialSequentialFileDriver(Connection connection, SQLProvider provider) {
super();
this.setConnection(connection);
this.setSqlProvider(provider); this.setSqlProvider(provider);
} }
@Override @Override
protected void prepareStatements() throws SQLException { protected void prepareStatements() {
this.largeObjectManager = new PostgresLargeObjectManager(connection); this.largeObjectManager = new PostgresLargeObjectManager();
this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL()); this.deleteFile = sqlProvider.getDeleteFileSQL();
this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS); this.createFile = sqlProvider.getInsertFileSQL();
this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName()); this.createFileAutogeneratedKeys = Statement.RETURN_GENERATED_KEYS;
this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL()); this.selectFileByFileName = sqlProvider.getSelectFileByFileName();
this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL()); this.copyFileRecord = sqlProvider.getCopyFileRecordByIdSQL();
this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL()); this.renameFile = sqlProvider.getUpdateFileNameByIdSQL();
this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL()); this.readLargeObject = sqlProvider.getReadLargeObjectSQL();
this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL()); this.appendToLargeObject = sqlProvider.getAppendToLargeObjectSQL();
this.selectFileNamesByExtension = sqlProvider.getSelectFileNamesByExtensionSQL();
} }
@Override @Override
public void createFile(JDBCSequentialFile file) throws SQLException { public void createFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { try (PreparedStatement createFile = connection.prepareStatement(this.createFile, this.createFileAutogeneratedKeys)) {
connection.setAutoCommit(false); connection.setAutoCommit(false);
Long oid = largeObjectManager.createLO(); Long oid = largeObjectManager.createLO(connection);
createFile.setString(1, file.getFileName()); createFile.setString(1, file.getFileName());
createFile.setString(2, file.getExtension()); createFile.setString(2, file.getExtension());
@ -87,31 +82,31 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
@Override @Override
public void loadFile(JDBCSequentialFile file) throws SQLException { public void loadFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
connection.setAutoCommit(false); try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) {
readLargeObject.setLong(1, file.getId()); connection.setAutoCommit(false);
readLargeObject.setLong(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) { try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) { if (rs.next()) {
file.setWritePosition(getPostGresLargeObjectSize(file)); file.setWritePosition(getPostGresLargeObjectSize(file));
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
} }
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
} }
} }
} }
@Override @Override
public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException { public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
Object largeObject = null;
Long oid = getOID(file); Long oid = getOID(file);
try { try {
connection.setAutoCommit(false); connection.setAutoCommit(false);
largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.WRITE); Object largeObject = largeObjectManager.open(connection, oid, PostgresLargeObjectManager.WRITE);
if (append) { if (append) {
largeObjectManager.seek(largeObject, largeObjectManager.size(largeObject)); largeObjectManager.seek(largeObject, largeObjectManager.size(largeObject));
} else { } else {
@ -130,12 +125,11 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
@Override @Override
public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException { public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
Object largeObject = null;
long oid = getOID(file); long oid = getOID(file);
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { try {
connection.setAutoCommit(false); connection.setAutoCommit(false);
largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.READ); Object largeObject = largeObjectManager.open(connection, oid, PostgresLargeObjectManager.READ);
int readLength = (int) calculateReadLength(largeObjectManager.size(largeObject), bytes.remaining(), file.position()); int readLength = (int) calculateReadLength(largeObjectManager.size(largeObject), bytes.remaining(), file.position());
if (readLength > 0) { if (readLength > 0) {
@ -160,17 +154,19 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
private Long getOID(JDBCSequentialFile file) throws SQLException { private Long getOID(JDBCSequentialFile file) throws SQLException {
Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY); Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
if (oid == null) { if (oid == null) {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
connection.setAutoCommit(false); try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) {
readLargeObject.setLong(1, file.getId()); connection.setAutoCommit(false);
try (ResultSet rs = readLargeObject.executeQuery()) { readLargeObject.setLong(1, file.getId());
if (rs.next()) { try (ResultSet rs = readLargeObject.executeQuery()) {
file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1)); if (rs.next()) {
file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
} }
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
} }
} }
} }
@ -184,10 +180,10 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential
int size = 0; int size = 0;
Long oid = getOID(file); Long oid = getOID(file);
if (oid != null) { if (oid != null) {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { try {
connection.setAutoCommit(false); connection.setAutoCommit(false);
Object largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.READ); Object largeObject = largeObjectManager.open(connection, oid, PostgresLargeObjectManager.READ);
size = largeObjectManager.size(largeObject); size = largeObjectManager.size(largeObject);
largeObjectManager.close(largeObject); largeObjectManager.close(largeObject);
connection.commit(); connection.commit();

View File

@ -17,7 +17,7 @@
package org.apache.activemq.artemis.jdbc.store.journal; package org.apache.activemq.artemis.jdbc.store.journal;
import javax.sql.DataSource; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList; import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -67,15 +68,15 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
private final List<JDBCJournalRecord> records; private final List<JDBCJournalRecord> records;
private PreparedStatement insertJournalRecords; private String insertJournalRecords;
private PreparedStatement selectJournalRecords; private String selectJournalRecords;
private PreparedStatement countJournalRecords; private String countJournalRecords;
private PreparedStatement deleteJournalRecords; private String deleteJournalRecords;
private PreparedStatement deleteJournalTxRecords; private String deleteJournalTxRecords;
private boolean started; private boolean started;
@ -95,30 +96,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
private final IOCriticalErrorListener criticalIOErrorListener; private final IOCriticalErrorListener criticalIOErrorListener;
public JDBCJournalImpl(DataSource dataSource, public JDBCJournalImpl(JDBCConnectionProvider connectionProvider,
SQLProvider provider, SQLProvider provider,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
Executor completeExecutor, Executor completeExecutor,
IOCriticalErrorListener criticalIOErrorListener, IOCriticalErrorListener criticalIOErrorListener,
long syncDelay) { long syncDelay) {
super(dataSource, provider); super(connectionProvider, provider);
records = new ArrayList<>();
this.scheduledExecutorService = scheduledExecutorService;
this.completeExecutor = completeExecutor;
this.criticalIOErrorListener = criticalIOErrorListener;
this.syncDelay = syncDelay;
}
public JDBCJournalImpl(String jdbcUrl,
String user,
String password,
String jdbcDriverClass,
SQLProvider sqlProvider,
ScheduledExecutorService scheduledExecutorService,
Executor completeExecutor,
IOCriticalErrorListener criticalIOErrorListener,
long syncDelay) {
super(sqlProvider, jdbcUrl, user, password, jdbcDriverClass);
records = new ArrayList<>(); records = new ArrayList<>();
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
this.completeExecutor = completeExecutor; this.completeExecutor = completeExecutor;
@ -153,13 +137,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
} }
@Override @Override
protected void prepareStatements() throws SQLException { protected void prepareStatements() {
logger.tracef("preparing statements"); logger.tracef("preparing statements");
insertJournalRecords = connection.prepareStatement(sqlProvider.getInsertJournalRecordsSQL()); insertJournalRecords = sqlProvider.getInsertJournalRecordsSQL();
selectJournalRecords = connection.prepareStatement(sqlProvider.getSelectJournalRecordsSQL()); selectJournalRecords = sqlProvider.getSelectJournalRecordsSQL();
countJournalRecords = connection.prepareStatement(sqlProvider.getCountJournalRecordsSQL()); countJournalRecords = sqlProvider.getCountJournalRecordsSQL();
deleteJournalRecords = connection.prepareStatement(sqlProvider.getDeleteJournalRecordsSQL()); deleteJournalRecords = sqlProvider.getDeleteJournalRecordsSQL();
deleteJournalTxRecords = connection.prepareStatement(sqlProvider.getDeleteJournalTxRecordsSQL()); deleteJournalTxRecords = sqlProvider.getDeleteJournalTxRecordsSQL();
} }
@Override @Override
@ -205,65 +189,70 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
TransactionHolder holder; TransactionHolder holder;
try { try (Connection connection = connectionProvider.getConnection()) {
connection.setAutoCommit(false);
for (JDBCJournalRecord record : recordRef) { try (PreparedStatement deleteJournalRecords = connection.prepareStatement(this.deleteJournalRecords);
PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords);
PreparedStatement insertJournalRecords = connection.prepareStatement(this.insertJournalRecords)) {
if (logger.isTraceEnabled()) { connection.setAutoCommit(false);
logger.trace("sync::preparing JDBC statement for " + record);
} for (JDBCJournalRecord record : recordRef) {
if (logger.isTraceEnabled()) {
logger.trace("sync::preparing JDBC statement for " + record);
}
switch (record.getRecordType()) {
switch (record.getRecordType()) { case JDBCJournalRecord.DELETE_RECORD:
case JDBCJournalRecord.DELETE_RECORD: // Standard SQL Delete Record, Non transactional delete
// Standard SQL Delete Record, Non transactional delete
deletedRecords.add(record.getId());
record.writeDeleteRecord(deleteJournalRecords);
break;
case JDBCJournalRecord.ROLLBACK_RECORD:
// Roll back we remove all records associated with this TX ID. This query is always performed last.
deleteJournalTxRecords.setLong(1, record.getTxId());
deleteJournalTxRecords.addBatch();
break;
case JDBCJournalRecord.COMMIT_RECORD:
// We perform all the deletes and add the commit record in the same Database TX
holder = transactions.get(record.getTxId());
for (RecordInfo info : holder.recordsToDelete) {
deletedRecords.add(record.getId()); deletedRecords.add(record.getId());
deletedRecords.add(info.id); record.writeDeleteRecord(deleteJournalRecords);
deleteJournalRecords.setLong(1, info.id); break;
deleteJournalRecords.addBatch(); case JDBCJournalRecord.ROLLBACK_RECORD:
} // Roll back we remove all records associated with this TX ID. This query is always performed last.
record.writeRecord(insertJournalRecords); deleteJournalTxRecords.setLong(1, record.getTxId());
committedTransactions.add(record.getTxId()); deleteJournalTxRecords.addBatch();
break; break;
default: case JDBCJournalRecord.COMMIT_RECORD:
// Default we add a new record to the DB // We perform all the deletes and add the commit record in the same Database TX
record.writeRecord(insertJournalRecords); holder = transactions.get(record.getTxId());
break; for (RecordInfo info : holder.recordsToDelete) {
deletedRecords.add(record.getId());
deletedRecords.add(info.id);
deleteJournalRecords.setLong(1, info.id);
deleteJournalRecords.addBatch();
}
record.writeRecord(insertJournalRecords);
committedTransactions.add(record.getTxId());
break;
default:
// Default we add a new record to the DB
record.writeRecord(insertJournalRecords);
break;
}
} }
}
insertJournalRecords.executeBatch(); insertJournalRecords.executeBatch();
deleteJournalRecords.executeBatch(); deleteJournalRecords.executeBatch();
deleteJournalTxRecords.executeBatch();
connection.commit();
if (logger.isTraceEnabled()) {
logger.trace("JDBC commit worked");
}
if (cleanupTxRecords(deletedRecords, committedTransactions)) {
deleteJournalTxRecords.executeBatch(); deleteJournalTxRecords.executeBatch();
connection.commit(); connection.commit();
logger.trace("JDBC commit worked on cleanupTxRecords"); if (logger.isTraceEnabled()) {
logger.trace("JDBC commit worked");
}
if (cleanupTxRecords(deletedRecords, committedTransactions)) {
deleteJournalTxRecords.executeBatch();
connection.commit();
logger.trace("JDBC commit worked on cleanupTxRecords");
}
executeCallbacks(recordRef, true);
return recordRef.size();
} }
executeCallbacks(recordRef, true);
return recordRef.size();
} catch (Exception e) { } catch (Exception e) {
handleException(recordRef, e); handleException(recordRef, e);
return 0; return 0;
@ -280,18 +269,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
logger.trace("Rolling back Transaction, just in case"); logger.trace("Rolling back Transaction, just in case");
} }
try {
connection.rollback();
} catch (Throwable rollback) {
logger.warn(rollback);
}
try {
connection.close();
} catch (Throwable rollback) {
logger.warn(rollback);
}
if (recordRef != null) { if (recordRef != null) {
executeCallbacks(recordRef, false); executeCallbacks(recordRef, false);
} }
@ -308,23 +285,27 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
transactions.get(txId).committed = true; transactions.get(txId).committed = true;
} }
boolean hasDeletedJournalTxRecords = false; boolean hasDeletedJournalTxRecords = false;
// TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
for (TransactionHolder h : iterableCopyTx) {
iterableCopy = new ArrayList<>(); try (Connection connection = connectionProvider.getConnection();
iterableCopy.addAll(h.recordInfos); PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords)) {
// TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
for (TransactionHolder h : iterableCopyTx) {
for (RecordInfo info : iterableCopy) { iterableCopy = new ArrayList<>();
if (deletedRecords.contains(info.id)) { iterableCopy.addAll(h.recordInfos);
h.recordInfos.remove(info);
for (RecordInfo info : iterableCopy) {
if (deletedRecords.contains(info.id)) {
h.recordInfos.remove(info);
}
} }
}
if (h.recordInfos.isEmpty() && h.committed) { if (h.recordInfos.isEmpty() && h.committed) {
deleteJournalTxRecords.setLong(1, h.transactionID); deleteJournalTxRecords.setLong(1, h.transactionID);
deleteJournalTxRecords.addBatch(); deleteJournalTxRecords.addBatch();
hasDeletedJournalTxRecords = true; hasDeletedJournalTxRecords = true;
transactions.remove(h.transactionID); transactions.remove(h.transactionID);
}
} }
} }
return hasDeletedJournalTxRecords; return hasDeletedJournalTxRecords;
@ -868,51 +849,54 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager); JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager);
JDBCJournalRecord r; JDBCJournalRecord r;
try (ResultSet rs = selectJournalRecords.executeQuery()) { try (Connection connection = connectionProvider.getConnection();
int noRecords = 0; PreparedStatement selectJournalRecords = connection.prepareStatement(this.selectJournalRecords)) {
while (rs.next()) { try (ResultSet rs = selectJournalRecords.executeQuery()) {
r = JDBCJournalRecord.readRecord(rs); int noRecords = 0;
switch (r.getRecordType()) { while (rs.next()) {
case JDBCJournalRecord.ADD_RECORD: r = JDBCJournalRecord.readRecord(rs);
jrc.onReadAddRecord(r.toRecordInfo()); switch (r.getRecordType()) {
break; case JDBCJournalRecord.ADD_RECORD:
case JDBCJournalRecord.UPDATE_RECORD: jrc.onReadAddRecord(r.toRecordInfo());
jrc.onReadUpdateRecord(r.toRecordInfo()); break;
break; case JDBCJournalRecord.UPDATE_RECORD:
case JDBCJournalRecord.DELETE_RECORD: jrc.onReadUpdateRecord(r.toRecordInfo());
jrc.onReadDeleteRecord(r.getId()); break;
break; case JDBCJournalRecord.DELETE_RECORD:
case JDBCJournalRecord.ADD_RECORD_TX: jrc.onReadDeleteRecord(r.getId());
jrc.onReadAddRecordTX(r.getTxId(), r.toRecordInfo()); break;
break; case JDBCJournalRecord.ADD_RECORD_TX:
case JDBCJournalRecord.UPDATE_RECORD_TX: jrc.onReadAddRecordTX(r.getTxId(), r.toRecordInfo());
jrc.onReadUpdateRecordTX(r.getTxId(), r.toRecordInfo()); break;
break; case JDBCJournalRecord.UPDATE_RECORD_TX:
case JDBCJournalRecord.DELETE_RECORD_TX: jrc.onReadUpdateRecordTX(r.getTxId(), r.toRecordInfo());
jrc.onReadDeleteRecordTX(r.getTxId(), r.toRecordInfo()); break;
break; case JDBCJournalRecord.DELETE_RECORD_TX:
case JDBCJournalRecord.PREPARE_RECORD: jrc.onReadDeleteRecordTX(r.getTxId(), r.toRecordInfo());
jrc.onReadPrepareRecord(r.getTxId(), r.getTxDataAsByteArray(), r.getTxCheckNoRecords()); break;
break; case JDBCJournalRecord.PREPARE_RECORD:
case JDBCJournalRecord.COMMIT_RECORD: jrc.onReadPrepareRecord(r.getTxId(), r.getTxDataAsByteArray(), r.getTxCheckNoRecords());
jrc.onReadCommitRecord(r.getTxId(), r.getTxCheckNoRecords()); break;
break; case JDBCJournalRecord.COMMIT_RECORD:
case JDBCJournalRecord.ROLLBACK_RECORD: jrc.onReadCommitRecord(r.getTxId(), r.getTxCheckNoRecords());
jrc.onReadRollbackRecord(r.getTxId()); break;
break; case JDBCJournalRecord.ROLLBACK_RECORD:
default: jrc.onReadRollbackRecord(r.getTxId());
throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType()); break;
default:
throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType());
}
noRecords++;
if (r.getSeq() > seq.longValue()) {
seq.set(r.getSeq());
}
} }
noRecords++; jrc.checkPreparedTx();
if (r.getSeq() > seq.longValue()) {
seq.set(r.getSeq());
}
}
jrc.checkPreparedTx();
jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId()); jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
jli.setNumberOfRecords(noRecords); jli.setNumberOfRecords(noRecords);
transactions = jrc.getTransactions(); transactions = jrc.getTransactions();
}
} catch (Throwable e) { } catch (Throwable e) {
handleException(null, e); handleException(null, e);
} }
@ -962,9 +946,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
@Override @Override
public int getNumberOfRecords() { public int getNumberOfRecords() {
int count = 0; int count = 0;
try (ResultSet rs = countJournalRecords.executeQuery()) { try (Connection connection = connectionProvider.getConnection();
rs.next(); PreparedStatement countJournalRecords = connection.prepareStatement(this.countJournalRecords)) {
count = rs.getInt(1); try (ResultSet rs = countJournalRecords.executeQuery()) {
rs.next();
count = rs.getInt(1);
}
} catch (SQLException e) { } catch (SQLException e) {
logger.warn(e.getMessage(), e); logger.warn(e.getMessage(), e);
return -1; return -1;

View File

@ -22,10 +22,12 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DatabaseMetaData; import java.sql.DatabaseMetaData;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -363,7 +365,15 @@ public class PropertySQLProvider implements SQLProvider {
} }
public Factory(DataSource dataSource) { public Factory(DataSource dataSource) {
this(investigateDialect(dataSource)); this(new JDBCConnectionProvider(dataSource));
}
public Factory(Map<String, Object> dataSourceProperties) {
this(investigateDialect(dataSourceProperties));
}
public Factory(JDBCConnectionProvider connectionProvider) {
this(investigateDialect(connectionProvider));
} }
public static SQLDialect investigateDialect(Connection connection) { public static SQLDialect investigateDialect(Connection connection) {
@ -388,8 +398,21 @@ public class PropertySQLProvider implements SQLProvider {
return dialect; return dialect;
} }
private static SQLDialect investigateDialect(DataSource dataSource) { public static SQLDialect investigateDialect(Map<String, Object> dataSourceProperties) {
try (Connection connection = dataSource.getConnection()) { SQLDialect dialect = null;
for (Object entry : dataSourceProperties.values()) {
if (entry instanceof String) {
dialect = identifyDialect((String) entry);
if (dialect != null) {
return dialect;
}
}
}
return dialect;
}
private static SQLDialect investigateDialect(JDBCConnectionProvider connectionProvider) {
try (Connection connection = connectionProvider.getConnection()) {
return investigateDialect(connection); return investigateDialect(connection);
} catch (Exception e) { } catch (Exception e) {
logger.debug("Unable to read JDBC metadata.", e); logger.debug("Unable to read JDBC metadata.", e);

View File

@ -21,9 +21,11 @@ import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -31,11 +33,14 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCDataSourceUtils;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
@ -79,15 +84,20 @@ public class JDBCSequentialFileFactoryTest {
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
Map<String, Object> dataSourceProperties = new HashMap<>();
if (useAuthentication) { if (useAuthentication) {
user = "testuser"; user = "testuser";
password = "testpassword"; password = "testpassword";
System.setProperty("derby.connection.requireAuthentication", "true"); System.setProperty("derby.connection.requireAuthentication", "true");
System.setProperty("derby.user." + user, password); System.setProperty("derby.user." + user, password);
dataSourceProperties.put("username", user);
dataSourceProperties.put("password", password);
} }
String connectionUrl = "jdbc:derby:target/data;create=true"; dataSourceProperties.put("url", "jdbc:derby:target/data;create=true");
dataSourceProperties.put("driverClassName", className);
String tableName = "FILES"; String tableName = "FILES";
factory = new JDBCSequentialFileFactory(connectionUrl, user, password, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() { String jdbcDatasourceClass = ActiveMQDefaultConfiguration.getDefaultDataSourceClassName();
factory = new JDBCSequentialFileFactory(new JDBCConnectionProvider(JDBCDataSourceUtils.getDataSource(jdbcDatasourceClass, dataSourceProperties)), JDBCUtils.getSQLProvider(dataSourceProperties, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() {
@Override @Override
public void onIOException(Throwable code, String message, SequentialFile file) { public void onIOException(Throwable code, String message, SequentialFile file) {
} }

View File

@ -44,21 +44,21 @@ public class PostgresLargeObjectManagerTest {
@Test @Test
public void testShouldNotUseReflection() throws SQLException { public void testShouldNotUseReflection() throws SQLException {
PostgresLargeObjectManager manager = new PostgresLargeObjectManager(new MockConnection()); PostgresLargeObjectManager manager = new PostgresLargeObjectManager();
try { try {
manager.createLO(); manager.createLO(new MockConnection());
fail("Shouldn't be using reflection"); fail("Shouldn't be using reflection");
} catch (ClassCastException ex) { } catch (ClassCastException ex) {
} }
} }
@Test @Test
public void testShouldUseReflection() throws SQLException, ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { public void testShouldUseReflection() throws ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException {
ClassLoader loader = new FunkyClassLoader(); ClassLoader loader = new FunkyClassLoader();
Class funkyClass = loader.loadClass("org.apache.activemq.artemis.jdbc.store.file.PostgresLargeObjectManager"); Class funkyClass = loader.loadClass("org.apache.activemq.artemis.jdbc.store.file.PostgresLargeObjectManager");
Object manager = funkyClass.getConstructor(Connection.class).newInstance(new MockConnection()); Object manager = funkyClass.getConstructor().newInstance();
try { try {
funkyClass.getMethod("createLO").invoke(manager); funkyClass.getMethod("createLO", Connection.class).invoke(manager, new MockConnection());
fail("Shouldn't be using reflection"); fail("Shouldn't be using reflection");
} catch (java.lang.reflect.InvocationTargetException ex) { } catch (java.lang.reflect.InvocationTargetException ex) {
assertEquals("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex.getCause().getMessage()); assertEquals("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex.getCause().getMessage());

View File

@ -20,8 +20,13 @@ import javax.sql.DataSource;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCDataSourceUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import java.util.HashMap;
import java.util.Map;
public class DatabaseStorageConfiguration implements StoreConfiguration { public class DatabaseStorageConfiguration implements StoreConfiguration {
private String messageTableName = ActiveMQDefaultConfiguration.getDefaultMessageTableName(); private String messageTableName = ActiveMQDefaultConfiguration.getDefaultMessageTableName();
@ -44,6 +49,12 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private DataSource dataSource; private DataSource dataSource;
private String dataSourceClassName = ActiveMQDefaultConfiguration.getDefaultDataSourceClassName();
private Map<String, Object> dataSourceProperties = new HashMap();
private JDBCConnectionProvider connectionProvider;
private SQLProvider.Factory sqlProviderFactory; private SQLProvider.Factory sqlProviderFactory;
private int jdbcNetworkTimeout = ActiveMQDefaultConfiguration.getDefaultJdbcNetworkTimeout(); private int jdbcNetworkTimeout = ActiveMQDefaultConfiguration.getDefaultJdbcNetworkTimeout();
@ -138,7 +149,22 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
* *
* @return the DataSource used to store Artemis data in the JDBC data store. * @return the DataSource used to store Artemis data in the JDBC data store.
*/ */
public DataSource getDataSource() { private DataSource getDataSource() {
if (dataSource == null) {
if (dataSourceProperties.isEmpty()) {
addDataSourceProperty("driverClassName", jdbcDriverClassName);
addDataSourceProperty("url", jdbcConnectionUrl);
if (jdbcUser != null) {
addDataSourceProperty("username", jdbcUser);
}
if (jdbcPassword != null) {
addDataSourceProperty("password", jdbcPassword);
}
// Let the pool to have unbounded number of connections by default to prevent connection starvation
addDataSourceProperty("maxTotal", "-1");
}
dataSource = JDBCDataSourceUtils.getDataSource(dataSourceClassName, dataSourceProperties);
}
return dataSource; return dataSource;
} }
@ -151,6 +177,33 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
this.dataSource = dataSource; this.dataSource = dataSource;
} }
public JDBCConnectionProvider getConnectionProvider() {
if (connectionProvider == null) {
connectionProvider = new JDBCConnectionProvider(getDataSource());
}
return connectionProvider;
}
public void addDataSourceProperty(String key, String value) {
if (value.toLowerCase().equals("true") || value.toLowerCase().equals("false")) {
dataSourceProperties.put(key, Boolean.parseBoolean(value.toLowerCase()));
} else {
try {
int i = Integer.parseInt(value);
dataSourceProperties.put(key, i);
} catch (NumberFormatException nfe) {
dataSourceProperties.put(key, value);
}
}
}
public String getDataSourceClassName() {
return dataSourceClassName;
}
public void setDataSourceClassName(String dataSourceClassName) {
this.dataSourceClassName = dataSourceClassName;
}
/** /**
* The {@link SQLProvider.Factory} used to communicate with the JDBC data store. * The {@link SQLProvider.Factory} used to communicate with the JDBC data store.
* It can be {@code null}. If the value is {@code null} and {@code dataSource} is set, the {@code {@link org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory}} will be used, * It can be {@code null}. If the value is {@code null} and {@code dataSource} is set, the {@code {@link org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory}} will be used,

View File

@ -1738,6 +1738,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
password = PasswordMaskingUtil.resolveMask(mainConfig.isMaskPassword(), password, mainConfig.getPasswordCodec()); password = PasswordMaskingUtil.resolveMask(mainConfig.isMaskPassword(), password, mainConfig.getPasswordCodec());
} }
conf.setJdbcPassword(password); conf.setJdbcPassword(password);
conf.setDataSourceClassName(getString(storeNode, "data-source-class-name", conf.getDataSourceClassName(), Validators.NO_CHECK));
if (parameterExists(storeNode, "data-source-properties")) {
NodeList propertyNodeList = storeNode.getElementsByTagName("data-source-property");
for (int i = 0; i < propertyNodeList.getLength(); i++) {
Element propertyNode = (Element) propertyNodeList.item(i);
conf.addDataSourceProperty(propertyNode.getAttributeNode("key").getValue(), propertyNode.getAttributeNode("value").getValue());
}
}
//conf.initDataSource();
return conf; return conf;
} }

View File

@ -40,10 +40,8 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
@ -72,16 +70,12 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
protected final StorageManager storageManager; protected final StorageManager storageManager;
private JDBCSequentialFileFactoryDriver dbDriver;
private DatabaseStorageConfiguration dbConf; private DatabaseStorageConfiguration dbConf;
private ExecutorFactory executorFactory; private ExecutorFactory executorFactory;
private JDBCSequentialFileFactory pagingFactoryFileFactory; private JDBCSequentialFileFactory pagingFactoryFileFactory;
private JDBCSequentialFile directoryList;
private final boolean readWholePage; private final boolean readWholePage;
@Override @Override
@ -106,8 +100,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
final ScheduledExecutorService scheduledExecutor, final ScheduledExecutorService scheduledExecutor,
final ExecutorFactory executorFactory, final ExecutorFactory executorFactory,
final boolean syncNonTransactional, final boolean syncNonTransactional,
final IOCriticalErrorListener critialErrorListener) throws Exception { final IOCriticalErrorListener criticalErrorListener) throws Exception {
this(dbConf, storageManager, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener, false); this(dbConf, storageManager, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, criticalErrorListener, false);
} }
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf, public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
@ -116,7 +110,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
final ScheduledExecutorService scheduledExecutor, final ScheduledExecutorService scheduledExecutor,
final ExecutorFactory executorFactory, final ExecutorFactory executorFactory,
final boolean syncNonTransactional, final boolean syncNonTransactional,
final IOCriticalErrorListener critialErrorListener, final IOCriticalErrorListener criticalErrorListener,
final boolean readWholePage) throws Exception { final boolean readWholePage) throws Exception {
this.storageManager = storageManager; this.storageManager = storageManager;
this.executorFactory = executorFactory; this.executorFactory = executorFactory;
@ -124,7 +118,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
this.scheduledExecutor = scheduledExecutor; this.scheduledExecutor = scheduledExecutor;
this.syncTimeout = syncTimeout; this.syncTimeout = syncTimeout;
this.dbConf = dbConf; this.dbConf = dbConf;
this.criticalErrorListener = critialErrorListener; this.criticalErrorListener = criticalErrorListener;
this.factoryToTableName = new HashMap<>(); this.factoryToTableName = new HashMap<>();
this.readWholePage = readWholePage; this.readWholePage = readWholePage;
start(); start();
@ -137,20 +131,11 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
if (pageStoreTableNamePrefix.length() > 10) { if (pageStoreTableNamePrefix.length() > 10) {
throw new IllegalStateException("The maximum name size for the page store table prefix is 10 characters: THE PAGING STORE CAN'T START"); throw new IllegalStateException("The maximum name size for the page store table prefix is 10 characters: THE PAGING STORE CAN'T START");
} }
if (dbConf.getDataSource() != null) { SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory(); if (sqlProviderFactory == null) {
if (sqlProviderFactory == null) { sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getConnectionProvider());
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
}
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
} else {
String driverClassName = dbConf.getJdbcDriverClassName();
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
}
final int jdbcNetworkTimeout = dbConf.getJdbcNetworkTimeout();
if (jdbcNetworkTimeout >= 0) {
pagingFactoryFileFactory.setNetworkTimeout(this.executorFactory.getExecutor(), jdbcNetworkTimeout);
} }
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getConnectionProvider(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
pagingFactoryFileFactory.start(); pagingFactoryFileFactory.start();
started = true; started = true;
} }
@ -278,22 +263,14 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
directoryList.close(); directoryList.close();
final SQLProvider sqlProvider; final SQLProvider sqlProvider;
if (dbConf.getDataSource() != null) { final SQLProvider.Factory sqlProviderFactory;
final SQLProvider.Factory sqlProviderFactory; if (dbConf.getSqlProviderFactory() != null) {
if (dbConf.getSqlProviderFactory() != null) { sqlProviderFactory = dbConf.getSqlProviderFactory();
sqlProviderFactory = dbConf.getSqlProviderFactory();
} else {
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
}
sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
} else { } else {
sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE); sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getConnectionProvider());
}
final JDBCSequentialFileFactory fileFactory = new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener);
final int jdbcNetworkTimeout = dbConf.getJdbcNetworkTimeout();
if (jdbcNetworkTimeout >= 0) {
fileFactory.setNetworkTimeout(this.executorFactory.getExecutor(), jdbcNetworkTimeout);
} }
sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
final JDBCSequentialFileFactory fileFactory = new JDBCSequentialFileFactory(dbConf.getConnectionProvider(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener);
factoryToTableName.put(fileFactory, directoryName); factoryToTableName.put(fileFactory, directoryName);
return fileFactory; return fileFactory;
} }

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.persistence.impl.journal; package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.Connection;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -26,7 +25,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
@ -36,8 +35,6 @@ import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
public class JDBCJournalStorageManager extends JournalStorageManager { public class JDBCJournalStorageManager extends JournalStorageManager {
private Connection connection;
public JDBCJournalStorageManager(Configuration config, public JDBCJournalStorageManager(Configuration config,
CriticalAnalyzer analyzer, CriticalAnalyzer analyzer,
ExecutorFactory executorFactory, ExecutorFactory executorFactory,
@ -59,33 +56,35 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
protected synchronized void init(Configuration config, IOCriticalErrorListener criticalErrorListener) { protected synchronized void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
try { try {
final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration(); final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
final JDBCConnectionProvider connectionProvider = dbConf.getConnectionProvider();
final int networkTimeout = dbConf.getJdbcNetworkTimeout();
if (networkTimeout >= 0) {
connectionProvider.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout);
}
final JDBCJournalImpl bindingsJournal; final JDBCJournalImpl bindingsJournal;
final JDBCJournalImpl messageJournal; final JDBCJournalImpl messageJournal;
final JDBCSequentialFileFactory largeMessagesFactory; final JDBCSequentialFileFactory largeMessagesFactory;
if (dbConf.getDataSource() != null) { SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory(); if (sqlProviderFactory == null) {
if (sqlProviderFactory == null) { sqlProviderFactory = new PropertySQLProvider.Factory(connectionProvider);
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
}
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener,dbConf.getJdbcJournalSyncPeriodMillis());
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
} else {
String driverClassName = dbConf.getJdbcDriverClassName();
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
}
final int networkTimeout = dbConf.getJdbcNetworkTimeout();
if (networkTimeout >= 0) {
bindingsJournal.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout);
}
if (networkTimeout >= 0) {
messageJournal.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout);
}
if (networkTimeout >= 0) {
largeMessagesFactory.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout);
} }
bindingsJournal = new JDBCJournalImpl(
connectionProvider,
sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL),
scheduledExecutorService,
executorFactory.getExecutor(),
criticalErrorListener,dbConf.getJdbcJournalSyncPeriodMillis());
messageJournal = new JDBCJournalImpl(
connectionProvider,
sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL),
scheduledExecutorService, executorFactory.getExecutor(),
criticalErrorListener,
dbConf.getJdbcJournalSyncPeriodMillis());
largeMessagesFactory = new JDBCSequentialFileFactory(
connectionProvider,
sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE),
executorFactory.getExecutor(),
criticalErrorListener);
this.bindingsJournal = bindingsJournal; this.bindingsJournal = bindingsJournal;
this.messageJournal = messageJournal; this.messageJournal = messageJournal;
this.largeMessagesFactory = largeMessagesFactory; this.largeMessagesFactory = largeMessagesFactory;

View File

@ -26,6 +26,7 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
/** /**
@ -35,13 +36,13 @@ final class JdbcLeaseLock implements LeaseLock {
private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class); private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class);
private static final int MAX_HOLDER_ID_LENGTH = 128; private static final int MAX_HOLDER_ID_LENGTH = 128;
private final Connection connection; private final JDBCConnectionProvider connectionProvider;
private final String holderId; private final String holderId;
private final PreparedStatement tryAcquireLock; private final String tryAcquireLock;
private final PreparedStatement tryReleaseLock; private final String tryReleaseLock;
private final PreparedStatement renewLock; private final String renewLock;
private final PreparedStatement isLocked; private final String isLocked;
private final PreparedStatement currentDateTime; private final String currentDateTime;
private final long expirationMillis; private final long expirationMillis;
private boolean maybeAcquired; private boolean maybeAcquired;
private final String lockName; private final String lockName;
@ -51,12 +52,12 @@ final class JdbcLeaseLock implements LeaseLock {
* whose life cycle will be managed externally. * whose life cycle will be managed externally.
*/ */
JdbcLeaseLock(String holderId, JdbcLeaseLock(String holderId,
Connection connection, JDBCConnectionProvider connectionProvider,
PreparedStatement tryAcquireLock, String tryAcquireLock,
PreparedStatement tryReleaseLock, String tryReleaseLock,
PreparedStatement renewLock, String renewLock,
PreparedStatement isLocked, String isLocked,
PreparedStatement currentDateTime, String currentDateTime,
long expirationMIllis, long expirationMIllis,
String lockName) { String lockName) {
if (holderId.length() > MAX_HOLDER_ID_LENGTH) { if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
@ -70,7 +71,7 @@ final class JdbcLeaseLock implements LeaseLock {
this.currentDateTime = currentDateTime; this.currentDateTime = currentDateTime;
this.expirationMillis = expirationMIllis; this.expirationMillis = expirationMIllis;
this.maybeAcquired = false; this.maybeAcquired = false;
this.connection = connection; this.connectionProvider = connectionProvider;
this.lockName = lockName; this.lockName = lockName;
} }
@ -84,13 +85,12 @@ final class JdbcLeaseLock implements LeaseLock {
} }
private String readableLockStatus() { private String readableLockStatus() {
try { try (Connection connection = connectionProvider.getConnection()) {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit(); final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false); connection.setAutoCommit(false);
try { try (PreparedStatement preparedStatement = connection.prepareStatement(this.isLocked)) {
final String lockStatus; final String lockStatus;
final PreparedStatement preparedStatement = this.isLocked;
try (ResultSet resultSet = preparedStatement.executeQuery()) { try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) { if (!resultSet.next()) {
lockStatus = null; lockStatus = null;
@ -114,100 +114,96 @@ final class JdbcLeaseLock implements LeaseLock {
} }
} }
private long dbCurrentTimeMillis() throws SQLException { private long dbCurrentTimeMillis(Connection connection) throws SQLException {
final long start = System.nanoTime(); final long start = System.nanoTime();
try (ResultSet resultSet = currentDateTime.executeQuery()) { try (PreparedStatement currentDateTime = connection.prepareStatement(this.currentDateTime)) {
resultSet.next(); try (ResultSet resultSet = currentDateTime.executeQuery()) {
final Timestamp currentTimestamp = resultSet.getTimestamp(1); resultSet.next();
final long elapsedTime = System.nanoTime() - start; final Timestamp currentTimestamp = resultSet.getTimestamp(1);
if (LOGGER.isDebugEnabled()) { final long elapsedTime = System.nanoTime() - start;
LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms", if (LOGGER.isDebugEnabled()) {
lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime)); LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms",
lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime));
}
return currentTimestamp.getTime();
} }
return currentTimestamp.getTime();
} }
} }
@Override @Override
public boolean renew() { public boolean renew() {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); final boolean autoCommit = connection.getAutoCommit();
final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false);
connection.setAutoCommit(false); try (PreparedStatement preparedStatement = connection.prepareStatement(this.renewLock)) {
try { final long now = dbCurrentTimeMillis(connection);
final PreparedStatement preparedStatement = this.renewLock; final Timestamp expirationTime = new Timestamp(now + expirationMillis);
final long now = dbCurrentTimeMillis(); if (LOGGER.isDebugEnabled()) {
final Timestamp expirationTime = new Timestamp(now + expirationMillis); LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
if (LOGGER.isDebugEnabled()) { lockName, holderId, expirationTime);
LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
lockName, holderId, expirationTime);
}
preparedStatement.setTimestamp(1, expirationTime);
preparedStatement.setString(2, holderId);
preparedStatement.setTimestamp(3, expirationTime);
preparedStatement.setTimestamp(4, expirationTime);
final int updatedRows = preparedStatement.executeUpdate();
final boolean renewed = updatedRows == 1;
connection.commit();
if (!renewed) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
} else {
LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId);
}
return renewed;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) { preparedStatement.setTimestamp(1, expirationTime);
throw new IllegalStateException(e); preparedStatement.setString(2, holderId);
preparedStatement.setTimestamp(3, expirationTime);
preparedStatement.setTimestamp(4, expirationTime);
final int updatedRows = preparedStatement.executeUpdate();
final boolean renewed = updatedRows == 1;
connection.commit();
if (!renewed) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
} else {
LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId);
}
return renewed;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) {
throw new IllegalStateException(e);
} }
} }
@Override @Override
public boolean tryAcquire() { public boolean tryAcquire() {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); final boolean autoCommit = connection.getAutoCommit();
final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false);
connection.setAutoCommit(false); try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryAcquireLock)) {
try { final long now = dbCurrentTimeMillis(connection);
final PreparedStatement preparedStatement = tryAcquireLock; preparedStatement.setString(1, holderId);
final long now = dbCurrentTimeMillis(); final Timestamp expirationTime = new Timestamp(now + expirationMillis);
preparedStatement.setString(1, holderId); preparedStatement.setTimestamp(2, expirationTime);
final Timestamp expirationTime = new Timestamp(now + expirationMillis); preparedStatement.setTimestamp(3, expirationTime);
preparedStatement.setTimestamp(2, expirationTime); LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s",
preparedStatement.setTimestamp(3, expirationTime); lockName, holderId, expirationTime);
LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s", final boolean acquired = preparedStatement.executeUpdate() == 1;
lockName, holderId, expirationTime); connection.commit();
final boolean acquired = preparedStatement.executeUpdate() == 1; if (acquired) {
connection.commit(); this.maybeAcquired = true;
if (acquired) { LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId);
this.maybeAcquired = true; } else {
LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId); if (LOGGER.isDebugEnabled()) {
} else { LOGGER.debugf("[%s] %s has failed to acquire lock: lock status = { %s }",
if (LOGGER.isDebugEnabled()) { lockName, holderId, readableLockStatus());
LOGGER.debugf("[%s] %s has failed to acquire lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
} }
return acquired;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) { return acquired;
throw new IllegalStateException(e); } catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) {
throw new IllegalStateException(e);
} }
} }
@ -222,104 +218,85 @@ final class JdbcLeaseLock implements LeaseLock {
} }
private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) { private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); final boolean autoCommit = connection.getAutoCommit();
final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false);
connection.setAutoCommit(false); try (PreparedStatement preparedStatement = connection.prepareStatement(this.isLocked)) {
try { boolean result;
boolean result; try (ResultSet resultSet = preparedStatement.executeQuery()) {
final PreparedStatement preparedStatement = this.isLocked; if (!resultSet.next()) {
try (ResultSet resultSet = preparedStatement.executeQuery()) { result = false;
if (!resultSet.next()) { } else {
result = false; final String currentHolderId = resultSet.getString(1);
} else { result = holderIdFilter.test(currentHolderId);
final String currentHolderId = resultSet.getString(1); final Timestamp expirationTime = resultSet.getTimestamp(2);
result = holderIdFilter.test(currentHolderId); final Timestamp currentTimestamp = resultSet.getTimestamp(3);
final Timestamp expirationTime = resultSet.getTimestamp(2); final long currentTimestampMillis = currentTimestamp.getTime();
final Timestamp currentTimestamp = resultSet.getTimestamp(3); boolean zombie = false;
final long currentTimestampMillis = currentTimestamp.getTime(); if (expirationTime != null) {
boolean zombie = false; final long lockExpirationTime = expirationTime.getTime();
if (expirationTime != null) { final long expiredBy = currentTimestampMillis - lockExpirationTime;
final long lockExpirationTime = expirationTime.getTime(); if (expiredBy > 0) {
final long expiredBy = currentTimestampMillis - lockExpirationTime; result = false;
if (expiredBy > 0) { zombie = true;
result = false;
zombie = true;
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has found %s with holderId = %s expirationTime = %s currentTimestamp = %s",
lockName, holderId, zombie ? "zombie lock" : "lock",
currentHolderId, expirationTime, currentTimestamp);
} }
} }
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has found %s with holderId = %s expirationTime = %s currentTimestamp = %s",
lockName, holderId, zombie ? "zombie lock" : "lock",
currentHolderId, expirationTime, currentTimestamp);
}
} }
connection.commit();
return result;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) { connection.commit();
throw new IllegalStateException(e); return result;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) {
throw new IllegalStateException(e);
} }
} }
@Override @Override
public void release() { public void release() {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); final boolean autoCommit = connection.getAutoCommit();
final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false);
connection.setAutoCommit(false); try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryReleaseLock)) {
try { preparedStatement.setString(1, holderId);
final PreparedStatement preparedStatement = this.tryReleaseLock; final boolean released = preparedStatement.executeUpdate() == 1;
preparedStatement.setString(1, holderId); //consider it as released to avoid on finalize to be reclaimed
final boolean released = preparedStatement.executeUpdate() == 1; this.maybeAcquired = false;
//consider it as released to avoid on finalize to be reclaimed connection.commit();
this.maybeAcquired = false; if (!released) {
connection.commit(); if (LOGGER.isDebugEnabled()) {
if (!released) { LOGGER.debugf("[%s] %s has failed to release lock: lock status = { %s }",
if (LOGGER.isDebugEnabled()) { lockName, holderId, readableLockStatus());
LOGGER.debugf("[%s] %s has failed to release lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
} else {
LOGGER.debugf("[%s] %s has released lock", lockName, holderId);
} }
} catch (SQLException ie) { } else {
connection.rollback(); LOGGER.debugf("[%s] %s has released lock", lockName, holderId);
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) { } catch (SQLException ie) {
throw new IllegalStateException(e); connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) {
throw new IllegalStateException(e);
} }
} }
@Override @Override
public void close() throws SQLException { public void close() throws SQLException {
synchronized (connection) { if (this.maybeAcquired) {
//to avoid being called if not needed release();
if (!this.tryReleaseLock.isClosed()) {
try {
if (this.maybeAcquired) {
release();
}
} finally {
this.tryReleaseLock.close();
this.tryAcquireLock.close();
this.renewLock.close();
this.isLocked.close();
this.currentDateTime.close();
}
}
} }
} }

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.server.impl.jdbc; package org.apache.activemq.artemis.core.server.impl.jdbc;
import javax.sql.DataSource;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -29,7 +28,7 @@ import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback; import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
@ -61,59 +60,37 @@ public final class JdbcNodeManager extends NodeManager {
ExecutorFactory executorFactory, ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) { IOCriticalErrorListener ioCriticalErrorListener) {
validateTimeoutConfiguration(configuration); validateTimeoutConfiguration(configuration);
if (configuration.getDataSource() != null) { final SQLProvider.Factory sqlProviderFactory;
final SQLProvider.Factory sqlProviderFactory; if (configuration.getSqlProviderFactory() != null) {
if (configuration.getSqlProviderFactory() != null) { sqlProviderFactory = configuration.getSqlProviderFactory();
sqlProviderFactory = configuration.getSqlProviderFactory();
} else {
sqlProviderFactory = new PropertySQLProvider.Factory(configuration.getDataSource());
}
final String brokerId = java.util.UUID.randomUUID().toString();
return usingDataSource(brokerId,
configuration.getJdbcNetworkTimeout(),
configuration.getJdbcLockExpirationMillis(),
configuration.getJdbcLockRenewPeriodMillis(),
configuration.getJdbcLockAcquisitionTimeoutMillis(),
configuration.getDataSource(),
sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
} else { } else {
final SQLProvider sqlProvider = JDBCUtils.getSQLProvider(configuration.getJdbcDriverClassName(), configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER); sqlProviderFactory = new PropertySQLProvider.Factory(configuration.getConnectionProvider());
final String brokerId = java.util.UUID.randomUUID().toString();
return usingConnectionUrl(brokerId,
configuration.getJdbcNetworkTimeout(),
configuration.getJdbcLockExpirationMillis(),
configuration.getJdbcLockRenewPeriodMillis(),
configuration.getJdbcLockAcquisitionTimeoutMillis(),
configuration.getJdbcConnectionUrl(),
configuration.getJdbcUser(),
configuration.getJdbcPassword(),
configuration.getJdbcDriverClassName(),
sqlProvider,
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
} }
final String brokerId = java.util.UUID.randomUUID().toString();
return usingConnectionProvider(brokerId,
configuration.getJdbcLockExpirationMillis(),
configuration.getJdbcLockRenewPeriodMillis(),
configuration.getJdbcLockAcquisitionTimeoutMillis(),
configuration.getConnectionProvider(),
sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
} }
private static JdbcNodeManager usingDataSource(String brokerId, private static JdbcNodeManager usingConnectionProvider(String brokerId,
int networkTimeoutMillis,
long lockExpirationMillis, long lockExpirationMillis,
long lockRenewPeriodMillis, long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis, long lockAcquisitionTimeoutMillis,
DataSource dataSource, JDBCConnectionProvider connectionProvider,
SQLProvider provider, SQLProvider provider,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory, ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) { IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager( return new JdbcNodeManager(
() -> JdbcSharedStateManager.usingDataSource(brokerId, () -> JdbcSharedStateManager.usingConnectionProvider(brokerId,
networkTimeoutMillis,
executorFactory == null ? null : executorFactory.getExecutor(),
lockExpirationMillis, lockExpirationMillis,
dataSource, connectionProvider,
provider), provider),
lockRenewPeriodMillis, lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis, lockAcquisitionTimeoutMillis,
@ -122,36 +99,6 @@ public final class JdbcNodeManager extends NodeManager {
ioCriticalErrorListener); ioCriticalErrorListener);
} }
private static JdbcNodeManager usingConnectionUrl(String brokerId,
int networkTimeoutMillis,
long lockExpirationMillis,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
String jdbcUrl,
String user,
String password,
String driverClass,
SQLProvider provider,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(
() -> JdbcSharedStateManager.usingConnectionUrl(brokerId,
networkTimeoutMillis,
executorFactory == null ? null : executorFactory.getExecutor(),
lockExpirationMillis,
jdbcUrl,
user,
password,
driverClass,
provider),
lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis,
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
}
private static void validateTimeoutConfiguration(DatabaseStorageConfiguration configuration) { private static void validateTimeoutConfiguration(DatabaseStorageConfiguration configuration) {
final long lockExpiration = configuration.getJdbcLockExpirationMillis(); final long lockExpiration = configuration.getJdbcLockExpirationMillis();
if (lockExpiration <= 0) { if (lockExpiration <= 0) {

View File

@ -17,15 +17,14 @@
package org.apache.activemq.artemis.core.server.impl.jdbc; package org.apache.activemq.artemis.core.server.impl.jdbc;
import javax.sql.DataSource;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.concurrent.Executor;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUID;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -42,21 +41,18 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
private final long lockExpirationMillis; private final long lockExpirationMillis;
private JdbcLeaseLock liveLock; private JdbcLeaseLock liveLock;
private JdbcLeaseLock backupLock; private JdbcLeaseLock backupLock;
private PreparedStatement readNodeId; private String readNodeId;
private PreparedStatement writeNodeId; private String writeNodeId;
private PreparedStatement initializeNodeId; private String initializeNodeId;
private PreparedStatement readState; private String readState;
private PreparedStatement writeState; private String writeState;
public static JdbcSharedStateManager usingDataSource(String holderId, public static JdbcSharedStateManager usingConnectionProvider(String holderId,
int networkTimeout,
Executor networkTimeoutExecutor,
long locksExpirationMillis, long locksExpirationMillis,
DataSource dataSource, JDBCConnectionProvider connectionProvider,
SQLProvider provider) { SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis); final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout); sharedStateManager.setJdbcConnectionProvider(connectionProvider);
sharedStateManager.setDataSource(dataSource);
sharedStateManager.setSqlProvider(provider); sharedStateManager.setSqlProvider(provider);
try { try {
sharedStateManager.start(); sharedStateManager.start();
@ -66,64 +62,6 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
} }
} }
public static JdbcSharedStateManager usingConnectionUrl(String holderId,
long locksExpirationMillis,
String jdbcConnectionUrl,
String jdbcDriverClass,
SQLProvider provider) {
return JdbcSharedStateManager.usingConnectionUrl(holderId,
-1,
null,
locksExpirationMillis,
jdbcConnectionUrl,
null,
null,
jdbcDriverClass,
provider);
}
public static JdbcSharedStateManager usingConnectionUrl(String holderId,
long locksExpirationMillis,
String jdbcConnectionUrl,
String user,
String password,
String jdbcDriverClass,
SQLProvider provider) {
return JdbcSharedStateManager.usingConnectionUrl(holderId,
-1,
null,
locksExpirationMillis,
jdbcConnectionUrl,
user,
password,
jdbcDriverClass,
provider);
}
public static JdbcSharedStateManager usingConnectionUrl(String holderId,
int networkTimeout,
Executor networkTimeoutExecutor,
long locksExpirationMillis,
String jdbcConnectionUrl,
String user,
String password,
String jdbcDriverClass,
SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout);
sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl);
sharedStateManager.setJdbcDriverClass(jdbcDriverClass);
sharedStateManager.setSqlProvider(provider);
sharedStateManager.setUser(user);
sharedStateManager.setPassword(password);
try {
sharedStateManager.start();
return sharedStateManager;
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
@Override @Override
protected void createSchema() { protected void createSchema() {
try { try {
@ -135,28 +73,28 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
} }
static JdbcLeaseLock createLiveLock(String holderId, static JdbcLeaseLock createLiveLock(String holderId,
Connection connection, JDBCConnectionProvider connectionProvider,
SQLProvider sqlProvider, SQLProvider sqlProvider,
long expirationMillis) throws SQLException { long expirationMillis) {
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "LIVE"); return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(), sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(), sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "LIVE");
} }
static JdbcLeaseLock createBackupLock(String holderId, static JdbcLeaseLock createBackupLock(String holderId,
Connection connection, JDBCConnectionProvider connectionProvider,
SQLProvider sqlProvider, SQLProvider sqlProvider,
long expirationMillis) throws SQLException { long expirationMillis) {
return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "BACKUP"); return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(), sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(), sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "BACKUP");
} }
@Override @Override
protected void prepareStatements() throws SQLException { protected void prepareStatements() {
this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis); this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis);
this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis); this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis);
this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL()); this.readNodeId = sqlProvider.readNodeIdSQL();
this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL()); this.writeNodeId = sqlProvider.writeNodeIdSQL();
this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL()); this.initializeNodeId = sqlProvider.initializeNodeIdSQL();
this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL()); this.writeState = sqlProvider.writeStateSQL();
this.readState = connection.prepareStatement(sqlProvider.readStateSQL()); this.readState = sqlProvider.readStateSQL();
} }
private JdbcSharedStateManager(String holderId, long lockExpirationMillis) { private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
@ -174,17 +112,18 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
return this.backupLock; return this.backupLock;
} }
private UUID rawReadNodeId() throws SQLException { private UUID rawReadNodeId(Connection connection) throws SQLException {
final PreparedStatement preparedStatement = this.readNodeId; try (PreparedStatement preparedStatement = connection.prepareStatement(this.readNodeId)) {
try (ResultSet resultSet = preparedStatement.executeQuery()) { try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) { if (!resultSet.next()) {
return null;
} else {
final String nodeId = resultSet.getString(1);
if (nodeId != null) {
return new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeId));
} else {
return null; return null;
} else {
final String nodeId = resultSet.getString(1);
if (nodeId != null) {
return new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeId));
} else {
return null;
}
} }
} }
} }
@ -192,65 +131,71 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
@Override @Override
public UUID readNodeId() { public UUID readNodeId() {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit(); final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(true); connection.setAutoCommit(true);
try { try {
return rawReadNodeId(); return rawReadNodeId(connection);
} finally { } finally {
connection.setAutoCommit(autoCommit); connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) { } catch (SQLException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
} catch (SQLException e) {
throw new IllegalStateException(e);
} }
} }
@Override @Override
public void writeNodeId(UUID nodeId) { public void writeNodeId(UUID nodeId) {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
final boolean autoCommit = connection.getAutoCommit(); final boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(true); connection.setAutoCommit(true);
try { try {
rawWriteNodeId(nodeId); rawWriteNodeId(connection, nodeId);
} finally { } finally {
connection.setAutoCommit(autoCommit); connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) { } catch (SQLException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
} catch (SQLException e) {
throw new IllegalStateException(e);
} }
} }
private void rawWriteNodeId(UUID nodeId) throws SQLException { private void rawWriteNodeId(Connection connection, UUID nodeId) throws SQLException {
final PreparedStatement preparedStatement = this.writeNodeId; try (PreparedStatement preparedStatement = connection.prepareStatement(this.writeNodeId)) {
preparedStatement.setString(1, nodeId.toString()); preparedStatement.setString(1, nodeId.toString());
if (preparedStatement.executeUpdate() != 1) { if (preparedStatement.executeUpdate() != 1) {
throw new IllegalStateException("can't write NodeId on the JDBC Node Manager Store!"); throw new IllegalStateException("can't write NodeId on the JDBC Node Manager Store!");
}
} }
} }
private boolean rawInitializeNodeId(UUID nodeId) throws SQLException { private boolean rawInitializeNodeId(Connection connection, UUID nodeId) throws SQLException {
final PreparedStatement preparedStatement = this.initializeNodeId; try (PreparedStatement preparedStatement = connection.prepareStatement(this.initializeNodeId)) {
preparedStatement.setString(1, nodeId.toString()); preparedStatement.setString(1, nodeId.toString());
final int rows = preparedStatement.executeUpdate(); final int rows = preparedStatement.executeUpdate();
assert rows <= 1; assert rows <= 1;
return rows > 0; return rows > 0;
}
} }
@Override @Override
public UUID setup(Supplier<? extends UUID> nodeIdFactory) { public UUID setup(Supplier<? extends UUID> nodeIdFactory) {
SQLException lastError = null; SQLException lastError = null;
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
final UUID newNodeId = nodeIdFactory.get(); final UUID newNodeId = nodeIdFactory.get();
for (int attempts = 0; attempts < MAX_SETUP_ATTEMPTS; attempts++) { for (int attempts = 0; attempts < MAX_SETUP_ATTEMPTS; attempts++) {
lastError = null; lastError = null;
try { try {
final UUID nodeId = initializeOrReadNodeId(newNodeId); final UUID nodeId = initializeOrReadNodeId(connection, newNodeId);
if (nodeId != null) { if (nodeId != null) {
return nodeId; return nodeId;
} }
@ -259,6 +204,8 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
lastError = e; lastError = e;
} }
} }
} catch (SQLException e) {
lastError = e;
} }
if (lastError != null) { if (lastError != null) {
logger.error("Unable to setup a NodeId on the JDBC shared state", lastError); logger.error("Unable to setup a NodeId on the JDBC shared state", lastError);
@ -268,7 +215,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
throw new IllegalStateException("FAILED TO SETUP the JDBC Shared State NodeId"); throw new IllegalStateException("FAILED TO SETUP the JDBC Shared State NodeId");
} }
private UUID initializeOrReadNodeId(final UUID newNodeId) throws SQLException { private UUID initializeOrReadNodeId(Connection connection, final UUID newNodeId) throws SQLException {
synchronized (connection) { synchronized (connection) {
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
final boolean autoCommit = connection.getAutoCommit(); final boolean autoCommit = connection.getAutoCommit();
@ -276,10 +223,10 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
try { try {
final UUID nodeId; final UUID nodeId;
//optimistic try to initialize nodeId //optimistic try to initialize nodeId
if (rawInitializeNodeId(newNodeId)) { if (rawInitializeNodeId(connection, newNodeId)) {
nodeId = newNodeId; nodeId = newNodeId;
} else { } else {
nodeId = rawReadNodeId(); nodeId = rawReadNodeId(connection);
} }
if (nodeId != null) { if (nodeId != null) {
connection.commit(); connection.commit();
@ -335,76 +282,65 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
@Override @Override
public State readState() { public State readState() {
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); final boolean autoCommit = connection.getAutoCommit();
final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false);
connection.setAutoCommit(false); final State state;
final State state; try (PreparedStatement preparedStatement = connection.prepareStatement(this.readState)) {
try { try (ResultSet resultSet = preparedStatement.executeQuery()) {
final PreparedStatement preparedStatement = this.readState; if (!resultSet.next()) {
try (ResultSet resultSet = preparedStatement.executeQuery()) { state = State.FIRST_TIME_START;
if (!resultSet.next()) { } else {
state = State.FIRST_TIME_START; state = decodeState(resultSet.getString(1));
} else {
state = decodeState(resultSet.getString(1));
}
} }
connection.commit();
return state;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) { connection.commit();
throw new IllegalStateException(e); return state;
} catch (SQLException ie) {
connection.rollback();
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) {
throw new IllegalStateException(e);
} }
} }
@Override @Override
public void writeState(State state) { public void writeState(State state) {
final String encodedState = encodeState(state); final String encodedState = encodeState(state);
synchronized (connection) { try (Connection connection = connectionProvider.getConnection()) {
try { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); final boolean autoCommit = connection.getAutoCommit();
final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false);
connection.setAutoCommit(false); try (PreparedStatement preparedStatement = connection.prepareStatement(this.writeState)) {
try { preparedStatement.setString(1, encodedState);
final PreparedStatement preparedStatement = this.writeState; if (preparedStatement.executeUpdate() != 1) {
preparedStatement.setString(1, encodedState); throw new IllegalStateException("can't write state to the JDBC Node Manager Store!");
if (preparedStatement.executeUpdate() != 1) {
throw new IllegalStateException("can't write state to the JDBC Node Manager Store!");
}
connection.commit();
} catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) { connection.commit();
throw new IllegalStateException(e); } catch (SQLException ie) {
connection.rollback();
connection.setAutoCommit(true);
throw new IllegalStateException(ie);
} finally {
connection.setAutoCommit(autoCommit);
} }
} catch (SQLException e) {
throw new IllegalStateException(e);
} }
} }
@Override @Override
public void stop() throws SQLException { public void stop() throws SQLException {
//release all the managed resources inside the connection lock //release all the managed resources inside the connection lock
synchronized (connection) { //synchronized (connection) {
this.readNodeId.close(); this.liveLock.close();
this.writeNodeId.close(); this.backupLock.close();
this.initializeNodeId.close(); super.stop();
this.readState.close(); //}
this.writeState.close();
this.liveLock.close();
this.backupLock.close();
super.stop();
}
} }
@Override @Override

View File

@ -2361,14 +2361,14 @@
<xsd:complexType name="databaseStoreType"> <xsd:complexType name="databaseStoreType">
<xsd:all> <xsd:all>
<xsd:element name="jdbc-driver-class-name" type="xsd:string" minOccurs="1" maxOccurs="1"> <xsd:element name="jdbc-driver-class-name" type="xsd:string" minOccurs="0" maxOccurs="1">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
The JDBC Driver class name The JDBC Driver class name
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="jdbc-connection-url" type="xsd:string" minOccurs="1" maxOccurs="1"> <xsd:element name="jdbc-connection-url" type="xsd:string" minOccurs="0" maxOccurs="1">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
The JDBC Connection URL e.g. jdbc:mysql://localhost:3306/ The JDBC Connection URL e.g. jdbc:mysql://localhost:3306/
@ -2391,6 +2391,31 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="data-source-class-name" type="xsd:string" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The DataSource class name
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="data-source-properties" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
A list of options for the DataSource
</xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element name="data-source-property" type="dataSourcePropertyType" minOccurs="1" maxOccurs="unbounded">
<xsd:annotation>
<xsd:documentation>
A key-value pair option for the DataSource
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
<xsd:element name="message-table-name" type="xsd:string" minOccurs="1" maxOccurs="1"> <xsd:element name="message-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
@ -2458,6 +2483,23 @@
<xsd:attributeGroup ref="xml:specialAttrs"/> <xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType> </xsd:complexType>
<xsd:complexType name="dataSourcePropertyType">
<xsd:attribute name="key" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Configuration option key
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="value" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Configuration option value
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="haPolicyType"> <xsd:complexType name="haPolicyType">
<xsd:choice> <xsd:choice>
<xsd:element name="live-only" type="haLiveOnlyPolicyType" minOccurs="0" maxOccurs="1"> <xsd:element name="live-only" type="haLiveOnlyPolicyType" minOccurs="0" maxOccurs="1">

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.server.impl.jdbc; package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.sql.SQLException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
@ -67,10 +66,10 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
return JdbcSharedStateManager return JdbcSharedStateManager
.createLiveLock( .createLiveLock(
UUID.randomUUID().toString(), UUID.randomUUID().toString(),
jdbcSharedStateManager.getConnection(), jdbcSharedStateManager.getJdbcConnectionProvider(),
sqlProvider, sqlProvider,
acquireMillis); acquireMillis);
} catch (SQLException e) { } catch (Exception e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
} }
@ -85,20 +84,18 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
if (withExistingTable) { if (withExistingTable) {
TestJDBCDriver testDriver = TestJDBCDriver TestJDBCDriver testDriver = TestJDBCDriver
.usingConnectionUrl( .usingDbConf(
dbConf.getJdbcConnectionUrl(), dbConf,
dbConf.getJdbcDriverClassName(),
sqlProvider); sqlProvider);
testDriver.start(); testDriver.start();
testDriver.stop(); testDriver.stop();
} }
jdbcSharedStateManager = JdbcSharedStateManager jdbcSharedStateManager = JdbcSharedStateManager
.usingConnectionUrl( .usingConnectionProvider(
UUID.randomUUID().toString(), UUID.randomUUID().toString(),
dbConf.getJdbcLockExpirationMillis(), dbConf.getJdbcLockExpirationMillis(),
dbConf.getJdbcConnectionUrl(), dbConf.getConnectionProvider(),
dbConf.getJdbcDriverClassName(),
sqlProvider); sqlProvider);
} }

View File

@ -41,19 +41,17 @@ public class JdbcSharedStateManagerTest extends ActiveMQTestBase {
} }
private TestJDBCDriver createFakeDriver(boolean initializeTable) { private TestJDBCDriver createFakeDriver(boolean initializeTable) {
return TestJDBCDriver.usingConnectionUrl( return TestJDBCDriver.usingDbConf(
dbConf.getJdbcConnectionUrl(), dbConf,
dbConf.getJdbcDriverClassName(),
sqlProvider, sqlProvider,
initializeTable); initializeTable);
} }
private JdbcSharedStateManager createSharedStateManager() { private JdbcSharedStateManager createSharedStateManager() {
return JdbcSharedStateManager.usingConnectionUrl( return JdbcSharedStateManager.usingConnectionProvider(
UUID.randomUUID().toString(), UUID.randomUUID().toString(),
dbConf.getJdbcLockExpirationMillis(), dbConf.getJdbcLockExpirationMillis(),
dbConf.getJdbcConnectionUrl(), dbConf.getConnectionProvider(),
dbConf.getJdbcDriverClassName(),
sqlProvider); sqlProvider);
} }

View File

@ -16,28 +16,28 @@
*/ */
package org.apache.activemq.artemis.core.server.impl.jdbc; package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.junit.Assert; import org.junit.Assert;
public class TestJDBCDriver extends AbstractJDBCDriver { public class TestJDBCDriver extends AbstractJDBCDriver {
public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl, public static TestJDBCDriver usingDbConf(DatabaseStorageConfiguration dbConf,
String jdbcDriverClass, SQLProvider provider) {
SQLProvider provider) { return usingDbConf(dbConf, provider, false);
return usingConnectionUrl(jdbcConnectionUrl, jdbcDriverClass, provider, false);
} }
public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl, public static TestJDBCDriver usingDbConf(DatabaseStorageConfiguration dbConf,
String jdbcDriverClass, SQLProvider provider,
SQLProvider provider, boolean initialize) {
boolean initialize) {
TestJDBCDriver driver = new TestJDBCDriver(initialize); TestJDBCDriver driver = new TestJDBCDriver(initialize);
driver.setSqlProvider(provider); driver.setSqlProvider(provider);
driver.setJdbcConnectionUrl(jdbcConnectionUrl); driver.setJdbcConnectionProvider(dbConf.getConnectionProvider());
driver.setJdbcDriverClass(jdbcDriverClass);
return driver; return driver;
} }
@ -48,12 +48,11 @@ public class TestJDBCDriver extends AbstractJDBCDriver {
} }
@Override @Override
protected void prepareStatements() throws SQLException { protected void prepareStatements() { }
}
@Override @Override
protected void createSchema() throws SQLException { protected void createSchema() {
try { try (Connection connection = getJdbcConnectionProvider().getConnection()) {
connection.createStatement().execute(sqlProvider.createNodeManagerStoreTableSQL()); connection.createStatement().execute(sqlProvider.createNodeManagerStoreTableSQL());
if (initialize) { if (initialize) {
connection.createStatement().execute(sqlProvider.createNodeIdSQL()); connection.createStatement().execute(sqlProvider.createNodeIdSQL());

View File

@ -747,6 +747,13 @@
<version>2.7.2</version> <version>2.7.2</version>
</dependency> </dependency>
<!-- needed by artemis-jdbc-store -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.1.1</version>
</dependency>
<!-- Needed for Micrometer --> <!-- Needed for Micrometer -->
<dependency> <dependency>
<groupId>io.micrometer</groupId> <groupId>io.micrometer</groupId>

View File

@ -123,7 +123,7 @@ public class JDBCJournalTest extends ActiveMQTestBase {
SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL); SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL);
scheduledExecutorService = new ScheduledThreadPoolExecutor(5); scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
executorService = Executors.newSingleThreadExecutor(); executorService = Executors.newSingleThreadExecutor();
journal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), getJdbcUser(), getJdbcPassword(), dbConf.getJdbcDriverClassName(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() { journal = new JDBCJournalImpl(dbConf.getConnectionProvider(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() {
@Override @Override
public void onIOException(Throwable code, String message, SequentialFile file) { public void onIOException(Throwable code, String message, SequentialFile file) {
@ -145,10 +145,7 @@ public class JDBCJournalTest extends ActiveMQTestBase {
public void testConcurrentEmptyJournal() throws SQLException { public void testConcurrentEmptyJournal() throws SQLException {
Assert.assertTrue(journal.isStarted()); Assert.assertTrue(journal.isStarted());
Assert.assertEquals(0, journal.getNumberOfRecords()); Assert.assertEquals(0, journal.getNumberOfRecords());
final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getConnectionProvider(),
getJdbcUser(),
getJdbcPassword(),
dbConf.getJdbcDriverClassName(),
sqlProvider, scheduledExecutorService, sqlProvider, scheduledExecutorService,
executorService, (code, message, file) -> { executorService, (code, message, file) -> {
Assert.fail(message); Assert.fail(message);