ARTEMIS-2823 Use datasource with JDBC store db connections fixes

It add additional required fixes:
- Fixed uncommitted deleted tx records
- Fixed JDBC authorization on test
- Using property-based version for commons-dbcp2
- stopping thread pool after activation to allow JDBC lease locks to release the lock
- centralize JDBC network timeout configuration and save repeating it
- adding dbcp2 as the default pooled DataSource to be used
This commit is contained in:
franz1981 2020-09-11 00:24:12 +02:00
parent 4979262f2d
commit b4532d944d
11 changed files with 153 additions and 102 deletions

View File

@ -83,6 +83,8 @@
<include>org.jboss.logging:jboss-logging</include> <include>org.jboss.logging:jboss-logging</include>
<include>org.jboss.slf4j:slf4j-jboss-logmanager</include> <include>org.jboss.slf4j:slf4j-jboss-logmanager</include>
<include>org.jctools:jctools-core</include> <include>org.jctools:jctools-core</include>
<include>org.apache.commons:commons-dbcp2</include>
<include>org.apache.commons:commons-pool2</include>
<include>io.netty:netty-all</include> <include>io.netty:netty-all</include>
<include>org.apache.qpid:proton-j</include> <include>org.apache.qpid:proton-j</include>
<include>org.apache.activemq:activemq-client</include> <include>org.apache.activemq:activemq-client</include>

View File

@ -72,6 +72,8 @@
<bundle dependency="true">mvn:org.apache.commons/commons-lang3/${commons.lang.version}</bundle> <bundle dependency="true">mvn:org.apache.commons/commons-lang3/${commons.lang.version}</bundle>
<bundle dependency="true">mvn:org.jctools/jctools-core/${jctools.version}</bundle> <bundle dependency="true">mvn:org.jctools/jctools-core/${jctools.version}</bundle>
<bundle dependency="true">mvn:com.google.guava/guava/${guava.version}</bundle> <bundle dependency="true">mvn:com.google.guava/guava/${guava.version}</bundle>
<bundle dependency="true">mvn:org.apache.commons/commons-dbcp2/${commons.dbcp2.version}</bundle>
<bundle dependency="true">mvn:org.apache.commons/commons-pool2/${commons.dbcp2.version}</bundle>
<!-- Micrometer can't be included until it supports OSGi. It is currently an "optional" Maven dependency. --> <!-- Micrometer can't be included until it supports OSGi. It is currently an "optional" Maven dependency. -->
<!--bundle dependency="true">mvn:io.micrometer/micrometer-core/${version.micrometer}</bundle--> <!--bundle dependency="true">mvn:io.micrometer/micrometer-core/${version.micrometer}</bundle-->

View File

@ -85,7 +85,6 @@
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId> <artifactId>commons-dbcp2</artifactId>
<version>2.1.1</version>
</dependency> </dependency>
<!-- Database driver support --> <!-- Database driver support -->

View File

@ -16,10 +16,6 @@
*/ */
package org.apache.activemq.artemis.jdbc.store.drivers; package org.apache.activemq.artemis.jdbc.store.drivers;
import org.apache.activemq.artemis.jdbc.store.logging.LoggingConnection;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
@ -27,17 +23,23 @@ import java.sql.SQLException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.jdbc.store.logging.LoggingConnection;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
public class JDBCConnectionProvider { public class JDBCConnectionProvider {
private static final Logger logger = Logger.getLogger(JDBCConnectionProvider.class); private static final Logger logger = Logger.getLogger(JDBCConnectionProvider.class);
private DataSource dataSource; private DataSource dataSource;
private Executor networkTimeoutExecutor; private Executor networkTimeoutExecutor;
private int networkTimeoutMillis; private int networkTimeoutMillis;
private boolean supportNetworkTimeout;
public JDBCConnectionProvider(DataSource dataSource) { public JDBCConnectionProvider(DataSource dataSource) {
this.dataSource = dataSource; this.dataSource = dataSource;
this.networkTimeoutExecutor = null; this.networkTimeoutExecutor = null;
this.networkTimeoutMillis = -1; this.networkTimeoutMillis = -1;
this.supportNetworkTimeout = true;
addDerbyShutdownHook(); addDerbyShutdownHook();
} }
@ -58,14 +60,18 @@ public class JDBCConnectionProvider {
} }
if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor != null) { if (this.networkTimeoutMillis >= 0 && this.networkTimeoutExecutor != null) {
try { if (supportNetworkTimeout) {
connection.setNetworkTimeout(this.networkTimeoutExecutor, this.networkTimeoutMillis); try {
} catch (SQLException e) { connection.setNetworkTimeout(this.networkTimeoutExecutor, this.networkTimeoutMillis);
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); } catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.warn("Unable to set a network timeout on the JDBC connection"); supportNetworkTimeout = false;
} catch (Throwable throwable) { logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
//it included SecurityExceptions and UnsupportedOperationException ActiveMQJournalLogger.LOGGER.warn("Unable to set a network timeout on the JDBC connection: won't retry again in the future");
logger.warn("Unable to set a network timeout on the JDBC connection", throwable); } catch (Throwable throwable) {
supportNetworkTimeout = false;
//it included SecurityExceptions and UnsupportedOperationException
logger.warn("Unable to set a network timeout on the JDBC connection: won't retry again in the future", throwable);
}
} }
} }
return connection; return connection;

