From 2faafec737b10f68aa38925825029b7136f23ee1 Mon Sep 17 00:00:00 2001 From: Mikko Uoti Date: Thu, 28 May 2020 09:42:26 +0300 Subject: [PATCH] ARTEMIS-2823 Use datasource with JDBC store db connections Replaces direct jdbc connections with dbcp2 datasource. Adds configuration options to use alternative datasources and to alter the parameters. While adding slight overhead, this vastly improves the management and pooling capabilities with db connections. --- .../config/ActiveMQDefaultConfiguration.java | 7 + artemis-jdbc-store/pom.xml | 7 + .../store/drivers/AbstractJDBCDriver.java | 395 +++++------------- .../store/drivers/JDBCConnectionProvider.java | 103 +++++ .../store/drivers/JDBCDataSourceUtils.java | 48 +++ .../artemis/jdbc/store/drivers/JDBCUtils.java | 8 + .../store/file/Db2SequentialFileDriver.java | 40 +- .../jdbc/store/file/JDBCFileUtils.java | 48 +-- .../store/file/JDBCSequentialFileFactory.java | 46 +- .../file/JDBCSequentialFileFactoryDriver.java | 256 ++++++------ .../file/PostgresLargeObjectManager.java | 39 +- ...ostgresSequentialSequentialFileDriver.java | 100 +++-- .../jdbc/store/journal/JDBCJournalImpl.java | 289 ++++++------- .../jdbc/store/sql/PropertySQLProvider.java | 29 +- .../file/JDBCSequentialFileFactoryTest.java | 14 +- .../file/PostgresLargeObjectManagerTest.java | 10 +- .../storage/DatabaseStorageConfiguration.java | 55 ++- .../impl/FileConfigurationParser.java | 10 + .../impl/PagingStoreFactoryDatabase.java | 51 +-- .../journal/JDBCJournalStorageManager.java | 53 ++- .../core/server/impl/jdbc/JdbcLeaseLock.java | 323 +++++++------- .../server/impl/jdbc/JdbcNodeManager.java | 91 +--- .../impl/jdbc/JdbcSharedStateManager.java | 272 +++++------- .../schema/artemis-configuration.xsd | 46 +- .../server/impl/jdbc/JdbcLeaseLockTest.java | 15 +- .../impl/jdbc/JdbcSharedStateManagerTest.java | 10 +- .../core/server/impl/jdbc/TestJDBCDriver.java | 27 +- pom.xml | 7 + .../jdbc/store/journal/JDBCJournalTest.java | 7 +- 29 files changed, 1142 insertions(+), 1264 deletions(-) create mode 100644 artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCConnectionProvider.java create mode 100644 artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCDataSourceUtils.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 12892ca572..cb6bfec78d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -455,6 +455,9 @@ public final class ActiveMQDefaultConfiguration { // Default JDBC Driver class name, derby by default just for demo purposes private static String DEFAULT_JDBC_DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; + // Default JDBC Driver class name. DBCP2 BasicDataSource is used by default. + private static String DEFAULT_JDBC_DATA_SOURCE_CLASS_NAME = "org.apache.commons.dbcp2.BasicDataSource"; + // Default message table name, used with Database storage type private static String DEFAULT_MESSAGE_TABLE_NAME = "MESSAGES"; @@ -1392,6 +1395,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_JDBC_DRIVER_CLASS_NAME; } + public static String getDefaultDataSourceClassName() { + return DEFAULT_JDBC_DATA_SOURCE_CLASS_NAME; + } + public static String getDefaultLargeMessagesTableName() { return DEFAULT_LARGE_MESSAGES_TABLE_NAME; } diff --git a/artemis-jdbc-store/pom.xml b/artemis-jdbc-store/pom.xml index bd5a3b07fc..fe439201ca 100644 --- a/artemis-jdbc-store/pom.xml +++ b/artemis-jdbc-store/pom.xml @@ -81,6 +81,13 @@ ${project.version} + + + org.apache.commons + commons-dbcp2 + 2.1.1 + + org.apache.derby diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java index 7168cf025a..ab89fc43db 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java @@ -16,23 +16,15 @@ */ package org.apache.activemq.artemis.jdbc.store.drivers; -import javax.sql.DataSource; import java.sql.Connection; -import java.sql.Driver; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLWarning; import java.sql.Statement; import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; -import org.apache.activemq.artemis.jdbc.store.logging.LoggingConnection; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.jboss.logging.Logger; /** @@ -43,80 +35,27 @@ public abstract class AbstractJDBCDriver { private static final Logger logger = Logger.getLogger(AbstractJDBCDriver.class); - protected Connection connection; - protected SQLProvider sqlProvider; - private String jdbcConnectionUrl; + protected JDBCConnectionProvider connectionProvider; - private String jdbcDriverClass; + public AbstractJDBCDriver() { } - private DataSource dataSource; - - private Executor networkTimeoutExecutor; - - private int networkTimeoutMillis; - - private String user; - - private String password; - - public AbstractJDBCDriver() { - this.networkTimeoutExecutor = null; - this.networkTimeoutMillis = -1; - } - - public AbstractJDBCDriver(SQLProvider sqlProvider, String jdbcConnectionUrl, String user, String password, String jdbcDriverClass) { - this.jdbcConnectionUrl = jdbcConnectionUrl; - this.user = user; - this.password = password; - this.jdbcDriverClass = jdbcDriverClass; - this.sqlProvider = sqlProvider; - this.networkTimeoutExecutor = null; - this.networkTimeoutMillis = -1; - } - - public AbstractJDBCDriver(DataSource dataSource, SQLProvider provider) { - this.dataSource = dataSource; + public AbstractJDBCDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) { + this.connectionProvider = connectionProvider; this.sqlProvider = provider; - this.networkTimeoutExecutor = null; - this.networkTimeoutMillis = -1; } public void start() throws SQLException { - connect(); - synchronized (connection) { - createSchema(); - prepareStatements(); - } - } - - public AbstractJDBCDriver(Connection connection, SQLProvider sqlProvider) { - if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) { - this.connection = new LoggingConnection(connection, logger); - } else { - this.connection = connection; - } - this.sqlProvider = sqlProvider; - this.networkTimeoutExecutor = null; - this.networkTimeoutMillis = -1; + createSchema(); + prepareStatements(); } public void stop() throws SQLException { - synchronized (connection) { - if (sqlProvider.closeConnectionOnShutdown()) { - try { - connection.setAutoCommit(true); - connection.close(); - } catch (SQLException e) { - logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); - throw e; - } - } - } + } - protected abstract void prepareStatements() throws SQLException; + protected abstract void prepareStatements(); protected abstract void createSchema() throws SQLException; @@ -124,217 +63,116 @@ public abstract class AbstractJDBCDriver { createTableIfNotExists(sqlProvider.getTableName(), schemaSqls); } - private void connect() throws SQLException { - if (connection == null) { - if (dataSource != null) { - try { - connection = dataSource.getConnection(); - - if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) { - this.connection = new LoggingConnection(connection, logger); - } - - } catch (SQLException e) { - logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); - throw e; - } - } else { - try { - if (jdbcDriverClass == null || jdbcDriverClass.isEmpty()) { - throw new IllegalStateException("jdbcDriverClass is null or empty!"); - } - if (jdbcConnectionUrl == null || jdbcConnectionUrl.isEmpty()) { - throw new IllegalStateException("jdbcConnectionUrl is null or empty!"); - } - final Driver dbDriver = getDriver(jdbcDriverClass); - Properties properties = new Properties(); - if (user != null) { - properties.setProperty("user", user); - properties.setProperty("password", password); - } - connection = dbDriver.connect(jdbcConnectionUrl, properties); - - if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) { - this.connection = new LoggingConnection(connection, logger); - } - - if (connection == null) { - throw new IllegalStateException("the driver: " + jdbcDriverClass + " isn't able to connect to the requested url: " + jdbcConnectionUrl); - } - } catch (SQLException e) { - logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); - ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl); - throw e; - } - } - if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor == null) { - logger.warn("Unable to set a network timeout on the JDBC connection: networkTimeoutExecutor is null"); - } - if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor != null) { - try { - connection.setNetworkTimeout(this.networkTimeoutExecutor, this.networkTimeoutMillis); - } catch (SQLException e) { - logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); - ActiveMQJournalLogger.LOGGER.warn("Unable to set a network timeout on the JDBC connection"); - } catch (Throwable throwable) { - //it included SecurityExceptions and UnsupportedOperationException - logger.warn("Unable to set a network timeout on the JDBC connection", throwable); - } - } - } - } - public void destroy() throws Exception { final String dropTableSql = "DROP TABLE " + sqlProvider.getTableName(); - try { - connection.setAutoCommit(false); - try (Statement statement = connection.createStatement()) { - statement.executeUpdate(dropTableSql); - } - connection.commit(); - } catch (SQLException e) { - logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, dropTableSql)); + try (Connection connection = connectionProvider.getConnection()) { try { - connection.rollback(); - } catch (SQLException rollbackEx) { - logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, dropTableSql)); - throw rollbackEx; + connection.setAutoCommit(false); + try (Statement statement = connection.createStatement()) { + statement.executeUpdate(dropTableSql); + } + connection.commit(); + } catch (SQLException e) { + logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, dropTableSql)); + try { + connection.rollback(); + } catch (SQLException rollbackEx) { + logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, dropTableSql)); + throw rollbackEx; + } + throw e; } - throw e; } } private void createTableIfNotExists(String tableName, String... sqls) throws SQLException { logger.tracef("Validating if table %s didn't exist before creating", tableName); - try { - connection.setAutoCommit(false); - final boolean tableExists; - try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) { - if (rs == null || !rs.next()) { - tableExists = false; - if (logger.isTraceEnabled()) { - logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls)); - } - if (rs != null) { - final SQLWarning sqlWarning = rs.getWarnings(); - if (sqlWarning != null) { - logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning)); + try (Connection connection = connectionProvider.getConnection()) { + try { + connection.setAutoCommit(false); + final boolean tableExists; + try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) { + if (rs == null || !rs.next()) { + tableExists = false; + if (logger.isTraceEnabled()) { + logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls)); } - } - } else { - tableExists = true; - } - } - if (tableExists) { - logger.tracef("Validating if the existing table %s is initialized or not", tableName); - try (Statement statement = connection.createStatement(); - ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) { - logger.tracef("Validation of the existing table %s initialization is started", tableName); - int rows; - if (cntRs.next() && (rows = cntRs.getInt(1)) > 0) { - logger.tracef("Table %s did exist but is not empty. Skipping initialization. Found %d rows.", tableName, rows); - if (logger.isDebugEnabled()) { - final long expectedRows = Stream.of(sqls).map(String::toUpperCase).filter(sql -> sql.contains("INSERT INTO")).count(); - if (rows < expectedRows) { - logger.debug("Table " + tableName + " was expected to contain " + expectedRows + " rows while it has " + rows + " rows."); + if (rs != null) { + final SQLWarning sqlWarning = rs.getWarnings(); + if (sqlWarning != null) { + logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning)); } } - connection.commit(); - return; } else { - sqls = Stream.of(sqls).filter(sql -> { - final String upperCaseSql = sql.toUpperCase(); - return !(upperCaseSql.contains("CREATE TABLE") || upperCaseSql.contains("CREATE INDEX")); - }).toArray(String[]::new); - if (sqls.length > 0) { - logger.tracef("Table %s did exist but is empty. Starting initialization.", tableName); + tableExists = true; + } + } + if (tableExists) { + logger.tracef("Validating if the existing table %s is initialized or not", tableName); + try (Statement statement = connection.createStatement(); + ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) { + logger.tracef("Validation of the existing table %s initialization is started", tableName); + int rows; + if (cntRs.next() && (rows = cntRs.getInt(1)) > 0) { + logger.tracef("Table %s did exist but is not empty. Skipping initialization. Found %d rows.", tableName, rows); + if (logger.isDebugEnabled()) { + final long expectedRows = Stream.of(sqls).map(String::toUpperCase).filter(sql -> sql.contains("INSERT INTO")).count(); + if (rows < expectedRows) { + logger.debug("Table " + tableName + " was expected to contain " + expectedRows + " rows while it has " + rows + " rows."); + } + } + connection.commit(); + return; } else { - logger.tracef("Table %s did exist but is empty. Initialization completed: no initialization statements left.", tableName); + sqls = Stream.of(sqls).filter(sql -> { + final String upperCaseSql = sql.toUpperCase(); + return !(upperCaseSql.contains("CREATE TABLE") || upperCaseSql.contains("CREATE INDEX")); + }).toArray(String[]::new); + if (sqls.length > 0) { + logger.tracef("Table %s did exist but is empty. Starting initialization.", tableName); + } else { + logger.tracef("Table %s did exist but is empty. Initialization completed: no initialization statements left.", tableName); + } + } + } catch (SQLException e) { + //that's not a real issue and do not deserve any user-level log: + //some DBMS just return stale information about table existence + //and can fail on later attempts to access them + if (logger.isTraceEnabled()) { + logger.trace(JDBCUtils.appendSQLExceptionDetails(new StringBuilder("Can't verify the initialization of table ").append(tableName).append(" due to:"), e, sqlProvider.getCountJournalRecordsSQL())); + } + try { + connection.rollback(); + } catch (SQLException rollbackEx) { + logger.debug("Rollback failed while validating initialization of a table", rollbackEx); + } + connection.setAutoCommit(false); + logger.tracef("Table %s seems to exist, but we can't verify the initialization. Keep trying to create and initialize.", tableName); + } + } + if (sqls.length > 0) { + try (Statement statement = connection.createStatement()) { + for (String sql : sqls) { + statement.executeUpdate(sql); + final SQLWarning statementSqlWarning = statement.getWarnings(); + if (statementSqlWarning != null) { + logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), statementSqlWarning, sql)); + } } } - } catch (SQLException e) { - //that's not a real issue and do not deserve any user-level log: - //some DBMS just return stale information about table existence - //and can fail on later attempts to access them - if (logger.isTraceEnabled()) { - logger.trace(JDBCUtils.appendSQLExceptionDetails(new StringBuilder("Can't verify the initialization of table ").append(tableName).append(" due to:"), e, sqlProvider.getCountJournalRecordsSQL())); - } - try { - connection.rollback(); - } catch (SQLException rollbackEx) { - logger.debug("Rollback failed while validating initialization of a table", rollbackEx); - } - connection.setAutoCommit(false); - logger.tracef("Table %s seems to exist, but we can't verify the initialization. Keep trying to create and initialize.", tableName); + + connection.commit(); } - } - if (sqls.length > 0) { - try (Statement statement = connection.createStatement()) { - for (String sql : sqls) { - statement.executeUpdate(sql); - final SQLWarning statementSqlWarning = statement.getWarnings(); - if (statementSqlWarning != null) { - logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), statementSqlWarning, sql)); - } - } + } catch (SQLException e) { + final String sqlStatements = String.join("\n", sqls); + logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements)); + try { + connection.rollback(); + } catch (SQLException rollbackEx) { + logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, sqlStatements)); + throw rollbackEx; } - - connection.commit(); - } - } catch (SQLException e) { - final String sqlStatements = String.join("\n", sqls); - logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements)); - try { - connection.rollback(); - } catch (SQLException rollbackEx) { - logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), rollbackEx, sqlStatements)); - throw rollbackEx; - } - throw e; - } - } - - private static AtomicBoolean shutAdded = new AtomicBoolean(false); - - private static class ShutdownDerby extends Thread { - @Override - public void run() { - try { - DriverManager.getConnection("jdbc:derby:;shutdown=true"); - } catch (Exception e) { - } - } - - } - - private Driver getDriver(String className) { - try { - Driver driver = (Driver) Class.forName(className).newInstance(); - - // Shutdown the derby if using the derby embedded driver. - if (className.equals("org.apache.derby.jdbc.EmbeddedDriver")) { - if (shutAdded.compareAndSet(false, true)) { - Runtime.getRuntime().addShutdownHook(new ShutdownDerby()); - } - } - return driver; - } catch (ClassNotFoundException cnfe) { - throw new RuntimeException("Could not find class: " + className); - } catch (Exception e) { - throw new RuntimeException("Unable to instantiate driver class: ", e); - } - } - - public Connection getConnection() { - return connection; - } - - public final void setConnection(Connection connection) { - if (this.connection == null) { - if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) { - this.connection = new LoggingConnection(connection, logger); - } else { - this.connection = connection; + throw e; } } } @@ -343,37 +181,12 @@ public abstract class AbstractJDBCDriver { this.sqlProvider = sqlProvider; } - public void setJdbcConnectionUrl(String jdbcConnectionUrl) { - this.jdbcConnectionUrl = jdbcConnectionUrl; + public void setJdbcConnectionProvider(JDBCConnectionProvider connectionProvider) { + this.connectionProvider = connectionProvider; } - public String getUser() { - return user; - } - - public void setUser(String user) { - this.user = user; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public void setJdbcDriverClass(String jdbcDriverClass) { - this.jdbcDriverClass = jdbcDriverClass; - } - - public void setDataSource(DataSource dataSource) { - this.dataSource = dataSource; - } - - public void setNetworkTimeout(Executor executor, int milliseconds) { - this.networkTimeoutExecutor = executor; - this.networkTimeoutMillis = milliseconds; + public JDBCConnectionProvider getJdbcConnectionProvider() { + return this.connectionProvider; } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCConnectionProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCConnectionProvider.java new file mode 100644 index 0000000000..6d2c9f2ae0 --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCConnectionProvider.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.drivers; + +import org.apache.activemq.artemis.jdbc.store.logging.LoggingConnection; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.jboss.logging.Logger; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +public class JDBCConnectionProvider { + + private static final Logger logger = Logger.getLogger(JDBCConnectionProvider.class); + private DataSource dataSource; + private Executor networkTimeoutExecutor; + private int networkTimeoutMillis; + + public JDBCConnectionProvider(DataSource dataSource) { + this.dataSource = dataSource; + this.networkTimeoutExecutor = null; + this.networkTimeoutMillis = -1; + addDerbyShutdownHook(); + } + + public synchronized Connection getConnection() throws SQLException { + Connection connection; + try { + connection = dataSource.getConnection(); + if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) { + connection = new LoggingConnection(connection, logger); + } + } catch (SQLException e) { + logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); + throw e; + } + + if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor == null) { + logger.warn("Unable to set a network timeout on the JDBC connection: networkTimeoutExecutor is null"); + } + + if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor != null) { + try { + connection.setNetworkTimeout(this.networkTimeoutExecutor, this.networkTimeoutMillis); + } catch (SQLException e) { + logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); + ActiveMQJournalLogger.LOGGER.warn("Unable to set a network timeout on the JDBC connection"); + } catch (Throwable throwable) { + //it included SecurityExceptions and UnsupportedOperationException + logger.warn("Unable to set a network timeout on the JDBC connection", throwable); + } + } + return connection; + } + + private static AtomicBoolean shutAdded = new AtomicBoolean(false); + + private static class ShutdownDerby extends Thread { + @Override + public void run() { + try { + DriverManager.getConnection("jdbc:derby:;shutdown=true"); + } catch (Exception e) { } + } + + } + + public void addDerbyShutdownHook() { + // Shutdown the derby if using the derby embedded driver. + try (Connection connection = getConnection()) { + if (connection.getMetaData().getDriverName().equals("org.apache.derby.jdbc.EmbeddedDriver")) { + if (shutAdded.compareAndSet(false, true)) { + Runtime.getRuntime().addShutdownHook(new ShutdownDerby()); + } + } + } catch (SQLException e) { + logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); + } + } + + public void setNetworkTimeout(Executor executor, int milliseconds) { + this.networkTimeoutExecutor = executor; + this.networkTimeoutMillis = milliseconds; + } +} diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCDataSourceUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCDataSourceUtils.java new file mode 100644 index 0000000000..5e0c3d8309 --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCDataSourceUtils.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.drivers; + +import org.apache.commons.beanutils.PropertyUtils; +import org.jboss.logging.Logger; + +import javax.sql.DataSource; +import java.util.Map; +import java.util.stream.Collectors; + +public class JDBCDataSourceUtils { + + private static final Logger logger = Logger.getLogger(JDBCDataSourceUtils.class); + + public static DataSource getDataSource(String dataSourceClassName, Map dataSourceProperties) { + logger.info(new StringBuilder("Initialising JDBC data source: ").append(dataSourceClassName).append(" ") + .append(dataSourceProperties.keySet().stream() + .map(key -> key + "=" + dataSourceProperties.get(key)) + .collect(Collectors.joining(", ", "{", "}")))); + try { + DataSource dataSource = (DataSource) Class.forName(dataSourceClassName).newInstance(); + for (Map.Entry entry : dataSourceProperties.entrySet()) { + PropertyUtils.setProperty(dataSource, entry.getKey(), entry.getValue()); + } + return dataSource; + } catch (ClassNotFoundException cnfe) { + throw new RuntimeException("Could not find class: " + dataSourceClassName); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate DataSource", e); + } + } + +} diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCUtils.java index ce6dc6f192..37c1e97d89 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCUtils.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCUtils.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.jdbc.store.drivers; import java.sql.SQLException; +import java.util.Map; import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; @@ -39,6 +40,13 @@ public class JDBCUtils { return factory.create(tableName, storeType); } + public static SQLProvider getSQLProvider(Map dataSourceProperties, String tableName, SQLProvider.DatabaseStoreType storeType) { + PropertySQLProvider.Factory.SQLDialect dialect = PropertySQLProvider.Factory.investigateDialect(dataSourceProperties); + logger.tracef("getSQLProvider Returning SQL provider for dialect %s, tableName::%s", dialect, tableName); + PropertySQLProvider.Factory factory = new PropertySQLProvider.Factory(dialect); + return factory.create(tableName, storeType); + } + /** * Append to {@code errorMessage} a detailed description of the provided {@link SQLException}.
* The information appended are: diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/Db2SequentialFileDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/Db2SequentialFileDriver.java index c0066ab495..12da3d2661 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/Db2SequentialFileDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/Db2SequentialFileDriver.java @@ -17,41 +17,38 @@ package org.apache.activemq.artemis.jdbc.store.file; -import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; @SuppressWarnings("SynchronizeOnNonFinalField") public final class Db2SequentialFileDriver extends JDBCSequentialFileFactoryDriver { - private PreparedStatement replaceLargeObject; + private String replaceLargeObject; public Db2SequentialFileDriver() { super(); } - public Db2SequentialFileDriver(DataSource dataSource, SQLProvider provider) { - super(dataSource, provider); - } - - public Db2SequentialFileDriver(Connection connection, SQLProvider provider) { - super(connection, provider); + public Db2SequentialFileDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) { + super(connectionProvider, provider); } @Override - protected void prepareStatements() throws SQLException { - this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL()); - this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), new String[]{"ID"}); - this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName()); - this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL()); - this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL()); - this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL()); - this.replaceLargeObject = connection.prepareStatement(sqlProvider.getReplaceLargeObjectSQL()); - this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL()); - this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL()); + protected void prepareStatements() { + this.deleteFile = sqlProvider.getDeleteFileSQL(); + this.createFile = sqlProvider.getInsertFileSQL(); + this.createFileColumnNames = new String[]{"ID"}; + this.selectFileByFileName = sqlProvider.getSelectFileByFileName(); + this.copyFileRecord = sqlProvider.getCopyFileRecordByIdSQL(); + this.renameFile = sqlProvider.getUpdateFileNameByIdSQL(); + this.readLargeObject = sqlProvider.getReadLargeObjectSQL(); + this.replaceLargeObject = sqlProvider.getReplaceLargeObjectSQL(); + this.appendToLargeObject = sqlProvider.getAppendToLargeObjectSQL(); + this.selectFileNamesByExtension = sqlProvider.getSelectFileNamesByExtensionSQL(); } @Override @@ -59,9 +56,8 @@ public final class Db2SequentialFileDriver extends JDBCSequentialFileFactoryDriv if (data == null || data.length == 0) { return 0; } - final PreparedStatement largeObjectStatement = append ? appendToLargeObject : replaceLargeObject; - synchronized (connection) { - try { + try (Connection connection = connectionProvider.getConnection()) { + try (PreparedStatement largeObjectStatement = connection.prepareStatement(append ? appendToLargeObject : replaceLargeObject)) { connection.setAutoCommit(false); int bytesWritten; largeObjectStatement.setBytes(1, data); @@ -81,4 +77,4 @@ public final class Db2SequentialFileDriver extends JDBCSequentialFileFactoryDriv } } } -} +} \ No newline at end of file diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java index 58ac5b9b5d..1d1ea62ef8 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileUtils.java @@ -17,10 +17,10 @@ package org.apache.activemq.artemis.jdbc.store.file; -import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; @@ -29,54 +29,18 @@ import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Fac class JDBCFileUtils { - static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass, - String jdbcConnectionUrl, - String user, - String password, - SQLProvider provider) throws SQLException { - final JDBCSequentialFileFactoryDriver dbDriver; - final PropertySQLProvider.Factory.SQLDialect sqlDialect = PropertySQLProvider.Factory.identifyDialect(driverClass); - if (POSTGRESQL.equals(sqlDialect)) { - dbDriver = new PostgresSequentialSequentialFileDriver(); - } else if (DB2.equals(sqlDialect)) { - dbDriver = new Db2SequentialFileDriver(); - } else { - dbDriver = new JDBCSequentialFileFactoryDriver(); - } - dbDriver.setSqlProvider(provider); - dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl); - dbDriver.setJdbcDriverClass(driverClass); - dbDriver.setUser(user); - dbDriver.setPassword(password); - return dbDriver; - } - - static JDBCSequentialFileFactoryDriver getDBFileDriver(DataSource dataSource, SQLProvider provider) throws SQLException { + static JDBCSequentialFileFactoryDriver getDBFileDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) throws SQLException { final JDBCSequentialFileFactoryDriver dbDriver; final PropertySQLProvider.Factory.SQLDialect sqlDialect; - try (Connection connection = dataSource.getConnection()) { + try (Connection connection = connectionProvider.getConnection()) { sqlDialect = PropertySQLProvider.Factory.investigateDialect(connection); } if (POSTGRESQL.equals(sqlDialect)) { - dbDriver = new PostgresSequentialSequentialFileDriver(dataSource, provider); + dbDriver = new PostgresSequentialSequentialFileDriver(connectionProvider, provider); } else if (DB2.equals(sqlDialect)) { - dbDriver = new Db2SequentialFileDriver(dataSource, provider); + dbDriver = new Db2SequentialFileDriver(connectionProvider, provider); } else { - dbDriver = new JDBCSequentialFileFactoryDriver(dataSource, provider); - } - return dbDriver; - } - - static JDBCSequentialFileFactoryDriver getDBFileDriver(Connection connection, SQLProvider provider) throws SQLException { - JDBCSequentialFileFactoryDriver dbDriver; - final PropertySQLProvider.Factory.SQLDialect sqlDialect = PropertySQLProvider.Factory.investigateDialect(connection); - if (POSTGRESQL.equals(sqlDialect)) { - dbDriver = new PostgresSequentialSequentialFileDriver(connection, provider); - dbDriver.setConnection(connection); - } else if (DB2.equals(sqlDialect)) { - dbDriver = new Db2SequentialFileDriver(connection, provider); - } else { - dbDriver = new JDBCSequentialFileFactoryDriver(connection, provider); + dbDriver = new JDBCSequentialFileFactoryDriver(connectionProvider, provider); } return dbDriver; } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java index 4cd19fd5ff..c9169c52e8 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -16,10 +16,8 @@ */ package org.apache.activemq.artemis.jdbc.store.file; -import javax.sql.DataSource; import java.io.File; import java.nio.ByteBuffer; -import java.sql.Connection; import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -32,6 +30,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; @@ -53,7 +52,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM private final IOCriticalErrorListener criticalErrorListener; - public JDBCSequentialFileFactory(final DataSource dataSource, + public JDBCSequentialFileFactory(final JDBCConnectionProvider connectionProvider, final SQLProvider sqlProvider, Executor executor, IOCriticalErrorListener criticalErrorListener) throws Exception { @@ -62,38 +61,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM this.criticalErrorListener = criticalErrorListener; try { - this.dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider); - } catch (SQLException e) { - criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null); - } - } - - public JDBCSequentialFileFactory(final String connectionUrl, - String userName, - String password, - final String className, - final SQLProvider sqlProvider, - Executor executor, - IOCriticalErrorListener criticalErrorListener) throws Exception { - this.executor = executor; - this.criticalErrorListener = criticalErrorListener; - try { - this.dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, userName, password, sqlProvider); - } catch (SQLException e) { - criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null); - } - - } - - public JDBCSequentialFileFactory(final Connection connection, - final SQLProvider sqlProvider, - final Executor executor, - final IOCriticalErrorListener criticalErrorListener) throws Exception { - this.executor = executor; - this.criticalErrorListener = criticalErrorListener; - - try { - this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider); + this.dbDriver = JDBCFileUtils.getDBFileDriver(connectionProvider, sqlProvider); } catch (SQLException e) { criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null); } @@ -103,14 +71,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM return dbDriver; } - /** - * @see Connection#setNetworkTimeout(Executor, int) - **/ - public JDBCSequentialFileFactory setNetworkTimeout(Executor executor, int milliseconds) { - this.dbDriver.setNetworkTimeout(executor, milliseconds); - return this; - } - @Override public SequentialFileFactory setDatasync(boolean enabled) { return this; diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java index 7d9aded43d..ab6906c3cb 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.jdbc.store.file; -import javax.sql.DataSource; import java.nio.ByteBuffer; import java.sql.Blob; import java.sql.Connection; @@ -28,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.jboss.logging.Logger; @@ -36,32 +36,25 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { private static final Logger logger = Logger.getLogger(JDBCSequentialFileFactoryDriver.class); - protected PreparedStatement deleteFile; - - protected PreparedStatement createFile; - - protected PreparedStatement selectFileByFileName; - - protected PreparedStatement copyFileRecord; - - protected PreparedStatement renameFile; - - protected PreparedStatement readLargeObject; - - protected PreparedStatement appendToLargeObject; - - protected PreparedStatement selectFileNamesByExtension; + protected String deleteFile; + protected String createFile; + protected String[] createFileColumnNames; + protected int createFileAutogeneratedKeys; + protected String selectFileByFileName; + protected String copyFileRecord; + protected String renameFile; + protected String readLargeObject; + protected String appendToLargeObject; + protected Integer appendToLargeObjectResultSetType; + protected Integer appendToLargeObjectResultSetConcurrency; + protected String selectFileNamesByExtension; JDBCSequentialFileFactoryDriver() { super(); } - JDBCSequentialFileFactoryDriver(DataSource dataSource, SQLProvider provider) { - super(dataSource, provider); - } - - JDBCSequentialFileFactoryDriver(Connection connection, SQLProvider sqlProvider) { - super(connection, sqlProvider); + JDBCSequentialFileFactoryDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) { + super(connectionProvider, provider); } @Override @@ -70,22 +63,25 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { } @Override - protected void prepareStatements() throws SQLException { - this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL()); - this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), new String[] {"ID"}); - this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName()); - this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL()); - this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL()); - this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL()); - this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE); - this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL()); + protected void prepareStatements() { + this.deleteFile = sqlProvider.getDeleteFileSQL(); + this.createFile = sqlProvider.getInsertFileSQL(); + this.createFileColumnNames = new String[] {"ID"}; + this.selectFileByFileName = sqlProvider.getSelectFileByFileName(); + this.copyFileRecord = sqlProvider.getCopyFileRecordByIdSQL(); + this.renameFile = sqlProvider.getUpdateFileNameByIdSQL(); + this.readLargeObject = sqlProvider.getReadLargeObjectSQL(); + this.appendToLargeObject = sqlProvider.getAppendToLargeObjectSQL(); + this.appendToLargeObjectResultSetType = ResultSet.TYPE_FORWARD_ONLY; + this.appendToLargeObjectResultSetConcurrency = ResultSet.CONCUR_UPDATABLE; + this.selectFileNamesByExtension = sqlProvider.getSelectFileNamesByExtensionSQL(); } public List listFiles(String extension) throws Exception { - synchronized (connection) { - List fileNames = new ArrayList<>(); - try { - connection.setAutoCommit(false); + List fileNames = new ArrayList<>(); + try (Connection connection = connectionProvider.getConnection()) { + connection.setAutoCommit(false); + try (PreparedStatement selectFileNamesByExtension = connection.prepareStatement(this.selectFileNamesByExtension)) { selectFileNamesByExtension.setString(1, extension); try (ResultSet rs = selectFileNamesByExtension.executeQuery()) { while (rs.next()) { @@ -97,8 +93,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { connection.rollback(); throw e; } - return fileNames; } + return fileNames; } /** @@ -108,14 +104,12 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @throws SQLException */ public void openFile(JDBCSequentialFile file) throws SQLException { - synchronized (connection) { - final long fileId = fileExists(file); - if (fileId < 0) { - createFile(file); - } else { - file.setId(fileId); - loadFile(file); - } + final long fileId = fileExists(file); + if (fileId < 0) { + createFile(file); + } else { + file.setId(fileId); + loadFile(file); } } @@ -131,18 +125,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @throws SQLException */ public long fileExists(JDBCSequentialFile file) throws SQLException { - try { - synchronized (connection) { + try (Connection connection = connectionProvider.getConnection()) { + try (PreparedStatement selectFileByFileName = connection.prepareStatement(this.selectFileByFileName)) { connection.setAutoCommit(false); selectFileByFileName.setString(1, file.getFileName()); try (ResultSet rs = selectFileByFileName.executeQuery()) { final long id = rs.next() ? rs.getLong(1) : -1; connection.commit(); return id; - } catch (Exception e) { - connection.rollback(); - throw e; } + } catch (Exception e) { + connection.rollback(); + throw e; } } catch (NullPointerException npe) { npe.printStackTrace(); @@ -157,20 +151,22 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @throws SQLException */ public void loadFile(JDBCSequentialFile file) throws SQLException { - synchronized (connection) { - connection.setAutoCommit(false); - readLargeObject.setLong(1, file.getId()); + try (Connection connection = connectionProvider.getConnection()) { + try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) { + connection.setAutoCommit(false); + readLargeObject.setLong(1, file.getId()); - try (ResultSet rs = readLargeObject.executeQuery()) { - if (rs.next()) { - Blob blob = rs.getBlob(1); - if (blob != null) { - file.setWritePosition(blob.length()); - } else { - logger.trace("No Blob found for file: " + file.getFileName() + " " + file.getId()); + try (ResultSet rs = readLargeObject.executeQuery()) { + if (rs.next()) { + Blob blob = rs.getBlob(1); + if (blob != null) { + file.setWritePosition(blob.length()); + } else { + logger.trace("No Blob found for file: " + file.getFileName() + " " + file.getId()); + } } + connection.commit(); } - connection.commit(); } catch (SQLException e) { connection.rollback(); throw e; @@ -185,18 +181,23 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @throws SQLException */ public void createFile(JDBCSequentialFile file) throws SQLException { - synchronized (connection) { + try (Connection connection = connectionProvider.getConnection()) { try { connection.setAutoCommit(false); - createFile.setString(1, file.getFileName()); - createFile.setString(2, file.getExtension()); - createFile.setBytes(3, new byte[0]); - createFile.executeUpdate(); - try (ResultSet keys = createFile.getGeneratedKeys()) { - keys.next(); - file.setId(keys.getLong(1)); + try (PreparedStatement createFile = + createFileColumnNames != null ? + connection.prepareStatement(this.createFile, this.createFileColumnNames) : + connection.prepareStatement(this.createFile, this.createFileAutogeneratedKeys)) { + createFile.setString(1, file.getFileName()); + createFile.setString(2, file.getExtension()); + createFile.setBytes(3, new byte[0]); + createFile.executeUpdate(); + try (ResultSet keys = createFile.getGeneratedKeys()) { + keys.next(); + file.setId(keys.getLong(1)); + } + connection.commit(); } - connection.commit(); } catch (SQLException e) { connection.rollback(); throw e; @@ -212,9 +213,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @throws SQLException */ public void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException { - synchronized (connection) { - try { - connection.setAutoCommit(false); + try (Connection connection = connectionProvider.getConnection()) { + connection.setAutoCommit(false); + try (PreparedStatement renameFile = connection.prepareStatement(this.renameFile)) { renameFile.setString(1, newFileName); renameFile.setLong(2, file.getId()); renameFile.executeUpdate(); @@ -233,8 +234,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @throws SQLException */ public void deleteFile(JDBCSequentialFile file) throws SQLException { - synchronized (connection) { - try { + try (Connection connection = connectionProvider.getConnection()) { + try (PreparedStatement deleteFile = connection.prepareStatement(this.deleteFile)) { connection.setAutoCommit(false); deleteFile.setLong(1, file.getId()); deleteFile.executeUpdate(); @@ -259,31 +260,36 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @throws SQLException */ public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException { - synchronized (connection) { + try (Connection connection = connectionProvider.getConnection()) { connection.setAutoCommit(false); - appendToLargeObject.setLong(1, file.getId()); + try (PreparedStatement appendToLargeObject = + this.appendToLargeObjectResultSetType != null && this.appendToLargeObjectResultSetConcurrency != null ? + connection.prepareStatement(this.appendToLargeObject, this.appendToLargeObjectResultSetType, this.appendToLargeObjectResultSetConcurrency) : + connection.prepareStatement(this.appendToLargeObject)) { + appendToLargeObject.setLong(1, file.getId()); - int bytesWritten = 0; - try (ResultSet rs = appendToLargeObject.executeQuery()) { - if (rs.next()) { - Blob blob = rs.getBlob(1); - if (blob == null) { - blob = connection.createBlob(); + int bytesWritten = 0; + try (ResultSet rs = appendToLargeObject.executeQuery()) { + if (rs.next()) { + Blob blob = rs.getBlob(1); + if (blob == null) { + blob = connection.createBlob(); + } + if (append) { + bytesWritten = blob.setBytes(blob.length() + 1, data); + } else { + blob.truncate(0); + bytesWritten = blob.setBytes(1, data); + } + rs.updateBlob(1, blob); + rs.updateRow(); } - if (append) { - bytesWritten = blob.setBytes(blob.length() + 1, data); - } else { - blob.truncate(0); - bytesWritten = blob.setBytes(1, data); - } - rs.updateBlob(1, blob); - rs.updateRow(); + connection.commit(); + return bytesWritten; + } catch (SQLException e) { + connection.rollback(); + throw e; } - connection.commit(); - return bytesWritten; - } catch (SQLException e) { - connection.rollback(); - throw e; } } } @@ -297,35 +303,37 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @throws SQLException */ public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException { - synchronized (connection) { + try (Connection connection = connectionProvider.getConnection()) { connection.setAutoCommit(false); - readLargeObject.setLong(1, file.getId()); - int readLength = 0; - try (ResultSet rs = readLargeObject.executeQuery()) { - if (rs.next()) { - final Blob blob = rs.getBlob(1); - if (blob != null) { - final long blobLength = blob.length(); - final int bytesRemaining = bytes.remaining(); - final long filePosition = file.position(); - readLength = (int) calculateReadLength(blobLength, bytesRemaining, filePosition); - if (logger.isDebugEnabled()) { - logger.debugf("trying read %d bytes: blobLength = %d bytesRemaining = %d filePosition = %d", - readLength, blobLength, bytesRemaining, filePosition); - } - if (readLength < 0) { - readLength = -1; - } else if (readLength > 0) { - byte[] data = blob.getBytes(file.position() + 1, readLength); - bytes.put(data); + try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) { + readLargeObject.setLong(1, file.getId()); + int readLength = 0; + try (ResultSet rs = readLargeObject.executeQuery()) { + if (rs.next()) { + final Blob blob = rs.getBlob(1); + if (blob != null) { + final long blobLength = blob.length(); + final int bytesRemaining = bytes.remaining(); + final long filePosition = file.position(); + readLength = (int) calculateReadLength(blobLength, bytesRemaining, filePosition); + if (logger.isDebugEnabled()) { + logger.debugf("trying read %d bytes: blobLength = %d bytesRemaining = %d filePosition = %d", + readLength, blobLength, bytesRemaining, filePosition); + } + if (readLength < 0) { + readLength = -1; + } else if (readLength > 0) { + byte[] data = blob.getBytes(file.position() + 1, readLength); + bytes.put(data); + } } } + connection.commit(); + return readLength; + } catch (SQLException e) { + connection.rollback(); + throw e; } - connection.commit(); - return readLength; - } catch (SQLException e) { - connection.rollback(); - throw e; } } } @@ -338,9 +346,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { * @throws SQLException */ public void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException { - synchronized (connection) { - try { - connection.setAutoCommit(false); + try (Connection connection = connectionProvider.getConnection()) { + connection.setAutoCommit(false); + try (PreparedStatement copyFileRecord = connection.prepareStatement(this.copyFileRecord)) { copyFileRecord.setLong(1, fileFrom.getId()); copyFileRecord.setLong(2, fileTo.getId()); copyFileRecord.executeUpdate(); @@ -357,7 +365,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { */ @Override public void destroy() throws SQLException { - synchronized (connection) { + try (Connection connection = connectionProvider.getConnection()) { try { connection.setAutoCommit(false); try (Statement statement = connection.createStatement()) { diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java index 07212c40d0..65d2d1e695 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java @@ -19,6 +19,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.sql.Connection; import java.sql.SQLException; + import org.postgresql.PGConnection; import org.postgresql.largeobject.LargeObject; @@ -43,11 +44,10 @@ public class PostgresLargeObjectManager { */ public static final int READWRITE = READ | WRITE; - private final Connection realConnection; private boolean shouldUseReflection; - public PostgresLargeObjectManager(Connection connection) throws SQLException { - this.realConnection = unwrap(connection); + + public PostgresLargeObjectManager() { try { this.getClass().getClassLoader().loadClass("org.postgresql.PGConnection"); shouldUseReflection = false; @@ -56,9 +56,9 @@ public class PostgresLargeObjectManager { } } - public final Long createLO() throws SQLException { + public final Long createLO(Connection connection) throws SQLException { if (shouldUseReflection) { - Object largeObjectManager = getLargeObjectManager(); + Object largeObjectManager = getLargeObjectManager(connection); try { Method method = largeObjectManager.getClass().getMethod("createLO"); return (Long) method.invoke(largeObjectManager); @@ -66,13 +66,13 @@ public class PostgresLargeObjectManager { throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex); } } else { - return ((PGConnection) realConnection).getLargeObjectAPI().createLO(); + return ((PGConnection) unwrap(connection)).getLargeObjectAPI().createLO(); } } - public Object open(long oid, int mode) throws SQLException { + public Object open(Connection connection, long oid, int mode) throws SQLException { if (shouldUseReflection) { - Object largeObjectManager = getLargeObjectManager(); + Object largeObjectManager = getLargeObjectManager(connection); try { Method method = largeObjectManager.getClass().getMethod("open", long.class, int.class); return method.invoke(largeObjectManager, oid, mode); @@ -80,7 +80,7 @@ public class PostgresLargeObjectManager { throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex); } } else { - return ((PGConnection) realConnection).getLargeObjectAPI().open(oid, mode); + return ((PGConnection) unwrap(connection)).getLargeObjectAPI().open(oid, mode); } } @@ -162,22 +162,22 @@ public class PostgresLargeObjectManager { } } - private Object getLargeObjectManager() throws SQLException { + private Object getLargeObjectManager(Connection connection) throws SQLException { if (shouldUseReflection) { try { - Method method = realConnection.getClass().getMethod("getLargeObjectAPI"); - return method.invoke(realConnection); + Connection conn = unwrap(connection); + Method method = conn.getClass().getMethod("getLargeObjectAPI"); + return method.invoke(conn); } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) { throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex); } } else { - return ((PGConnection) realConnection).getLargeObjectAPI(); + return ((PGConnection) unwrap(connection)).getLargeObjectAPI(); } } public final Connection unwrap(Connection connection) throws SQLException { - Connection conn = connection.unwrap(Connection.class); - return unwrapIronJacamar(unwrapDbcp(unwrapSpring(conn))); + return unwrapIronJacamar(unwrapDbcp(unwrapDbcp2(unwrapSpring(connection.unwrap(Connection.class))))); } private Connection unwrapIronJacamar(Connection conn) { @@ -198,6 +198,15 @@ public class PostgresLargeObjectManager { } } + private Connection unwrapDbcp2(Connection conn) { + try { + Method method = conn.getClass().getMethod("getInnermostDelegateInternal"); + return (Connection) method.invoke(conn); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) { + return conn; + } + } + private Connection unwrapSpring(Connection conn) { try { Method method = conn.getClass().getMethod("getTargetConnection"); diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java index 7d439daec7..7bfadcd780 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java @@ -18,14 +18,14 @@ package org.apache.activemq.artemis.jdbc.store.file; import java.nio.ByteBuffer; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; -import javax.sql.DataSource; - @SuppressWarnings("SynchronizeOnNonFinalField") public final class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver { @@ -36,37 +36,32 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential super(); } - public PostgresSequentialSequentialFileDriver(DataSource dataSource, SQLProvider provider) { + public PostgresSequentialSequentialFileDriver(JDBCConnectionProvider connectionProvider, SQLProvider provider) { super(); - this.setDataSource(dataSource); - this.setSqlProvider(provider); - } - - public PostgresSequentialSequentialFileDriver(Connection connection, SQLProvider provider) { - super(); - this.setConnection(connection); + this.setJdbcConnectionProvider(connectionProvider); this.setSqlProvider(provider); } @Override - protected void prepareStatements() throws SQLException { - this.largeObjectManager = new PostgresLargeObjectManager(connection); - this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL()); - this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS); - this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName()); - this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL()); - this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL()); - this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL()); - this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL()); - this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL()); + protected void prepareStatements() { + this.largeObjectManager = new PostgresLargeObjectManager(); + this.deleteFile = sqlProvider.getDeleteFileSQL(); + this.createFile = sqlProvider.getInsertFileSQL(); + this.createFileAutogeneratedKeys = Statement.RETURN_GENERATED_KEYS; + this.selectFileByFileName = sqlProvider.getSelectFileByFileName(); + this.copyFileRecord = sqlProvider.getCopyFileRecordByIdSQL(); + this.renameFile = sqlProvider.getUpdateFileNameByIdSQL(); + this.readLargeObject = sqlProvider.getReadLargeObjectSQL(); + this.appendToLargeObject = sqlProvider.getAppendToLargeObjectSQL(); + this.selectFileNamesByExtension = sqlProvider.getSelectFileNamesByExtensionSQL(); } @Override public void createFile(JDBCSequentialFile file) throws SQLException { - synchronized (connection) { - try { + try (Connection connection = connectionProvider.getConnection()) { + try (PreparedStatement createFile = connection.prepareStatement(this.createFile, this.createFileAutogeneratedKeys)) { connection.setAutoCommit(false); - Long oid = largeObjectManager.createLO(); + Long oid = largeObjectManager.createLO(connection); createFile.setString(1, file.getFileName()); createFile.setString(2, file.getExtension()); @@ -87,31 +82,31 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential @Override public void loadFile(JDBCSequentialFile file) throws SQLException { - synchronized (connection) { - connection.setAutoCommit(false); - readLargeObject.setLong(1, file.getId()); + try (Connection connection = connectionProvider.getConnection()) { + try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) { + connection.setAutoCommit(false); + readLargeObject.setLong(1, file.getId()); - try (ResultSet rs = readLargeObject.executeQuery()) { - if (rs.next()) { - file.setWritePosition(getPostGresLargeObjectSize(file)); + try (ResultSet rs = readLargeObject.executeQuery()) { + if (rs.next()) { + file.setWritePosition(getPostGresLargeObjectSize(file)); + } + connection.commit(); + } catch (SQLException e) { + connection.rollback(); + throw e; } - connection.commit(); - } catch (SQLException e) { - connection.rollback(); - throw e; } } } @Override public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException { - synchronized (connection) { - Object largeObject = null; - + try (Connection connection = connectionProvider.getConnection()) { Long oid = getOID(file); try { connection.setAutoCommit(false); - largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.WRITE); + Object largeObject = largeObjectManager.open(connection, oid, PostgresLargeObjectManager.WRITE); if (append) { largeObjectManager.seek(largeObject, largeObjectManager.size(largeObject)); } else { @@ -130,12 +125,11 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential @Override public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException { - Object largeObject = null; long oid = getOID(file); - synchronized (connection) { + try (Connection connection = connectionProvider.getConnection()) { try { connection.setAutoCommit(false); - largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.READ); + Object largeObject = largeObjectManager.open(connection, oid, PostgresLargeObjectManager.READ); int readLength = (int) calculateReadLength(largeObjectManager.size(largeObject), bytes.remaining(), file.position()); if (readLength > 0) { @@ -160,17 +154,19 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential private Long getOID(JDBCSequentialFile file) throws SQLException { Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY); if (oid == null) { - synchronized (connection) { - connection.setAutoCommit(false); - readLargeObject.setLong(1, file.getId()); - try (ResultSet rs = readLargeObject.executeQuery()) { - if (rs.next()) { - file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1)); + try (Connection connection = connectionProvider.getConnection()) { + try (PreparedStatement readLargeObject = connection.prepareStatement(this.readLargeObject)) { + connection.setAutoCommit(false); + readLargeObject.setLong(1, file.getId()); + try (ResultSet rs = readLargeObject.executeQuery()) { + if (rs.next()) { + file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1)); + } + connection.commit(); + } catch (SQLException e) { + connection.rollback(); + throw e; } - connection.commit(); - } catch (SQLException e) { - connection.rollback(); - throw e; } } } @@ -184,10 +180,10 @@ public final class PostgresSequentialSequentialFileDriver extends JDBCSequential int size = 0; Long oid = getOID(file); if (oid != null) { - synchronized (connection) { + try (Connection connection = connectionProvider.getConnection()) { try { connection.setAutoCommit(false); - Object largeObject = largeObjectManager.open(oid, PostgresLargeObjectManager.READ); + Object largeObject = largeObjectManager.open(connection, oid, PostgresLargeObjectManager.READ); size = largeObjectManager.size(largeObject); largeObjectManager.close(largeObject); connection.commit(); diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index f87d7a7f3b..a15377ec99 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -17,7 +17,7 @@ package org.apache.activemq.artemis.jdbc.store.journal; -import javax.sql.DataSource; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList; import org.jboss.logging.Logger; @@ -67,15 +68,15 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { private final List records; - private PreparedStatement insertJournalRecords; + private String insertJournalRecords; - private PreparedStatement selectJournalRecords; + private String selectJournalRecords; - private PreparedStatement countJournalRecords; + private String countJournalRecords; - private PreparedStatement deleteJournalRecords; + private String deleteJournalRecords; - private PreparedStatement deleteJournalTxRecords; + private String deleteJournalTxRecords; private boolean started; @@ -95,30 +96,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { private final IOCriticalErrorListener criticalIOErrorListener; - public JDBCJournalImpl(DataSource dataSource, + public JDBCJournalImpl(JDBCConnectionProvider connectionProvider, SQLProvider provider, ScheduledExecutorService scheduledExecutorService, Executor completeExecutor, IOCriticalErrorListener criticalIOErrorListener, long syncDelay) { - super(dataSource, provider); - records = new ArrayList<>(); - this.scheduledExecutorService = scheduledExecutorService; - this.completeExecutor = completeExecutor; - this.criticalIOErrorListener = criticalIOErrorListener; - this.syncDelay = syncDelay; - } - - public JDBCJournalImpl(String jdbcUrl, - String user, - String password, - String jdbcDriverClass, - SQLProvider sqlProvider, - ScheduledExecutorService scheduledExecutorService, - Executor completeExecutor, - IOCriticalErrorListener criticalIOErrorListener, - long syncDelay) { - super(sqlProvider, jdbcUrl, user, password, jdbcDriverClass); + super(connectionProvider, provider); records = new ArrayList<>(); this.scheduledExecutorService = scheduledExecutorService; this.completeExecutor = completeExecutor; @@ -153,13 +137,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - protected void prepareStatements() throws SQLException { + protected void prepareStatements() { logger.tracef("preparing statements"); - insertJournalRecords = connection.prepareStatement(sqlProvider.getInsertJournalRecordsSQL()); - selectJournalRecords = connection.prepareStatement(sqlProvider.getSelectJournalRecordsSQL()); - countJournalRecords = connection.prepareStatement(sqlProvider.getCountJournalRecordsSQL()); - deleteJournalRecords = connection.prepareStatement(sqlProvider.getDeleteJournalRecordsSQL()); - deleteJournalTxRecords = connection.prepareStatement(sqlProvider.getDeleteJournalTxRecordsSQL()); + insertJournalRecords = sqlProvider.getInsertJournalRecordsSQL(); + selectJournalRecords = sqlProvider.getSelectJournalRecordsSQL(); + countJournalRecords = sqlProvider.getCountJournalRecordsSQL(); + deleteJournalRecords = sqlProvider.getDeleteJournalRecordsSQL(); + deleteJournalTxRecords = sqlProvider.getDeleteJournalTxRecordsSQL(); } @Override @@ -205,65 +189,70 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { TransactionHolder holder; - try { - connection.setAutoCommit(false); + try (Connection connection = connectionProvider.getConnection()) { - for (JDBCJournalRecord record : recordRef) { + try (PreparedStatement deleteJournalRecords = connection.prepareStatement(this.deleteJournalRecords); + PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords); + PreparedStatement insertJournalRecords = connection.prepareStatement(this.insertJournalRecords)) { - if (logger.isTraceEnabled()) { - logger.trace("sync::preparing JDBC statement for " + record); - } + connection.setAutoCommit(false); + + for (JDBCJournalRecord record : recordRef) { + + if (logger.isTraceEnabled()) { + logger.trace("sync::preparing JDBC statement for " + record); + } - - switch (record.getRecordType()) { - case JDBCJournalRecord.DELETE_RECORD: - // Standard SQL Delete Record, Non transactional delete - deletedRecords.add(record.getId()); - record.writeDeleteRecord(deleteJournalRecords); - break; - case JDBCJournalRecord.ROLLBACK_RECORD: - // Roll back we remove all records associated with this TX ID. This query is always performed last. - deleteJournalTxRecords.setLong(1, record.getTxId()); - deleteJournalTxRecords.addBatch(); - break; - case JDBCJournalRecord.COMMIT_RECORD: - // We perform all the deletes and add the commit record in the same Database TX - holder = transactions.get(record.getTxId()); - for (RecordInfo info : holder.recordsToDelete) { + switch (record.getRecordType()) { + case JDBCJournalRecord.DELETE_RECORD: + // Standard SQL Delete Record, Non transactional delete deletedRecords.add(record.getId()); - deletedRecords.add(info.id); - deleteJournalRecords.setLong(1, info.id); - deleteJournalRecords.addBatch(); - } - record.writeRecord(insertJournalRecords); - committedTransactions.add(record.getTxId()); - break; - default: - // Default we add a new record to the DB - record.writeRecord(insertJournalRecords); - break; + record.writeDeleteRecord(deleteJournalRecords); + break; + case JDBCJournalRecord.ROLLBACK_RECORD: + // Roll back we remove all records associated with this TX ID. This query is always performed last. + deleteJournalTxRecords.setLong(1, record.getTxId()); + deleteJournalTxRecords.addBatch(); + break; + case JDBCJournalRecord.COMMIT_RECORD: + // We perform all the deletes and add the commit record in the same Database TX + holder = transactions.get(record.getTxId()); + for (RecordInfo info : holder.recordsToDelete) { + deletedRecords.add(record.getId()); + deletedRecords.add(info.id); + deleteJournalRecords.setLong(1, info.id); + deleteJournalRecords.addBatch(); + } + record.writeRecord(insertJournalRecords); + committedTransactions.add(record.getTxId()); + break; + default: + // Default we add a new record to the DB + record.writeRecord(insertJournalRecords); + break; + } } - } - insertJournalRecords.executeBatch(); - deleteJournalRecords.executeBatch(); - deleteJournalTxRecords.executeBatch(); - - connection.commit(); - if (logger.isTraceEnabled()) { - logger.trace("JDBC commit worked"); - } - - if (cleanupTxRecords(deletedRecords, committedTransactions)) { + insertJournalRecords.executeBatch(); + deleteJournalRecords.executeBatch(); deleteJournalTxRecords.executeBatch(); + connection.commit(); - logger.trace("JDBC commit worked on cleanupTxRecords"); + if (logger.isTraceEnabled()) { + logger.trace("JDBC commit worked"); + } + + if (cleanupTxRecords(deletedRecords, committedTransactions)) { + deleteJournalTxRecords.executeBatch(); + connection.commit(); + logger.trace("JDBC commit worked on cleanupTxRecords"); + } + executeCallbacks(recordRef, true); + + return recordRef.size(); + } - executeCallbacks(recordRef, true); - - return recordRef.size(); - } catch (Exception e) { handleException(recordRef, e); return 0; @@ -280,18 +269,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { logger.trace("Rolling back Transaction, just in case"); } - try { - connection.rollback(); - } catch (Throwable rollback) { - logger.warn(rollback); - } - - try { - connection.close(); - } catch (Throwable rollback) { - logger.warn(rollback); - } - if (recordRef != null) { executeCallbacks(recordRef, false); } @@ -308,23 +285,27 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { transactions.get(txId).committed = true; } boolean hasDeletedJournalTxRecords = false; - // TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop - for (TransactionHolder h : iterableCopyTx) { - iterableCopy = new ArrayList<>(); - iterableCopy.addAll(h.recordInfos); + try (Connection connection = connectionProvider.getConnection(); + PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords)) { + // TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop + for (TransactionHolder h : iterableCopyTx) { - for (RecordInfo info : iterableCopy) { - if (deletedRecords.contains(info.id)) { - h.recordInfos.remove(info); + iterableCopy = new ArrayList<>(); + iterableCopy.addAll(h.recordInfos); + + for (RecordInfo info : iterableCopy) { + if (deletedRecords.contains(info.id)) { + h.recordInfos.remove(info); + } } - } - if (h.recordInfos.isEmpty() && h.committed) { - deleteJournalTxRecords.setLong(1, h.transactionID); - deleteJournalTxRecords.addBatch(); - hasDeletedJournalTxRecords = true; - transactions.remove(h.transactionID); + if (h.recordInfos.isEmpty() && h.committed) { + deleteJournalTxRecords.setLong(1, h.transactionID); + deleteJournalTxRecords.addBatch(); + hasDeletedJournalTxRecords = true; + transactions.remove(h.transactionID); + } } } return hasDeletedJournalTxRecords; @@ -868,51 +849,54 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager); JDBCJournalRecord r; - try (ResultSet rs = selectJournalRecords.executeQuery()) { - int noRecords = 0; - while (rs.next()) { - r = JDBCJournalRecord.readRecord(rs); - switch (r.getRecordType()) { - case JDBCJournalRecord.ADD_RECORD: - jrc.onReadAddRecord(r.toRecordInfo()); - break; - case JDBCJournalRecord.UPDATE_RECORD: - jrc.onReadUpdateRecord(r.toRecordInfo()); - break; - case JDBCJournalRecord.DELETE_RECORD: - jrc.onReadDeleteRecord(r.getId()); - break; - case JDBCJournalRecord.ADD_RECORD_TX: - jrc.onReadAddRecordTX(r.getTxId(), r.toRecordInfo()); - break; - case JDBCJournalRecord.UPDATE_RECORD_TX: - jrc.onReadUpdateRecordTX(r.getTxId(), r.toRecordInfo()); - break; - case JDBCJournalRecord.DELETE_RECORD_TX: - jrc.onReadDeleteRecordTX(r.getTxId(), r.toRecordInfo()); - break; - case JDBCJournalRecord.PREPARE_RECORD: - jrc.onReadPrepareRecord(r.getTxId(), r.getTxDataAsByteArray(), r.getTxCheckNoRecords()); - break; - case JDBCJournalRecord.COMMIT_RECORD: - jrc.onReadCommitRecord(r.getTxId(), r.getTxCheckNoRecords()); - break; - case JDBCJournalRecord.ROLLBACK_RECORD: - jrc.onReadRollbackRecord(r.getTxId()); - break; - default: - throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType()); + try (Connection connection = connectionProvider.getConnection(); + PreparedStatement selectJournalRecords = connection.prepareStatement(this.selectJournalRecords)) { + try (ResultSet rs = selectJournalRecords.executeQuery()) { + int noRecords = 0; + while (rs.next()) { + r = JDBCJournalRecord.readRecord(rs); + switch (r.getRecordType()) { + case JDBCJournalRecord.ADD_RECORD: + jrc.onReadAddRecord(r.toRecordInfo()); + break; + case JDBCJournalRecord.UPDATE_RECORD: + jrc.onReadUpdateRecord(r.toRecordInfo()); + break; + case JDBCJournalRecord.DELETE_RECORD: + jrc.onReadDeleteRecord(r.getId()); + break; + case JDBCJournalRecord.ADD_RECORD_TX: + jrc.onReadAddRecordTX(r.getTxId(), r.toRecordInfo()); + break; + case JDBCJournalRecord.UPDATE_RECORD_TX: + jrc.onReadUpdateRecordTX(r.getTxId(), r.toRecordInfo()); + break; + case JDBCJournalRecord.DELETE_RECORD_TX: + jrc.onReadDeleteRecordTX(r.getTxId(), r.toRecordInfo()); + break; + case JDBCJournalRecord.PREPARE_RECORD: + jrc.onReadPrepareRecord(r.getTxId(), r.getTxDataAsByteArray(), r.getTxCheckNoRecords()); + break; + case JDBCJournalRecord.COMMIT_RECORD: + jrc.onReadCommitRecord(r.getTxId(), r.getTxCheckNoRecords()); + break; + case JDBCJournalRecord.ROLLBACK_RECORD: + jrc.onReadRollbackRecord(r.getTxId()); + break; + default: + throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType()); + } + noRecords++; + if (r.getSeq() > seq.longValue()) { + seq.set(r.getSeq()); + } } - noRecords++; - if (r.getSeq() > seq.longValue()) { - seq.set(r.getSeq()); - } - } - jrc.checkPreparedTx(); + jrc.checkPreparedTx(); - jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId()); - jli.setNumberOfRecords(noRecords); - transactions = jrc.getTransactions(); + jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId()); + jli.setNumberOfRecords(noRecords); + transactions = jrc.getTransactions(); + } } catch (Throwable e) { handleException(null, e); } @@ -962,9 +946,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { @Override public int getNumberOfRecords() { int count = 0; - try (ResultSet rs = countJournalRecords.executeQuery()) { - rs.next(); - count = rs.getInt(1); + try (Connection connection = connectionProvider.getConnection(); + PreparedStatement countJournalRecords = connection.prepareStatement(this.countJournalRecords)) { + try (ResultSet rs = countJournalRecords.executeQuery()) { + rs.next(); + count = rs.getInt(1); + } } catch (SQLException e) { logger.warn(e.getMessage(), e); return -1; diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/PropertySQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/PropertySQLProvider.java index b120ca2838..d7399b67ff 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/PropertySQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/PropertySQLProvider.java @@ -22,10 +22,12 @@ import java.io.IOException; import java.io.InputStream; import java.sql.Connection; import java.sql.DatabaseMetaData; +import java.util.Map; import java.util.Properties; import java.util.function.Function; import java.util.stream.Stream; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; import org.jboss.logging.Logger; @@ -363,7 +365,15 @@ public class PropertySQLProvider implements SQLProvider { } public Factory(DataSource dataSource) { - this(investigateDialect(dataSource)); + this(new JDBCConnectionProvider(dataSource)); + } + + public Factory(Map dataSourceProperties) { + this(investigateDialect(dataSourceProperties)); + } + + public Factory(JDBCConnectionProvider connectionProvider) { + this(investigateDialect(connectionProvider)); } public static SQLDialect investigateDialect(Connection connection) { @@ -388,8 +398,21 @@ public class PropertySQLProvider implements SQLProvider { return dialect; } - private static SQLDialect investigateDialect(DataSource dataSource) { - try (Connection connection = dataSource.getConnection()) { + public static SQLDialect investigateDialect(Map dataSourceProperties) { + SQLDialect dialect = null; + for (Object entry : dataSourceProperties.values()) { + if (entry instanceof String) { + dialect = identifyDialect((String) entry); + if (dialect != null) { + return dialect; + } + } + } + return dialect; + } + + private static SQLDialect investigateDialect(JDBCConnectionProvider connectionProvider) { + try (Connection connection = connectionProvider.getConnection()) { return investigateDialect(connection); } catch (Exception e) { logger.debug("Unable to read JDBC metadata.", e); diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryTest.java index fef665c641..40f848b0d5 100644 --- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryTest.java +++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryTest.java @@ -21,9 +21,11 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -31,11 +33,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCDataSourceUtils; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; @@ -79,15 +84,20 @@ public class JDBCSequentialFileFactoryTest { @Before public void setup() throws Exception { executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); + Map dataSourceProperties = new HashMap<>(); if (useAuthentication) { user = "testuser"; password = "testpassword"; System.setProperty("derby.connection.requireAuthentication", "true"); System.setProperty("derby.user." + user, password); + dataSourceProperties.put("username", user); + dataSourceProperties.put("password", password); } - String connectionUrl = "jdbc:derby:target/data;create=true"; + dataSourceProperties.put("url", "jdbc:derby:target/data;create=true"); + dataSourceProperties.put("driverClassName", className); String tableName = "FILES"; - factory = new JDBCSequentialFileFactory(connectionUrl, user, password, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() { + String jdbcDatasourceClass = ActiveMQDefaultConfiguration.getDefaultDataSourceClassName(); + factory = new JDBCSequentialFileFactory(new JDBCConnectionProvider(JDBCDataSourceUtils.getDataSource(jdbcDatasourceClass, dataSourceProperties)), JDBCUtils.getSQLProvider(dataSourceProperties, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() { @Override public void onIOException(Throwable code, String message, SequentialFile file) { } diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManagerTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManagerTest.java index 886ce105e8..e799e060b6 100644 --- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManagerTest.java +++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManagerTest.java @@ -44,21 +44,21 @@ public class PostgresLargeObjectManagerTest { @Test public void testShouldNotUseReflection() throws SQLException { - PostgresLargeObjectManager manager = new PostgresLargeObjectManager(new MockConnection()); + PostgresLargeObjectManager manager = new PostgresLargeObjectManager(); try { - manager.createLO(); + manager.createLO(new MockConnection()); fail("Shouldn't be using reflection"); } catch (ClassCastException ex) { } } @Test - public void testShouldUseReflection() throws SQLException, ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { + public void testShouldUseReflection() throws ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { ClassLoader loader = new FunkyClassLoader(); Class funkyClass = loader.loadClass("org.apache.activemq.artemis.jdbc.store.file.PostgresLargeObjectManager"); - Object manager = funkyClass.getConstructor(Connection.class).newInstance(new MockConnection()); + Object manager = funkyClass.getConstructor().newInstance(); try { - funkyClass.getMethod("createLO").invoke(manager); + funkyClass.getMethod("createLO", Connection.class).invoke(manager, new MockConnection()); fail("Shouldn't be using reflection"); } catch (java.lang.reflect.InvocationTargetException ex) { assertEquals("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex.getCause().getMessage()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java index 47621819b5..3a059f07b1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java @@ -20,8 +20,13 @@ import javax.sql.DataSource; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCDataSourceUtils; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import java.util.HashMap; +import java.util.Map; + public class DatabaseStorageConfiguration implements StoreConfiguration { private String messageTableName = ActiveMQDefaultConfiguration.getDefaultMessageTableName(); @@ -44,6 +49,12 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { private DataSource dataSource; + private String dataSourceClassName = ActiveMQDefaultConfiguration.getDefaultDataSourceClassName(); + + private Map dataSourceProperties = new HashMap(); + + private JDBCConnectionProvider connectionProvider; + private SQLProvider.Factory sqlProviderFactory; private int jdbcNetworkTimeout = ActiveMQDefaultConfiguration.getDefaultJdbcNetworkTimeout(); @@ -138,7 +149,22 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { * * @return the DataSource used to store Artemis data in the JDBC data store. */ - public DataSource getDataSource() { + private DataSource getDataSource() { + if (dataSource == null) { + if (dataSourceProperties.isEmpty()) { + addDataSourceProperty("driverClassName", jdbcDriverClassName); + addDataSourceProperty("url", jdbcConnectionUrl); + if (jdbcUser != null) { + addDataSourceProperty("username", jdbcUser); + } + if (jdbcPassword != null) { + addDataSourceProperty("password", jdbcPassword); + } + // Let the pool to have unbounded number of connections by default to prevent connection starvation + addDataSourceProperty("maxTotal", "-1"); + } + dataSource = JDBCDataSourceUtils.getDataSource(dataSourceClassName, dataSourceProperties); + } return dataSource; } @@ -151,6 +177,33 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { this.dataSource = dataSource; } + public JDBCConnectionProvider getConnectionProvider() { + if (connectionProvider == null) { + connectionProvider = new JDBCConnectionProvider(getDataSource()); + } + return connectionProvider; + } + public void addDataSourceProperty(String key, String value) { + if (value.toLowerCase().equals("true") || value.toLowerCase().equals("false")) { + dataSourceProperties.put(key, Boolean.parseBoolean(value.toLowerCase())); + } else { + try { + int i = Integer.parseInt(value); + dataSourceProperties.put(key, i); + } catch (NumberFormatException nfe) { + dataSourceProperties.put(key, value); + } + } + } + + public String getDataSourceClassName() { + return dataSourceClassName; + } + + public void setDataSourceClassName(String dataSourceClassName) { + this.dataSourceClassName = dataSourceClassName; + } + /** * The {@link SQLProvider.Factory} used to communicate with the JDBC data store. * It can be {@code null}. If the value is {@code null} and {@code dataSource} is set, the {@code {@link org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory}} will be used, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 6213d75809..ad01a0e148 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1738,6 +1738,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { password = PasswordMaskingUtil.resolveMask(mainConfig.isMaskPassword(), password, mainConfig.getPasswordCodec()); } conf.setJdbcPassword(password); + conf.setDataSourceClassName(getString(storeNode, "data-source-class-name", conf.getDataSourceClassName(), Validators.NO_CHECK)); + if (parameterExists(storeNode, "data-source-properties")) { + NodeList propertyNodeList = storeNode.getElementsByTagName("data-source-property"); + for (int i = 0; i < propertyNodeList.getLength(); i++) { + Element propertyNode = (Element) propertyNodeList.item(i); + conf.addDataSourceProperty(propertyNode.getAttributeNode("key").getValue(), propertyNode.getAttributeNode("value").getValue()); + } + } + //conf.initDataSource(); + return conf; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java index 2c6dbdddb1..4d164a542f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java @@ -40,10 +40,8 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; -import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver; import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.utils.ExecutorFactory; @@ -72,16 +70,12 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { protected final StorageManager storageManager; - private JDBCSequentialFileFactoryDriver dbDriver; - private DatabaseStorageConfiguration dbConf; private ExecutorFactory executorFactory; private JDBCSequentialFileFactory pagingFactoryFileFactory; - private JDBCSequentialFile directoryList; - private final boolean readWholePage; @Override @@ -106,8 +100,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { final ScheduledExecutorService scheduledExecutor, final ExecutorFactory executorFactory, final boolean syncNonTransactional, - final IOCriticalErrorListener critialErrorListener) throws Exception { - this(dbConf, storageManager, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener, false); + final IOCriticalErrorListener criticalErrorListener) throws Exception { + this(dbConf, storageManager, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, criticalErrorListener, false); } public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf, @@ -116,7 +110,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { final ScheduledExecutorService scheduledExecutor, final ExecutorFactory executorFactory, final boolean syncNonTransactional, - final IOCriticalErrorListener critialErrorListener, + final IOCriticalErrorListener criticalErrorListener, final boolean readWholePage) throws Exception { this.storageManager = storageManager; this.executorFactory = executorFactory; @@ -124,7 +118,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { this.scheduledExecutor = scheduledExecutor; this.syncTimeout = syncTimeout; this.dbConf = dbConf; - this.criticalErrorListener = critialErrorListener; + this.criticalErrorListener = criticalErrorListener; this.factoryToTableName = new HashMap<>(); this.readWholePage = readWholePage; start(); @@ -137,20 +131,11 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { if (pageStoreTableNamePrefix.length() > 10) { throw new IllegalStateException("The maximum name size for the page store table prefix is 10 characters: THE PAGING STORE CAN'T START"); } - if (dbConf.getDataSource() != null) { - SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory(); - if (sqlProviderFactory == null) { - sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource()); - } - pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); - } else { - String driverClassName = dbConf.getJdbcDriverClassName(); - pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); - } - final int jdbcNetworkTimeout = dbConf.getJdbcNetworkTimeout(); - if (jdbcNetworkTimeout >= 0) { - pagingFactoryFileFactory.setNetworkTimeout(this.executorFactory.getExecutor(), jdbcNetworkTimeout); + SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory(); + if (sqlProviderFactory == null) { + sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getConnectionProvider()); } + pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getConnectionProvider(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); pagingFactoryFileFactory.start(); started = true; } @@ -278,22 +263,14 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { directoryList.close(); final SQLProvider sqlProvider; - if (dbConf.getDataSource() != null) { - final SQLProvider.Factory sqlProviderFactory; - if (dbConf.getSqlProviderFactory() != null) { - sqlProviderFactory = dbConf.getSqlProviderFactory(); - } else { - sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource()); - } - sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE); + final SQLProvider.Factory sqlProviderFactory; + if (dbConf.getSqlProviderFactory() != null) { + sqlProviderFactory = dbConf.getSqlProviderFactory(); } else { - sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE); - } - final JDBCSequentialFileFactory fileFactory = new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener); - final int jdbcNetworkTimeout = dbConf.getJdbcNetworkTimeout(); - if (jdbcNetworkTimeout >= 0) { - fileFactory.setNetworkTimeout(this.executorFactory.getExecutor(), jdbcNetworkTimeout); + sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getConnectionProvider()); } + sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE); + final JDBCSequentialFileFactory fileFactory = new JDBCSequentialFileFactory(dbConf.getConnectionProvider(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener); factoryToTableName.put(fileFactory, directoryName); return fileFactory; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index 629405b8a7..fc9b3a24b3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.core.persistence.impl.journal; import java.nio.ByteBuffer; -import java.sql.Connection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -26,7 +25,7 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; -import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; @@ -36,8 +35,6 @@ import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; public class JDBCJournalStorageManager extends JournalStorageManager { - private Connection connection; - public JDBCJournalStorageManager(Configuration config, CriticalAnalyzer analyzer, ExecutorFactory executorFactory, @@ -59,33 +56,35 @@ public class JDBCJournalStorageManager extends JournalStorageManager { protected synchronized void init(Configuration config, IOCriticalErrorListener criticalErrorListener) { try { final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration(); + final JDBCConnectionProvider connectionProvider = dbConf.getConnectionProvider(); + final int networkTimeout = dbConf.getJdbcNetworkTimeout(); + if (networkTimeout >= 0) { + connectionProvider.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout); + } final JDBCJournalImpl bindingsJournal; final JDBCJournalImpl messageJournal; final JDBCSequentialFileFactory largeMessagesFactory; - if (dbConf.getDataSource() != null) { - SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory(); - if (sqlProviderFactory == null) { - sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource()); - } - bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener,dbConf.getJdbcJournalSyncPeriodMillis()); - messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis()); - largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener); - } else { - String driverClassName = dbConf.getJdbcDriverClassName(); - bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis()); - messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis()); - largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener); - } - final int networkTimeout = dbConf.getJdbcNetworkTimeout(); - if (networkTimeout >= 0) { - bindingsJournal.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout); - } - if (networkTimeout >= 0) { - messageJournal.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout); - } - if (networkTimeout >= 0) { - largeMessagesFactory.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout); + SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory(); + if (sqlProviderFactory == null) { + sqlProviderFactory = new PropertySQLProvider.Factory(connectionProvider); } + bindingsJournal = new JDBCJournalImpl( + connectionProvider, + sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), + scheduledExecutorService, + executorFactory.getExecutor(), + criticalErrorListener,dbConf.getJdbcJournalSyncPeriodMillis()); + messageJournal = new JDBCJournalImpl( + connectionProvider, + sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), + scheduledExecutorService, executorFactory.getExecutor(), + criticalErrorListener, + dbConf.getJdbcJournalSyncPeriodMillis()); + largeMessagesFactory = new JDBCSequentialFileFactory( + connectionProvider, + sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), + executorFactory.getExecutor(), + criticalErrorListener); this.bindingsJournal = bindingsJournal; this.messageJournal = messageJournal; this.largeMessagesFactory = largeMessagesFactory; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java index 11c1aabddc..3b7124eeaa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java @@ -26,6 +26,7 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; import org.jboss.logging.Logger; /** @@ -35,13 +36,13 @@ final class JdbcLeaseLock implements LeaseLock { private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class); private static final int MAX_HOLDER_ID_LENGTH = 128; - private final Connection connection; + private final JDBCConnectionProvider connectionProvider; private final String holderId; - private final PreparedStatement tryAcquireLock; - private final PreparedStatement tryReleaseLock; - private final PreparedStatement renewLock; - private final PreparedStatement isLocked; - private final PreparedStatement currentDateTime; + private final String tryAcquireLock; + private final String tryReleaseLock; + private final String renewLock; + private final String isLocked; + private final String currentDateTime; private final long expirationMillis; private boolean maybeAcquired; private final String lockName; @@ -51,12 +52,12 @@ final class JdbcLeaseLock implements LeaseLock { * whose life cycle will be managed externally. */ JdbcLeaseLock(String holderId, - Connection connection, - PreparedStatement tryAcquireLock, - PreparedStatement tryReleaseLock, - PreparedStatement renewLock, - PreparedStatement isLocked, - PreparedStatement currentDateTime, + JDBCConnectionProvider connectionProvider, + String tryAcquireLock, + String tryReleaseLock, + String renewLock, + String isLocked, + String currentDateTime, long expirationMIllis, String lockName) { if (holderId.length() > MAX_HOLDER_ID_LENGTH) { @@ -70,7 +71,7 @@ final class JdbcLeaseLock implements LeaseLock { this.currentDateTime = currentDateTime; this.expirationMillis = expirationMIllis; this.maybeAcquired = false; - this.connection = connection; + this.connectionProvider = connectionProvider; this.lockName = lockName; } @@ -84,13 +85,12 @@ final class JdbcLeaseLock implements LeaseLock { } private String readableLockStatus() { - try { + try (Connection connection = connectionProvider.getConnection()) { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); - try { + try (PreparedStatement preparedStatement = connection.prepareStatement(this.isLocked)) { final String lockStatus; - final PreparedStatement preparedStatement = this.isLocked; try (ResultSet resultSet = preparedStatement.executeQuery()) { if (!resultSet.next()) { lockStatus = null; @@ -114,100 +114,96 @@ final class JdbcLeaseLock implements LeaseLock { } } - private long dbCurrentTimeMillis() throws SQLException { + private long dbCurrentTimeMillis(Connection connection) throws SQLException { final long start = System.nanoTime(); - try (ResultSet resultSet = currentDateTime.executeQuery()) { - resultSet.next(); - final Timestamp currentTimestamp = resultSet.getTimestamp(1); - final long elapsedTime = System.nanoTime() - start; - if (LOGGER.isDebugEnabled()) { - LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms", - lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime)); + try (PreparedStatement currentDateTime = connection.prepareStatement(this.currentDateTime)) { + try (ResultSet resultSet = currentDateTime.executeQuery()) { + resultSet.next(); + final Timestamp currentTimestamp = resultSet.getTimestamp(1); + final long elapsedTime = System.nanoTime() - start; + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms", + lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime)); + } + return currentTimestamp.getTime(); } - return currentTimestamp.getTime(); } } @Override public boolean renew() { - synchronized (connection) { - try { - connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); - final boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); - try { - final PreparedStatement preparedStatement = this.renewLock; - final long now = dbCurrentTimeMillis(); - final Timestamp expirationTime = new Timestamp(now + expirationMillis); - if (LOGGER.isDebugEnabled()) { - LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s", - lockName, holderId, expirationTime); - } - preparedStatement.setTimestamp(1, expirationTime); - preparedStatement.setString(2, holderId); - preparedStatement.setTimestamp(3, expirationTime); - preparedStatement.setTimestamp(4, expirationTime); - final int updatedRows = preparedStatement.executeUpdate(); - final boolean renewed = updatedRows == 1; - connection.commit(); - if (!renewed) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }", - lockName, holderId, readableLockStatus()); - } - } else { - LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId); - } - return renewed; - } catch (SQLException ie) { - connection.rollback(); - throw new IllegalStateException(ie); - } finally { - connection.setAutoCommit(autoCommit); + try (Connection connection = connectionProvider.getConnection()) { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + final boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + try (PreparedStatement preparedStatement = connection.prepareStatement(this.renewLock)) { + final long now = dbCurrentTimeMillis(connection); + final Timestamp expirationTime = new Timestamp(now + expirationMillis); + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s", + lockName, holderId, expirationTime); } - } catch (SQLException e) { - throw new IllegalStateException(e); + preparedStatement.setTimestamp(1, expirationTime); + preparedStatement.setString(2, holderId); + preparedStatement.setTimestamp(3, expirationTime); + preparedStatement.setTimestamp(4, expirationTime); + final int updatedRows = preparedStatement.executeUpdate(); + final boolean renewed = updatedRows == 1; + connection.commit(); + if (!renewed) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }", + lockName, holderId, readableLockStatus()); + } + } else { + LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId); + } + return renewed; + } catch (SQLException ie) { + connection.rollback(); + throw new IllegalStateException(ie); + } finally { + connection.setAutoCommit(autoCommit); } + } catch (SQLException e) { + throw new IllegalStateException(e); } } @Override public boolean tryAcquire() { - synchronized (connection) { - try { - connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); - final boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); - try { - final PreparedStatement preparedStatement = tryAcquireLock; - final long now = dbCurrentTimeMillis(); - preparedStatement.setString(1, holderId); - final Timestamp expirationTime = new Timestamp(now + expirationMillis); - preparedStatement.setTimestamp(2, expirationTime); - preparedStatement.setTimestamp(3, expirationTime); - LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s", - lockName, holderId, expirationTime); - final boolean acquired = preparedStatement.executeUpdate() == 1; - connection.commit(); - if (acquired) { - this.maybeAcquired = true; - LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId); - } else { - if (LOGGER.isDebugEnabled()) { - LOGGER.debugf("[%s] %s has failed to acquire lock: lock status = { %s }", - lockName, holderId, readableLockStatus()); - } + try (Connection connection = connectionProvider.getConnection()) { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + final boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryAcquireLock)) { + final long now = dbCurrentTimeMillis(connection); + preparedStatement.setString(1, holderId); + final Timestamp expirationTime = new Timestamp(now + expirationMillis); + preparedStatement.setTimestamp(2, expirationTime); + preparedStatement.setTimestamp(3, expirationTime); + LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s", + lockName, holderId, expirationTime); + final boolean acquired = preparedStatement.executeUpdate() == 1; + connection.commit(); + if (acquired) { + this.maybeAcquired = true; + LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId); + } else { + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("[%s] %s has failed to acquire lock: lock status = { %s }", + lockName, holderId, readableLockStatus()); } - return acquired; - } catch (SQLException ie) { - connection.rollback(); - throw new IllegalStateException(ie); - } finally { - connection.setAutoCommit(autoCommit); } - } catch (SQLException e) { - throw new IllegalStateException(e); + return acquired; + } catch (SQLException ie) { + connection.rollback(); + throw new IllegalStateException(ie); + } finally { + connection.setAutoCommit(autoCommit); } + } catch (SQLException e) { + throw new IllegalStateException(e); } } @@ -222,104 +218,85 @@ final class JdbcLeaseLock implements LeaseLock { } private boolean checkValidHolderId(Predicate holderIdFilter) { - synchronized (connection) { - try { - connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); - final boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); - try { - boolean result; - final PreparedStatement preparedStatement = this.isLocked; - try (ResultSet resultSet = preparedStatement.executeQuery()) { - if (!resultSet.next()) { - result = false; - } else { - final String currentHolderId = resultSet.getString(1); - result = holderIdFilter.test(currentHolderId); - final Timestamp expirationTime = resultSet.getTimestamp(2); - final Timestamp currentTimestamp = resultSet.getTimestamp(3); - final long currentTimestampMillis = currentTimestamp.getTime(); - boolean zombie = false; - if (expirationTime != null) { - final long lockExpirationTime = expirationTime.getTime(); - final long expiredBy = currentTimestampMillis - lockExpirationTime; - if (expiredBy > 0) { - result = false; - zombie = true; - } - } - if (LOGGER.isDebugEnabled()) { - LOGGER.debugf("[%s] %s has found %s with holderId = %s expirationTime = %s currentTimestamp = %s", - lockName, holderId, zombie ? "zombie lock" : "lock", - currentHolderId, expirationTime, currentTimestamp); + try (Connection connection = connectionProvider.getConnection()) { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + final boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + try (PreparedStatement preparedStatement = connection.prepareStatement(this.isLocked)) { + boolean result; + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (!resultSet.next()) { + result = false; + } else { + final String currentHolderId = resultSet.getString(1); + result = holderIdFilter.test(currentHolderId); + final Timestamp expirationTime = resultSet.getTimestamp(2); + final Timestamp currentTimestamp = resultSet.getTimestamp(3); + final long currentTimestampMillis = currentTimestamp.getTime(); + boolean zombie = false; + if (expirationTime != null) { + final long lockExpirationTime = expirationTime.getTime(); + final long expiredBy = currentTimestampMillis - lockExpirationTime; + if (expiredBy > 0) { + result = false; + zombie = true; } } + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("[%s] %s has found %s with holderId = %s expirationTime = %s currentTimestamp = %s", + lockName, holderId, zombie ? "zombie lock" : "lock", + currentHolderId, expirationTime, currentTimestamp); + } } - connection.commit(); - return result; - } catch (SQLException ie) { - connection.rollback(); - throw new IllegalStateException(ie); - } finally { - connection.setAutoCommit(autoCommit); } - } catch (SQLException e) { - throw new IllegalStateException(e); + connection.commit(); + return result; + } catch (SQLException ie) { + connection.rollback(); + throw new IllegalStateException(ie); + } finally { + connection.setAutoCommit(autoCommit); } + } catch (SQLException e) { + throw new IllegalStateException(e); } } @Override public void release() { - synchronized (connection) { - try { - connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); - final boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); - try { - final PreparedStatement preparedStatement = this.tryReleaseLock; - preparedStatement.setString(1, holderId); - final boolean released = preparedStatement.executeUpdate() == 1; - //consider it as released to avoid on finalize to be reclaimed - this.maybeAcquired = false; - connection.commit(); - if (!released) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debugf("[%s] %s has failed to release lock: lock status = { %s }", - lockName, holderId, readableLockStatus()); - } - } else { - LOGGER.debugf("[%s] %s has released lock", lockName, holderId); + try (Connection connection = connectionProvider.getConnection()) { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + final boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryReleaseLock)) { + preparedStatement.setString(1, holderId); + final boolean released = preparedStatement.executeUpdate() == 1; + //consider it as released to avoid on finalize to be reclaimed + this.maybeAcquired = false; + connection.commit(); + if (!released) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("[%s] %s has failed to release lock: lock status = { %s }", + lockName, holderId, readableLockStatus()); } - } catch (SQLException ie) { - connection.rollback(); - throw new IllegalStateException(ie); - } finally { - connection.setAutoCommit(autoCommit); + } else { + LOGGER.debugf("[%s] %s has released lock", lockName, holderId); } - } catch (SQLException e) { - throw new IllegalStateException(e); + } catch (SQLException ie) { + connection.rollback(); + throw new IllegalStateException(ie); + } finally { + connection.setAutoCommit(autoCommit); } + } catch (SQLException e) { + throw new IllegalStateException(e); } } @Override public void close() throws SQLException { - synchronized (connection) { - //to avoid being called if not needed - if (!this.tryReleaseLock.isClosed()) { - try { - if (this.maybeAcquired) { - release(); - } - } finally { - this.tryReleaseLock.close(); - this.tryAcquireLock.close(); - this.renewLock.close(); - this.isLocked.close(); - this.currentDateTime.close(); - } - } + if (this.maybeAcquired) { + release(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java index 6fd4b86efb..212e6e12e1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.core.server.impl.jdbc; -import javax.sql.DataSource; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -29,7 +28,7 @@ import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback; -import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.utils.ExecutorFactory; @@ -61,59 +60,37 @@ public final class JdbcNodeManager extends NodeManager { ExecutorFactory executorFactory, IOCriticalErrorListener ioCriticalErrorListener) { validateTimeoutConfiguration(configuration); - if (configuration.getDataSource() != null) { - final SQLProvider.Factory sqlProviderFactory; - if (configuration.getSqlProviderFactory() != null) { - sqlProviderFactory = configuration.getSqlProviderFactory(); - } else { - sqlProviderFactory = new PropertySQLProvider.Factory(configuration.getDataSource()); - } - final String brokerId = java.util.UUID.randomUUID().toString(); - return usingDataSource(brokerId, - configuration.getJdbcNetworkTimeout(), - configuration.getJdbcLockExpirationMillis(), - configuration.getJdbcLockRenewPeriodMillis(), - configuration.getJdbcLockAcquisitionTimeoutMillis(), - configuration.getDataSource(), - sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), - scheduledExecutorService, - executorFactory, - ioCriticalErrorListener); + final SQLProvider.Factory sqlProviderFactory; + if (configuration.getSqlProviderFactory() != null) { + sqlProviderFactory = configuration.getSqlProviderFactory(); } else { - final SQLProvider sqlProvider = JDBCUtils.getSQLProvider(configuration.getJdbcDriverClassName(), configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER); - final String brokerId = java.util.UUID.randomUUID().toString(); - return usingConnectionUrl(brokerId, - configuration.getJdbcNetworkTimeout(), - configuration.getJdbcLockExpirationMillis(), - configuration.getJdbcLockRenewPeriodMillis(), - configuration.getJdbcLockAcquisitionTimeoutMillis(), - configuration.getJdbcConnectionUrl(), - configuration.getJdbcUser(), - configuration.getJdbcPassword(), - configuration.getJdbcDriverClassName(), - sqlProvider, - scheduledExecutorService, - executorFactory, - ioCriticalErrorListener); + sqlProviderFactory = new PropertySQLProvider.Factory(configuration.getConnectionProvider()); } + final String brokerId = java.util.UUID.randomUUID().toString(); + return usingConnectionProvider(brokerId, + configuration.getJdbcLockExpirationMillis(), + configuration.getJdbcLockRenewPeriodMillis(), + configuration.getJdbcLockAcquisitionTimeoutMillis(), + configuration.getConnectionProvider(), + sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), + scheduledExecutorService, + executorFactory, + ioCriticalErrorListener); } - private static JdbcNodeManager usingDataSource(String brokerId, - int networkTimeoutMillis, + private static JdbcNodeManager usingConnectionProvider(String brokerId, long lockExpirationMillis, long lockRenewPeriodMillis, long lockAcquisitionTimeoutMillis, - DataSource dataSource, + JDBCConnectionProvider connectionProvider, SQLProvider provider, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory, IOCriticalErrorListener ioCriticalErrorListener) { return new JdbcNodeManager( - () -> JdbcSharedStateManager.usingDataSource(brokerId, - networkTimeoutMillis, - executorFactory == null ? null : executorFactory.getExecutor(), + () -> JdbcSharedStateManager.usingConnectionProvider(brokerId, lockExpirationMillis, - dataSource, + connectionProvider, provider), lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, @@ -122,36 +99,6 @@ public final class JdbcNodeManager extends NodeManager { ioCriticalErrorListener); } - private static JdbcNodeManager usingConnectionUrl(String brokerId, - int networkTimeoutMillis, - long lockExpirationMillis, - long lockRenewPeriodMillis, - long lockAcquisitionTimeoutMillis, - String jdbcUrl, - String user, - String password, - String driverClass, - SQLProvider provider, - ScheduledExecutorService scheduledExecutorService, - ExecutorFactory executorFactory, - IOCriticalErrorListener ioCriticalErrorListener) { - return new JdbcNodeManager( - () -> JdbcSharedStateManager.usingConnectionUrl(brokerId, - networkTimeoutMillis, - executorFactory == null ? null : executorFactory.getExecutor(), - lockExpirationMillis, - jdbcUrl, - user, - password, - driverClass, - provider), - lockRenewPeriodMillis, - lockAcquisitionTimeoutMillis, - scheduledExecutorService, - executorFactory, - ioCriticalErrorListener); - } - private static void validateTimeoutConfiguration(DatabaseStorageConfiguration configuration) { final long lockExpiration = configuration.getJdbcLockExpirationMillis(); if (lockExpiration <= 0) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java index 8ad6f1e2c5..06f7e2a9b7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java @@ -17,15 +17,14 @@ package org.apache.activemq.artemis.core.server.impl.jdbc; -import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.concurrent.Executor; import java.util.function.Supplier; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.utils.UUID; import org.jboss.logging.Logger; @@ -42,21 +41,18 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS private final long lockExpirationMillis; private JdbcLeaseLock liveLock; private JdbcLeaseLock backupLock; - private PreparedStatement readNodeId; - private PreparedStatement writeNodeId; - private PreparedStatement initializeNodeId; - private PreparedStatement readState; - private PreparedStatement writeState; + private String readNodeId; + private String writeNodeId; + private String initializeNodeId; + private String readState; + private String writeState; - public static JdbcSharedStateManager usingDataSource(String holderId, - int networkTimeout, - Executor networkTimeoutExecutor, + public static JdbcSharedStateManager usingConnectionProvider(String holderId, long locksExpirationMillis, - DataSource dataSource, + JDBCConnectionProvider connectionProvider, SQLProvider provider) { final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis); - sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout); - sharedStateManager.setDataSource(dataSource); + sharedStateManager.setJdbcConnectionProvider(connectionProvider); sharedStateManager.setSqlProvider(provider); try { sharedStateManager.start(); @@ -66,64 +62,6 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS } } - public static JdbcSharedStateManager usingConnectionUrl(String holderId, - long locksExpirationMillis, - String jdbcConnectionUrl, - String jdbcDriverClass, - SQLProvider provider) { - return JdbcSharedStateManager.usingConnectionUrl(holderId, - -1, - null, - locksExpirationMillis, - jdbcConnectionUrl, - null, - null, - jdbcDriverClass, - provider); - } - - public static JdbcSharedStateManager usingConnectionUrl(String holderId, - long locksExpirationMillis, - String jdbcConnectionUrl, - String user, - String password, - String jdbcDriverClass, - SQLProvider provider) { - return JdbcSharedStateManager.usingConnectionUrl(holderId, - -1, - null, - locksExpirationMillis, - jdbcConnectionUrl, - user, - password, - jdbcDriverClass, - provider); - } - - public static JdbcSharedStateManager usingConnectionUrl(String holderId, - int networkTimeout, - Executor networkTimeoutExecutor, - long locksExpirationMillis, - String jdbcConnectionUrl, - String user, - String password, - String jdbcDriverClass, - SQLProvider provider) { - final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis); - sharedStateManager.setNetworkTimeout(networkTimeoutExecutor, networkTimeout); - sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl); - sharedStateManager.setJdbcDriverClass(jdbcDriverClass); - sharedStateManager.setSqlProvider(provider); - sharedStateManager.setUser(user); - sharedStateManager.setPassword(password); - try { - sharedStateManager.start(); - return sharedStateManager; - } catch (SQLException e) { - throw new IllegalStateException(e); - } - } - @Override protected void createSchema() { try { @@ -135,28 +73,28 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS } static JdbcLeaseLock createLiveLock(String holderId, - Connection connection, + JDBCConnectionProvider connectionProvider, SQLProvider sqlProvider, - long expirationMillis) throws SQLException { - return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "LIVE"); + long expirationMillis) { + return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(), sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(), sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "LIVE"); } static JdbcLeaseLock createBackupLock(String holderId, - Connection connection, + JDBCConnectionProvider connectionProvider, SQLProvider sqlProvider, - long expirationMillis) throws SQLException { - return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, "BACKUP"); + long expirationMillis) { + return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(), sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(), sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "BACKUP"); } @Override - protected void prepareStatements() throws SQLException { - this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis); - this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis); - this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL()); - this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL()); - this.initializeNodeId = connection.prepareStatement(sqlProvider.initializeNodeIdSQL()); - this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL()); - this.readState = connection.prepareStatement(sqlProvider.readStateSQL()); + protected void prepareStatements() { + this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis); + this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis); + this.readNodeId = sqlProvider.readNodeIdSQL(); + this.writeNodeId = sqlProvider.writeNodeIdSQL(); + this.initializeNodeId = sqlProvider.initializeNodeIdSQL(); + this.writeState = sqlProvider.writeStateSQL(); + this.readState = sqlProvider.readStateSQL(); } private JdbcSharedStateManager(String holderId, long lockExpirationMillis) { @@ -174,17 +112,18 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS return this.backupLock; } - private UUID rawReadNodeId() throws SQLException { - final PreparedStatement preparedStatement = this.readNodeId; - try (ResultSet resultSet = preparedStatement.executeQuery()) { - if (!resultSet.next()) { - return null; - } else { - final String nodeId = resultSet.getString(1); - if (nodeId != null) { - return new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeId)); - } else { + private UUID rawReadNodeId(Connection connection) throws SQLException { + try (PreparedStatement preparedStatement = connection.prepareStatement(this.readNodeId)) { + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (!resultSet.next()) { return null; + } else { + final String nodeId = resultSet.getString(1); + if (nodeId != null) { + return new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeId)); + } else { + return null; + } } } } @@ -192,65 +131,71 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS @Override public UUID readNodeId() { - synchronized (connection) { + try (Connection connection = connectionProvider.getConnection()) { try { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(true); try { - return rawReadNodeId(); + return rawReadNodeId(connection); } finally { connection.setAutoCommit(autoCommit); } } catch (SQLException e) { throw new IllegalStateException(e); } + } catch (SQLException e) { + throw new IllegalStateException(e); } } @Override public void writeNodeId(UUID nodeId) { - synchronized (connection) { + try (Connection connection = connectionProvider.getConnection()) { try { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); final boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(true); try { - rawWriteNodeId(nodeId); + rawWriteNodeId(connection, nodeId); } finally { connection.setAutoCommit(autoCommit); } } catch (SQLException e) { throw new IllegalStateException(e); } + } catch (SQLException e) { + throw new IllegalStateException(e); } } - private void rawWriteNodeId(UUID nodeId) throws SQLException { - final PreparedStatement preparedStatement = this.writeNodeId; - preparedStatement.setString(1, nodeId.toString()); - if (preparedStatement.executeUpdate() != 1) { - throw new IllegalStateException("can't write NodeId on the JDBC Node Manager Store!"); + private void rawWriteNodeId(Connection connection, UUID nodeId) throws SQLException { + try (PreparedStatement preparedStatement = connection.prepareStatement(this.writeNodeId)) { + preparedStatement.setString(1, nodeId.toString()); + if (preparedStatement.executeUpdate() != 1) { + throw new IllegalStateException("can't write NodeId on the JDBC Node Manager Store!"); + } } } - private boolean rawInitializeNodeId(UUID nodeId) throws SQLException { - final PreparedStatement preparedStatement = this.initializeNodeId; - preparedStatement.setString(1, nodeId.toString()); - final int rows = preparedStatement.executeUpdate(); - assert rows <= 1; - return rows > 0; + private boolean rawInitializeNodeId(Connection connection, UUID nodeId) throws SQLException { + try (PreparedStatement preparedStatement = connection.prepareStatement(this.initializeNodeId)) { + preparedStatement.setString(1, nodeId.toString()); + final int rows = preparedStatement.executeUpdate(); + assert rows <= 1; + return rows > 0; + } } @Override public UUID setup(Supplier nodeIdFactory) { SQLException lastError = null; - synchronized (connection) { + try (Connection connection = connectionProvider.getConnection()) { final UUID newNodeId = nodeIdFactory.get(); for (int attempts = 0; attempts < MAX_SETUP_ATTEMPTS; attempts++) { lastError = null; try { - final UUID nodeId = initializeOrReadNodeId(newNodeId); + final UUID nodeId = initializeOrReadNodeId(connection, newNodeId); if (nodeId != null) { return nodeId; } @@ -259,6 +204,8 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS lastError = e; } } + } catch (SQLException e) { + lastError = e; } if (lastError != null) { logger.error("Unable to setup a NodeId on the JDBC shared state", lastError); @@ -268,7 +215,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS throw new IllegalStateException("FAILED TO SETUP the JDBC Shared State NodeId"); } - private UUID initializeOrReadNodeId(final UUID newNodeId) throws SQLException { + private UUID initializeOrReadNodeId(Connection connection, final UUID newNodeId) throws SQLException { synchronized (connection) { connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); final boolean autoCommit = connection.getAutoCommit(); @@ -276,10 +223,10 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS try { final UUID nodeId; //optimistic try to initialize nodeId - if (rawInitializeNodeId(newNodeId)) { + if (rawInitializeNodeId(connection, newNodeId)) { nodeId = newNodeId; } else { - nodeId = rawReadNodeId(); + nodeId = rawReadNodeId(connection); } if (nodeId != null) { connection.commit(); @@ -335,76 +282,65 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS @Override public State readState() { - synchronized (connection) { - try { - connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); - final boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); - final State state; - try { - final PreparedStatement preparedStatement = this.readState; - try (ResultSet resultSet = preparedStatement.executeQuery()) { - if (!resultSet.next()) { - state = State.FIRST_TIME_START; - } else { - state = decodeState(resultSet.getString(1)); - } + try (Connection connection = connectionProvider.getConnection()) { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + final boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + final State state; + try (PreparedStatement preparedStatement = connection.prepareStatement(this.readState)) { + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (!resultSet.next()) { + state = State.FIRST_TIME_START; + } else { + state = decodeState(resultSet.getString(1)); } - connection.commit(); - return state; - } catch (SQLException ie) { - connection.rollback(); - throw new IllegalStateException(ie); - } finally { - connection.setAutoCommit(autoCommit); } - } catch (SQLException e) { - throw new IllegalStateException(e); + connection.commit(); + return state; + } catch (SQLException ie) { + connection.rollback(); + throw new IllegalStateException(ie); + } finally { + connection.setAutoCommit(autoCommit); } + } catch (SQLException e) { + throw new IllegalStateException(e); } } @Override public void writeState(State state) { final String encodedState = encodeState(state); - synchronized (connection) { - try { - connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); - final boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); - try { - final PreparedStatement preparedStatement = this.writeState; - preparedStatement.setString(1, encodedState); - if (preparedStatement.executeUpdate() != 1) { - throw new IllegalStateException("can't write state to the JDBC Node Manager Store!"); - } - connection.commit(); - } catch (SQLException ie) { - connection.rollback(); - connection.setAutoCommit(true); - throw new IllegalStateException(ie); - } finally { - connection.setAutoCommit(autoCommit); + try (Connection connection = connectionProvider.getConnection()) { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + final boolean autoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + try (PreparedStatement preparedStatement = connection.prepareStatement(this.writeState)) { + preparedStatement.setString(1, encodedState); + if (preparedStatement.executeUpdate() != 1) { + throw new IllegalStateException("can't write state to the JDBC Node Manager Store!"); } - } catch (SQLException e) { - throw new IllegalStateException(e); + connection.commit(); + } catch (SQLException ie) { + connection.rollback(); + connection.setAutoCommit(true); + throw new IllegalStateException(ie); + } finally { + connection.setAutoCommit(autoCommit); } + } catch (SQLException e) { + throw new IllegalStateException(e); } } @Override public void stop() throws SQLException { //release all the managed resources inside the connection lock - synchronized (connection) { - this.readNodeId.close(); - this.writeNodeId.close(); - this.initializeNodeId.close(); - this.readState.close(); - this.writeState.close(); - this.liveLock.close(); - this.backupLock.close(); - super.stop(); - } + //synchronized (connection) { + this.liveLock.close(); + this.backupLock.close(); + super.stop(); + //} } @Override diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 0e0ea15d50..d03b466f8d 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2361,14 +2361,14 @@ - + The JDBC Driver class name - + The JDBC Connection URL e.g. jdbc:mysql://localhost:3306/ @@ -2391,6 +2391,31 @@ + + + + The DataSource class name + + + + + + + A list of options for the DataSource + + + + + + + + A key-value pair option for the DataSource + + + + + + @@ -2458,6 +2483,23 @@ + + + + + Configuration option key + + + + + + + Configuration option value + + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java index 2ecddbf3b9..77f488e551 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.core.server.impl.jdbc; -import java.sql.SQLException; import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -67,10 +66,10 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase { return JdbcSharedStateManager .createLiveLock( UUID.randomUUID().toString(), - jdbcSharedStateManager.getConnection(), + jdbcSharedStateManager.getJdbcConnectionProvider(), sqlProvider, acquireMillis); - } catch (SQLException e) { + } catch (Exception e) { throw new IllegalStateException(e); } } @@ -85,20 +84,18 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase { if (withExistingTable) { TestJDBCDriver testDriver = TestJDBCDriver - .usingConnectionUrl( - dbConf.getJdbcConnectionUrl(), - dbConf.getJdbcDriverClassName(), + .usingDbConf( + dbConf, sqlProvider); testDriver.start(); testDriver.stop(); } jdbcSharedStateManager = JdbcSharedStateManager - .usingConnectionUrl( + .usingConnectionProvider( UUID.randomUUID().toString(), dbConf.getJdbcLockExpirationMillis(), - dbConf.getJdbcConnectionUrl(), - dbConf.getJdbcDriverClassName(), + dbConf.getConnectionProvider(), sqlProvider); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java index 7340026768..db146e19c1 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java @@ -41,19 +41,17 @@ public class JdbcSharedStateManagerTest extends ActiveMQTestBase { } private TestJDBCDriver createFakeDriver(boolean initializeTable) { - return TestJDBCDriver.usingConnectionUrl( - dbConf.getJdbcConnectionUrl(), - dbConf.getJdbcDriverClassName(), + return TestJDBCDriver.usingDbConf( + dbConf, sqlProvider, initializeTable); } private JdbcSharedStateManager createSharedStateManager() { - return JdbcSharedStateManager.usingConnectionUrl( + return JdbcSharedStateManager.usingConnectionProvider( UUID.randomUUID().toString(), dbConf.getJdbcLockExpirationMillis(), - dbConf.getJdbcConnectionUrl(), - dbConf.getJdbcDriverClassName(), + dbConf.getConnectionProvider(), sqlProvider); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java index 2df62747d2..061a5d5ae6 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java @@ -16,28 +16,28 @@ */ package org.apache.activemq.artemis.core.server.impl.jdbc; +import java.sql.Connection; import java.sql.SQLException; +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.junit.Assert; public class TestJDBCDriver extends AbstractJDBCDriver { - public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl, - String jdbcDriverClass, - SQLProvider provider) { - return usingConnectionUrl(jdbcConnectionUrl, jdbcDriverClass, provider, false); + public static TestJDBCDriver usingDbConf(DatabaseStorageConfiguration dbConf, + SQLProvider provider) { + return usingDbConf(dbConf, provider, false); } - public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl, - String jdbcDriverClass, - SQLProvider provider, - boolean initialize) { + public static TestJDBCDriver usingDbConf(DatabaseStorageConfiguration dbConf, + SQLProvider provider, + boolean initialize) { + TestJDBCDriver driver = new TestJDBCDriver(initialize); driver.setSqlProvider(provider); - driver.setJdbcConnectionUrl(jdbcConnectionUrl); - driver.setJdbcDriverClass(jdbcDriverClass); + driver.setJdbcConnectionProvider(dbConf.getConnectionProvider()); return driver; } @@ -48,12 +48,11 @@ public class TestJDBCDriver extends AbstractJDBCDriver { } @Override - protected void prepareStatements() throws SQLException { - } + protected void prepareStatements() { } @Override - protected void createSchema() throws SQLException { - try { + protected void createSchema() { + try (Connection connection = getJdbcConnectionProvider().getConnection()) { connection.createStatement().execute(sqlProvider.createNodeManagerStoreTableSQL()); if (initialize) { connection.createStatement().execute(sqlProvider.createNodeIdSQL()); diff --git a/pom.xml b/pom.xml index afeac1921b..f7eeec39a9 100644 --- a/pom.xml +++ b/pom.xml @@ -747,6 +747,13 @@ 2.7.2
+ + + org.apache.commons + commons-dbcp2 + 2.1.1 + + io.micrometer diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java index 46a102752b..02159537e6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java @@ -123,7 +123,7 @@ public class JDBCJournalTest extends ActiveMQTestBase { SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL); scheduledExecutorService = new ScheduledThreadPoolExecutor(5); executorService = Executors.newSingleThreadExecutor(); - journal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), getJdbcUser(), getJdbcPassword(), dbConf.getJdbcDriverClassName(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() { + journal = new JDBCJournalImpl(dbConf.getConnectionProvider(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() { @Override public void onIOException(Throwable code, String message, SequentialFile file) { @@ -145,10 +145,7 @@ public class JDBCJournalTest extends ActiveMQTestBase { public void testConcurrentEmptyJournal() throws SQLException { Assert.assertTrue(journal.isStarted()); Assert.assertEquals(0, journal.getNumberOfRecords()); - final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), - getJdbcUser(), - getJdbcPassword(), - dbConf.getJdbcDriverClassName(), + final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getConnectionProvider(), sqlProvider, scheduledExecutorService, executorService, (code, message, file) -> { Assert.fail(message);