AMQ-6370 - move cleanup lock to connection allocation to avoid contention with between store and connection pool. pool connection grant or block is now guarded by store lock which lasts till connection is closed

This commit is contained in:
gtully 2016-07-21 10:54:26 +01:00
parent 9f7d70ba0d
commit 2a815c2e08
8 changed files with 506 additions and 139 deletions

View File

@ -297,6 +297,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
if (isCreateTablesOnStartup()) {
TransactionContext transactionContext = getTransactionContext();
transactionContext.getExclusiveConnection();
transactionContext.begin();
try {
try {
@ -344,6 +345,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
try {
LOG.debug("Cleaning up old messages.");
c = getTransactionContext();
c.getExclusiveConnection();
getAdapter().doDeleteOldMessages(c);
} catch (IOException e) {
LOG.warn("Old message cleanup failed due to: " + e, e);
@ -549,6 +551,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
@Override
public void deleteAllMessages() throws IOException {
TransactionContext c = getTransactionContext();
c.getExclusiveConnection();
try {
getAdapter().doDropTables(c);
getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());

View File

@ -17,12 +17,13 @@
package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.*;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.sql.DataSource;
@ -47,14 +48,24 @@ public class TransactionContext {
// a cheap dirty level that we can live with
private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
private LinkedList<Runnable> completions = new LinkedList<Runnable>();
private ReentrantReadWriteLock exclusiveConnectionLock = new ReentrantReadWriteLock();
public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
this.persistenceAdapter = persistenceAdapter;
this.dataSource = persistenceAdapter.getDataSource();
}
public Connection getExclusiveConnection() throws IOException {
return lockAndWrapped(exclusiveConnectionLock.writeLock());
}
public Connection getConnection() throws IOException {
return lockAndWrapped(exclusiveConnectionLock.readLock());
}
private Connection lockAndWrapped(Lock toLock) throws IOException {
if (connection == null) {
toLock.lock();
try {
connection = dataSource.getConnection();
if (persistenceAdapter.isChangeAutoCommitAllowed()) {
@ -64,9 +75,15 @@ public class TransactionContext {
connection.setAutoCommit(autoCommit);
}
}
connection = new UnlockOnCloseConnection(connection, toLock);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e);
inTx = false;
try {
toLock.unlock();
} catch (IllegalMonitorStateException oops) {
LOG.error("Thread does not hold the context lock on close of:" + connection, oops);
}
close();
IOException ioe = IOExceptionSupport.create(e);
if (persistenceAdapter.getBrokerService() != null) {
@ -260,4 +277,290 @@ public class TransactionContext {
public void onCompletion(Runnable runnable) {
completions.add(runnable);
}
final private class UnlockOnCloseConnection implements Connection {
private final Connection delegate;
private final Lock lock;
UnlockOnCloseConnection(Connection delegate, Lock toUnlockOnClose) {
this.delegate = delegate;
this.lock = toUnlockOnClose;
}
@Override
public void close() throws SQLException {
try {
delegate.close();
} finally {
lock.unlock();
}
}
// simple delegate for the rest of the impl..
@Override
public Statement createStatement() throws SQLException {
return delegate.createStatement();
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
return delegate.prepareStatement(sql);
}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
return delegate.prepareCall(sql);
}
@Override
public String nativeSQL(String sql) throws SQLException {
return delegate.nativeSQL(sql);
}
@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
delegate.setAutoCommit(autoCommit);
}
@Override
public boolean getAutoCommit() throws SQLException {
return delegate.getAutoCommit();
}
@Override
public void commit() throws SQLException {
delegate.commit();
}
@Override
public void rollback() throws SQLException {
delegate.rollback();
}
@Override
public boolean isClosed() throws SQLException {
return delegate.isClosed();
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
return delegate.getMetaData();
}
@Override
public void setReadOnly(boolean readOnly) throws SQLException {
delegate.setReadOnly(readOnly);
}
@Override
public boolean isReadOnly() throws SQLException {
return delegate.isReadOnly();
}
@Override
public void setCatalog(String catalog) throws SQLException {
delegate.setCatalog(catalog);
}
@Override
public String getCatalog() throws SQLException {
return delegate.getCatalog();
}
@Override
public void setTransactionIsolation(int level) throws SQLException {
delegate.setTransactionIsolation(level);
}
@Override
public int getTransactionIsolation() throws SQLException {
return delegate.getTransactionIsolation();
}
@Override
public SQLWarning getWarnings() throws SQLException {
return delegate.getWarnings();
}
@Override
public void clearWarnings() throws SQLException {
delegate.clearWarnings();
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
return delegate.createStatement(resultSetType, resultSetConcurrency);
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency);
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return delegate.prepareCall(sql, resultSetType, resultSetConcurrency);
}
@Override
public Map<String, Class<?>> getTypeMap() throws SQLException {
return delegate.getTypeMap();
}
@Override
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
delegate.setTypeMap(map);
}
@Override
public void setHoldability(int holdability) throws SQLException {
delegate.setHoldability(holdability);
}
@Override
public int getHoldability() throws SQLException {
return delegate.getHoldability();
}
@Override
public Savepoint setSavepoint() throws SQLException {
return delegate.setSavepoint();
}
@Override
public Savepoint setSavepoint(String name) throws SQLException {
return delegate.setSavepoint(name);
}
@Override
public void rollback(Savepoint savepoint) throws SQLException {
delegate.rollback(savepoint);
}
@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
delegate.releaseSavepoint(savepoint);
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
return delegate.prepareStatement(sql, autoGeneratedKeys);
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
return delegate.prepareStatement(sql, columnIndexes);
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
return delegate.prepareStatement(sql, columnNames);
}
@Override
public Clob createClob() throws SQLException {
return delegate.createClob();
}
@Override
public Blob createBlob() throws SQLException {
return delegate.createBlob();
}
@Override
public NClob createNClob() throws SQLException {
return delegate.createNClob();
}
@Override
public SQLXML createSQLXML() throws SQLException {
return delegate.createSQLXML();
}
@Override
public boolean isValid(int timeout) throws SQLException {
return delegate.isValid(timeout);
}
@Override
public void setClientInfo(String name, String value) throws SQLClientInfoException {
delegate.setClientInfo(name, value);
}
@Override
public void setClientInfo(Properties properties) throws SQLClientInfoException {
delegate.setClientInfo(properties);
}
@Override
public String getClientInfo(String name) throws SQLException {
return delegate.getClientInfo(name);
}
@Override
public Properties getClientInfo() throws SQLException {
return delegate.getClientInfo();
}
@Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
return delegate.createArrayOf(typeName, elements);
}
@Override
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
return delegate.createStruct(typeName, attributes);
}
@Override
public void setSchema(String schema) throws SQLException {
delegate.setSchema(schema);
}
@Override
public String getSchema() throws SQLException {
return delegate.getSchema();
}
@Override
public void abort(Executor executor) throws SQLException {
delegate.abort(executor);
}
@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
delegate.setNetworkTimeout(executor, milliseconds);
}
@Override
public int getNetworkTimeout() throws SQLException {
return delegate.getNetworkTimeout();
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return delegate.unwrap(iface);
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return delegate.isWrapperFor(iface);
}
}
}