View File

@ -189,70 +189,67 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
TransactionHolder holder; TransactionHolder holder;
try (Connection connection = connectionProvider.getConnection()) { try (Connection connection = connectionProvider.getConnection();
PreparedStatement deleteJournalRecords = connection.prepareStatement(this.deleteJournalRecords);
PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords);
PreparedStatement insertJournalRecords = connection.prepareStatement(this.insertJournalRecords)) {
try (PreparedStatement deleteJournalRecords = connection.prepareStatement(this.deleteJournalRecords); connection.setAutoCommit(false);
PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords);
PreparedStatement insertJournalRecords = connection.prepareStatement(this.insertJournalRecords)) {
connection.setAutoCommit(false); for (JDBCJournalRecord record : recordRef) {
for (JDBCJournalRecord record : recordRef) {
if (logger.isTraceEnabled()) {
logger.trace("sync::preparing JDBC statement for " + record);
}
switch (record.getRecordType()) {
case JDBCJournalRecord.DELETE_RECORD:
// Standard SQL Delete Record, Non transactional delete
deletedRecords.add(record.getId());
record.writeDeleteRecord(deleteJournalRecords);
break;
case JDBCJournalRecord.ROLLBACK_RECORD:
// Roll back we remove all records associated with this TX ID. This query is always performed last.
deleteJournalTxRecords.setLong(1, record.getTxId());
deleteJournalTxRecords.addBatch();
break;
case JDBCJournalRecord.COMMIT_RECORD:
// We perform all the deletes and add the commit record in the same Database TX
holder = transactions.get(record.getTxId());
for (RecordInfo info : holder.recordsToDelete) {
deletedRecords.add(record.getId());
deletedRecords.add(info.id);
deleteJournalRecords.setLong(1, info.id);
deleteJournalRecords.addBatch();
}
record.writeRecord(insertJournalRecords);
committedTransactions.add(record.getTxId());
break;
default:
// Default we add a new record to the DB
record.writeRecord(insertJournalRecords);
break;
}
}
insertJournalRecords.executeBatch();
deleteJournalRecords.executeBatch();
deleteJournalTxRecords.executeBatch();
connection.commit();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("JDBC commit worked"); logger.trace("sync::preparing JDBC statement for " + record);
} }
if (cleanupTxRecords(deletedRecords, committedTransactions)) {
deleteJournalTxRecords.executeBatch(); switch (record.getRecordType()) {
connection.commit(); case JDBCJournalRecord.DELETE_RECORD:
logger.trace("JDBC commit worked on cleanupTxRecords"); // Standard SQL Delete Record, Non transactional delete
deletedRecords.add(record.getId());
record.writeDeleteRecord(deleteJournalRecords);
break;
case JDBCJournalRecord.ROLLBACK_RECORD:
// Roll back we remove all records associated with this TX ID. This query is always performed last.
deleteJournalTxRecords.setLong(1, record.getTxId());
deleteJournalTxRecords.addBatch();
break;
case JDBCJournalRecord.COMMIT_RECORD:
// We perform all the deletes and add the commit record in the same Database TX
holder = transactions.get(record.getTxId());
for (RecordInfo info : holder.recordsToDelete) {
deletedRecords.add(record.getId());
deletedRecords.add(info.id);
deleteJournalRecords.setLong(1, info.id);
deleteJournalRecords.addBatch();
}
record.writeRecord(insertJournalRecords);
committedTransactions.add(record.getTxId());
break;
default:
// Default we add a new record to the DB
record.writeRecord(insertJournalRecords);
break;
} }
executeCallbacks(recordRef, true);
return recordRef.size();
} }
insertJournalRecords.executeBatch();
deleteJournalRecords.executeBatch();
deleteJournalTxRecords.executeBatch();
connection.commit();
if (logger.isTraceEnabled()) {
logger.trace("JDBC commit worked");
}
if (cleanupTxRecords(deletedRecords, committedTransactions, deleteJournalTxRecords)) {
deleteJournalTxRecords.executeBatch();
connection.commit();
logger.trace("JDBC commit worked on cleanupTxRecords");
}
executeCallbacks(recordRef, true);
return recordRef.size();
} catch (Exception e) { } catch (Exception e) {
handleException(recordRef, e); handleException(recordRef, e);
return 0; return 0;
@ -276,7 +273,8 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
we remove the Tx Records (i.e. PREPARE, COMMIT). */ we remove the Tx Records (i.e. PREPARE, COMMIT). */
private synchronized boolean cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException { private synchronized boolean cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx,
PreparedStatement deleteJournalTxRecords) throws SQLException {
List<RecordInfo> iterableCopy; List<RecordInfo> iterableCopy;
List<TransactionHolder> iterableCopyTx = new ArrayList<>(); List<TransactionHolder> iterableCopyTx = new ArrayList<>();
iterableCopyTx.addAll(transactions.values()); iterableCopyTx.addAll(transactions.values());
@ -285,27 +283,23 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
transactions.get(txId).committed = true; transactions.get(txId).committed = true;
} }
boolean hasDeletedJournalTxRecords = false; boolean hasDeletedJournalTxRecords = false;
// TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
for (TransactionHolder h : iterableCopyTx) {
try (Connection connection = connectionProvider.getConnection(); iterableCopy = new ArrayList<>();
PreparedStatement deleteJournalTxRecords = connection.prepareStatement(this.deleteJournalTxRecords)) { iterableCopy.addAll(h.recordInfos);
// TODO (mtaylor) perhaps we could store a reverse mapping of IDs to prevent this O(n) loop
for (TransactionHolder h : iterableCopyTx) {
iterableCopy = new ArrayList<>(); for (RecordInfo info : iterableCopy) {
iterableCopy.addAll(h.recordInfos); if (deletedRecords.contains(info.id)) {
h.recordInfos.remove(info);
for (RecordInfo info : iterableCopy) {
if (deletedRecords.contains(info.id)) {
h.recordInfos.remove(info);
}
} }
}
if (h.recordInfos.isEmpty() && h.committed) { if (h.recordInfos.isEmpty() && h.committed) {
deleteJournalTxRecords.setLong(1, h.transactionID); deleteJournalTxRecords.setLong(1, h.transactionID);
deleteJournalTxRecords.addBatch(); deleteJournalTxRecords.addBatch();
hasDeletedJournalTxRecords = true; hasDeletedJournalTxRecords = true;
transactions.remove(h.transactionID); transactions.remove(h.transactionID);
}
} }
} }
return hasDeletedJournalTxRecords; return hasDeletedJournalTxRecords;

