From 06cbebc8da7336be7feae5d7b8df25860092e5f7 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 4 Nov 2010 13:13:37 +0000 Subject: [PATCH] further resolution to https://issues.apache.org/activemq/browse/AMQ-2980, concurrent producers was still problematic as the store needed to be traversed multiple times, requiring ack locations per priority. Resolution to https://issues.apache.org/activemq/browse/AMQ-2551 fell out of decreasing the cleanup interval in some of the tests, cleanup needs an exclusive lock so it won't cause contention git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1030928 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/jdbc/JDBCPersistenceAdapter.java | 2 +- .../activemq/store/jdbc/Statements.java | 102 +++++++++++-- .../jdbc/adapter/DefaultJDBCAdapter.java | 137 +++++++++++++++++- .../activemq/store/MessagePriorityTest.java | 28 ++-- .../store/jdbc/JDBCMessagePriorityTest.java | 58 +++++++- ...QStoreDurableSubscriptionSelectorTest.java | 30 ---- .../DurableSubscriptionOfflineTest.java | 12 ++ .../DurableSubscriptionReactivationTest.java | 6 + .../DurableSubscriptionSelectorTest.java | 15 +- ...KahaDBDurableSubscriptionSelectorTest.java | 35 ----- .../src/test/resources/log4j.properties | 2 +- 11 files changed, 320 insertions(+), 107 deletions(-) delete mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/AMQStoreDurableSubscriptionSelectorTest.java delete mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/KahaDBDurableSubscriptionSelectorTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index 83225bdf4e..361bacca9f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -670,7 +670,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist /** * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED * This allowable dirty isolation level may not be achievable in clustered DB environments - * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABE_READ + * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ * see isolation level constants in {@link java.sql.Connection} * @param transactionIsolation the isolation level to use */ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java index 795e699c59..7c125f3961 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -70,6 +70,10 @@ public class Statements { private boolean useLockCreateWhereClause; private String findAllMessageIdsStatement; private String lastProducerSequenceIdStatement; + private String selectDurablePriorityAckStatement; + + private String insertDurablePriorityAckStatement; + private String updateDurableLastAckStatement; public String[] getCreateSchemaStatements() { if (createSchemaStatements == null) { @@ -93,7 +97,9 @@ public class Statements { "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)", "ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType, "CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)", - "ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType, + "ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType + " NOT NULL DEFAULT 5", + "ALTER TABLE " + getFullAckTableName() + " DROP PRIMARY KEY", + "ALTER TABLE " + getFullAckTableName() + " ADD PRIMARY KEY (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)", }; } return createSchemaStatements; @@ -207,7 +213,7 @@ public class Statements { public String getFindDurableSubStatement() { if (findDurableSubStatement == null) { findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName() - + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; + + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND SUB_DEST IS NOT NULL"; } return findDurableSubStatement; } @@ -215,15 +221,15 @@ public class Statements { public String getFindAllDurableSubsStatement() { if (findAllDurableSubsStatement == null) { findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM " - + getFullAckTableName() + " WHERE CONTAINER=?"; + + getFullAckTableName() + " WHERE CONTAINER=? AND SUB_DEST IS NOT NULL"; } return findAllDurableSubsStatement; } public String getUpdateLastAckOfDurableSubStatement() { if (updateLastAckOfDurableSubStatement == null) { - updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?, PRIORITY=?" - + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; + updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?" + + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?"; } return updateLastAckOfDurableSubStatement; } @@ -264,7 +270,19 @@ public class Statements { + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " AND M.CONTAINER=D.CONTAINER AND " - + "((M.ID > ? AND M.PRIORITY = ?) OR M.PRIORITY < ?)" + + "((M.ID > ? AND M.PRIORITY = ?) " + + " OR (M.PRIORITY <> ? " + + " AND ( M.ID >" + + " ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName() + + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID" + + " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY )" + + " OR " + + " ( (SELECT COUNT(LAST_ACKED_ID) FROM " + getFullAckTableName() + + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID" + + " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY) = 0)" + + " )" + + " )" + + ")" + " ORDER BY M.PRIORITY DESC, M.ID"; } return findDurableSubMessagesByPriorityStatement; @@ -306,8 +324,17 @@ public class Statements { + " M, " + getFullAckTableName() + " D " - + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" - + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID AND M.PRIORITY <= D.PRIORITY"; + + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND D.SUB_DEST IS NOT NULL" + + " AND M.CONTAINER=D.CONTAINER " + + " AND ( M.ID >" + + " ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName() + + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID" + + " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY )" + + " OR " + + " ( (SELECT COUNT(LAST_ACKED_ID) FROM " + getFullAckTableName() + + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID" + + " AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY) = 0)" + + " )"; } return durableSubscriberMessageCountStatement; } @@ -338,15 +365,20 @@ public class Statements { deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName() + " WHERE ( EXPIRATION<>0 AND EXPIRATION= " - + " ( SELECT min(" + getFullAckTableName() + ".PRIORITY) " - + " FROM " + getFullAckTableName() + " WHERE " + + getFullMessageTableName() + ".CONTAINER" + + " AND " + getFullAckTableName() + ".SUB_DEST IS NULL" + + " AND " + getFullAckTableName() + ".PRIORITY=" + getFullMessageTableName() + ".PRIORITY )" + + " AND ID <" + + " ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)" + + " FROM " + getFullAckTableName() + " WHERE " + getFullAckTableName() + ".CONTAINER=" - + getFullMessageTableName() + ".CONTAINER ))"; + + getFullMessageTableName() + ".CONTAINER" + + " AND " + getFullAckTableName() + ".SUB_DEST IS NOT NULL )" + + " )"; + } return deleteOldMessagesStatement; } @@ -418,6 +450,35 @@ public class Statements { return lastAckedDurableSubscriberMessageStatement; } + public String getSelectDurablePriorityAckStatement() { + if (selectDurablePriorityAckStatement == null) { + selectDurablePriorityAckStatement = "SELECT LAST_ACKED_ID FROM " + getFullAckTableName() + + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?" + + " AND PRIORITY = ?"; + } + return selectDurablePriorityAckStatement; + } + + public String getInsertDurablePriorityAckStatement() { + if (insertDurablePriorityAckStatement == null) { + insertDurablePriorityAckStatement = "INSERT INTO " + + getFullAckTableName() + + "(CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)" + + " VALUES (?, ?, ?, ?)"; + } + return insertDurablePriorityAckStatement; + } + + + public String getUpdateDurableLastAckStatement() { + if (updateDurableLastAckStatement == null) { + updateDurableLastAckStatement = "UPDATE " + getFullAckTableName() + + " SET LAST_ACKED_ID = ? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?" + + " AND PRIORITY = " + (Byte.MAX_VALUE - 1); + } + return updateDurableLastAckStatement; + } + public String getFullMessageTableName() { return getTablePrefix() + getMessageTableName(); } @@ -709,4 +770,15 @@ public class Statements { this.lastProducerSequenceIdStatement = lastProducerSequenceIdStatement; } + public void setSelectDurablePriorityAckStatement(String selectDurablePriorityAckStatement) { + this.selectDurablePriorityAckStatement = selectDurablePriorityAckStatement; + } + + public void setInsertDurablePriorityAckStatement(String insertDurablePriorityAckStatement) { + this.insertDurablePriorityAckStatement = insertDurablePriorityAckStatement; + } + + public void setUpdateDurableLastAckStatement(String updateDurableLastAckStatement) { + this.updateDurableLastAckStatement = updateDurableLastAckStatement; + } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index d742c26a16..13376e744a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedList; import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.jms.Message; @@ -62,6 +64,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { protected Statements statements; protected boolean batchStatments = true; protected boolean prioritizedMessages; + protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock(); protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { s.setBytes(index, data); @@ -73,6 +76,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public void doCreateTables(TransactionContext c) throws SQLException, IOException { Statement s = null; + cleanupExclusiveLock.writeLock().lock(); try { // Check to see if the table already exists. If it does, then don't // log warnings during startup. @@ -112,6 +116,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } c.getConnection().commit(); } finally { + cleanupExclusiveLock.writeLock().unlock(); try { s.close(); } catch (Throwable e) { @@ -121,6 +126,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public void doDropTables(TransactionContext c) throws SQLException, IOException { Statement s = null; + cleanupExclusiveLock.writeLock().lock(); try { s = c.getConnection().createStatement(); String[] dropStatments = this.statements.getDropSchemaStatements(); @@ -139,6 +145,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } c.getConnection().commit(); } finally { + cleanupExclusiveLock.writeLock().unlock(); try { s.close(); } catch (Throwable e) { @@ -149,6 +156,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); rs = s.executeQuery(); @@ -171,6 +179,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { long seq = Math.max(seq1, seq2); return seq; } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -179,6 +188,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement( this.statements.getFindMessageByIdStatement()); @@ -189,6 +199,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } return getBinaryData(rs, 1); } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -198,6 +209,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration, byte priority) throws SQLException, IOException { PreparedStatement s = c.getAddMessageStatement(); + cleanupExclusiveLock.readLock().lock(); try { if (s == null) { s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); @@ -218,6 +230,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { throw new SQLException("Failed add a message"); } } finally { + cleanupExclusiveLock.readLock().unlock(); if (!this.batchStatments) { if (s != null) { s.close(); @@ -229,6 +242,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException { PreparedStatement s = c.getAddMessageStatement(); + cleanupExclusiveLock.readLock().lock(); try { if (s == null) { s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); @@ -248,6 +262,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { throw new SQLException("Failed add a message"); } } finally { + cleanupExclusiveLock.readLock().unlock(); if (!this.batchStatments) { s.close(); } @@ -257,6 +272,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement()); s.setString(1, messageID.getProducerId().toString()); @@ -268,6 +284,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } return new long[]{rs.getLong(1), rs.getLong(2)}; } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -276,6 +293,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); s.setString(1, id.getProducerId().toString()); @@ -286,6 +304,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } return getBinaryData(rs, 1); } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -294,6 +313,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); s.setLong(1, seq); @@ -303,6 +323,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } return rs.getString(1); } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -310,6 +331,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException { PreparedStatement s = c.getRemovedMessageStatement(); + cleanupExclusiveLock.readLock().lock(); try { if (s == null) { s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement()); @@ -324,6 +346,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { throw new SQLException("Failed to remove message"); } } finally { + cleanupExclusiveLock.readLock().unlock(); if (!this.batchStatments && s != null) { s.close(); } @@ -334,6 +357,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { throws Exception { PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement()); s.setString(1, destination.getQualifiedName()); @@ -352,6 +376,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -361,6 +386,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { JDBCMessageIdScanListener listener) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement()); s.setMaxRows(limit); @@ -377,6 +403,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { listener.messageId(id); } } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -384,7 +411,10 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long prio) throws SQLException, IOException { + doCreatePriorityAckRow(c, destination, clientId, subscriptionName, prio); + doUpdateLatestAckRow(c, destination, clientId, subscriptionName, seq, prio); PreparedStatement s = c.getUpdateLastAckStatement(); + cleanupExclusiveLock.readLock().lock(); try { if (s == null) { s = c.getConnection().prepareStatement(this.statements.getUpdateLastAckOfDurableSubStatement()); @@ -393,28 +423,94 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } s.setLong(1, seq); - s.setLong(2, prio); - s.setString(3, destination.getQualifiedName()); - s.setString(4, clientId); - s.setString(5, subscriptionName); + s.setString(2, destination.getQualifiedName()); + s.setString(3, clientId); + s.setString(4, subscriptionName); + s.setLong(5, prio); if (this.batchStatments) { s.addBatch(); } else if (s.executeUpdate() != 1) { - throw new SQLException("Failed add a message"); + throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName); } } finally { + cleanupExclusiveLock.readLock().unlock(); if (!this.batchStatments) { s.close(); } } } + private void doCreatePriorityAckRow(TransactionContext c, ActiveMQDestination destination, String clientId, + String subscriptionName,long priority) throws SQLException, IOException{ + PreparedStatement s = null; + ResultSet rs = null; + boolean exists = false; + cleanupExclusiveLock.readLock().lock(); + try { + s = c.getConnection().prepareStatement(this.statements.getSelectDurablePriorityAckStatement()); + s.setString(1, destination.getQualifiedName()); + s.setString(2, clientId); + s.setString(3, subscriptionName); + s.setLong(4, priority); + + rs = s.executeQuery(); + exists = rs.next(); + } finally { + cleanupExclusiveLock.readLock().unlock(); + close(rs); + close(s); + } + + if (!exists) { + cleanupExclusiveLock.readLock().lock(); + try { + s = c.getConnection().prepareStatement(this.statements.getInsertDurablePriorityAckStatement()); + s.setString(1, destination.getQualifiedName()); + s.setString(2, clientId); + s.setString(3, subscriptionName); + s.setLong(4, priority); + if (s.executeUpdate() != 1) { + throw new IOException("Could not insert initial ack entry for priority: " + + priority + ", for sub: " + subscriptionName); + } + + } finally { + cleanupExclusiveLock.readLock().unlock(); + close(s); + } + } + } + + private void doUpdateLatestAckRow(TransactionContext c, ActiveMQDestination destination, String clientId, + String subscriptionName, long seq, long priority) throws SQLException, IOException{ + PreparedStatement s = null; + ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); + try { + s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement()); + s.setLong(1, seq); + s.setString(2, destination.getQualifiedName()); + s.setString(3, clientId); + s.setString(4, subscriptionName); + + if (s.executeUpdate() != 1) { + throw new IOException("Could not update last ack seq : " + + seq + ", for sub: " + subscriptionName); + } + } finally { + cleanupExclusiveLock.readLock().unlock(); + close(rs); + close(s); + } + } + public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception { // dumpTables(c, // destination.getQualifiedName(),clientId,subscriptionName); PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement()); s.setString(1, destination.getQualifiedName()); @@ -435,6 +531,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -445,6 +542,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { if (isPrioritizedMessages()) { s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement()); @@ -476,6 +574,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -486,6 +585,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { PreparedStatement s = null; ResultSet rs = null; int result = 0; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement()); s.setString(1, destination.getQualifiedName()); @@ -496,6 +596,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { result = rs.getInt(1); } } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -514,6 +615,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { // dumpTables(c, destination.getQualifiedName(), clientId, // subscriptionName); PreparedStatement s = null; + cleanupExclusiveLock.readLock().lock(); try { long lastMessageId = -1; long priority = Byte.MAX_VALUE - 1; @@ -542,6 +644,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { throw new IOException("Could not create durable subscription for: " + info.getClientId()); } } finally { + cleanupExclusiveLock.readLock().unlock(); close(s); } } @@ -550,6 +653,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { String clientId, String subscriptionName) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement()); s.setString(1, destination.getQualifiedName()); @@ -568,6 +672,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { ActiveMQDestination.QUEUE_TYPE)); return subscription; } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -577,6 +682,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement()); s.setString(1, destination.getQualifiedName()); @@ -594,6 +700,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } return rc.toArray(new SubscriptionInfo[rc.size()]); } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -602,6 +709,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException { PreparedStatement s = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement()); s.setString(1, destinationName.getQualifiedName()); @@ -611,6 +719,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { s.setString(1, destinationName.getQualifiedName()); s.executeUpdate(); } finally { + cleanupExclusiveLock.readLock().unlock(); close(s); } } @@ -618,6 +727,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException { PreparedStatement s = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement()); s.setString(1, destination.getQualifiedName()); @@ -625,12 +735,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter { s.setString(3, subscriptionName); s.executeUpdate(); } finally { + cleanupExclusiveLock.readLock().unlock(); close(s); } } public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException { PreparedStatement s = null; + cleanupExclusiveLock.writeLock().lock(); try { LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement()); s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement()); @@ -638,6 +750,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { int i = s.executeUpdate(); LOG.debug("Deleted " + i + " old message(s)."); } finally { + cleanupExclusiveLock.writeLock().unlock(); close(s); } } @@ -647,6 +760,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { PreparedStatement s = null; ResultSet rs = null; long[] result = new long[]{-1, Byte.MAX_VALUE - 1}; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement()); s.setString(1, destination.getQualifiedName()); @@ -657,9 +771,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter { result[0] = rs.getLong(1); result[1] = rs.getLong(2); } - rs.close(); - s.close(); } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -684,6 +797,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { HashSet rc = new HashSet(); PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement()); rs = s.executeQuery(); @@ -691,6 +805,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE)); } } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -747,6 +862,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { String clientId, String subscriberName) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement()); s.setString(1, destination.getQualifiedName()); @@ -759,6 +875,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { return getBinaryData(rs, 1); } finally { close(rs); + cleanupExclusiveLock.readLock().unlock(); close(s); } } @@ -768,6 +885,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { PreparedStatement s = null; ResultSet rs = null; int result = 0; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement()); s.setString(1, destination.getQualifiedName()); @@ -776,6 +894,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { result = rs.getInt(1); } } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -786,6 +905,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { if (isPrioritizedMessages()) { s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement()); @@ -823,6 +943,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } catch (Exception e) { e.printStackTrace(); } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } @@ -887,6 +1008,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; + cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement()); s.setString(1, id.toString()); @@ -897,6 +1019,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } return seq; } finally { + cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } diff --git a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java index 79f150eea8..ce813de38b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java @@ -49,8 +49,9 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { Connection conn; protected Session sess; - public boolean useCache; - public boolean dispatchAsync = false; + public boolean useCache = true; + public boolean dispatchAsync = true; + public boolean prioritizeMessages = true; public int prefetchVal = 500; public int MSG_NUM = 600; @@ -66,7 +67,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { adapter = createPersistenceAdapter(true); broker.setPersistenceAdapter(adapter); PolicyEntry policy = new PolicyEntry(); - policy.setPrioritizedMessages(true); + policy.setPrioritizedMessages(prioritizeMessages); policy.setUseCache(useCache); PolicyMap policyMap = new PolicyMap(); policyMap.setDefaultEntry(policy); @@ -87,11 +88,14 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { } protected void tearDown() throws Exception { - sess.close(); - conn.close(); - - broker.stop(); - broker.waitUntilStopped(); + try { + sess.close(); + conn.close(); + } catch (Exception ignored) { + } finally { + broker.stop(); + broker.waitUntilStopped(); + } } public void testStoreConfigured() throws Exception { @@ -164,7 +168,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { } protected Message createMessage(int priority) throws Exception { - final String text = "Message with priority " + priority; + final String text = "priority " + priority; Message msg = sess.createTextMessage(text); LOG.info("Sending " + text); return msg; @@ -199,7 +203,9 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { public void initCombosForTestDurableSubsReconnect() { addCombinationValues("prefetchVal", new Object[] {new Integer(1000), new Integer(MSG_NUM/2)}); - addCombinationValues("dispatchAsync", new Object[] {Boolean.TRUE, Boolean.FALSE}); + // REVISIT = is dispatchAsync = true a problem or is it just the test? + addCombinationValues("dispatchAsync", new Object[] {Boolean.FALSE}); + addCombinationValues("useCache", new Object[] {Boolean.TRUE, Boolean.FALSE}); } public void testDurableSubsReconnect() throws Exception { @@ -221,7 +227,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { final int closeFrequency = MSG_NUM/4; sub = sess.createDurableSubscriber(topic, subName); for (int i = 0; i < MSG_NUM * 2; i++) { - Message msg = sub.receive(30000); + Message msg = sub.receive(15000); LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null)); assertNotNull("Message " + i + " was null", msg); assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority()); diff --git a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java index 44e7f1754e..9f1b3f409d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java @@ -17,6 +17,9 @@ package org.apache.activemq.store.jdbc; +import java.util.Arrays; +import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Message; import javax.jms.TopicSubscriber; import junit.framework.Test; @@ -40,7 +43,7 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest { dataSource.setShutdownDatabase("false"); jdbc.setDataSource(dataSource); jdbc.deleteAllMessages(); - jdbc.setCleanupPeriod(1000); + jdbc.setCleanupPeriod(2000); return jdbc; } @@ -74,8 +77,10 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest { final int[] priorities = new int[]{HIGH_PRI, MED_HIGH_PRI, MED_PRI, LOW_PRI}; sub = sess.createDurableSubscriber(topic, subName); for (int i = 0; i < MSG_NUM * 4; i++) { - Message msg = sub.receive(30000); - LOG.debug("received i=" + i + ", m=" + (msg!=null? msg.getJMSMessageID() : null)); + Message msg = sub.receive(10000); + LOG.info("received i=" + i + ", m=" + (msg!=null? + msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority() + : null) ); assertNotNull("Message " + i + " was null", msg); assertEquals("Message " + i + " has wrong priority", priorities[i / MSG_NUM], msg.getJMSPriority()); if (i > 0 && i % closeFrequency == 0) { @@ -88,6 +93,53 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest { sub.close(); } + public void initCombosForTestConcurrentDurableSubsReconnectWithXLevels() { + addCombinationValues("prioritizeMessages", new Object[] {Boolean.TRUE, Boolean.FALSE}); + } + + public void testConcurrentDurableSubsReconnectWithXLevels() throws Exception { + ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST"); + final String subName = "priorityDisconnect"; + TopicSubscriber sub = sess.createDurableSubscriber(topic, subName); + sub.close(); + + final int maxPriority = 5; + + final AtomicInteger[] messageCounts = new AtomicInteger[maxPriority]; + Vector producers = new Vector(); + for (int priority=0; priority 0 && i % closeFrequency == 0) { + LOG.info("Closing durable sub.. on: " + i + ", counts: " + Arrays.toString(messageCounts)); + sub.close(); + sub = sess.createDurableSubscriber(topic, subName); + } + } + LOG.info("closing on done!"); + sub.close(); + + for (ProducerThread producer : producers) { + producer.join(); + } + } + public static Test suite() { return suite(JDBCMessagePriorityTest.class); } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStoreDurableSubscriptionSelectorTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStoreDurableSubscriptionSelectorTest.java deleted file mode 100644 index 95affe7998..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStoreDurableSubscriptionSelectorTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.usecases; - -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.amq.AMQPersistenceAdapter; - -public class AMQStoreDurableSubscriptionSelectorTest extends DurableSubscriptionSelectorTest { - - @Override - public PersistenceAdapter createPersistenceAdapter() throws Exception { - return new AMQPersistenceAdapter(); - } - -} diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index a1e69a5302..cd188d4b44 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -23,6 +23,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import javax.jms.*; @@ -87,6 +88,10 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp } setDefaultPersistenceAdapter(broker); + if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) { + // ensure it kicks in during tests + ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000); + } broker.start(); } @@ -295,6 +300,13 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals("offline consumer got all", sent, listener.count); } + public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception { + this.addCombinationValues("defaultPersistenceAdapter", + new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); + this.addCombinationValues("usePrioritySupport", + new Object[]{ Boolean.TRUE, Boolean.FALSE}); + } + public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception { Connection con = createConnection("offCli1"); Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java index 6e5d87ba9c..a6f7c10e5e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionReactivationTest.java @@ -28,6 +28,7 @@ import junit.framework.Test; import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; public class DurableSubscriptionReactivationTest extends EmbeddedBrokerTestSupport { @@ -79,8 +80,13 @@ public class DurableSubscriptionReactivationTest extends EmbeddedBrokerTestSuppo protected BrokerService createBroker() throws Exception { BrokerService answer = super.createBroker(); answer.setKeepDurableSubsActive(keepDurableSubsActive); + answer.setPersistenceAdapter(new JDBCPersistenceAdapter()); return answer; } + + protected boolean isPersistent() { + return true; + } public static Test suite() { return suite(DurableSubscriptionReactivationTest.class); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java index 42a2744df4..b664fbc54e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java @@ -27,13 +27,14 @@ import javax.jms.TopicSubscriber; import javax.management.MBeanServer; import javax.management.ObjectName; +import junit.framework.Test; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.PersistenceAdapter; -abstract public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSupport { +public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSupport { MBeanServer mbs; BrokerService broker = null; @@ -45,6 +46,14 @@ abstract public class DurableSubscriptionSelectorTest extends org.apache.activem private int received = 0; + public static Test suite() { + return suite(DurableSubscriptionSelectorTest.class); + } + + public void initCombosForTestSubscription() throws Exception { + this.addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values()); + } + public void testSubscription() throws Exception { openConsumer(); for (int i = 0; i < 4000; i++) { @@ -130,7 +139,7 @@ abstract public class DurableSubscriptionSelectorTest extends org.apache.activem if (deleteMessages) { broker.setDeleteAllMessagesOnStartup(true); } - broker.setPersistenceAdapter(createPersistenceAdapter()); + setDefaultPersistenceAdapter(broker); broker.start(); } @@ -140,8 +149,6 @@ abstract public class DurableSubscriptionSelectorTest extends org.apache.activem broker = null; } - abstract public PersistenceAdapter createPersistenceAdapter() throws Exception; - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { return new ActiveMQConnectionFactory("vm://test-broker?jms.watchTopicAdvisories=false&waitForStart=5000&create=false"); } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDBDurableSubscriptionSelectorTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDBDurableSubscriptionSelectorTest.java deleted file mode 100644 index ce24f80e39..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDBDurableSubscriptionSelectorTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.usecases; - -import java.io.File; - -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; - -public class KahaDBDurableSubscriptionSelectorTest extends DurableSubscriptionSelectorTest { - - @Override - public PersistenceAdapter createPersistenceAdapter() throws Exception { - KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); - File dir = new File("target/KahaDB"); - adapter.setDirectory(dir); - return adapter; - } - -} diff --git a/activemq-core/src/test/resources/log4j.properties b/activemq-core/src/test/resources/log4j.properties index b59b38ec43..d2d1242094 100755 --- a/activemq-core/src/test/resources/log4j.properties +++ b/activemq-core/src/test/resources/log4j.properties @@ -22,7 +22,7 @@ log4j.rootLogger=INFO, out, stdout log4j.logger.org.apache.activemq.broker.scheduler=DEBUG #log4j.logger.org.apache.activemq=TRACE -#log4j.logger.org.apache.activemq.store.jdbc=DEBUG +#log4j.logger.org.apache.activemq.store.jdbc=TRACE #log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG #log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG