Added a SubscriptionInfo[] getAllSubscriptions() to the TopicMessageStore. We will need this if we want to eagerly load the durable subs when the broker starts up.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@365993 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-01-04 21:09:16 +00:00
parent 27329abdcb
commit 9b64c37c4c
11 changed files with 80 additions and 0 deletions

View File

@ -90,4 +90,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
public String getMessageReference(MessageId identity) throws IOException {
return delegate.getMessageReference(identity);
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return delegate.getAllSubscriptions();
}
}

View File

@ -69,6 +69,15 @@ public interface TopicMessageStore extends MessageStore {
*/
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException;
/**
* Lists all the durable subscirptions for a given destination.
*
* @param clientId TODO
* @param subscriptionName TODO
* @return
*/
public SubscriptionInfo[] getAllSubscriptions() throws IOException;
/**
* Inserts the subscriber info due to a subscription change
* <p/>

View File

@ -74,5 +74,7 @@ public interface JDBCAdapter {
public abstract void setUseExternalMessageReferences(boolean useExternalMessageReferences);
public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
}

View File

@ -126,4 +126,15 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
}
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
return adapter.doGetAllSubscriptions(c, destination);
} catch (SQLException e) {
throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
} finally {
c.close();
}
}
}

View File

@ -47,5 +47,6 @@ public interface StatementProvider {
public boolean isUseExternalMessageReferences();
public String getFullMessageTableName();
public String getFindAllDurableSubsStatment();
}

View File

@ -47,6 +47,7 @@ public class CachingStatementProvider implements StatementProvider {
private String deleteOldMessagesStatment;
private String findLastSequenceIdInAcks;
private String findAllDestinationsStatment;
private String findAllDurableSubsStatment;
public CachingStatementProvider(StatementProvider statementProvider) {
this.statementProvider = statementProvider;
@ -222,4 +223,11 @@ public class CachingStatementProvider implements StatementProvider {
public String getFullMessageTableName() {
return statementProvider.getFullMessageTableName();
}
public String getFindAllDurableSubsStatment() {
if ( findAllDurableSubsStatment==null ) {
findAllDurableSubsStatment = statementProvider.getFindAllDurableSubsStatment();
}
return findAllDurableSubsStatment;
}
}

View File

@ -21,6 +21,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
@ -482,6 +483,32 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
}
public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(statementProvider.getFindAllDurableSubsStatment());
s.setString(1, destination.getQualifiedName());
rs = s.executeQuery();
ArrayList rc = new ArrayList();
while(rs.next()) {
SubscriptionInfo subscription = new SubscriptionInfo();
subscription.setDestination(destination);
subscription.setSelector(rs.getString(1));
subscription.setSubcriptionName(rs.getString(2));
subscription.setClientId(rs.getString(3));
}
return (SubscriptionInfo[]) rc.toArray(new SubscriptionInfo[rc.size()]);
}
finally {
close(rs);
close(s);
}
}
public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException {
PreparedStatement s = null;
try {

View File

@ -109,6 +109,12 @@ public class DefaultStatementProvider implements StatementProvider {
" WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
public String getFindAllDurableSubsStatment() {
return "SELECT SELECTOR, SUB_NAME, CLIENT_ID" +
"FROM "+getTablePrefix()+durableSubAcksTableName+
" WHERE CONTAINER=?";
}
public String getUpdateLastAckOfDurableSub() {
return "UPDATE "+getTablePrefix()+durableSubAcksTableName+
" SET LAST_ACKED_ID=?" +

View File

@ -180,4 +180,8 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
longTermStore.deleteSubscription(clientId, subscriptionName);
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return longTermStore.getAllSubscriptions();
}
}

View File

@ -190,4 +190,8 @@ public class QuickJournalTopicMessageStore extends QuickJournalMessageStore impl
longTermStore.deleteSubscription(clientId, subscriptionName);
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return longTermStore.getAllSubscriptions();
}
}

View File

@ -112,4 +112,8 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
subscriberDatabase.clear();
lastMessageId=null;
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
}