refactor to allow the MBean createDurableSubscription() method to return the ObjectName of the newly created subscription

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@385575 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-13 15:41:55 +00:00
parent ad5ad88996
commit 0def1d4d40
22 changed files with 103 additions and 46 deletions

View File

@ -22,6 +22,7 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
@ -70,8 +71,8 @@ public class AdvisoryBroker extends BrokerFilter {
connections.put(info.getConnectionId(), info);
}
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
next.addConsumer(context, info);
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription answer = next.addConsumer(context, info);
// Don't advise advisory topics.
if( !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) {
@ -120,6 +121,7 @@ public class AdvisoryBroker extends BrokerFilter {
}
}
}
return answer;
}
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {

View File

@ -16,6 +16,7 @@ package org.apache.activemq.broker;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
@ -54,12 +55,13 @@ public class BrokerBroadcaster extends BrokerFilter{
}
}
public void addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{
next.addConsumer(context,info);
public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{
Subscription answer = next.addConsumer(context,info);
Broker brokers[]=getListeners();
for(int i=0;i<brokers.length;i++){
brokers[i].addConsumer(context,info);
}
return answer;
}
public void addProducer(ConnectionContext context,ProducerInfo info) throws Exception{

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker;
import java.util.Set;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@ -63,8 +64,8 @@ public class BrokerFilter implements Broker {
next.addConnection(context, info);
}
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
next.addConsumer(context, info);
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
return next.addConsumer(context, info);
}
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker;
import java.util.Set;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@ -123,8 +124,8 @@ public class EmptyBroker implements Broker{
}
public void addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{
public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{
return null;
}
public void removeConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker;
import java.util.Set;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@ -124,7 +125,7 @@ public class ErrorBroker implements Broker {
throw new IllegalStateException(this.message);
}
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
throw new IllegalStateException(this.message);
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker;
import java.util.Set;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@ -75,8 +76,8 @@ public class MutableBrokerFilter implements Broker {
getNext().addConnection(context, info);
}
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
getNext().addConsumer(context, info);
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
return getNext().addConsumer(context, info);
}
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {

View File

@ -21,6 +21,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.InsertableMutableBrokerFilter;
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
@ -132,9 +133,10 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
}
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
super.addConsumer(context, info);
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription answer = super.addConsumer(context, info);
sendAsyncToSlave(info);
return answer;
}

View File

