From 947659cd522f03b532fca796cd699ca8d1e33a66 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Tue, 19 Aug 2008 12:56:40 +0000 Subject: [PATCH] added some helper mbean methods so that you can ask for the consumers on a DestinationViewMBean git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@687043 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/jmx/DestinationView.java | 19 ++++++- .../broker/jmx/DestinationViewMBean.java | 10 ++++ .../broker/jmx/ManagedRegionBroker.java | 49 +++++++++++-------- .../broker/jmx/TopicSubscriptionView.java | 2 + .../activemq/broker/region/Destination.java | 2 + .../broker/region/DestinationFilter.java | 6 +++ .../apache/activemq/broker/region/Queue.java | 8 ++- .../activemq/broker/region/Subscription.java | 2 + .../apache/activemq/broker/region/Topic.java | 8 +++ .../SubscriptionAddRemoveQueueTest.java | 5 ++ 10 files changed, 89 insertions(+), 22 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 26950144c0..209169d87c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.io.IOException; import javax.jms.Connection; import javax.jms.InvalidSelectorException; @@ -33,11 +34,14 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; +import javax.management.ObjectName; +import javax.management.MalformedObjectNameException; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTextMessage; @@ -341,5 +345,18 @@ public class DestinationView implements DestinationViewMBean { public void setUseCache(boolean value) { destination.setUseCache(value); - } + } + + public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException { + List subscriptions = destination.getConsumers(); + ObjectName[] answer = new ObjectName[subscriptions.size()]; + ObjectName objectName = broker.getBrokerService().getBrokerObjectName(); + int index = 0; + for (Subscription subscription : subscriptions) { + String connectionClientId = subscription.getContext().getClientId(); + String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription, connectionClientId, objectName); + answer[index++] = new ObjectName(objectNameStr); + } + return answer; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index 01e63fa967..9a6ecfa86f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -18,11 +18,14 @@ package org.apache.activemq.broker.jmx; import java.util.List; import java.util.Map; +import java.io.IOException; import javax.jms.InvalidSelectorException; import javax.management.openmbean.CompositeData; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; +import javax.management.ObjectName; +import javax.management.MalformedObjectNameException; public interface DestinationViewMBean { @@ -259,4 +262,11 @@ public interface DestinationViewMBean { */ public void setUseCache(boolean value); + /** + * Returns all the current subscription MBeans matching this destination + * + * @return the names of the subscriptions for this destination + */ + ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException; + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index b621580bae..73f3eda30b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -168,27 +168,11 @@ public class ManagedRegionBroker extends RegionBroker { } public ObjectName registerSubscription(ConnectionContext context, Subscription sub) { - Hashtable map = brokerObjectName.getKeyPropertyList(); - String objectNameStr = brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,"; - String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString(); - String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName()); - String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(context.getClientId()); - String persistentMode = "persistentMode="; - String consumerId = ""; + String connectionClientId = context.getClientId(); + ObjectName brokerJmxObjectName = brokerObjectName; + String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName); + SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName()); - if (sub.getConsumerInfo().isDurable()) { - persistentMode += "Durable, subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName()); - } else { - persistentMode += "Non-Durable"; - if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) { - consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString()); - } - } - objectNameStr += persistentMode + ","; - objectNameStr += destinationType + ","; - objectNameStr += destinationName + ","; - objectNameStr += clientId; - objectNameStr += consumerId; try { ObjectName objectName = new ObjectName(objectNameStr); SubscriptionView view; @@ -210,6 +194,31 @@ public class ManagedRegionBroker extends RegionBroker { } } + public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) { + Hashtable map = brokerJmxObjectName.getKeyPropertyList(); + String brokerDomain = brokerJmxObjectName.getDomain(); + String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,"; + String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString(); + String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName()); + String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId); + String persistentMode = "persistentMode="; + String consumerId = ""; + if (sub.getConsumerInfo().isDurable()) { + persistentMode += "Durable, subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName()); + } else { + persistentMode += "Non-Durable"; + if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) { + consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString()); + } + } + objectNameStr += persistentMode + ","; + objectNameStr += destinationType + ","; + objectNameStr += destinationName + ","; + objectNameStr += clientId; + objectNameStr += consumerId; + return objectNameStr; + } + public void unregisterSubscription(Subscription sub) { ObjectName name = subscriptionMap.remove(sub); if (name != null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java index 0687066976..875a4ce89d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java @@ -57,4 +57,6 @@ public class TopicSubscriptionView extends SubscriptionView implements TopicSubs topicSubscription.setMaximumPendingMessages(max); } } + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java index d120f4bca8..94a9d27070 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import java.util.List; import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; @@ -168,4 +169,5 @@ public interface Destination extends Service, Task { */ void isFull(ConnectionContext context,Usage usage); + List getConsumers(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 8c0e548ff1..55068898d2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -19,6 +19,8 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.Iterator; import java.util.Set; +import java.util.List; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -100,6 +102,10 @@ public class DestinationFilter implements Destination { next.stop(); } + public List getConsumers() { + return next.getConsumers(); + } + /** * Sends a message to the given destination which may be a wildcard */ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 4dfa731bde..bbf4cc6056 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -115,7 +115,13 @@ public class Queue extends BaseDestination implements Task { this.taskFactory=taskFactory; this.dispatchSelector=new QueueDispatchSelector(destination); } - + + public List getConsumers() { + synchronized (consumers) { + return new ArrayList(consumers); + } + } + public void initialize() throws Exception { if (this.messages == null) { if (destination.isTemporary() || broker == null || store == null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java index e5c921ebe1..f8b7028e8a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -221,4 +221,6 @@ public interface Subscription extends SubscriptionRecovery { * @return the number of messages this subscription can accept before its full */ int countBeforeFull(); + + ConnectionContext getContext(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 47164791d1..fc9ef9ce8c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -19,6 +19,8 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.LinkedList; import java.util.Set; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; @@ -106,6 +108,12 @@ public class Topic extends BaseDestination implements Task{ } } + public List getConsumers() { + synchronized (consumers) { + return new ArrayList(consumers); + } + } + public boolean lock(MessageReference node, LockOwner sub) { return true; } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java index 0b35e78287..9c3f1d04e6 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java @@ -140,6 +140,11 @@ public class SubscriptionAddRemoveQueueTest extends TestCase { dispatched.add(qmr); } + public ConnectionContext getContext() { + // TODO + return null; + } + public void add(ConnectionContext context, Destination destination) throws Exception { // TODO Auto-generated method stub