From b4532d944d04ad91ff09d9ecc429fa8b4fcd475e Mon Sep 17 00:00:00 2001 From: franz1981 Date: Fri, 11 Sep 2020 00:24:12 +0200 Subject: [PATCH] ARTEMIS-2823 Use datasource with JDBC store db connections fixes It add additional required fixes: - Fixed uncommitted deleted tx records - Fixed JDBC authorization on test - Using property-based version for commons-dbcp2 - stopping thread pool after activation to allow JDBC lease locks to release the lock - centralize JDBC network timeout configuration and save repeating it - adding dbcp2 as the default pooled DataSource to be used --- .../src/main/assembly/dep.xml | 2 + .../src/main/resources/features.xml | 2 + artemis-jdbc-store/pom.xml | 1 - .../store/drivers/JDBCConnectionProvider.java | 30 ++-- .../jdbc/store/journal/JDBCJournalImpl.java | 142 +++++++++--------- .../storage/DatabaseStorageConfiguration.java | 19 ++- .../impl/FileConfigurationParser.java | 1 - .../journal/JDBCJournalStorageManager.java | 4 - .../core/server/impl/ActiveMQServerImpl.java | 40 ++++- pom.xml | 12 +- .../jdbc/store/journal/JDBCJournalTest.java | 2 + 11 files changed, 153 insertions(+), 102 deletions(-) diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml index 92196c4355..b9b24962d0 100644 --- a/artemis-distribution/src/main/assembly/dep.xml +++ b/artemis-distribution/src/main/assembly/dep.xml @@ -83,6 +83,8 @@ org.jboss.logging:jboss-logging org.jboss.slf4j:slf4j-jboss-logmanager org.jctools:jctools-core + org.apache.commons:commons-dbcp2 + org.apache.commons:commons-pool2 io.netty:netty-all org.apache.qpid:proton-j org.apache.activemq:activemq-client diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml index 96aaa1a19e..a67cfb5966 100644 --- a/artemis-features/src/main/resources/features.xml +++ b/artemis-features/src/main/resources/features.xml @@ -72,6 +72,8 @@ mvn:org.apache.commons/commons-lang3/${commons.lang.version} mvn:org.jctools/jctools-core/${jctools.version} mvn:com.google.guava/guava/${guava.version} + mvn:org.apache.commons/commons-dbcp2/${commons.dbcp2.version} + mvn:org.apache.commons/commons-pool2/${commons.dbcp2.version} diff --git a/artemis-jdbc-store/pom.xml b/artemis-jdbc-store/pom.xml index fe439201ca..52ab7b20bc 100644 --- a/artemis-jdbc-store/pom.xml +++ b/artemis-jdbc-store/pom.xml @@ -85,7 +85,6 @@ org.apache.commons commons-dbcp2 - 2.1.1 diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCConnectionProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCConnectionProvider.java index 6d2c9f2ae0..72e971adc2 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCConnectionProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/JDBCConnectionProvider.java @@ -16,10 +16,6 @@ */ package org.apache.activemq.artemis.jdbc.store.drivers; -import org.apache.activemq.artemis.jdbc.store.logging.LoggingConnection; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; -import org.jboss.logging.Logger; - import javax.sql.DataSource; import java.sql.Connection; import java.sql.DriverManager; @@ -27,17 +23,23 @@ import java.sql.SQLException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.artemis.jdbc.store.logging.LoggingConnection; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.jboss.logging.Logger; + public class JDBCConnectionProvider { private static final Logger logger = Logger.getLogger(JDBCConnectionProvider.class); private DataSource dataSource; private Executor networkTimeoutExecutor; private int networkTimeoutMillis; + private boolean supportNetworkTimeout; public JDBCConnectionProvider(DataSource dataSource) { this.dataSource = dataSource; this.networkTimeoutExecutor = null; this.networkTimeoutMillis = -1; + this.supportNetworkTimeout = true; addDerbyShutdownHook(); } @@ -58,14 +60,18 @@ public class JDBCConnectionProvider { } if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor != null) { - try { - connection.setNetworkTimeout(this.networkTimeoutExecutor, this.networkTimeoutMillis); - } catch (SQLException e) { - logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); - ActiveMQJournalLogger.LOGGER.warn("Unable to set a network timeout on the JDBC connection"); - } catch (Throwable throwable) { - //it included SecurityExceptions and UnsupportedOperationException - logger.warn("Unable to set a network timeout on the JDBC connection", throwable); + if (supportNetworkTimeout) { + try { + connection.setNetworkTimeout(this.networkTimeoutExecutor, this.networkTimeoutMillis); + } catch (SQLException e) { + supportNetworkTimeout = false; + logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); + ActiveMQJournalLogger.LOGGER.warn("Unable to set a network timeout on the JDBC connection: won't retry again in the future"); + } catch (Throwable throwable) { + supportNetworkTimeout = false; + //it included SecurityExceptions and UnsupportedOperationException + logger.warn("Unable to set a network timeout on the JDBC connection: won't retry again in the future", throwable); + } } } return connection; diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index a15377ec99..fe341e4797 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -189,70 +189,67 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { TransactionHolder holder; - try (Connection connection = connectionProvider.getConnection()) { + try (Connection connection = connectionProvider.getConnection(); + PreparedStatement deleteJournalRecords = connection.prepareStatement(this.deleteJournalRecords); + PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords); + PreparedStatement insertJournalRecords = connection.prepareStatement(this.insertJournalRecords)) { - try (PreparedStatement deleteJournalRecords = connection.prepareStatement(this.deleteJournalRecords); - PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords); - PreparedStatement insertJournalRecords = connection.prepareStatement(this.insertJournalRecords)) { + connection.setAutoCommit(false); - connection.setAutoCommit(false); + for (JDBCJournalRecord record : recordRef) { - for (JDBCJournalRecord record : recordRef) { - - if (logger.isTraceEnabled()) { - logger.trace("sync::preparing JDBC statement for " + record); - } - - - switch (record.getRecordType()) { - case JDBCJournalRecord.DELETE_RECORD: - // Standard SQL Delete Record, Non transactional delete - deletedRecords.add(record.getId()); - record.writeDeleteRecord(deleteJournalRecords); - break; - case JDBCJournalRecord.ROLLBACK_RECORD: - // Roll back we remove all records associated with this TX ID. This query is always performed last. - deleteJournalTxRecords.setLong(1, record.getTxId()); - deleteJournalTxRecords.addBatch(); - break; - case JDBCJournalRecord.COMMIT_RECORD: - // We perform all the deletes and add the commit record in the same Database TX - holder = transactions.get(record.getTxId()); - for (RecordInfo info : holder.recordsToDelete) { - deletedRecords.add(record.getId()); - deletedRecords.add(info.id); - deleteJournalRecords.setLong(1, info.id); - deleteJournalRecords.addBatch(); - } - record.writeRecord(insertJournalRecords); - committedTransactions.add(record.getTxId()); - break; - default: - // Default we add a new record to the DB - record.writeRecord(insertJournalRecords); - break; - } - } - - insertJournalRecords.executeBatch(); - deleteJournalRecords.executeBatch(); - deleteJournalTxRecords.executeBatch(); - - connection.commit(); if (logger.isTraceEnabled()) { - logger.trace("JDBC commit worked"); + logger.trace("sync::preparing JDBC statement for " + record); } - if (cleanupTxRecords(deletedRecords, committedTransactions)) { - deleteJournalTxRecords.executeBatch(); - connection.commit(); - logger.trace("JDBC commit worked on cleanupTxRecords"); + + switch (record.getRecordType()) { + case JDBCJournalRecord.DELETE_RECORD: + // Standard SQL Delete Record, Non transactional delete + deletedRecords.add(record.getId()); + record.writeDeleteRecord(deleteJournalRecords); + break; + case JDBCJournalRecord.ROLLBACK_RECORD: + // Roll back we remove all records associated with this TX ID. This query is always performed last. + deleteJournalTxRecords.setLong(1, record.getTxId()); + deleteJournalTxRecords.addBatch(); + break; + case JDBCJournalRecord.COMMIT_RECORD: + // We perform all the deletes and add the commit record in the same Database TX + holder = transactions.get(record.getTxId()); + for (RecordInfo info : holder.recordsToDelete) { + deletedRecords.add(record.getId()); + deletedRecords.add(info.id); + deleteJournalRecords.setLong(1, info.id); + deleteJournalRecords.addBatch(); + } + record.writeRecord(insertJournalRecords); + committedTransactions.add(record.getTxId()); + break; + default: + // Default we add a new record to the DB + record.writeRecord(insertJournalRecords); + break; } - executeCallbacks(recordRef, true); - - return recordRef.size(); - } + + insertJournalRecords.executeBatch(); + deleteJournalRecords.executeBatch(); + deleteJournalTxRecords.executeBatch(); + + connection.commit(); + if (logger.isTraceEnabled()) { + logger.trace("JDBC commit worked"); + } + + if (cleanupTxRecords(deletedRecords, committedTransactions, deleteJournalTxRecords)) { + deleteJournalTxRecords.executeBatch(); + connection.commit(); + logger.trace("JDBC commit worked on cleanupTxRecords"); + } + executeCallbacks(recordRef, true); + + return recordRef.size(); } catch (Exception e) { handleException(recordRef, e); return 0; @@ -276,7 +273,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, we remove the Tx Records (i.e. PREPARE, COMMIT). */ - private synchronized boolean cleanupTxRecords(List deletedRecords, List committedTx) throws SQLException { + private synchronized boolean cleanupTxRecords(List deletedRecords, List committedTx, + PreparedStatement deleteJournalTxRecords) throws SQLException { List iterableCopy; List iterableCopyTx = new ArrayList<>(); iterableCopyTx.addAll(transactions.values()); @@ -285,27 +283,23 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { transactions.get(txId).committed = true; } boolean hasDeletedJournalTxRecords = false; + // TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop + for (TransactionHolder h : iterableCopyTx) { - try (Connection connection = connectionProvider.getConnection(); - PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords)) { - // TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop - for (TransactionHolder h : iterableCopyTx) { + iterableCopy = new ArrayList<>(); + iterableCopy.addAll(h.recordInfos); - iterableCopy = new ArrayList<>(); - iterableCopy.addAll(h.recordInfos); - - for (RecordInfo info : iterableCopy) { - if (deletedRecords.contains(info.id)) { - h.recordInfos.remove(info); - } + for (RecordInfo info : iterableCopy) { + if (deletedRecords.contains(info.id)) { + h.recordInfos.remove(info); } + } - if (h.recordInfos.isEmpty() && h.committed) { - deleteJournalTxRecords.setLong(1, h.transactionID); - deleteJournalTxRecords.addBatch(); - hasDeletedJournalTxRecords = true; - transactions.remove(h.transactionID); - } + if (h.recordInfos.isEmpty() && h.committed) { + deleteJournalTxRecords.setLong(1, h.transactionID); + deleteJournalTxRecords.addBatch(); + hasDeletedJournalTxRecords = true; + transactions.remove(h.transactionID); } } return hasDeletedJournalTxRecords; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java index 3a059f07b1..76d8e4e21b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java @@ -17,6 +17,9 @@ package org.apache.activemq.artemis.core.config.storage; import javax.sql.DataSource; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration; @@ -24,9 +27,6 @@ import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCDataSourceUtils; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; -import java.util.HashMap; -import java.util.Map; - public class DatabaseStorageConfiguration implements StoreConfiguration { private String messageTableName = ActiveMQDefaultConfiguration.getDefaultMessageTableName(); @@ -183,6 +183,19 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { } return connectionProvider; } + + public DatabaseStorageConfiguration setConnectionProviderNetworkTimeout(Executor executor, int ms) { + getConnectionProvider().setNetworkTimeout(executor, ms); + return this; + } + + public DatabaseStorageConfiguration clearConnectionProviderNetworkTimeout() { + if (connectionProvider != null) { + connectionProvider.setNetworkTimeout(null, -1); + } + return this; + } + public void addDataSourceProperty(String key, String value) { if (value.toLowerCase().equals("true") || value.toLowerCase().equals("false")) { dataSourceProperties.put(key, Boolean.parseBoolean(value.toLowerCase())); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index ad01a0e148..23a465232b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1746,7 +1746,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { conf.addDataSourceProperty(propertyNode.getAttributeNode("key").getValue(), propertyNode.getAttributeNode("value").getValue()); } } - //conf.initDataSource(); return conf; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index fc9b3a24b3..1ab90c0af7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -57,10 +57,6 @@ public class JDBCJournalStorageManager extends JournalStorageManager { try { final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration(); final JDBCConnectionProvider connectionProvider = dbConf.getConnectionProvider(); - final int networkTimeout = dbConf.getJdbcNetworkTimeout(); - if (networkTimeout >= 0) { - connectionProvider.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout); - } final JDBCJournalImpl bindingsJournal; final JDBCJournalImpl messageJournal; final JDBCSequentialFileFactory largeMessagesFactory; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 0fc6944252..2ef94f8013 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -490,6 +490,24 @@ public class ActiveMQServerImpl implements ActiveMQServer { return this; } + private void configureJdbcNetworkTimeout() { + if (configuration.isPersistenceEnabled()) { + if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { + DatabaseStorageConfiguration databaseStorageConfiguration = (DatabaseStorageConfiguration) configuration.getStoreConfiguration(); + databaseStorageConfiguration.setConnectionProviderNetworkTimeout(threadPool, databaseStorageConfiguration.getJdbcNetworkTimeout()); + } + } + } + + private void clearJdbcNetworkTimeout() { + if (configuration.isPersistenceEnabled()) { + if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { + DatabaseStorageConfiguration databaseStorageConfiguration = (DatabaseStorageConfiguration) configuration.getStoreConfiguration(); + databaseStorageConfiguration.clearConnectionProviderNetworkTimeout(); + } + } + } + /* * Can be overridden for tests */ @@ -581,6 +599,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { try { checkJournalDirectory(); + // this would create the connection provider while setting the JDBC global network timeout + configureJdbcNetworkTimeout(); + nodeManager = createNodeManager(configuration.getNodeManagerLockLocation(), false); nodeManager.start(); @@ -1220,16 +1241,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { securitySettingPlugin.stop(); } - if (threadPool != null && !threadPoolSupplied) { - shutdownPool(threadPool); - } - if (ioExecutorPool != null) { shutdownPool(ioExecutorPool); } - if (!threadPoolSupplied) - threadPool = null; if (!scheduledPoolSupplied) scheduledPool = null; @@ -1268,6 +1283,19 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } + // JDBC journal can use this thread pool to configure the network timeout on a pooled connection: + // better to stop it after closing activation (and JDBC node manager on it) + final ExecutorService threadPool = this.threadPool; + if (threadPool != null && !threadPoolSupplied) { + shutdownPool(threadPool); + } + if (!threadPoolSupplied) { + this.threadPool = null; + } + + // given that threadPool can be garbage collected, need to clear anything that would make it leaks + clearJdbcNetworkTimeout(); + if (activationThread != null) { try { activationThread.join(30000); diff --git a/pom.xml b/pom.xml index f7eeec39a9..a5ba7124b6 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,7 @@ 5.14.5 10.11.1.1 1.9.4 + 2.7.0 3.2.2 1.8 1.16 @@ -751,7 +752,16 @@ org.apache.commons commons-dbcp2 - 2.1.1 + ${commons.dbcp2.version} + + + + + org.apache.commons + commons-pool2 + ${commons.dbcp2.version} + + runtime diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java index 02159537e6..74031516d2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java @@ -116,6 +116,8 @@ public class JDBCJournalTest extends ActiveMQTestBase { if (useAuthentication) { System.setProperty("derby.connection.requireAuthentication", "true"); System.setProperty("derby.user." + getJdbcUser(), getJdbcPassword()); + dbConf.setJdbcUser(getJdbcUser()); + dbConf.setJdbcPassword(getJdbcPassword()); } sqlProvider = JDBCUtils.getSQLProvider( dbConf.getJdbcDriverClassName(),