From 6c0df67be60797aa7c12e3dbe5dc62cc9b9564ea Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Thu, 9 Mar 2006 11:22:26 +0000 Subject: [PATCH] tidied up subscription objectNames git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384492 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/jmx/ManagedRegionBroker.java | 290 +++++++++--------- 1 file changed, 137 insertions(+), 153 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index 28ec1b724c..698a22b466 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.Map.Entry; - import javax.management.InstanceNotFoundException; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; @@ -34,7 +33,6 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; @@ -62,10 +60,8 @@ import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; - public class ManagedRegionBroker extends RegionBroker{ private static final Log log=LogFactory.getLog(ManagedRegionBroker.class); private final MBeanServer mbeanServer; @@ -80,42 +76,37 @@ public class ManagedRegionBroker extends RegionBroker{ private final Map inactiveDurableTopicSubscribers=new ConcurrentHashMap(); private final Map temporaryQueueSubscribers=new ConcurrentHashMap(); private final Map temporaryTopicSubscribers=new ConcurrentHashMap(); - private final Map subscriptionKeys = new ConcurrentHashMap(); - private final Map subscriptionMap = new ConcurrentHashMap(); - private final Set registeredMBeans = new CopyOnWriteArraySet(); - + private final Map subscriptionKeys=new ConcurrentHashMap(); + private final Map subscriptionMap=new ConcurrentHashMap(); + private final Set registeredMBeans=new CopyOnWriteArraySet(); /* This is the first broker in the broker interceptor chain. */ private Broker contextBroker; public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName, - TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter) throws IOException{ + TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter) + throws IOException{ super(brokerService,taskRunnerFactory,memoryManager,adapter); this.mbeanServer=mbeanServer; this.brokerObjectName=brokerObjectName; } - - public void start() throws Exception { + + public void start() throws Exception{ super.start(); - //build all existing durable subscriptions + // build all existing durable subscriptions buildExistingSubscriptions(); - } - - protected void doStop(ServiceStopper stopper) { + protected void doStop(ServiceStopper stopper){ super.doStop(stopper); - // lets remove any mbeans not yet removed - for (Iterator iter = registeredMBeans.iterator(); iter.hasNext();) { - ObjectName name = (ObjectName) iter.next(); - try { + for(Iterator iter=registeredMBeans.iterator();iter.hasNext();){ + ObjectName name=(ObjectName) iter.next(); + try{ mbeanServer.unregisterMBean(name); - } - catch (InstanceNotFoundException e) { - log.warn("The MBean: " + name + " is no longer registered with JMX"); - } - catch (Exception e) { - stopper.onException(this, e); + }catch(InstanceNotFoundException e){ + log.warn("The MBean: "+name+" is no longer registered with JMX"); + }catch(Exception e){ + stopper.onException(this,e); } } registeredMBeans.clear(); @@ -141,12 +132,12 @@ public class ManagedRegionBroker extends RegionBroker{ public void register(ActiveMQDestination destName,Destination destination){ try{ - ObjectName objectName = createObjectName(destName); + ObjectName objectName=createObjectName(destName); DestinationView view; if(destination instanceof Queue){ - view=new QueueView(this, (Queue) destination); + view=new QueueView(this,(Queue) destination); }else{ - view=new TopicView(this, (Topic) destination); + view=new TopicView(this,(Topic) destination); } registerDestination(objectName,destName,view); }catch(Exception e){ @@ -156,7 +147,7 @@ public class ManagedRegionBroker extends RegionBroker{ public void unregister(ActiveMQDestination destName){ try{ - ObjectName objectName = createObjectName(destName); + ObjectName objectName=createObjectName(destName); unregisterDestination(objectName); }catch(Exception e){ log.error("Failed to unregister "+destName,e); @@ -164,32 +155,30 @@ public class ManagedRegionBroker extends RegionBroker{ } public void registerSubscription(ConnectionContext context,Subscription sub){ - SubscriptionKey key = new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName()); Hashtable map=brokerObjectName.getKeyPropertyList(); - String name = key.toString(); + String name=""; + SubscriptionKey key=new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName()); + if(sub.getConsumerInfo().isDurable()){ + name=key.toString(); + } + if(sub.getConsumerInfo()!=null&&sub.getConsumerInfo().getConsumerId()!=null){ + name+="."+sub.getConsumerInfo().getConsumerId(); + } try{ - - ObjectName objectName = new ObjectName( - brokerObjectName.getDomain()+":"+ - "BrokerName="+map.get("BrokerName")+","+ - "Type=Subscription,"+ - "active=true,"+ - "name="+JMXSupport.encodeObjectNamePart(name)+"" - ); - + ObjectName objectName=new ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName") + +","+"Type=Subscription,"+"active=true,"+"name="+JMXSupport.encodeObjectNamePart(name)+""); SubscriptionView view; if(sub.getConsumerInfo().isDurable()){ view=new DurableSubscriptionView(this,context.getClientId(),sub); }else{ - if (sub instanceof TopicSubscription) { - view = new TopicSubscriptionView(context.getClientId(),(TopicSubscription) sub); - } - else { + if(sub instanceof TopicSubscription){ + view=new TopicSubscriptionView(context.getClientId(),(TopicSubscription) sub); + }else{ view=new SubscriptionView(context.getClientId(),sub); } } - subscriptionMap.put(sub,objectName); registerSubscription(objectName,sub.getConsumerInfo(),key,view); + subscriptionMap.put(sub,objectName); }catch(Exception e){ log.error("Failed to register subscription "+sub,e); } @@ -233,7 +222,8 @@ public class ManagedRegionBroker extends RegionBroker{ mbeanServer.unregisterMBean(key); } - protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionKey subscriptionKey,SubscriptionView view) throws Exception{ + protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionKey subscriptionKey, + SubscriptionView view) throws Exception{ ActiveMQDestination dest=info.getDestination(); if(dest.isQueue()){ if(dest.isTemporary()){ @@ -247,16 +237,16 @@ public class ManagedRegionBroker extends RegionBroker{ }else{ if(info.isDurable()){ durableTopicSubscribers.put(key,view); - //unregister any inactive durable subs - try { - ObjectName inactiveName = (ObjectName) subscriptionKeys.get(subscriptionKey); - if (inactiveName != null){ + // unregister any inactive durable subs + try{ + ObjectName inactiveName=(ObjectName) subscriptionKeys.get(subscriptionKey); + if(inactiveName!=null){ inactiveDurableTopicSubscribers.remove(inactiveName); registeredMBeans.remove(inactiveName); mbeanServer.unregisterMBean(inactiveName); } }catch(Exception e){ - log.error("Unable to unregister inactive durable subscriber: " + subscriptionKey,e); + log.error("Unable to unregister inactive durable subscriber: "+subscriptionKey,e); } }else{ topicSubscribers.put(key,view); @@ -275,72 +265,64 @@ public class ManagedRegionBroker extends RegionBroker{ temporaryTopicSubscribers.remove(key); registeredMBeans.remove(key); mbeanServer.unregisterMBean(key); - DurableSubscriptionView view = (DurableSubscriptionView) durableTopicSubscribers.remove(key); - if (view != null){ - //need to put this back in the inactive list - SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(),view.getSubscriptionName()); - SubscriptionInfo info = new SubscriptionInfo(); + DurableSubscriptionView view=(DurableSubscriptionView) durableTopicSubscribers.remove(key); + if(view!=null){ + // need to put this back in the inactive list + SubscriptionKey subscriptionKey=new SubscriptionKey(view.getClientId(),view.getSubscriptionName()); + SubscriptionInfo info=new SubscriptionInfo(); info.setClientId(subscriptionKey.getClientId()); info.setSubcriptionName(subscriptionKey.getSubscriptionName()); info.setDestination(new ActiveMQTopic(view.getDestinationName())); - addInactiveSubscription(subscriptionKey, info); + addInactiveSubscription(subscriptionKey,info); } - - } - + protected void buildExistingSubscriptions() throws Exception{ - Map subscriptions = new HashMap(); - Set destinations = adaptor.getDestinations(); - if (destinations != null){ - for (Iterator iter = destinations.iterator(); iter.hasNext();){ - ActiveMQDestination dest = (ActiveMQDestination) iter.next(); - if (dest.isTopic()){ - TopicMessageStore store = adaptor.createTopicMessageStore((ActiveMQTopic) dest); - SubscriptionInfo[] infos = store.getAllSubscriptions(); - if (infos != null){ - for (int i = 0; i < infos.length; i++) { - - SubscriptionInfo info = infos[i]; + Map subscriptions=new HashMap(); + Set destinations=adaptor.getDestinations(); + if(destinations!=null){ + for(Iterator iter=destinations.iterator();iter.hasNext();){ + ActiveMQDestination dest=(ActiveMQDestination) iter.next(); + if(dest.isTopic()){ + TopicMessageStore store=adaptor.createTopicMessageStore((ActiveMQTopic) dest); + SubscriptionInfo[] infos=store.getAllSubscriptions(); + if(infos!=null){ + for(int i=0;i