View File

@ -17,6 +17,9 @@
package org.apache.activemq.artemis.core.config.storage; package org.apache.activemq.artemis.core.config.storage;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration;
@ -24,9 +27,6 @@ import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCDataSourceUtils; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCDataSourceUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import java.util.HashMap;
import java.util.Map;
public class DatabaseStorageConfiguration implements StoreConfiguration { public class DatabaseStorageConfiguration implements StoreConfiguration {
private String messageTableName = ActiveMQDefaultConfiguration.getDefaultMessageTableName(); private String messageTableName = ActiveMQDefaultConfiguration.getDefaultMessageTableName();
@ -183,6 +183,19 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
} }
return connectionProvider; return connectionProvider;
} }
public DatabaseStorageConfiguration setConnectionProviderNetworkTimeout(Executor executor, int ms) {
getConnectionProvider().setNetworkTimeout(executor, ms);
return this;
}
public DatabaseStorageConfiguration clearConnectionProviderNetworkTimeout() {
if (connectionProvider != null) {
connectionProvider.setNetworkTimeout(null, -1);
}
return this;
}
public void addDataSourceProperty(String key, String value) { public void addDataSourceProperty(String key, String value) {
if (value.toLowerCase().equals("true") || value.toLowerCase().equals("false")) { if (value.toLowerCase().equals("true") || value.toLowerCase().equals("false")) {
dataSourceProperties.put(key, Boolean.parseBoolean(value.toLowerCase())); dataSourceProperties.put(key, Boolean.parseBoolean(value.toLowerCase()));

View File

@ -1746,7 +1746,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
conf.addDataSourceProperty(propertyNode.getAttributeNode("key").getValue(), propertyNode.getAttributeNode("value").getValue()); conf.addDataSourceProperty(propertyNode.getAttributeNode("key").getValue(), propertyNode.getAttributeNode("value").getValue());
} }
} }
//conf.initDataSource();
return conf; return conf;
} }

View File

