From c82f6f330965944c6948d0bc997ddf0681020dde Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Thu, 2 Mar 2006 10:31:23 +0000 Subject: [PATCH] Added support for view inactive durable consumers git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@382344 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/jmx/BrokerView.java | 4 + .../activemq/broker/jmx/BrokerViewMBean.java | 1 + .../broker/jmx/DurableSubscriptionView.java | 10 +- .../jmx/InactiveDurableSubscriptionView.java | 102 ++++++++++++++ .../broker/jmx/ManagedQueueRegion.java | 2 +- .../broker/jmx/ManagedRegionBroker.java | 126 +++++++++++++++--- .../broker/jmx/ManagedTempQueueRegion.java | 2 +- .../broker/jmx/ManagedTempTopicRegion.java | 2 +- .../broker/jmx/ManagedTopicRegion.java | 2 +- .../activemq/broker/jmx/SubscriptionView.java | 45 +++++-- .../broker/jmx/SubscriptionViewMBean.java | 10 ++ .../activemq/broker/region/RegionBroker.java | 2 +- 12 files changed, 274 insertions(+), 34 deletions(-) create mode 100755 activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java index 9340333da6..14d0201bd1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java @@ -117,4 +117,8 @@ public class BrokerView implements BrokerViewMBean { return broker.getTemporaryQueueSubscribers(); } + public ObjectName[] getInactiveDurableTopicSubscribers(){ + return broker.getInactiveDurableTopicSubscribers(); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java index f571882f4b..790269926f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java @@ -45,6 +45,7 @@ public interface BrokerViewMBean extends Service { public ObjectName[] getTopicSubscribers(); public ObjectName[] getDurableTopicSubscribers(); + public ObjectName[] getInactiveDurableTopicSubscribers(); public ObjectName[] getQueueSubscribers(); public ObjectName[] getTemporaryTopicSubscribers(); public ObjectName[] getTemporaryQueueSubscribers(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java index 4da10a7446..7079debb88 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java @@ -23,10 +23,16 @@ import org.apache.activemq.broker.region.Subscription; public class DurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean { protected String subscriptionName; - public DurableSubscriptionView(Subscription sub){ - super(sub); + /** + * Constructor + * @param clientId + * @param sub + */ + public DurableSubscriptionView(String clientId,Subscription sub){ + super(clientId,sub); this.subscriptionName = sub.getConsumerInfo().getSubcriptionName(); } + /** * @return name of the durable consumer */ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java new file mode 100755 index 0000000000..b2245947fd --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java @@ -0,0 +1,102 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; +import org.apache.activemq.command.SubscriptionInfo; +/** + * @version $Revision: 1.5 $ + */ +public class InactiveDurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean { + + protected SubscriptionInfo info; + public InactiveDurableSubscriptionView(String clientId,SubscriptionInfo sub){ + super(clientId,null); + this.info = sub; + } + + + + + /** + * @return the id of the Subscription + */ + public long getSubcriptionId(){ + return -1; + } + + /** + * @return the destination name + */ + public String getDestinationName(){ + return info.getDestination().getPhysicalName(); + + } + + /** + * @return true if the destination is a Queue + */ + public boolean isDestinationQueue(){ + return false; + } + + /** + * @return true of the destination is a Topic + */ + public boolean isDestinationTopic(){ + return true; + } + + /** + * @return true if the destination is temporary + */ + public boolean isDestinationTemporary(){ + return false; + } + /** + * @return name of the durable consumer + */ + public String getSubscriptionName(){ + return info.getSubcriptionName(); + } + + /** + * @return true if the subscriber is active + */ + public boolean isActive(){ + return false; + } + + /** + * Browse messages for this durable subscriber + * + * @return messages + * @throws OpenDataException + */ + public CompositeData[] browse() throws OpenDataException{ + return null; + } + + /** + * Browse messages for this durable subscriber + * + * @return messages + * @throws OpenDataException + */ + public TabularData browseAsTable() throws OpenDataException{ + return null; + } +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java index b7f382d388..0e7ddfc9f8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java @@ -41,7 +41,7 @@ public class ManagedQueueRegion extends QueueRegion { protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { Subscription sub = super.createSubscription(context, info); - regionBroker.registerSubscription(sub); + regionBroker.registerSubscription(context,sub); return sub; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index ab6f9bcb08..3fc627b180 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -14,13 +14,19 @@ package org.apache.activemq.broker.jmx; import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; import java.util.Hashtable; +import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.Map.Entry; import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Region; import org.apache.activemq.broker.region.RegionBroker; @@ -28,11 +34,15 @@ import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.util.JMXSupport; +import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; @@ -47,8 +57,11 @@ public class ManagedRegionBroker extends RegionBroker{ private final Map queueSubscribers=new ConcurrentHashMap(); private final Map topicSubscribers=new ConcurrentHashMap(); private final Map durableTopicSubscribers=new ConcurrentHashMap(); + private final Map inactiveDurableTopicSubscribers=new ConcurrentHashMap(); private final Map temporaryQueueSubscribers=new ConcurrentHashMap(); private final Map temporaryTopicSubscribers=new ConcurrentHashMap(); + private final Map subscriptionKeys = new ConcurrentHashMap(); + private final Map subscriptionMap = new ConcurrentHashMap(); public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter, @@ -57,6 +70,13 @@ public class ManagedRegionBroker extends RegionBroker{ this.mbeanServer=mbeanServer; this.brokerObjectName=brokerObjectName; } + + public void start() throws Exception { + super.start(); + //build all existing durable subscriptions + buildExistingSubscriptions(); + + } protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter,PolicyMap policyMap){ @@ -108,33 +128,37 @@ public class ManagedRegionBroker extends RegionBroker{ } } - public void registerSubscription(Subscription sub){ + public void registerSubscription(ConnectionContext context,Subscription sub){ + // NEED CONTEXT TO GET CLIENT ID AND USE Subscription KEY!!! + SubscriptionKey key = new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName()); Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList()); map.put("Type",JMXSupport.encodeObjectNamePart("Subscription")); - map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString())); + String name = key.toString() + ":" + sub.getConsumerInfo().toString(); + map.put("name",JMXSupport.encodeObjectNamePart(name)); + map.put("active", "true"); try{ ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map); SubscriptionView view; if(sub.getConsumerInfo().isDurable()){ - view=new DurableSubscriptionView(sub); + view=new DurableSubscriptionView(context.getClientId(),sub); }else{ - view=new SubscriptionView(sub); + view=new SubscriptionView(context.getClientId(),sub); } - registerSubscription(objectName,sub.getConsumerInfo(),view); + subscriptionMap.put(sub,objectName); + registerSubscription(objectName,sub.getConsumerInfo(),key,view); }catch(Exception e){ log.error("Failed to register subscription "+sub,e); } } public void unregisterSubscription(Subscription sub){ - Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList()); - map.put("Type",JMXSupport.encodeObjectNamePart("Subscription")); - map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString())); - try{ - ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map); - unregisterSubscription(objectName); - }catch(Exception e){ - log.error("Failed to unregister subscription "+sub,e); + ObjectName name=(ObjectName) subscriptionMap.get(sub); + if(name!=null){ + try{ + unregisterSubscription(name); + }catch(Exception e){ + log.error("Failed to unregister subscription "+sub,e); + } } } @@ -163,7 +187,7 @@ public class ManagedRegionBroker extends RegionBroker{ mbeanServer.unregisterMBean(key); } - protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionView view) throws Exception{ + protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionKey subscriptionKey,SubscriptionView view) throws Exception{ ActiveMQDestination dest=info.getDestination(); if(dest.isQueue()){ if(dest.isTemporary()){ @@ -177,6 +201,16 @@ public class ManagedRegionBroker extends RegionBroker{ }else{ if(info.isDurable()){ durableTopicSubscribers.put(key,view); + //unregister any inactive durable subs + try { + ObjectName inactiveName = (ObjectName) subscriptionKeys.get(subscriptionKey); + if (inactiveName != null){ + inactiveDurableTopicSubscribers.remove(inactiveName); + mbeanServer.unregisterMBean(inactiveName); + } + }catch(Exception e){ + log.error("Unable to unregister inactive durable subscriber: " + subscriptionKey,e); + } }else{ topicSubscribers.put(key,view); } @@ -188,10 +222,67 @@ public class ManagedRegionBroker extends RegionBroker{ protected void unregisterSubscription(ObjectName key) throws Exception{ queueSubscribers.remove(key); topicSubscribers.remove(key); - durableTopicSubscribers.remove(key); + inactiveDurableTopicSubscribers.remove(key); temporaryQueueSubscribers.remove(key); temporaryTopicSubscribers.remove(key); mbeanServer.unregisterMBean(key); + DurableSubscriptionView view = (DurableSubscriptionView) durableTopicSubscribers.remove(key); + if (view != null){ + //need to put this back in the inactive list + SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(),view.getSubscriptionName()); + SubscriptionInfo info = new SubscriptionInfo(); + info.setClientId(subscriptionKey.getClientId()); + info.setSubcriptionName(subscriptionKey.getSubscriptionName()); + info.setDestination(new ActiveMQTopic(view.getDestinationName())); + addInactiveSubscription(subscriptionKey, info); + } + + + } + + protected void buildExistingSubscriptions() throws Exception{ + Map subscriptions = new HashMap(); + Set destinations = adaptor.getDestinations(); + if (destinations != null){ + for (Iterator iter = destinations.iterator(); iter.hasNext();){ + ActiveMQDestination dest = (ActiveMQDestination) iter.next(); + if (dest.isTopic()){ + TopicMessageStore store = adaptor.createTopicMessageStore((ActiveMQTopic) dest); + SubscriptionInfo[] infos = store.getAllSubscriptions(); + if (infos != null){ + for (int i = 0; i < infos.length; i++) { + + SubscriptionInfo info = infos[i]; + log.debug("Restoring durable subscription: "+infos); + SubscriptionKey key = new SubscriptionKey(info); + subscriptions.put(key,info); + } + } + } + } + } + for (Iterator i = subscriptions.entrySet().iterator();i.hasNext();){ + Map.Entry entry = (Entry) i.next(); + SubscriptionKey key = (SubscriptionKey) entry.getKey(); + SubscriptionInfo info = (SubscriptionInfo) entry.getValue(); + addInactiveSubscription(key, info); + } + } + + protected void addInactiveSubscription(SubscriptionKey key,SubscriptionInfo info){ + Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList()); + map.put("Type",JMXSupport.encodeObjectNamePart("Subscription")); + map.put("name",JMXSupport.encodeObjectNamePart(key.toString())); + map.put("active", "false"); + try{ + ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map); + SubscriptionView view = new InactiveDurableSubscriptionView(key.getClientId(),info); + mbeanServer.registerMBean(view,objectName); + inactiveDurableTopicSubscribers.put(objectName,view); + subscriptionKeys.put(key, objectName); + }catch(Exception e){ + log.error("Failed to register subscription "+info,e); + } } protected ObjectName[] getTopics(){ @@ -231,4 +322,9 @@ public class ManagedRegionBroker extends RegionBroker{ Set set = temporaryQueueSubscribers.keySet(); return (ObjectName[])set.toArray(new ObjectName[set.size()]); } + + protected ObjectName[] getInactiveDurableTopicSubscribers(){ + Set set = inactiveDurableTopicSubscribers.keySet(); + return (ObjectName[])set.toArray(new ObjectName[set.size()]); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java index 2d0596d1d8..05d48e2d92 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java @@ -39,7 +39,7 @@ public class ManagedTempQueueRegion extends TempQueueRegion { protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { Subscription sub = super.createSubscription(context, info); - regionBroker.registerSubscription(sub); + regionBroker.registerSubscription(context,sub); return sub; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java index 4f7d712ce7..74aa4443f3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java @@ -39,7 +39,7 @@ public class ManagedTempTopicRegion extends TempTopicRegion { protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { Subscription sub = super.createSubscription(context, info); - regionBroker.registerSubscription(sub); + regionBroker.registerSubscription(context,sub); return sub; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java index 500b93a3d2..73f6f1f065 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java @@ -41,7 +41,7 @@ public class ManagedTopicRegion extends TopicRegion { protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { Subscription sub = super.createSubscription(context, info); - regionBroker.registerSubscription(sub); + regionBroker.registerSubscription(context,sub); return sub; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java index 353db2065f..6fcc4af8fc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java @@ -29,22 +29,30 @@ public class SubscriptionView implements SubscriptionViewMBean { protected final Subscription subscription; - + protected final String clientId; /** * Constructior * @param subs */ - public SubscriptionView(Subscription subs){ + public SubscriptionView(String clientId,Subscription subs){ + this.clientId = clientId; this.subscription = subs; } + /** + * @return the clientId + */ + public String getClientId(){ + return clientId; + } + /** * @return the id of the Connection the Subscription is on */ public String getConnectionId(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ return info.getConsumerId().getConnectionId(); } @@ -55,7 +63,7 @@ public class SubscriptionView implements SubscriptionViewMBean { * @return the id of the Session the subscription is on */ public long getSessionId(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ return info.getConsumerId().getSessionId(); } @@ -66,7 +74,7 @@ public class SubscriptionView implements SubscriptionViewMBean { * @return the id of the Subscription */ public long getSubcriptionId(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ return info.getConsumerId().getValue(); } @@ -77,7 +85,7 @@ public class SubscriptionView implements SubscriptionViewMBean { * @return the destination name */ public String getDestinationName(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ ActiveMQDestination dest = info.getDestination(); return dest.getPhysicalName(); @@ -90,7 +98,7 @@ public class SubscriptionView implements SubscriptionViewMBean { * @return true if the destination is a Queue */ public boolean isDestinationQueue(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ ActiveMQDestination dest = info.getDestination(); return dest.isQueue(); @@ -102,7 +110,7 @@ public class SubscriptionView implements SubscriptionViewMBean { * @return true of the destination is a Topic */ public boolean isDestinationTopic(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ ActiveMQDestination dest = info.getDestination(); return dest.isTopic(); @@ -114,41 +122,54 @@ public class SubscriptionView implements SubscriptionViewMBean { * @return true if the destination is temporary */ public boolean isDestinationTemporary(){ - ConsumerInfo info = subscription.getConsumerInfo(); + ConsumerInfo info = getConsumerInfo(); if (info != null){ ActiveMQDestination dest = info.getDestination(); return dest.isTemporary(); } return false; } + + /** + * @return true if the subscriber is active + */ + public boolean isActive(){ + return true; + } /** * The subscription should release as may references as it can to help the garbage collector * reclaim memory. */ public void gc(){ + if (subscription != null){ subscription.gc(); + } } /** * @return number of messages pending delivery */ public int getPending(){ - return subscription.pending(); + return subscription != null ? subscription.pending() : 0; } /** * @return number of messages dispatched */ public int getDispatched(){ - return subscription.dispatched(); + return subscription != null ? subscription.dispatched() : 0; } /** * @return number of messages delivered */ public int getDelivered(){ - return subscription.delivered(); + return subscription != null ? subscription.delivered() : 0; + } + + protected ConsumerInfo getConsumerInfo(){ + return subscription != null ? subscription.getConsumerInfo() : null; } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java index eacdd534a5..38e7c629ee 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java @@ -17,6 +17,11 @@ package org.apache.activemq.broker.jmx; * @version $Revision: 1.5 $ */ public interface SubscriptionViewMBean{ + + /** + * @return the clientId + */ + public String getClientId(); /** * @return the id of the Connection the Subscription is on */ @@ -51,6 +56,11 @@ public interface SubscriptionViewMBean{ * @return true if the destination is temporary */ public boolean isDestinationTemporary(); + + /** + * @return true if the subscriber is active + */ + public boolean isActive(); /** * The subscription should release as may references as it can to help the garbage collector reclaim memory. diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 64fa2ac0e5..ee06ba26dc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -81,7 +81,7 @@ public class RegionBroker implements Broker { private BrokerId brokerId; private String brokerName; private Map clientIdSet = new HashMap(); // we will synchronize access - private PersistenceAdapter adaptor; + protected PersistenceAdapter adaptor; public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException { this(brokerService,taskRunnerFactory, memoryManager, createDefaultPersistenceAdapter(memoryManager), null);