@ -21,6 +21,7 @@ import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId;
@ -143,7 +144,7 @@ public class BrokerView implements BrokerViewMBean {
broker.removeDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name), 1000);
}
public void createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception {
public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception {
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
context.setClientId(clientId);
@ -156,8 +157,12 @@ public class BrokerView implements BrokerViewMBean {
info.setDestination(new ActiveMQTopic(topicName));
info.setSubcriptionName(subscriberName);
info.setSelector(selector);
broker.addConsumer(context, info);
Subscription subscription = broker.addConsumer(context, info);
broker.removeConsumer(context, info);
if (subscription != null) {
return subscription.getObjectName();
}
return null;
}
public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception {

View File

@ -109,8 +109,10 @@ public interface BrokerViewMBean extends Service {
* @param subscriberName the durable subscriber name
* @param topicName the name of the topic to subscribe to
* @param selector a selector or null
*
* @return the object name of the MBean registered in JMX
*/
public void createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception;
public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception;
/**
* Destroys a durable subscriber

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.jmx;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
@ -41,7 +42,8 @@ public class ManagedQueueRegion extends QueueRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
Subscription sub = super.createSubscription(context, info);
regionBroker.registerSubscription(context,sub);
ObjectName name = regionBroker.registerSubscription(context,sub);
sub.setObjectName(name);
return sub;
}

View File

@ -62,6 +62,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
public class ManagedRegionBroker extends RegionBroker{
private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
private final MBeanServer mbeanServer;
@ -154,7 +155,7 @@ public class ManagedRegionBroker extends RegionBroker{
}
}
public void registerSubscription(ConnectionContext context,Subscription sub){
public ObjectName registerSubscription(ConnectionContext context,Subscription sub){
Hashtable map=brokerObjectName.getKeyPropertyList();
String name="";
SubscriptionKey key=new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName());
@ -179,8 +180,10 @@ public class ManagedRegionBroker extends RegionBroker{
}
registerSubscription(objectName,sub.getConsumerInfo(),key,view);
subscriptionMap.put(sub,objectName);
return objectName;
}catch(Exception e){
log.error("Failed to register subscription "+sub,e);
return null;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.jmx;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
@ -39,7 +40,8 @@ public class ManagedTempQueueRegion extends TempQueueRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
Subscription sub = super.createSubscription(context, info);
regionBroker.registerSubscription(context,sub);
ObjectName name = regionBroker.registerSubscription(context,sub);
sub.setObjectName(name);
return sub;
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.jmx;
import javax.jms.JMSException;
import javax.management.ObjectName;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
@ -39,7 +40,8 @@ public class ManagedTempTopicRegion extends TempTopicRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
Subscription sub = super.createSubscription(context, info);
regionBroker.registerSubscription(context,sub);
ObjectName name = regionBroker.registerSubscription(context,sub);
sub.setObjectName(name);
return sub;
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.jmx;
import javax.jms.JMSException;
import javax.management.ObjectName;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
@ -41,7 +42,8 @@ public class ManagedTopicRegion extends TopicRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
Subscription sub = super.createSubscription(context, info);
regionBroker.registerSubscription(context,sub);
ObjectName name = regionBroker.registerSubscription(context,sub);
sub.setObjectName(name);
return sub;
}

View File

@ -113,7 +113,7 @@ abstract public class AbstractRegion implements Region {
}
}
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription sub = createSubscription(context, info);
@ -148,6 +148,7 @@ abstract public class AbstractRegion implements Region {
((QueueBrowserSubscription)sub).browseDone();
}
return sub;
}
/**

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.management.ObjectName;
import java.io.IOException;
@ -46,6 +47,7 @@ abstract public class AbstractSubscription implements Subscription {
protected ConsumerInfo info;
final protected DestinationFilter destinationFilter;
private BooleanExpression selectorExpression;
private ObjectName objectName;
final protected CopyOnWriteArrayList destinations = new CopyOnWriteArrayList();
@ -140,4 +142,12 @@ abstract public class AbstractSubscription implements Subscription {
info.setSelector(selector);
this.selectorExpression = newSelector;
}
public ObjectName getObjectName() {
return objectName;
}
public void setObjectName(ObjectName objectName) {
this.objectName = objectName;
}
}

View File

@ -60,9 +60,10 @@ public interface Region extends Service {
/**
* Adds a consumer.
* @param context the environment the operation is being executed under.
* @return TODO
* @throws Exception TODO
*/
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception;
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception;
/**
* Removes a consumer.

View File

@ -200,7 +200,7 @@ public class RegionBroker implements Broker {
answer = tempTopicRegion.addDestination(context, destination);
break;
default:
throwUnknownDestinationType(destination);
throw createUnknownDestinationTypeException(destination);
}
destinations.add(destination);
@ -225,7 +225,7 @@ public class RegionBroker implements Broker {
tempTopicRegion.removeDestination(context, destination, timeout);
break;
default:
throwUnknownDestinationType(destination);
throw createUnknownDestinationTypeException(destination);
}
destinations.remove(destination);
@ -251,23 +251,23 @@ public class RegionBroker implements Broker {
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
}
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination();
switch(destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.addConsumer(context, info);
break;
return queueRegion.addConsumer(context, info);
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.addConsumer(context, info);
break;
return topicRegion.addConsumer(context, info);
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.addConsumer(context, info);
break;
return tempQueueRegion.addConsumer(context, info);
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.addConsumer(context, info);
break;
return tempTopicRegion.addConsumer(context, info);
default:
throwUnknownDestinationType(destination);
throw createUnknownDestinationTypeException(destination);
}
}
@ -287,7 +287,7 @@ public class RegionBroker implements Broker {
tempTopicRegion.removeConsumer(context, info);
break;
default:
throwUnknownDestinationType(destination);
throw createUnknownDestinationTypeException(destination);
}
}
@ -316,7 +316,7 @@ public class RegionBroker implements Broker {
tempTopicRegion.send(context, message);
break;
default:
throwUnknownDestinationType(destination);
throw createUnknownDestinationTypeException(destination);
}
}
@ -336,7 +336,7 @@ public class RegionBroker implements Broker {
tempTopicRegion.acknowledge(context, ack);
break;
default:
throwUnknownDestinationType(destination);
throw createUnknownDestinationTypeException(destination);
}
}
@ -402,8 +402,8 @@ public class RegionBroker implements Broker {
return destinationStatistics;
}
protected void throwUnknownDestinationType(ActiveMQDestination destination) throws JMSException {
throw new JMSException("Unknown destination type: " + destination.getDestinationType());
protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
return new JMSException("Unknown destination type: " + destination.getDestinationType());
}
public synchronized void addBroker(Connection connection,BrokerInfo info){
@ -442,7 +442,7 @@ public class RegionBroker implements Broker {
tempTopicRegion.processDispatchNotification(messageDispatchNotification);
break;
default:
throwUnknownDestinationType(destination);
throw createUnknownDestinationTypeException(destination);
}
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.filter.MessageEvaluationContext;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
/**
* @version $Revision: 1.5 $
@ -137,4 +138,14 @@ public interface Subscription {
* This operation is not supported for persistent topics.
*/
public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException;
/**
* @return the JMX object name that this subscription was registered as if applicable
*/
public ObjectName getObjectName();
/**
* Set when the subscription is registered in JMX
*/
public void setObjectName(ObjectName objectName);
}

View File

@ -57,7 +57,7 @@ public class TopicRegion extends AbstractRegion {
}
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (info.isDurable()) {
ActiveMQDestination destination = info.getDestination();
@ -103,9 +103,10 @@ public class TopicRegion extends AbstractRegion {
}
sub.activate(context, info);
return sub;
}
else {
super.addConsumer(context, info);
return super.addConsumer(context, info);
}
}

View File

@ -20,6 +20,7 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempDestination;
@ -85,7 +86,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
super.removeDestination(context, destination, timeout);
}
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
final SecurityContext subject = (SecurityContext) context.getSecurityContext();
if( subject == null )
@ -117,7 +118,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
}
super.addConsumer(context, info);
return super.addConsumer(context, info);
}
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {

View File

@ -140,10 +140,14 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
String topicName = getDestinationString();
String selector = null;
broker.createDurableSubscriber(clientID, "subscriber1", topicName , selector);
ObjectName name1 = broker.createDurableSubscriber(clientID, "subscriber1", topicName , selector);
broker.createDurableSubscriber(clientID, "subscriber2", topicName, selector);
assertEquals("Durable subscriber count", 2, broker.getDurableTopicSubscribers().length);
assertNotNull("Should have created an mbean name for the durable subscriber!", name1);
System.out.println("Created durable subscriber with name: " + name1);
// now lets try destroy it
broker.destroyDurableSubscriber(clientID, "subscriber1");
assertEquals("Durable subscriber count", 1, broker.getDurableTopicSubscribers().length);