diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java index 0bfadbea3c..41935873f5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java @@ -23,10 +23,13 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; public class BrokerView implements BrokerViewMBean { - private final ManagedRegionBroker broker; + final ManagedRegionBroker broker; private final BrokerService brokerService; public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception { @@ -140,6 +143,33 @@ public class BrokerView implements BrokerViewMBean { broker.removeDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name), 1000); } + public void createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception { + ConnectionContext context = new ConnectionContext(); + context.setBroker(broker); + context.setClientId(clientId); + ConsumerInfo info = new ConsumerInfo(); + ConsumerId consumerId = new ConsumerId(); + consumerId.setConnectionId(clientId); + consumerId.setSessionId(0); + consumerId.setValue(0); + info.setConsumerId(consumerId); + info.setDestination(new ActiveMQTopic(topicName)); + info.setSubcriptionName(subscriberName); + info.setSelector(selector); + broker.addConsumer(context, info); + broker.removeConsumer(context, info); + } + + public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception { + RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); + info.setClientId(clientId); + info.setSubcriptionName(subscriberName); + ConnectionContext context = new ConnectionContext(); + context.setBroker(broker); + context.setClientId(clientId); + broker.removeSubscription(context, info); + } + static public ConnectionContext getConnectionContext(Broker broker) { ConnectionContext context = new ConnectionContext(); context.setBroker(broker); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java index bd4b79448e..761220b3c2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java @@ -18,6 +18,11 @@ package org.apache.activemq.broker.jmx; import javax.management.ObjectName; import org.apache.activemq.Service; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; public interface BrokerViewMBean extends Service { @@ -96,5 +101,23 @@ public interface BrokerViewMBean extends Service { * @throws Exception */ public void removeQueue(String name) throws Exception; + + /** + * Creates a new durable topic subscriber + * + * @param clientId the JMS client ID + * @param subscriberName the durable subscriber name + * @param topicName the name of the topic to subscribe to + * @param selector a selector or null + */ + public void createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception; + + /** + * Destroys a durable subscriber + * + * @param clientId the JMS client ID + * @param subscriberName the durable subscriber name + */ + public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception; } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java index 4dfed1a6fc..ef748b3a08 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java @@ -19,37 +19,10 @@ import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; -public class TopicView extends DestinationView implements TopicViewMBean{ - - public TopicView(ManagedRegionBroker broker, Topic destination){ +public class TopicView extends DestinationView implements TopicViewMBean { + + public TopicView(ManagedRegionBroker broker, Topic destination) { super(broker, destination); } - public void createDurableSubscriber(String clientId,String subscriberName) throws Exception{ - ConnectionContext context = new ConnectionContext(); - context.setBroker(broker); - context.setClientId(clientId); - ConsumerInfo info = new ConsumerInfo(); - ConsumerId consumerId = new ConsumerId(); - consumerId.setConnectionId(clientId); - consumerId.setSessionId(0); - consumerId.setValue(0); - info.setConsumerId(consumerId); - info.setDestination(destination.getActiveMQDestination()); - info.setSubcriptionName(subscriberName); - broker.addConsumer(context, info); - broker.removeConsumer(context, info); - } - - public void destroyDurableSubscriber(String clientId,String subscriberName) throws Exception{ - RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); - info.setClientId(clientId); - info.setSubcriptionName(subscriberName); - ConnectionContext context = new ConnectionContext(); - context.setBroker(broker); - context.setClientId(clientId); - broker.removeSubscription(context, info); - - - } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java index c314d2f64e..4652919eed 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java @@ -17,23 +17,4 @@ package org.apache.activemq.broker.jmx; public interface TopicViewMBean extends DestinationViewMBean { - - /** - * Creates a durable subscription that is subscribed to this topic. - * - * @param clientId - * @param subscriberName - * @throws Exception - */ - public void createDurableSubscriber(String clientId, String subscriberName) throws Exception; - - /** - * Destroys a durable subscription that had previously subscribed to this topic. - * - * @param clientId - * @param subscriberName - * @throws Exception - */ - public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception; - } \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index 24366acc72..351c81b8a1 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -138,14 +138,14 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length); - ObjectName newTopicName = assertRegisteredObjectName(domain + ":Type=Topic,Destination=" + getDestinationString() + ",BrokerName=localhost"); - TopicViewMBean topic = (TopicViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, newTopicName, TopicViewMBean.class, true); - topic.createDurableSubscriber(clientID, "subscriber1"); - topic.createDurableSubscriber(clientID, "subscriber2"); + String topicName = getDestinationString(); + String selector = null; + broker.createDurableSubscriber(clientID, "subscriber1", topicName , selector); + broker.createDurableSubscriber(clientID, "subscriber2", topicName, selector); assertEquals("Durable subscriber count", 2, broker.getDurableTopicSubscribers().length); // now lets try destroy it - topic.destroyDurableSubscriber(clientID, "subscriber1"); + broker.destroyDurableSubscriber(clientID, "subscriber1"); assertEquals("Durable subscriber count", 1, broker.getDurableTopicSubscribers().length); }