tidied up subscription objectNames

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384492 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-03-09 11:22:26 +00:00
parent 9718909422
commit 6c0df67be6
1 changed files with 137 additions and 153 deletions

View File

@ -22,7 +22,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.Map.Entry; import java.util.Map.Entry;
import javax.management.InstanceNotFoundException; import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException; import javax.management.MalformedObjectNameException;
@ -34,7 +33,6 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType; import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -62,10 +60,8 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
public class ManagedRegionBroker extends RegionBroker{ public class ManagedRegionBroker extends RegionBroker{
private static final Log log=LogFactory.getLog(ManagedRegionBroker.class); private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
private final MBeanServer mbeanServer; private final MBeanServer mbeanServer;
@ -80,42 +76,37 @@ public class ManagedRegionBroker extends RegionBroker{
private final Map inactiveDurableTopicSubscribers=new ConcurrentHashMap(); private final Map inactiveDurableTopicSubscribers=new ConcurrentHashMap();
private final Map temporaryQueueSubscribers=new ConcurrentHashMap(); private final Map temporaryQueueSubscribers=new ConcurrentHashMap();
private final Map temporaryTopicSubscribers=new ConcurrentHashMap(); private final Map temporaryTopicSubscribers=new ConcurrentHashMap();
private final Map subscriptionKeys = new ConcurrentHashMap(); private final Map subscriptionKeys=new ConcurrentHashMap();
private final Map subscriptionMap = new ConcurrentHashMap(); private final Map subscriptionMap=new ConcurrentHashMap();
private final Set registeredMBeans = new CopyOnWriteArraySet(); private final Set registeredMBeans=new CopyOnWriteArraySet();
/* This is the first broker in the broker interceptor chain. */ /* This is the first broker in the broker interceptor chain. */
private Broker contextBroker; private Broker contextBroker;
public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName, public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName,
TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter) throws IOException{ TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter)
throws IOException{
super(brokerService,taskRunnerFactory,memoryManager,adapter); super(brokerService,taskRunnerFactory,memoryManager,adapter);
this.mbeanServer=mbeanServer; this.mbeanServer=mbeanServer;
this.brokerObjectName=brokerObjectName; this.brokerObjectName=brokerObjectName;
} }
public void start() throws Exception { public void start() throws Exception{
super.start(); super.start();
//build all existing durable subscriptions // build all existing durable subscriptions
buildExistingSubscriptions(); buildExistingSubscriptions();
} }
protected void doStop(ServiceStopper stopper){
protected void doStop(ServiceStopper stopper) {
super.doStop(stopper); super.doStop(stopper);
// lets remove any mbeans not yet removed // lets remove any mbeans not yet removed
for (Iterator iter = registeredMBeans.iterator(); iter.hasNext();) { for(Iterator iter=registeredMBeans.iterator();iter.hasNext();){
ObjectName name = (ObjectName) iter.next(); ObjectName name=(ObjectName) iter.next();
try { try{
mbeanServer.unregisterMBean(name); mbeanServer.unregisterMBean(name);
} }catch(InstanceNotFoundException e){
catch (InstanceNotFoundException e) { log.warn("The MBean: "+name+" is no longer registered with JMX");
log.warn("The MBean: " + name + " is no longer registered with JMX"); }catch(Exception e){
} stopper.onException(this,e);
catch (Exception e) {
stopper.onException(this, e);
} }
} }
registeredMBeans.clear(); registeredMBeans.clear();
@ -141,12 +132,12 @@ public class ManagedRegionBroker extends RegionBroker{
public void register(ActiveMQDestination destName,Destination destination){ public void register(ActiveMQDestination destName,Destination destination){
try{ try{
ObjectName objectName = createObjectName(destName); ObjectName objectName=createObjectName(destName);
DestinationView view; DestinationView view;
if(destination instanceof Queue){ if(destination instanceof Queue){
view=new QueueView(this, (Queue) destination); view=new QueueView(this,(Queue) destination);
}else{ }else{
view=new TopicView(this, (Topic) destination); view=new TopicView(this,(Topic) destination);
} }
registerDestination(objectName,destName,view); registerDestination(objectName,destName,view);
}catch(Exception e){ }catch(Exception e){
@ -156,7 +147,7 @@ public class ManagedRegionBroker extends RegionBroker{
public void unregister(ActiveMQDestination destName){ public void unregister(ActiveMQDestination destName){
try{ try{
ObjectName objectName = createObjectName(destName); ObjectName objectName=createObjectName(destName);
unregisterDestination(objectName); unregisterDestination(objectName);
}catch(Exception e){ }catch(Exception e){
log.error("Failed to unregister "+destName,e); log.error("Failed to unregister "+destName,e);
@ -164,32 +155,30 @@ public class ManagedRegionBroker extends RegionBroker{
} }
public void registerSubscription(ConnectionContext context,Subscription sub){ public void registerSubscription(ConnectionContext context,Subscription sub){
SubscriptionKey key = new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName());
Hashtable map=brokerObjectName.getKeyPropertyList(); Hashtable map=brokerObjectName.getKeyPropertyList();
String name = key.toString(); String name="";
SubscriptionKey key=new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName());
if(sub.getConsumerInfo().isDurable()){
name=key.toString();
}
if(sub.getConsumerInfo()!=null&&sub.getConsumerInfo().getConsumerId()!=null){
name+="."+sub.getConsumerInfo().getConsumerId();
}
try{ try{
ObjectName objectName=new ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")
ObjectName objectName = new ObjectName( +","+"Type=Subscription,"+"active=true,"+"name="+JMXSupport.encodeObjectNamePart(name)+"");
brokerObjectName.getDomain()+":"+
"BrokerName="+map.get("BrokerName")+","+
"Type=Subscription,"+
"active=true,"+
"name="+JMXSupport.encodeObjectNamePart(name)+""
);
SubscriptionView view; SubscriptionView view;
if(sub.getConsumerInfo().isDurable()){ if(sub.getConsumerInfo().isDurable()){
view=new DurableSubscriptionView(this,context.getClientId(),sub); view=new DurableSubscriptionView(this,context.getClientId(),sub);
}else{ }else{
if (sub instanceof TopicSubscription) { if(sub instanceof TopicSubscription){
view = new TopicSubscriptionView(context.getClientId(),(TopicSubscription) sub); view=new TopicSubscriptionView(context.getClientId(),(TopicSubscription) sub);
} }else{
else {
view=new SubscriptionView(context.getClientId(),sub); view=new SubscriptionView(context.getClientId(),sub);
} }
} }
subscriptionMap.put(sub,objectName);
registerSubscription(objectName,sub.getConsumerInfo(),key,view); registerSubscription(objectName,sub.getConsumerInfo(),key,view);
subscriptionMap.put(sub,objectName);
}catch(Exception e){ }catch(Exception e){
log.error("Failed to register subscription "+sub,e); log.error("Failed to register subscription "+sub,e);
} }
@ -233,7 +222,8 @@ public class ManagedRegionBroker extends RegionBroker{
mbeanServer.unregisterMBean(key); mbeanServer.unregisterMBean(key);
} }
protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionKey subscriptionKey,SubscriptionView view) throws Exception{ protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionKey subscriptionKey,
SubscriptionView view) throws Exception{
ActiveMQDestination dest=info.getDestination(); ActiveMQDestination dest=info.getDestination();
if(dest.isQueue()){ if(dest.isQueue()){
if(dest.isTemporary()){ if(dest.isTemporary()){
@ -247,16 +237,16 @@ public class ManagedRegionBroker extends RegionBroker{
}else{ }else{
if(info.isDurable()){ if(info.isDurable()){
durableTopicSubscribers.put(key,view); durableTopicSubscribers.put(key,view);
//unregister any inactive durable subs // unregister any inactive durable subs
try { try{
ObjectName inactiveName = (ObjectName) subscriptionKeys.get(subscriptionKey); ObjectName inactiveName=(ObjectName) subscriptionKeys.get(subscriptionKey);
if (inactiveName != null){ if(inactiveName!=null){
inactiveDurableTopicSubscribers.remove(inactiveName); inactiveDurableTopicSubscribers.remove(inactiveName);
registeredMBeans.remove(inactiveName); registeredMBeans.remove(inactiveName);
mbeanServer.unregisterMBean(inactiveName); mbeanServer.unregisterMBean(inactiveName);
} }
}catch(Exception e){ }catch(Exception e){
log.error("Unable to unregister inactive durable subscriber: " + subscriptionKey,e); log.error("Unable to unregister inactive durable subscriber: "+subscriptionKey,e);
} }
}else{ }else{
topicSubscribers.put(key,view); topicSubscribers.put(key,view);
@ -275,72 +265,64 @@ public class ManagedRegionBroker extends RegionBroker{
temporaryTopicSubscribers.remove(key); temporaryTopicSubscribers.remove(key);
registeredMBeans.remove(key); registeredMBeans.remove(key);
mbeanServer.unregisterMBean(key); mbeanServer.unregisterMBean(key);
DurableSubscriptionView view = (DurableSubscriptionView) durableTopicSubscribers.remove(key); DurableSubscriptionView view=(DurableSubscriptionView) durableTopicSubscribers.remove(key);
if (view != null){ if(view!=null){
//need to put this back in the inactive list // need to put this back in the inactive list
SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(),view.getSubscriptionName()); SubscriptionKey subscriptionKey=new SubscriptionKey(view.getClientId(),view.getSubscriptionName());
SubscriptionInfo info = new SubscriptionInfo(); SubscriptionInfo info=new SubscriptionInfo();
info.setClientId(subscriptionKey.getClientId()); info.setClientId(subscriptionKey.getClientId());
info.setSubcriptionName(subscriptionKey.getSubscriptionName()); info.setSubcriptionName(subscriptionKey.getSubscriptionName());
info.setDestination(new ActiveMQTopic(view.getDestinationName())); info.setDestination(new ActiveMQTopic(view.getDestinationName()));
addInactiveSubscription(subscriptionKey, info); addInactiveSubscription(subscriptionKey,info);
} }
} }
protected void buildExistingSubscriptions() throws Exception{ protected void buildExistingSubscriptions() throws Exception{
Map subscriptions = new HashMap(); Map subscriptions=new HashMap();
Set destinations = adaptor.getDestinations(); Set destinations=adaptor.getDestinations();
if (destinations != null){ if(destinations!=null){
for (Iterator iter = destinations.iterator(); iter.hasNext();){ for(Iterator iter=destinations.iterator();iter.hasNext();){
ActiveMQDestination dest = (ActiveMQDestination) iter.next(); ActiveMQDestination dest=(ActiveMQDestination) iter.next();
if (dest.isTopic()){ if(dest.isTopic()){
TopicMessageStore store = adaptor.createTopicMessageStore((ActiveMQTopic) dest); TopicMessageStore store=adaptor.createTopicMessageStore((ActiveMQTopic) dest);
SubscriptionInfo[] infos = store.getAllSubscriptions(); SubscriptionInfo[] infos=store.getAllSubscriptions();
if (infos != null){ if(infos!=null){
for (int i = 0; i < infos.length; i++) { for(int i=0;i<infos.length;i++){
SubscriptionInfo info=infos[i];
SubscriptionInfo info = infos[i];
log.debug("Restoring durable subscription: "+infos); log.debug("Restoring durable subscription: "+infos);
SubscriptionKey key = new SubscriptionKey(info); SubscriptionKey key=new SubscriptionKey(info);
subscriptions.put(key,info); subscriptions.put(key,info);
} }
} }
} }
} }
} }
for (Iterator i = subscriptions.entrySet().iterator();i.hasNext();){ for(Iterator i=subscriptions.entrySet().iterator();i.hasNext();){
Map.Entry entry = (Entry) i.next(); Map.Entry entry=(Entry) i.next();
SubscriptionKey key = (SubscriptionKey) entry.getKey(); SubscriptionKey key=(SubscriptionKey) entry.getKey();
SubscriptionInfo info = (SubscriptionInfo) entry.getValue(); SubscriptionInfo info=(SubscriptionInfo) entry.getValue();
addInactiveSubscription(key, info); addInactiveSubscription(key,info);
} }
} }
protected void addInactiveSubscription(SubscriptionKey key,SubscriptionInfo info){ protected void addInactiveSubscription(SubscriptionKey key,SubscriptionInfo info){
Hashtable map=brokerObjectName.getKeyPropertyList(); Hashtable map=brokerObjectName.getKeyPropertyList();
try{ try{
ObjectName objectName = new ObjectName( ObjectName objectName=new ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")
brokerObjectName.getDomain()+":"+ +","+"Type=Subscription,"+"active=false,"+"name="
"BrokerName="+map.get("BrokerName")+","+ +JMXSupport.encodeObjectNamePart(key.toString())+"");
"Type=Subscription,"+ SubscriptionView view=new InactiveDurableSubscriptionView(this,key.getClientId(),info);
"active=false,"+
"name="+JMXSupport.encodeObjectNamePart(key.toString())+""
);
SubscriptionView view = new InactiveDurableSubscriptionView(this,key.getClientId(),info);
registeredMBeans.add(objectName); registeredMBeans.add(objectName);
mbeanServer.registerMBean(view,objectName); mbeanServer.registerMBean(view,objectName);
inactiveDurableTopicSubscribers.put(objectName,view); inactiveDurableTopicSubscribers.put(objectName,view);
subscriptionKeys.put(key, objectName); subscriptionKeys.put(key,objectName);
}catch(Exception e){ }catch(Exception e){
log.error("Failed to register subscription "+info,e); log.error("Failed to register subscription "+info,e);
} }
} }
public CompositeData[] browse(SubscriptionView view) throws OpenDataException{ public CompositeData[] browse(SubscriptionView view) throws OpenDataException{
List messages = getSubscriberMessages(view); List messages=getSubscriberMessages(view);
CompositeData c[]=new CompositeData[messages.size()]; CompositeData c[]=new CompositeData[messages.size()];
for(int i=0;i<c.length;i++){ for(int i=0;i<c.length;i++){
try{ try{
@ -354,7 +336,7 @@ public class ManagedRegionBroker extends RegionBroker{
public TabularData browseAsTable(SubscriptionView view) throws OpenDataException{ public TabularData browseAsTable(SubscriptionView view) throws OpenDataException{
OpenTypeFactory factory=OpenTypeSupport.getFactory(ActiveMQMessage.class); OpenTypeFactory factory=OpenTypeSupport.getFactory(ActiveMQMessage.class);
List messages = getSubscriberMessages(view); List messages=getSubscriberMessages(view);
CompositeType ct=factory.getCompositeType(); CompositeType ct=factory.getCompositeType();
TabularType tt=new TabularType("MessageList","MessageList",ct,new String[] { "JMSMessageID" }); TabularType tt=new TabularType("MessageList","MessageList",ct,new String[] { "JMSMessageID" });
TabularDataSupport rc=new TabularDataSupport(tt); TabularDataSupport rc=new TabularDataSupport(tt);
@ -365,12 +347,10 @@ public class ManagedRegionBroker extends RegionBroker{
} }
protected List getSubscriberMessages(SubscriptionView view){ protected List getSubscriberMessages(SubscriptionView view){
final List result = new ArrayList(); final List result=new ArrayList();
try { try{
ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName()); ActiveMQTopic topic=new ActiveMQTopic(view.getDestinationName());
TopicMessageStore store = adaptor.createTopicMessageStore(topic); TopicMessageStore store=adaptor.createTopicMessageStore(topic);
store.recover(new MessageRecoveryListener(){ store.recover(new MessageRecoveryListener(){
public void recoverMessage(Message message) throws Exception{ public void recoverMessage(Message message) throws Exception{
result.add(message); result.add(message);
@ -381,71 +361,75 @@ public class ManagedRegionBroker extends RegionBroker{
public void finished(){} public void finished(){}
}); });
}catch(Throwable e){ }catch(Throwable e){
log.error("Failed to browse messages for Subscription " + view,e); log.error("Failed to browse messages for Subscription "+view,e);
} }
return result; return result;
} }
protected ObjectName[] getTopics(){ protected ObjectName[] getTopics(){
Set set = topics.keySet(); Set set=topics.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]); return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
} }
protected ObjectName[] getQueues(){ protected ObjectName[] getQueues(){
Set set = queues.keySet(); Set set=queues.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]); return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
} }
protected ObjectName[] getTemporaryTopics(){ protected ObjectName[] getTemporaryTopics(){
Set set = temporaryTopics.keySet(); Set set=temporaryTopics.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]); return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
} }
protected ObjectName[] getTemporaryQueues(){ protected ObjectName[] getTemporaryQueues(){
Set set = temporaryQueues.keySet(); Set set=temporaryQueues.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]); return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
} }
protected ObjectName[] getTopicSubscribers(){ protected ObjectName[] getTopicSubscribers(){
Set set = topicSubscribers.keySet(); Set set=topicSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]); return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
} }
protected ObjectName[] getDurableTopicSubscribers(){ protected ObjectName[] getDurableTopicSubscribers(){
Set set = durableTopicSubscribers.keySet(); Set set=durableTopicSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]); return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
} }
protected ObjectName[] getQueueSubscribers(){ protected ObjectName[] getQueueSubscribers(){
Set set = queueSubscribers.keySet(); Set set=queueSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]); return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
} }
protected ObjectName[] getTemporaryTopicSubscribers(){ protected ObjectName[] getTemporaryTopicSubscribers(){
Set set = temporaryTopicSubscribers.keySet(); Set set=temporaryTopicSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]); return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
} }
protected ObjectName[] getTemporaryQueueSubscribers(){ protected ObjectName[] getTemporaryQueueSubscribers(){
Set set = temporaryQueueSubscribers.keySet(); Set set=temporaryQueueSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]); return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
} }
protected ObjectName[] getInactiveDurableTopicSubscribers(){ protected ObjectName[] getInactiveDurableTopicSubscribers(){
Set set = inactiveDurableTopicSubscribers.keySet(); Set set=inactiveDurableTopicSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]); return (ObjectName[]) set.toArray(new ObjectName[set.size()]);
} }
public Broker getContextBroker() { public Broker getContextBroker(){
return contextBroker; return contextBroker;
} }
public void setContextBroker(Broker contextBroker) { public void setContextBroker(Broker contextBroker){
this.contextBroker = contextBroker; this.contextBroker=contextBroker;
} }
protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException { protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException{
// Build the object name for the destination // Build the object name for the destination
Hashtable map=brokerObjectName.getKeyPropertyList(); Hashtable map=brokerObjectName.getKeyPropertyList();
ObjectName objectName = new ObjectName( ObjectName objectName=new ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")+","
brokerObjectName.getDomain()+":"+ +"Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","
"BrokerName="+map.get("BrokerName")+","+ +"Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
"Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+
"Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName())
);
return objectName; return objectName;
} }
} }