View File

@ -69,7 +69,6 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
PreparedStatement s = null;
cleanupExclusiveLock.readLock().lock();
try {
// Add the Blob record.
s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
@ -94,7 +93,6 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
@ -127,7 +125,6 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
@ -149,7 +146,6 @@ public class BlobJDBCAdapter extends DefaultJDBCAdapter {
return os.toByteArray();
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}

View File

@ -70,7 +70,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
//This is deprecated and should be removed in a future release
protected boolean batchStatments = true;
protected boolean prioritizedMessages;
protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
protected int maxRows = MAX_ROWS;
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
@ -83,20 +82,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
@Override
public void doCreateTables(TransactionContext transactionContext) throws SQLException, IOException {
cleanupExclusiveLock.writeLock().lock();
try {
// Check to see if the table already exists. If it does, then don't log warnings during startup.
// Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version of the table
boolean messageTableAlreadyExists = messageTableAlreadyExists(transactionContext);
// Check to see if the table already exists. If it does, then don't log warnings during startup.
// Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version of the table
boolean messageTableAlreadyExists = messageTableAlreadyExists(transactionContext);
for (String createStatement : this.statements.getCreateSchemaStatements()) {
// This will fail usually since the tables will be
// created already.
executeStatement(transactionContext, createStatement, messageTableAlreadyExists);
}
} finally {
cleanupExclusiveLock.writeLock().unlock();
for (String createStatement : this.statements.getCreateSchemaStatements()) {
// This will fail usually since the tables will be
// created already.
executeStatement(transactionContext, createStatement, messageTableAlreadyExists);
}
}
@ -150,7 +143,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
@Override
public void doDropTables(TransactionContext c) throws SQLException, IOException {
Statement s = null;
cleanupExclusiveLock.writeLock().lock();
try {
s = c.getConnection().createStatement();
String[] dropStatments = this.statements.getDropSchemaStatements();
@ -169,7 +161,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
commitIfAutoCommitIsDisabled(c);
} finally {
cleanupExclusiveLock.writeLock().unlock();
try {
s.close();
} catch (Throwable e) {
@ -181,7 +172,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
rs = s.executeQuery();
@ -200,7 +190,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
long seq = Math.max(seq1, seq2);
return seq;
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -210,7 +199,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(
this.statements.getFindMessageByIdStatement());
@ -221,7 +209,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
return getBinaryData(rs, 1);
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -235,7 +222,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement();
cleanupExclusiveLock.readLock().lock();
try {
if (s == null) {
s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
@ -264,7 +250,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
throw new SQLException("Failed add a message");
}
} finally {
cleanupExclusiveLock.readLock().unlock();
if (!this.batchStatements) {
if (s != null) {
s.close();
@ -276,7 +261,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
@Override
public void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException {
PreparedStatement s = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getUpdateMessageStatement());
setBinaryData(s, 1, data);
@ -287,7 +271,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
throw new IOException("Could not update message: " + id + " in " + destination);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
@ -297,7 +280,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
long expirationTime, String messageRef) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement();
cleanupExclusiveLock.readLock().lock();
try {
if (s == null) {
s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
@ -317,7 +299,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
throw new SQLException("Failed add a message");
}
} finally {
cleanupExclusiveLock.readLock().unlock();
if (!this.batchStatements) {
s.close();
}
@ -328,7 +309,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
s.setString(1, messageID.getProducerId().toString());
@ -340,7 +320,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
return new long[]{rs.getLong(1), rs.getLong(2)};
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -350,7 +329,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
s.setString(1, id.getProducerId().toString());
@ -361,7 +339,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
return getBinaryData(rs, 1);
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -371,7 +348,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
s.setLong(1, seq);
@ -381,7 +357,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
return rs.getString(1);
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -393,7 +368,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
@Override
public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException {
PreparedStatement s = c.getRemovedMessageStatement();
cleanupExclusiveLock.readLock().lock();
try {
if (s == null) {
s = c.getConnection().prepareStatement(xid == null ?
@ -417,7 +391,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
throw new SQLException("Failed to remove message seq: " + seq);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
if (!this.batchStatements && s != null) {
s.close();
}
@ -429,7 +402,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
s.setString(1, destination.getQualifiedName());
@ -448,7 +420,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -459,7 +430,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
JDBCMessageIdScanListener listener) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
s.setMaxRows(limit);
@ -476,7 +446,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
listener.messageId(id);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -486,7 +455,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
String subscriptionName, long seq, long priority) throws SQLException, IOException {
PreparedStatement s = c.getUpdateLastAckStatement();
cleanupExclusiveLock.readLock().lock();
try {
if (s == null) {
s = c.getConnection().prepareStatement(xid == null ?
@ -513,7 +481,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
if (!this.batchStatements) {
close(s);
}
@ -525,7 +492,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
String subscriptionName, long seq, long priority) throws SQLException, IOException {
PreparedStatement s = c.getUpdateLastAckStatement();
cleanupExclusiveLock.readLock().lock();
try {
if (s == null) {
s = c.getConnection().prepareStatement(xid == null ?
@ -553,7 +519,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
+ seq + ", for sub: " + subscriptionName);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
if (!this.batchStatements) {
close(s);
}
@ -573,7 +538,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
@Override
public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException {
PreparedStatement s = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement());
s.setString(1, destination.getQualifiedName());
@ -584,7 +548,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
@ -596,7 +559,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
// destination.getQualifiedName(),clientId,subscriptionName);
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
s.setString(1, destination.getQualifiedName());
@ -617,7 +579,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -629,7 +590,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
s.setMaxRows(Math.min(maxReturned * 2, maxRows));
@ -653,7 +613,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -665,7 +624,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
s.setMaxRows(Math.min(maxReturned * 2, maxRows));
@ -690,7 +648,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -702,7 +659,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null;
ResultSet rs = null;
int result = 0;
cleanupExclusiveLock.readLock().lock();
try {
if (isPrioritizedMessages) {
s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
@ -717,7 +673,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
result = rs.getInt(1);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -737,7 +692,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
// dumpTables(c, destination.getQualifiedName(), clientId,
// subscriptionName);
PreparedStatement s = null;
cleanupExclusiveLock.readLock().lock();
try {
long lastMessageId = -1;
if (!retroactive) {
@ -774,7 +728,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
@ -784,7 +737,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
String clientId, String subscriptionName) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
s.setString(1, destination.getQualifiedName());
@ -803,7 +755,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
ActiveMQDestination.QUEUE_TYPE));
return subscription;
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -814,7 +765,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
s.setString(1, destination.getQualifiedName());
@ -832,7 +782,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
return rc.toArray(new SubscriptionInfo[rc.size()]);
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -842,7 +791,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
IOException {
PreparedStatement s = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
s.setString(1, destinationName.getQualifiedName());
@ -852,7 +800,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setString(1, destinationName.getQualifiedName());
s.executeUpdate();
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
@ -861,7 +808,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName) throws SQLException, IOException {
PreparedStatement s = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
s.setString(1, destination.getQualifiedName());
@ -869,7 +815,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
s.setString(3, subscriptionName);
s.executeUpdate();
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
@ -878,17 +823,15 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
@Override
public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
PreparedStatement s = null;
cleanupExclusiveLock.writeLock().lock();
try {
LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
s = c.getExclusiveConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
int priority = priorityIterator++%10;
s.setInt(1, priority);
s.setInt(2, priority);
int i = s.executeUpdate();
LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
} finally {
cleanupExclusiveLock.writeLock().unlock();
close(s);
}
}
@ -899,7 +842,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null;
ResultSet rs = null;
long result = -1;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
s.setString(1, destination.getQualifiedName());
@ -913,7 +855,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -939,7 +880,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
rs = s.executeQuery();
@ -947,7 +887,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -1024,7 +963,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
@Override
public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
PreparedStatement s = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
s.setString(1, destination.getQualifiedName());
@ -1039,7 +977,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
throw new IOException("Could not create ack record for destination: " + destination);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
@ -1048,7 +985,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement());
rs = s.executeQuery();
@ -1080,7 +1016,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
} finally {
close(rs);
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
@ -1088,7 +1023,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
@Override
public void doCommitAddOp(TransactionContext c, long preparedSequence, long sequence) throws SQLException, IOException {
PreparedStatement s = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement());
s.setLong(1, sequence);
@ -1097,7 +1031,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
@ -1109,7 +1042,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null;
ResultSet rs = null;
int result = 0;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
s.setString(1, destination.getQualifiedName());
@ -1118,7 +1050,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
result = rs.getInt(1);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -1130,7 +1061,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
long maxSeq, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
if (isPrioritizedMessages) {
s = c.getConnection().prepareStatement(limitQuery(this.statements.getFindNextMessagesByPriorityStatement()));
@ -1172,7 +1102,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} catch (Exception e) {
LOG.warn("Exception recovering next messages", e);
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}
@ -1183,7 +1112,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
s.setString(1, id.toString());
@ -1194,7 +1122,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
return seq;
} finally {
cleanupExclusiveLock.readLock().unlock();
close(rs);
close(s);
}

View File

@ -30,8 +30,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.store.jdbc.TransactionContext;
@ -61,12 +59,6 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
private List<LoggingEvent> loggingEvents = new ArrayList<>();
@Mock
private ReadWriteLock readWriteLock;
@Mock
private Lock lock;
@Mock
private TransactionContext transactionContext;
@ -96,7 +88,6 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
defaultJDBCAdapter = new DefaultJDBCAdapter();
defaultJDBCAdapter.cleanupExclusiveLock = readWriteLock;
defaultJDBCAdapter.statements = statements;
when(statements.getCreateSchemaStatements()).thenReturn(CREATE_STATEMENTS);
@ -104,7 +95,6 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
when(connection.getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),new String[] { "TABLE" })).thenReturn(resultSet);
when(connection.createStatement()).thenReturn(statement1, statement2);
when(connection.getAutoCommit()).thenReturn(true);
when(readWriteLock.writeLock()).thenReturn(lock);
}
@After
@ -119,8 +109,7 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
defaultJDBCAdapter.doCreateTables(transactionContext);
InOrder inOrder = inOrder(lock, resultSet, connection, statement1, statement2);
inOrder.verify(lock).lock();
InOrder inOrder = inOrder(resultSet, connection, statement1, statement2);
inOrder.verify(resultSet).next();
inOrder.verify(resultSet).close();
inOrder.verify(connection).createStatement();
@ -129,7 +118,6 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
inOrder.verify(connection).createStatement();
inOrder.verify(statement2).execute(CREATE_STATEMENT2);
inOrder.verify(statement2).close();
inOrder.verify(lock).unlock();
assertEquals(4, loggingEvents.size());
assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1);
@ -145,8 +133,7 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
defaultJDBCAdapter.doCreateTables(transactionContext);
InOrder inOrder = inOrder(lock, resultSet, connection, statement1, statement2);
inOrder.verify(lock).lock();
InOrder inOrder = inOrder(resultSet, connection, statement1, statement2);
inOrder.verify(resultSet).next();
inOrder.verify(resultSet).close();
inOrder.verify(connection).createStatement();
@ -155,7 +142,6 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
inOrder.verify(connection).createStatement();
inOrder.verify(statement2).execute(CREATE_STATEMENT2);
inOrder.verify(statement2).close();
inOrder.verify(lock).unlock();
assertEquals(3, loggingEvents.size());
assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1);
@ -170,8 +156,7 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
defaultJDBCAdapter.doCreateTables(transactionContext);
InOrder inOrder = inOrder(lock, resultSet, connection, statement1, statement2);
inOrder.verify(lock).lock();
InOrder inOrder = inOrder(resultSet, connection, statement1, statement2);
inOrder.verify(resultSet).next();
inOrder.verify(resultSet).close();
inOrder.verify(connection).createStatement();
@ -182,7 +167,6 @@ public class DefaultJDBCAdapterDoCreateTablesTest {
inOrder.verify(statement2).execute(CREATE_STATEMENT2);
inOrder.verify(connection).commit();
inOrder.verify(statement2).close();
inOrder.verify(lock).unlock();
assertEquals(2, loggingEvents.size());
assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1);

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import org.apache.activemq.ActiveMQXAConnection;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.apache.derby.jdbc.EmbeddedDriver;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.jms.MessageProducer;
import javax.jms.XASession;
import javax.sql.DataSource;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static junit.framework.TestCase.assertTrue;
import static org.apache.activemq.util.TestUtils.createXid;
public class JDBCCleanupLimitedPoolTest {
BrokerService broker;
JDBCPersistenceAdapter jdbcPersistenceAdapter;
BasicDataSource pool;
EmbeddedDataSource derby;
@Before
public void setUp() throws Exception {
System.setProperty("derby.system.home", new File(IOHelper.getDefaultDataDirectory()).getCanonicalPath());
derby = new EmbeddedDataSource();
derby.setDatabaseName("derbyDb");
derby.setCreateDatabase("create");
broker = createBroker();
broker.start();
broker.waitUntilStarted();
}
@After
public void tearDown() throws Exception {
broker.stop();
pool.close();
DataSourceServiceSupport.shutdownDefaultDataSource(derby);
}
protected BrokerService createBroker() throws Exception {
broker = new BrokerService();
broker.setUseJmx(false);
jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
jdbcPersistenceAdapter.deleteAllMessages();
jdbcPersistenceAdapter.setCleanupPeriod(0);
jdbcPersistenceAdapter.setUseLock(false);
pool = new BasicDataSource();
pool.setDriverClassName(EmbeddedDriver.class.getCanonicalName());
pool.setUrl("jdbc:derby:derbyDb;create=false");
pool.setUsername("uid");
pool.setPassword("pwd");
pool.setMaxTotal(2);
jdbcPersistenceAdapter.setDataSource(pool);
broker.setPersistenceAdapter(jdbcPersistenceAdapter);
broker.addConnector("tcp://0.0.0.0:0");
return broker;
}
@Test
public void testNoDeadlockOnXaPoolExhaustion() throws Exception {
final CountDownLatch done = new CountDownLatch(1);
final CountDownLatch doneCommit = new CountDownLatch(2000);
final ActiveMQXAConnectionFactory factory = new ActiveMQXAConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
ExecutorService executorService = Executors.newCachedThreadPool();
// some contention over pool of 2
for (int i = 0; i < 3; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
ActiveMQXAConnection conn = (ActiveMQXAConnection) factory.createXAConnection();
conn.start();
XASession sess = conn.createXASession();
while (done.getCount() > 0 && doneCommit.getCount() > 0) {
Xid xid = createXid();
sess.getXAResource().start(xid, XAResource.TMNOFLAGS);
MessageProducer producer = sess.createProducer(sess.createQueue("test"));
producer.send(sess.createTextMessage("test"));
sess.getXAResource().end(xid, XAResource.TMSUCCESS);
sess.getXAResource().prepare(xid);
sess.getXAResource().commit(xid, false);
doneCommit.countDown();
}
conn.close();
} catch (Exception ignored) {
ignored.printStackTrace();
}
}
});
}
executorService.execute(new Runnable() {
@Override
public void run() {
try {
while (!done.await(10, TimeUnit.MILLISECONDS) && doneCommit.getCount() > 0) {
jdbcPersistenceAdapter.cleanup();
}
} catch (Exception ignored) {
}
}
});
executorService.shutdown();
boolean allComplete = executorService.awaitTermination(20, TimeUnit.SECONDS);
done.countDown();
assertTrue("all complete", allComplete);
executorService.shutdownNow();
assertTrue("xa tx done", doneCommit.await(10, TimeUnit.SECONDS));
}
}

View File

@ -34,12 +34,12 @@ import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.activemq.util.TestUtils.createXid;
// https://issues.apache.org/activemq/browse/AMQ-2880
public class JDBCXACommitExceptionTest extends JDBCCommitExceptionTest {
private static final Logger LOG = LoggerFactory.getLogger(JDBCXACommitExceptionTest.class);
private long txGenerator = System.currentTimeMillis();
protected ActiveMQXAConnectionFactory factory;
boolean onePhase = true;
@ -128,32 +128,5 @@ public class JDBCXACommitExceptionTest extends JDBCCommitExceptionTest {
return messagesReceived;
}
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
os.writeLong(++txGenerator);
os.close();
final byte[] bs = baos.toByteArray();
return new Xid() {
@Override
public int getFormatId() {
return 86;
}
@Override
public byte[] getGlobalTransactionId() {
return bs;
}
@Override
public byte[] getBranchQualifier() {
return bs;
}
};
}
}

View File

@ -16,13 +16,15 @@
*/
package org.apache.activemq.util;
import java.io.IOException;
import java.io.*;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ServerSocketFactory;
import javax.transaction.xa.Xid;
public class TestUtils {
@ -65,4 +67,32 @@ public class TestUtils {
return ports;
}
private static AtomicLong txGenerator = new AtomicLong(System.currentTimeMillis());
public static Xid createXid() throws IOException {
java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
os.writeLong(txGenerator.incrementAndGet());
os.close();
final byte[] bs = baos.toByteArray();
return new Xid() {
@Override
public int getFormatId() {
return 86;
}
@Override
public byte[] getGlobalTransactionId() {
return bs;
}
@Override
public byte[] getBranchQualifier() {
return bs;
}
};
}
}