diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java index 299d5fbe35..0b813c1191 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java @@ -90,4 +90,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore { public String getMessageReference(MessageId identity) throws IOException { return delegate.getMessageReference(identity); } + + public SubscriptionInfo[] getAllSubscriptions() throws IOException { + return delegate.getAllSubscriptions(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java index fc19a0826e..1f0c7d2dee 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java @@ -69,6 +69,15 @@ public interface TopicMessageStore extends MessageStore { */ public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException; + /** + * Lists all the durable subscirptions for a given destination. + * + * @param clientId TODO + * @param subscriptionName TODO + * @return + */ + public SubscriptionInfo[] getAllSubscriptions() throws IOException; + /** * Inserts the subscriber info due to a subscription change *
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index 683822abd6..f1f85a501c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -74,5 +74,7 @@ public interface JDBCAdapter { public abstract void setUseExternalMessageReferences(boolean useExternalMessageReferences); + public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException; + } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index b18ba196e9..58d8ca3924 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -126,4 +126,15 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess } } + public SubscriptionInfo[] getAllSubscriptions() throws IOException { + TransactionContext c = persistenceAdapter.getTransactionContext(); + try { + return adapter.doGetAllSubscriptions(c, destination); + } catch (SQLException e) { + throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e); + } finally { + c.close(); + } + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java index a18c11dffd..1c3dc25baa 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java @@ -47,5 +47,6 @@ public interface StatementProvider { public boolean isUseExternalMessageReferences(); public String getFullMessageTableName(); + public String getFindAllDurableSubsStatment(); } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java index 1592e8e491..0250c9cb42 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java @@ -47,6 +47,7 @@ public class CachingStatementProvider implements StatementProvider { private String deleteOldMessagesStatment; private String findLastSequenceIdInAcks; private String findAllDestinationsStatment; + private String findAllDurableSubsStatment; public CachingStatementProvider(StatementProvider statementProvider) { this.statementProvider = statementProvider; @@ -222,4 +223,11 @@ public class CachingStatementProvider implements StatementProvider { public String getFullMessageTableName() { return statementProvider.getFullMessageTableName(); } + + public String getFindAllDurableSubsStatment() { + if ( findAllDurableSubsStatment==null ) { + findAllDurableSubsStatment = statementProvider.getFindAllDurableSubsStatment(); + } + return findAllDurableSubsStatment; + } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 9610d2577b..7ec201e771 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -21,6 +21,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.HashSet; import java.util.Set; @@ -482,6 +483,32 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } + public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException { + PreparedStatement s = null; + ResultSet rs = null; + try { + + s = c.getConnection().prepareStatement(statementProvider.getFindAllDurableSubsStatment()); + s.setString(1, destination.getQualifiedName()); + rs = s.executeQuery(); + + ArrayList rc = new ArrayList(); + while(rs.next()) { + SubscriptionInfo subscription = new SubscriptionInfo(); + subscription.setDestination(destination); + subscription.setSelector(rs.getString(1)); + subscription.setSubcriptionName(rs.getString(2)); + subscription.setClientId(rs.getString(3)); + } + + return (SubscriptionInfo[]) rc.toArray(new SubscriptionInfo[rc.size()]); + } + finally { + close(rs); + close(s); + } + } + public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException { PreparedStatement s = null; try { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java index 73ee9c523f..68b8aa9de5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java @@ -108,6 +108,12 @@ public class DefaultStatementProvider implements StatementProvider { "FROM "+getTablePrefix()+durableSubAcksTableName+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; } + + public String getFindAllDurableSubsStatment() { + return "SELECT SELECTOR, SUB_NAME, CLIENT_ID" + + "FROM "+getTablePrefix()+durableSubAcksTableName+ + " WHERE CONTAINER=?"; + } public String getUpdateLastAckOfDurableSub() { return "UPDATE "+getTablePrefix()+durableSubAcksTableName+ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java index 1b3c5f1360..89d92f8d50 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java @@ -179,5 +179,9 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top public void deleteSubscription(String clientId, String subscriptionName) throws IOException { longTermStore.deleteSubscription(clientId, subscriptionName); } + + public SubscriptionInfo[] getAllSubscriptions() throws IOException { + return longTermStore.getAllSubscriptions(); + } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java index 6b8e70c5fd..a6e7b65d00 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java @@ -190,4 +190,8 @@ public class QuickJournalTopicMessageStore extends QuickJournalMessageStore impl longTermStore.deleteSubscription(clientId, subscriptionName); } + public SubscriptionInfo[] getAllSubscriptions() throws IOException { + return longTermStore.getAllSubscriptions(); + } + } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java index 195b167a4a..6905a05b03 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java @@ -112,4 +112,8 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic subscriberDatabase.clear(); lastMessageId=null; } + + public SubscriptionInfo[] getAllSubscriptions() throws IOException { + return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]); + } }