added some helper mbean methods so that you can ask for the consumers on a DestinationViewMBean

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@687043 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2008-08-19 12:56:40 +00:00
parent 52e15a48a1
commit 947659cd52
10 changed files with 89 additions and 22 deletions

View File

@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.InvalidSelectorException;
@ -33,11 +34,14 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import javax.management.ObjectName;
import javax.management.MalformedObjectNameException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
@ -342,4 +346,17 @@ public class DestinationView implements DestinationViewMBean {
public void setUseCache(boolean value) {
destination.setUseCache(value);
}
public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {
List<Subscription> subscriptions = destination.getConsumers();
ObjectName[] answer = new ObjectName[subscriptions.size()];
ObjectName objectName = broker.getBrokerService().getBrokerObjectName();
int index = 0;
for (Subscription subscription : subscriptions) {
String connectionClientId = subscription.getContext().getClientId();
String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription, connectionClientId, objectName);
answer[index++] = new ObjectName(objectNameStr);
}
return answer;
}
}

View File

@ -18,11 +18,14 @@ package org.apache.activemq.broker.jmx;
import java.util.List;
import java.util.Map;
import java.io.IOException;
import javax.jms.InvalidSelectorException;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.ObjectName;
import javax.management.MalformedObjectNameException;
public interface DestinationViewMBean {
@ -259,4 +262,11 @@ public interface DestinationViewMBean {
*/
public void setUseCache(boolean value);
/**
* Returns all the current subscription MBeans matching this destination
*
* @return the names of the subscriptions for this destination
*/
ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException;
}

View File

@ -168,27 +168,11 @@ public class ManagedRegionBroker extends RegionBroker {
}
public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
Hashtable map = brokerObjectName.getKeyPropertyList();
String objectNameStr = brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(context.getClientId());
String persistentMode = "persistentMode=";
String consumerId = "";
String connectionClientId = context.getClientId();
ObjectName brokerJmxObjectName = brokerObjectName;
String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName);
SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
if (sub.getConsumerInfo().isDurable()) {
persistentMode += "Durable, subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
} else {
persistentMode += "Non-Durable";
if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) {
consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
}
}
objectNameStr += persistentMode + ",";
objectNameStr += destinationType + ",";
objectNameStr += destinationName + ",";
objectNameStr += clientId;
objectNameStr += consumerId;
try {
ObjectName objectName = new ObjectName(objectNameStr);
SubscriptionView view;
@ -210,6 +194,31 @@ public class ManagedRegionBroker extends RegionBroker {
}
}
public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) {
Hashtable map = brokerJmxObjectName.getKeyPropertyList();
String brokerDomain = brokerJmxObjectName.getDomain();
String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
String persistentMode = "persistentMode=";
String consumerId = "";
if (sub.getConsumerInfo().isDurable()) {
persistentMode += "Durable, subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
} else {
persistentMode += "Non-Durable";
if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) {
consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
}
}
objectNameStr += persistentMode + ",";
objectNameStr += destinationType + ",";
objectNameStr += destinationName + ",";
objectNameStr += clientId;
objectNameStr += consumerId;
return objectNameStr;
}
public void unregisterSubscription(Subscription sub) {
ObjectName name = subscriptionMap.remove(sub);
if (name != null) {

View File

@ -57,4 +57,6 @@ public class TopicSubscriptionView extends SubscriptionView implements TopicSubs
topicSubscription.setMaximumPendingMessages(max);
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.List;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
@ -168,4 +169,5 @@ public interface Destination extends Service, Task {
*/
void isFull(ConnectionContext context,Usage usage);
List<Subscription> getConsumers();
}

View File

@ -19,6 +19,8 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.List;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -100,6 +102,10 @@ public class DestinationFilter implements Destination {
next.stop();
}
public List<Subscription> getConsumers() {
return next.getConsumers();
}
/**
* Sends a message to the given destination which may be a wildcard
*/

View File

@ -116,6 +116,12 @@ public class Queue extends BaseDestination implements Task {
this.dispatchSelector=new QueueDispatchSelector(destination);
}
public List<Subscription> getConsumers() {
synchronized (consumers) {
return new ArrayList<Subscription>(consumers);
}
}
public void initialize() throws Exception {
if (this.messages == null) {
if (destination.isTemporary() || broker == null || store == null) {

View File

@ -221,4 +221,6 @@ public interface Subscription extends SubscriptionRecovery {
* @return the number of messages this subscription can accept before its full
*/
int countBeforeFull();
ConnectionContext getContext();
}

View File

@ -19,6 +19,8 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Set;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
@ -106,6 +108,12 @@ public class Topic extends BaseDestination implements Task{
}
}
public List<Subscription> getConsumers() {
synchronized (consumers) {
return new ArrayList<Subscription>(consumers);
}
}
public boolean lock(MessageReference node, LockOwner sub) {
return true;
}

View File

@ -140,6 +140,11 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
dispatched.add(qmr);
}
public ConnectionContext getContext() {
// TODO
return null;
}
public void add(ConnectionContext context, Destination destination)
throws Exception {
// TODO Auto-generated method stub