@ -57,10 +57,6 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
try { try {
final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration(); final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
final JDBCConnectionProvider connectionProvider = dbConf.getConnectionProvider(); final JDBCConnectionProvider connectionProvider = dbConf.getConnectionProvider();
final int networkTimeout = dbConf.getJdbcNetworkTimeout();
if (networkTimeout >= 0) {
connectionProvider.setNetworkTimeout(executorFactory.getExecutor(), networkTimeout);
}
final JDBCJournalImpl bindingsJournal; final JDBCJournalImpl bindingsJournal;
final JDBCJournalImpl messageJournal; final JDBCJournalImpl messageJournal;
final JDBCSequentialFileFactory largeMessagesFactory; final JDBCSequentialFileFactory largeMessagesFactory;

View File

@ -490,6 +490,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return this; return this;
} }
private void configureJdbcNetworkTimeout() {
if (configuration.isPersistenceEnabled()) {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
DatabaseStorageConfiguration databaseStorageConfiguration = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
databaseStorageConfiguration.setConnectionProviderNetworkTimeout(threadPool, databaseStorageConfiguration.getJdbcNetworkTimeout());
}
}
}
private void clearJdbcNetworkTimeout() {
if (configuration.isPersistenceEnabled()) {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
DatabaseStorageConfiguration databaseStorageConfiguration = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
databaseStorageConfiguration.clearConnectionProviderNetworkTimeout();
}
}
}
/* /*
* Can be overridden for tests * Can be overridden for tests
*/ */
@ -581,6 +599,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
try { try {
checkJournalDirectory(); checkJournalDirectory();
// this would create the connection provider while setting the JDBC global network timeout
configureJdbcNetworkTimeout();
nodeManager = createNodeManager(configuration.getNodeManagerLockLocation(), false); nodeManager = createNodeManager(configuration.getNodeManagerLockLocation(), false);
nodeManager.start(); nodeManager.start();
@ -1220,16 +1241,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
securitySettingPlugin.stop(); securitySettingPlugin.stop();
} }
if (threadPool != null && !threadPoolSupplied) {
shutdownPool(threadPool);
}
if (ioExecutorPool != null) { if (ioExecutorPool != null) {
shutdownPool(ioExecutorPool); shutdownPool(ioExecutorPool);
} }
if (!threadPoolSupplied)
threadPool = null;
if (!scheduledPoolSupplied) if (!scheduledPoolSupplied)
scheduledPool = null; scheduledPool = null;
@ -1268,6 +1283,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
} }
// JDBC journal can use this thread pool to configure the network timeout on a pooled connection:
// better to stop it after closing activation (and JDBC node manager on it)
final ExecutorService threadPool = this.threadPool;
if (threadPool != null && !threadPoolSupplied) {
shutdownPool(threadPool);
}
if (!threadPoolSupplied) {
this.threadPool = null;
}
// given that threadPool can be garbage collected, need to clear anything that would make it leaks
clearJdbcNetworkTimeout();
if (activationThread != null) { if (activationThread != null) {
try { try {
activationThread.join(30000); activationThread.join(30000);

12
pom.xml
View File

@ -87,6 +87,7 @@
<activemq5-version>5.14.5</activemq5-version> <activemq5-version>5.14.5</activemq5-version>
<apache.derby.version>10.11.1.1</apache.derby.version> <apache.derby.version>10.11.1.1</apache.derby.version>
<commons.beanutils.version>1.9.4</commons.beanutils.version> <commons.beanutils.version>1.9.4</commons.beanutils.version>
<commons.dbcp2.version>2.7.0</commons.dbcp2.version>
<commons.collections.version>3.2.2</commons.collections.version> <commons.collections.version>3.2.2</commons.collections.version>
<commons.text.version>1.8</commons.text.version> <commons.text.version>1.8</commons.text.version>
<fuse.mqtt.client.version>1.16</fuse.mqtt.client.version> <fuse.mqtt.client.version>1.16</fuse.mqtt.client.version>
@ -751,7 +752,16 @@
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId> <artifactId>commons-dbcp2</artifactId>
<version>2.1.1</version> <version>${commons.dbcp2.version}</version>
<!-- license Apache 2 -->
</dependency>
<!-- used by commons-dbcp2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>${commons.dbcp2.version}</version>
<!-- license Apache 2 -->
<scope>runtime</scope>
</dependency> </dependency>
<!-- Needed for Micrometer --> <!-- Needed for Micrometer -->

View File

@ -116,6 +116,8 @@ public class JDBCJournalTest extends ActiveMQTestBase {
if (useAuthentication) { if (useAuthentication) {
System.setProperty("derby.connection.requireAuthentication", "true"); System.setProperty("derby.connection.requireAuthentication", "true");
System.setProperty("derby.user." + getJdbcUser(), getJdbcPassword()); System.setProperty("derby.user." + getJdbcUser(), getJdbcPassword());
dbConf.setJdbcUser(getJdbcUser());
dbConf.setJdbcPassword(getJdbcPassword());
} }
sqlProvider = JDBCUtils.getSQLProvider( sqlProvider = JDBCUtils.getSQLProvider(
dbConf.getJdbcDriverClassName(), dbConf.getJdbcDriverClassName(),