mirror of https://github.com/apache/activemq.git
moved the create/destroy durable subscriptions to the broker MBean to make it easier to use and not require a reference to the topic to destroy them
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@385561 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6ce0aaa0dc
commit
ad5ad88996
|
@ -23,10 +23,13 @@ import org.apache.activemq.broker.BrokerService;
|
|||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||
|
||||
public class BrokerView implements BrokerViewMBean {
|
||||
|
||||
private final ManagedRegionBroker broker;
|
||||
final ManagedRegionBroker broker;
|
||||
private final BrokerService brokerService;
|
||||
|
||||
public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception {
|
||||
|
@ -140,6 +143,33 @@ public class BrokerView implements BrokerViewMBean {
|
|||
broker.removeDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name), 1000);
|
||||
}
|
||||
|
||||
public void createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception {
|
||||
ConnectionContext context = new ConnectionContext();
|
||||
context.setBroker(broker);
|
||||
context.setClientId(clientId);
|
||||
ConsumerInfo info = new ConsumerInfo();
|
||||
ConsumerId consumerId = new ConsumerId();
|
||||
consumerId.setConnectionId(clientId);
|
||||
consumerId.setSessionId(0);
|
||||
consumerId.setValue(0);
|
||||
info.setConsumerId(consumerId);
|
||||
info.setDestination(new ActiveMQTopic(topicName));
|
||||
info.setSubcriptionName(subscriberName);
|
||||
info.setSelector(selector);
|
||||
broker.addConsumer(context, info);
|
||||
broker.removeConsumer(context, info);
|
||||
}
|
||||
|
||||
public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception {
|
||||
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
|
||||
info.setClientId(clientId);
|
||||
info.setSubcriptionName(subscriberName);
|
||||
ConnectionContext context = new ConnectionContext();
|
||||
context.setBroker(broker);
|
||||
context.setClientId(clientId);
|
||||
broker.removeSubscription(context, info);
|
||||
}
|
||||
|
||||
static public ConnectionContext getConnectionContext(Broker broker) {
|
||||
ConnectionContext context = new ConnectionContext();
|
||||
context.setBroker(broker);
|
||||
|
|
|
@ -18,6 +18,11 @@ package org.apache.activemq.broker.jmx;
|
|||
|
||||
import javax.management.ObjectName;
|
||||
import org.apache.activemq.Service;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||
|
||||
public interface BrokerViewMBean extends Service {
|
||||
|
||||
|
@ -97,4 +102,22 @@ public interface BrokerViewMBean extends Service {
|
|||
*/
|
||||
public void removeQueue(String name) throws Exception;
|
||||
|
||||
/**
|
||||
* Creates a new durable topic subscriber
|
||||
*
|
||||
* @param clientId the JMS client ID
|
||||
* @param subscriberName the durable subscriber name
|
||||
* @param topicName the name of the topic to subscribe to
|
||||
* @param selector a selector or null
|
||||
*/
|
||||
public void createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception;
|
||||
|
||||
/**
|
||||
* Destroys a durable subscriber
|
||||
*
|
||||
* @param clientId the JMS client ID
|
||||
* @param subscriberName the durable subscriber name
|
||||
*/
|
||||
public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception;
|
||||
|
||||
}
|
|
@ -19,37 +19,10 @@ import org.apache.activemq.command.ConsumerId;
|
|||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||
|
||||
public class TopicView extends DestinationView implements TopicViewMBean{
|
||||
public class TopicView extends DestinationView implements TopicViewMBean {
|
||||
|
||||
public TopicView(ManagedRegionBroker broker, Topic destination){
|
||||
public TopicView(ManagedRegionBroker broker, Topic destination) {
|
||||
super(broker, destination);
|
||||
}
|
||||
|
||||
public void createDurableSubscriber(String clientId,String subscriberName) throws Exception{
|
||||
ConnectionContext context = new ConnectionContext();
|
||||
context.setBroker(broker);
|
||||
context.setClientId(clientId);
|
||||
ConsumerInfo info = new ConsumerInfo();
|
||||
ConsumerId consumerId = new ConsumerId();
|
||||
consumerId.setConnectionId(clientId);
|
||||
consumerId.setSessionId(0);
|
||||
consumerId.setValue(0);
|
||||
info.setConsumerId(consumerId);
|
||||
info.setDestination(destination.getActiveMQDestination());
|
||||
info.setSubcriptionName(subscriberName);
|
||||
broker.addConsumer(context, info);
|
||||
broker.removeConsumer(context, info);
|
||||
}
|
||||
|
||||
public void destroyDurableSubscriber(String clientId,String subscriberName) throws Exception{
|
||||
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
|
||||
info.setClientId(clientId);
|
||||
info.setSubcriptionName(subscriberName);
|
||||
ConnectionContext context = new ConnectionContext();
|
||||
context.setBroker(broker);
|
||||
context.setClientId(clientId);
|
||||
broker.removeSubscription(context, info);
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,23 +17,4 @@
|
|||
package org.apache.activemq.broker.jmx;
|
||||
|
||||
public interface TopicViewMBean extends DestinationViewMBean {
|
||||
|
||||
/**
|
||||
* Creates a durable subscription that is subscribed to this topic.
|
||||
*
|
||||
* @param clientId
|
||||
* @param subscriberName
|
||||
* @throws Exception
|
||||
*/
|
||||
public void createDurableSubscriber(String clientId, String subscriberName) throws Exception;
|
||||
|
||||
/**
|
||||
* Destroys a durable subscription that had previously subscribed to this topic.
|
||||
*
|
||||
* @param clientId
|
||||
* @param subscriberName
|
||||
* @throws Exception
|
||||
*/
|
||||
public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception;
|
||||
|
||||
}
|
|
@ -138,14 +138,14 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
|
|||
|
||||
assertEquals("Durable subscriber count", 0, broker.getDurableTopicSubscribers().length);
|
||||
|
||||
ObjectName newTopicName = assertRegisteredObjectName(domain + ":Type=Topic,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
TopicViewMBean topic = (TopicViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, newTopicName, TopicViewMBean.class, true);
|
||||
topic.createDurableSubscriber(clientID, "subscriber1");
|
||||
topic.createDurableSubscriber(clientID, "subscriber2");
|
||||
String topicName = getDestinationString();
|
||||
String selector = null;
|
||||
broker.createDurableSubscriber(clientID, "subscriber1", topicName , selector);
|
||||
broker.createDurableSubscriber(clientID, "subscriber2", topicName, selector);
|
||||
assertEquals("Durable subscriber count", 2, broker.getDurableTopicSubscribers().length);
|
||||
|
||||
// now lets try destroy it
|
||||
topic.destroyDurableSubscriber(clientID, "subscriber1");
|
||||
broker.destroyDurableSubscriber(clientID, "subscriber1");
|
||||
assertEquals("Durable subscriber count", 1, broker.getDurableTopicSubscribers().length);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue