added support fro browsing messages for a durable subscriber

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@382393 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-03-02 14:31:05 +00:00
parent 3182029ef4
commit c717221cf8
4 changed files with 96 additions and 9 deletions

View File

@ -22,14 +22,16 @@ import org.apache.activemq.broker.region.Subscription;
*/ */
public class DurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean { public class DurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean {
protected ManagedRegionBroker broker;
protected String subscriptionName; protected String subscriptionName;
/** /**
* Constructor * Constructor
* @param clientId * @param clientId
* @param sub * @param sub
*/ */
public DurableSubscriptionView(String clientId,Subscription sub){ public DurableSubscriptionView(ManagedRegionBroker broker,String clientId,Subscription sub){
super(clientId,sub); super(clientId,sub);
this.broker = broker;
this.subscriptionName = sub.getConsumerInfo().getSubcriptionName(); this.subscriptionName = sub.getConsumerInfo().getSubcriptionName();
} }
@ -47,7 +49,7 @@ public class DurableSubscriptionView extends SubscriptionView implements Durabl
* @throws OpenDataException * @throws OpenDataException
*/ */
public CompositeData[] browse() throws OpenDataException{ public CompositeData[] browse() throws OpenDataException{
return null; return broker.browse(this);
} }
/** /**
@ -57,6 +59,10 @@ public class DurableSubscriptionView extends SubscriptionView implements Durabl
* @throws OpenDataException * @throws OpenDataException
*/ */
public TabularData browseAsTable() throws OpenDataException{ public TabularData browseAsTable() throws OpenDataException{
return null; return broker.browseAsTable(this);
}
public String toString(){
return "InactiveDurableSubscriptionView: " + getClientId() + ":" + getSubscriptionName();
} }
} }

View File

@ -21,10 +21,19 @@ import org.apache.activemq.command.SubscriptionInfo;
* @version $Revision: 1.5 $ * @version $Revision: 1.5 $
*/ */
public class InactiveDurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean { public class InactiveDurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean {
protected ManagedRegionBroker broker;
protected SubscriptionInfo info; protected SubscriptionInfo info;
public InactiveDurableSubscriptionView(String clientId,SubscriptionInfo sub){
/**
* Constructor
* @param broker
* @param clientId
* @param sub
*/
public InactiveDurableSubscriptionView(ManagedRegionBroker broker,String clientId,SubscriptionInfo sub){
super(clientId,null); super(clientId,null);
this.broker = broker;
this.info = sub; this.info = sub;
} }
@ -87,7 +96,7 @@ public class InactiveDurableSubscriptionView extends SubscriptionView implements
* @throws OpenDataException * @throws OpenDataException
*/ */
public CompositeData[] browse() throws OpenDataException{ public CompositeData[] browse() throws OpenDataException{
return null; return broker.browse(this);
} }
/** /**
@ -97,6 +106,10 @@ public class InactiveDurableSubscriptionView extends SubscriptionView implements
* @throws OpenDataException * @throws OpenDataException
*/ */
public TabularData browseAsTable() throws OpenDataException{ public TabularData browseAsTable() throws OpenDataException{
return null; return broker.browseAsTable(this);
}
public String toString(){
return "InactiveDurableSubscriptionView: " + getClientId() + ":" + getSubscriptionName();
} }
} }

View File

@ -14,17 +14,27 @@
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.Map.Entry; import java.util.Map.Entry;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.ObjectName; import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Queue;
@ -34,10 +44,13 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
@ -46,6 +59,7 @@ import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
public class ManagedRegionBroker extends RegionBroker{ public class ManagedRegionBroker extends RegionBroker{
private static final Log log=LogFactory.getLog(ManagedRegionBroker.class); private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
private final MBeanServer mbeanServer; private final MBeanServer mbeanServer;
@ -139,7 +153,7 @@ public class ManagedRegionBroker extends RegionBroker{
ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map); ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
SubscriptionView view; SubscriptionView view;
if(sub.getConsumerInfo().isDurable()){ if(sub.getConsumerInfo().isDurable()){
view=new DurableSubscriptionView(context.getClientId(),sub); view=new DurableSubscriptionView(this,context.getClientId(),sub);
}else{ }else{
view=new SubscriptionView(context.getClientId(),sub); view=new SubscriptionView(context.getClientId(),sub);
} }
@ -275,7 +289,7 @@ public class ManagedRegionBroker extends RegionBroker{
map.put("active", "false"); map.put("active", "false");
try{ try{
ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map); ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
SubscriptionView view = new InactiveDurableSubscriptionView(key.getClientId(),info); SubscriptionView view = new InactiveDurableSubscriptionView(this,key.getClientId(),info);
mbeanServer.registerMBean(view,objectName); mbeanServer.registerMBean(view,objectName);
inactiveDurableTopicSubscribers.put(objectName,view); inactiveDurableTopicSubscribers.put(objectName,view);
subscriptionKeys.put(key, objectName); subscriptionKeys.put(key, objectName);
@ -284,6 +298,53 @@ public class ManagedRegionBroker extends RegionBroker{
} }
} }
public CompositeData[] browse(SubscriptionView view) throws OpenDataException{
List messages = getSubscriberMessages(view);
CompositeData c[]=new CompositeData[messages.size()];
for(int i=0;i<c.length;i++){
try{
c[i]=OpenTypeSupport.convert((Message) messages.get(i));
}catch(Throwable e){
e.printStackTrace();
}
}
return c;
}
public TabularData browseAsTable(SubscriptionView view) throws OpenDataException{
OpenTypeFactory factory=OpenTypeSupport.getFactory(ActiveMQMessage.class);
List messages = getSubscriberMessages(view);
CompositeType ct=factory.getCompositeType();
TabularType tt=new TabularType("MessageList","MessageList",ct,new String[] { "JMSMessageID" });
TabularDataSupport rc=new TabularDataSupport(tt);
for(int i=0;i<messages.size();i++){
rc.put(new CompositeDataSupport(ct,factory.getFields(messages.get(i))));
}
return rc;
}
protected List getSubscriberMessages(SubscriptionView view){
final List result = new ArrayList();
try {
ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
TopicMessageStore store = adaptor.createTopicMessageStore(topic);
store.recover(new MessageRecoveryListener(){
public void recoverMessage(Message message) throws Throwable{
result.add(message);
}
public void recoverMessageReference(String messageReference) throws Throwable{}
public void finished(){}
});
}catch(Throwable e){
log.error("Failed to browse messages for Subscription " + view,e);
}
return result;
}
protected ObjectName[] getTopics(){ protected ObjectName[] getTopics(){
Set set = topics.keySet(); Set set = topics.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]); return (ObjectName[])set.toArray(new ObjectName[set.size()]);

View File

@ -171,5 +171,12 @@ public class SubscriptionView implements SubscriptionViewMBean {
protected ConsumerInfo getConsumerInfo(){ protected ConsumerInfo getConsumerInfo(){
return subscription != null ? subscription.getConsumerInfo() : null; return subscription != null ? subscription.getConsumerInfo() : null;
} }
/**
*@return pretty print
*/
public String toString(){
return "SubscriptionView: " + getClientId() + ":" + getConnectionId();
}
} }