Added support for view inactive durable consumers

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@382344 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-03-02 10:31:23 +00:00
parent 4ddb76fe4d
commit c82f6f3309
12 changed files with 274 additions and 34 deletions

View File

@ -117,4 +117,8 @@ public class BrokerView implements BrokerViewMBean {
return broker.getTemporaryQueueSubscribers();
}
public ObjectName[] getInactiveDurableTopicSubscribers(){
return broker.getInactiveDurableTopicSubscribers();
}
}

View File

@ -45,6 +45,7 @@ public interface BrokerViewMBean extends Service {
public ObjectName[] getTopicSubscribers();
public ObjectName[] getDurableTopicSubscribers();
public ObjectName[] getInactiveDurableTopicSubscribers();
public ObjectName[] getQueueSubscribers();
public ObjectName[] getTemporaryTopicSubscribers();
public ObjectName[] getTemporaryQueueSubscribers();

View File

@ -23,10 +23,16 @@ import org.apache.activemq.broker.region.Subscription;
public class DurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean {
protected String subscriptionName;
public DurableSubscriptionView(Subscription sub){
super(sub);
/**
* Constructor
* @param clientId
* @param sub
*/
public DurableSubscriptionView(String clientId,Subscription sub){
super(clientId,sub);
this.subscriptionName = sub.getConsumerInfo().getSubcriptionName();
}
/**
* @return name of the durable consumer
*/

View File

@ -0,0 +1,102 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import org.apache.activemq.command.SubscriptionInfo;
/**
* @version $Revision: 1.5 $
*/
public class InactiveDurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean {
protected SubscriptionInfo info;
public InactiveDurableSubscriptionView(String clientId,SubscriptionInfo sub){
super(clientId,null);
this.info = sub;
}
/**
* @return the id of the Subscription
*/
public long getSubcriptionId(){
return -1;
}
/**
* @return the destination name
*/
public String getDestinationName(){
return info.getDestination().getPhysicalName();
}
/**
* @return true if the destination is a Queue
*/
public boolean isDestinationQueue(){
return false;
}
/**
* @return true of the destination is a Topic
*/
public boolean isDestinationTopic(){
return true;
}
/**
* @return true if the destination is temporary
*/
public boolean isDestinationTemporary(){
return false;
}
/**
* @return name of the durable consumer
*/
public String getSubscriptionName(){
return info.getSubcriptionName();
}
/**
* @return true if the subscriber is active
*/
public boolean isActive(){
return false;
}
/**
* Browse messages for this durable subscriber
*
* @return messages
* @throws OpenDataException
*/
public CompositeData[] browse() throws OpenDataException{
return null;
}
/**
* Browse messages for this durable subscriber
*
* @return messages
* @throws OpenDataException
*/
public TabularData browseAsTable() throws OpenDataException{
return null;
}
}

View File

@ -41,7 +41,7 @@ public class ManagedQueueRegion extends QueueRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
Subscription sub = super.createSubscription(context, info);
regionBroker.registerSubscription(sub);
regionBroker.registerSubscription(context,sub);
return sub;
}

View File

@ -14,13 +14,19 @@
package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
@ -28,11 +34,15 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
@ -47,8 +57,11 @@ public class ManagedRegionBroker extends RegionBroker{
private final Map queueSubscribers=new ConcurrentHashMap();
private final Map topicSubscribers=new ConcurrentHashMap();
private final Map durableTopicSubscribers=new ConcurrentHashMap();
private final Map inactiveDurableTopicSubscribers=new ConcurrentHashMap();
private final Map temporaryQueueSubscribers=new ConcurrentHashMap();
private final Map temporaryTopicSubscribers=new ConcurrentHashMap();
private final Map subscriptionKeys = new ConcurrentHashMap();
private final Map subscriptionMap = new ConcurrentHashMap();
public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName,
TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter,
@ -57,6 +70,13 @@ public class ManagedRegionBroker extends RegionBroker{
this.mbeanServer=mbeanServer;
this.brokerObjectName=brokerObjectName;
}
public void start() throws Exception {
super.start();
//build all existing durable subscriptions
buildExistingSubscriptions();
}
protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter adapter,PolicyMap policyMap){
@ -108,33 +128,37 @@ public class ManagedRegionBroker extends RegionBroker{
}
}
public void registerSubscription(Subscription sub){
public void registerSubscription(ConnectionContext context,Subscription sub){
// NEED CONTEXT TO GET CLIENT ID AND USE Subscription KEY!!!
SubscriptionKey key = new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName());
Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString()));
String name = key.toString() + ":" + sub.getConsumerInfo().toString();
map.put("name",JMXSupport.encodeObjectNamePart(name));
map.put("active", "true");
try{
ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
SubscriptionView view;
if(sub.getConsumerInfo().isDurable()){
view=new DurableSubscriptionView(sub);
view=new DurableSubscriptionView(context.getClientId(),sub);
}else{
view=new SubscriptionView(sub);
view=new SubscriptionView(context.getClientId(),sub);
}
registerSubscription(objectName,sub.getConsumerInfo(),view);
subscriptionMap.put(sub,objectName);
registerSubscription(objectName,sub.getConsumerInfo(),key,view);
}catch(Exception e){
log.error("Failed to register subscription "+sub,e);
}
}
public void unregisterSubscription(Subscription sub){
Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString()));
try{
ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
unregisterSubscription(objectName);
}catch(Exception e){
log.error("Failed to unregister subscription "+sub,e);
ObjectName name=(ObjectName) subscriptionMap.get(sub);
if(name!=null){
try{
unregisterSubscription(name);
}catch(Exception e){
log.error("Failed to unregister subscription "+sub,e);
}
}
}
@ -163,7 +187,7 @@ public class ManagedRegionBroker extends RegionBroker{
mbeanServer.unregisterMBean(key);
}
protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionView view) throws Exception{
protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionKey subscriptionKey,SubscriptionView view) throws Exception{
ActiveMQDestination dest=info.getDestination();
if(dest.isQueue()){
if(dest.isTemporary()){
@ -177,6 +201,16 @@ public class ManagedRegionBroker extends RegionBroker{
}else{
if(info.isDurable()){
durableTopicSubscribers.put(key,view);
//unregister any inactive durable subs
try {
ObjectName inactiveName = (ObjectName) subscriptionKeys.get(subscriptionKey);
if (inactiveName != null){
inactiveDurableTopicSubscribers.remove(inactiveName);
mbeanServer.unregisterMBean(inactiveName);
}
}catch(Exception e){
log.error("Unable to unregister inactive durable subscriber: " + subscriptionKey,e);
}
}else{
topicSubscribers.put(key,view);
}
@ -188,10 +222,67 @@ public class ManagedRegionBroker extends RegionBroker{
protected void unregisterSubscription(ObjectName key) throws Exception{
queueSubscribers.remove(key);
topicSubscribers.remove(key);
durableTopicSubscribers.remove(key);
inactiveDurableTopicSubscribers.remove(key);
temporaryQueueSubscribers.remove(key);
temporaryTopicSubscribers.remove(key);
mbeanServer.unregisterMBean(key);
DurableSubscriptionView view = (DurableSubscriptionView) durableTopicSubscribers.remove(key);
if (view != null){
//need to put this back in the inactive list
SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(),view.getSubscriptionName());
SubscriptionInfo info = new SubscriptionInfo();
info.setClientId(subscriptionKey.getClientId());
info.setSubcriptionName(subscriptionKey.getSubscriptionName());
info.setDestination(new ActiveMQTopic(view.getDestinationName()));
addInactiveSubscription(subscriptionKey, info);
}
}
protected void buildExistingSubscriptions() throws Exception{
Map subscriptions = new HashMap();
Set destinations = adaptor.getDestinations();
if (destinations != null){
for (Iterator iter = destinations.iterator(); iter.hasNext();){
ActiveMQDestination dest = (ActiveMQDestination) iter.next();
if (dest.isTopic()){
TopicMessageStore store = adaptor.createTopicMessageStore((ActiveMQTopic) dest);
SubscriptionInfo[] infos = store.getAllSubscriptions();
if (infos != null){
for (int i = 0; i < infos.length; i++) {
SubscriptionInfo info = infos[i];
log.debug("Restoring durable subscription: "+infos);
SubscriptionKey key = new SubscriptionKey(info);
subscriptions.put(key,info);
}
}
}
}
}
for (Iterator i = subscriptions.entrySet().iterator();i.hasNext();){
Map.Entry entry = (Entry) i.next();
SubscriptionKey key = (SubscriptionKey) entry.getKey();
SubscriptionInfo info = (SubscriptionInfo) entry.getValue();
addInactiveSubscription(key, info);
}
}
protected void addInactiveSubscription(SubscriptionKey key,SubscriptionInfo info){
Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
map.put("name",JMXSupport.encodeObjectNamePart(key.toString()));
map.put("active", "false");
try{
ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
SubscriptionView view = new InactiveDurableSubscriptionView(key.getClientId(),info);
mbeanServer.registerMBean(view,objectName);
inactiveDurableTopicSubscribers.put(objectName,view);
subscriptionKeys.put(key, objectName);
}catch(Exception e){
log.error("Failed to register subscription "+info,e);
}
}
protected ObjectName[] getTopics(){
@ -231,4 +322,9 @@ public class ManagedRegionBroker extends RegionBroker{
Set set = temporaryQueueSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getInactiveDurableTopicSubscribers(){
Set set = inactiveDurableTopicSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
}
}

View File

@ -39,7 +39,7 @@ public class ManagedTempQueueRegion extends TempQueueRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
Subscription sub = super.createSubscription(context, info);
regionBroker.registerSubscription(sub);
regionBroker.registerSubscription(context,sub);
return sub;
}

View File

@ -39,7 +39,7 @@ public class ManagedTempTopicRegion extends TempTopicRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
Subscription sub = super.createSubscription(context, info);
regionBroker.registerSubscription(sub);
regionBroker.registerSubscription(context,sub);
return sub;
}

View File

@ -41,7 +41,7 @@ public class ManagedTopicRegion extends TopicRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
Subscription sub = super.createSubscription(context, info);
regionBroker.registerSubscription(sub);
regionBroker.registerSubscription(context,sub);
return sub;
}

View File

@ -29,22 +29,30 @@ public class SubscriptionView implements SubscriptionViewMBean {
protected final Subscription subscription;
protected final String clientId;
/**
* Constructior
* @param subs
*/
public SubscriptionView(Subscription subs){
public SubscriptionView(String clientId,Subscription subs){
this.clientId = clientId;
this.subscription = subs;
}
/**
* @return the clientId
*/
public String getClientId(){
return clientId;
}
/**
* @return the id of the Connection the Subscription is on
*/
public String getConnectionId(){
ConsumerInfo info = subscription.getConsumerInfo();
ConsumerInfo info = getConsumerInfo();
if (info != null){
return info.getConsumerId().getConnectionId();
}
@ -55,7 +63,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
* @return the id of the Session the subscription is on
*/
public long getSessionId(){
ConsumerInfo info = subscription.getConsumerInfo();
ConsumerInfo info = getConsumerInfo();
if (info != null){
return info.getConsumerId().getSessionId();
}
@ -66,7 +74,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
* @return the id of the Subscription
*/
public long getSubcriptionId(){
ConsumerInfo info = subscription.getConsumerInfo();
ConsumerInfo info = getConsumerInfo();
if (info != null){
return info.getConsumerId().getValue();
}
@ -77,7 +85,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
* @return the destination name
*/
public String getDestinationName(){
ConsumerInfo info = subscription.getConsumerInfo();
ConsumerInfo info = getConsumerInfo();
if (info != null){
ActiveMQDestination dest = info.getDestination();
return dest.getPhysicalName();
@ -90,7 +98,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
* @return true if the destination is a Queue
*/
public boolean isDestinationQueue(){
ConsumerInfo info = subscription.getConsumerInfo();
ConsumerInfo info = getConsumerInfo();
if (info != null){
ActiveMQDestination dest = info.getDestination();
return dest.isQueue();
@ -102,7 +110,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
* @return true of the destination is a Topic
*/
public boolean isDestinationTopic(){
ConsumerInfo info = subscription.getConsumerInfo();
ConsumerInfo info = getConsumerInfo();
if (info != null){
ActiveMQDestination dest = info.getDestination();
return dest.isTopic();
@ -114,41 +122,54 @@ public class SubscriptionView implements SubscriptionViewMBean {
* @return true if the destination is temporary
*/
public boolean isDestinationTemporary(){
ConsumerInfo info = subscription.getConsumerInfo();
ConsumerInfo info = getConsumerInfo();
if (info != null){
ActiveMQDestination dest = info.getDestination();
return dest.isTemporary();
}
return false;
}
/**
* @return true if the subscriber is active
*/
public boolean isActive(){
return true;
}
/**
* The subscription should release as may references as it can to help the garbage collector
* reclaim memory.
*/
public void gc(){
if (subscription != null){
subscription.gc();
}
}
/**
* @return number of messages pending delivery
*/
public int getPending(){
return subscription.pending();
return subscription != null ? subscription.pending() : 0;
}
/**
* @return number of messages dispatched
*/
public int getDispatched(){
return subscription.dispatched();
return subscription != null ? subscription.dispatched() : 0;
}
/**
* @return number of messages delivered
*/
public int getDelivered(){
return subscription.delivered();
return subscription != null ? subscription.delivered() : 0;
}
protected ConsumerInfo getConsumerInfo(){
return subscription != null ? subscription.getConsumerInfo() : null;
}
}

View File

@ -17,6 +17,11 @@ package org.apache.activemq.broker.jmx;
* @version $Revision: 1.5 $
*/
public interface SubscriptionViewMBean{
/**
* @return the clientId
*/
public String getClientId();
/**
* @return the id of the Connection the Subscription is on
*/
@ -51,6 +56,11 @@ public interface SubscriptionViewMBean{
* @return true if the destination is temporary
*/
public boolean isDestinationTemporary();
/**
* @return true if the subscriber is active
*/
public boolean isActive();
/**
* The subscription should release as may references as it can to help the garbage collector reclaim memory.

View File

@ -81,7 +81,7 @@ public class RegionBroker implements Broker {
private BrokerId brokerId;
private String brokerName;
private Map clientIdSet = new HashMap(); // we will synchronize access
private PersistenceAdapter adaptor;
protected PersistenceAdapter adaptor;
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException {
this(brokerService,taskRunnerFactory, memoryManager, createDefaultPersistenceAdapter(memoryManager), null);