From 3cd3fd364e90595c59b33c3466bf9559fdc9a051 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Tue, 7 Mar 2006 09:25:05 +0000 Subject: [PATCH] added extra check to make sure all MBeans are unregistered on a stop() call to ensure that AMQ-585 is fixed git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383828 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/jmx/ManagedRegionBroker.java | 100 +++++++++++------- .../activemq/broker/region/RegionBroker.java | 14 ++- .../broker/ReconnectWithJMXEnabledTest.java | 84 +++++++++++++++ 3 files changed, 157 insertions(+), 41 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index 568c6af466..8f6fbb8974 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -13,25 +13,8 @@ */ package org.apache.activemq.broker.jmx; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Map.Entry; - -import javax.management.MBeanServer; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; @@ -56,11 +39,34 @@ import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.util.JMXSupport; +import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; + public class ManagedRegionBroker extends RegionBroker{ private static final Log log=LogFactory.getLog(ManagedRegionBroker.class); private final MBeanServer mbeanServer; @@ -77,6 +83,7 @@ public class ManagedRegionBroker extends RegionBroker{ private final Map temporaryTopicSubscribers=new ConcurrentHashMap(); private final Map subscriptionKeys = new ConcurrentHashMap(); private final Map subscriptionMap = new ConcurrentHashMap(); + private final Set registeredMBeans = new CopyOnWriteArraySet(); /* This is the first broker in the broker interceptor chain. */ private Broker contextBroker; @@ -95,6 +102,23 @@ public class ManagedRegionBroker extends RegionBroker{ } + + protected void doStop(ServiceStopper stopper) { + super.doStop(stopper); + + // lets remove any mbeans not yet removed + for (Iterator iter = registeredMBeans.iterator(); iter.hasNext();) { + ObjectName name = (ObjectName) iter.next(); + try { + mbeanServer.unregisterMBean(name); + } + catch (Exception e) { + stopper.onException(this, e); + } + } + registeredMBeans.clear(); + } + protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter){ return new ManagedQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter); @@ -114,15 +138,8 @@ public class ManagedRegionBroker extends RegionBroker{ } public void register(ActiveMQDestination destName,Destination destination){ - // Build the object name for the destination - Hashtable map=brokerObjectName.getKeyPropertyList(); try{ - ObjectName objectName = new ObjectName( - brokerObjectName.getDomain()+":"+ - "BrokerName="+map.get("BrokerName")+","+ - "Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+ - "Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName()) - ); + ObjectName objectName = createObjectName(destName); DestinationView view; if(destination instanceof Queue){ view=new QueueView(this, (Queue) destination); @@ -136,15 +153,8 @@ public class ManagedRegionBroker extends RegionBroker{ } public void unregister(ActiveMQDestination destName){ - // Build the object name for the destination - Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList()); try{ - ObjectName objectName = new ObjectName( - brokerObjectName.getDomain()+":"+ - "BrokerName="+map.get("BrokerName")+","+ - "Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+ - "Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName()) - ); + ObjectName objectName = createObjectName(destName); unregisterDestination(objectName); }catch(Exception e){ log.error("Failed to unregister "+destName,e); @@ -208,6 +218,7 @@ public class ManagedRegionBroker extends RegionBroker{ topics.put(key,view); } } + registeredMBeans.add(key); mbeanServer.registerMBean(view,key); } @@ -216,6 +227,7 @@ public class ManagedRegionBroker extends RegionBroker{ queues.remove(key); temporaryQueues.remove(key); temporaryTopics.remove(key); + registeredMBeans.remove(key); mbeanServer.unregisterMBean(key); } @@ -238,6 +250,7 @@ public class ManagedRegionBroker extends RegionBroker{ ObjectName inactiveName = (ObjectName) subscriptionKeys.get(subscriptionKey); if (inactiveName != null){ inactiveDurableTopicSubscribers.remove(inactiveName); + registeredMBeans.remove(inactiveName); mbeanServer.unregisterMBean(inactiveName); } }catch(Exception e){ @@ -248,6 +261,7 @@ public class ManagedRegionBroker extends RegionBroker{ } } } + registeredMBeans.add(key); mbeanServer.registerMBean(view,key); } @@ -257,6 +271,7 @@ public class ManagedRegionBroker extends RegionBroker{ inactiveDurableTopicSubscribers.remove(key); temporaryQueueSubscribers.remove(key); temporaryTopicSubscribers.remove(key); + registeredMBeans.remove(key); mbeanServer.unregisterMBean(key); DurableSubscriptionView view = (DurableSubscriptionView) durableTopicSubscribers.remove(key); if (view != null){ @@ -313,6 +328,7 @@ public class ManagedRegionBroker extends RegionBroker{ ); SubscriptionView view = new InactiveDurableSubscriptionView(this,key.getClientId(),info); + registeredMBeans.add(objectName); mbeanServer.registerMBean(view,objectName); inactiveDurableTopicSubscribers.put(objectName,view); subscriptionKeys.put(key, objectName); @@ -418,4 +434,16 @@ public class ManagedRegionBroker extends RegionBroker{ public void setContextBroker(Broker contextBroker) { this.contextBroker = contextBroker; } + + protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException { + // Build the object name for the destination + Hashtable map=brokerObjectName.getKeyPropertyList(); + ObjectName objectName = new ObjectName( + brokerObjectName.getDomain()+":"+ + "BrokerName="+map.get("BrokerName")+","+ + "Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+ + "Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName()) + ); + return objectName; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index c7c4b1fe73..bdba385c40 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -133,10 +133,7 @@ public class RegionBroker implements Broker { public void stop() throws Exception { stopped = true; ServiceStopper ss = new ServiceStopper(); - ss.stop(queueRegion); - ss.stop(topicRegion); - ss.stop(tempQueueRegion); - ss.stop(tempTopicRegion); + doStop(ss); ss.throwFirstException(); } @@ -461,6 +458,13 @@ public class RegionBroker implements Broker { return adaptor != null ? adaptor.getDestinations() : Collections.EMPTY_SET; } - + + protected void doStop(ServiceStopper ss) { + ss.stop(queueRegion); + ss.stop(topicRegion); + ss.stop(tempQueueRegion); + ss.stop(tempTopicRegion); + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java new file mode 100644 index 0000000000..6d91a2efb9 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java @@ -0,0 +1,84 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.apache.activemq.broker; + +import org.apache.activemq.EmbeddedBrokerTestSupport; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * + * @version $Revision$ + */ +public class ReconnectWithJMXEnabledTest extends EmbeddedBrokerTestSupport { + + protected Connection connection; + protected boolean transacted; + protected int authMode = Session.AUTO_ACKNOWLEDGE; + + public void testTestUseConnectionCloseBrokerThenRestartInSameJVM() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + connection.close(); + + broker.stop(); + broker = createBroker(); + startBroker(); + + connection = connectionFactory.createConnection(); + useConnection(connection); + } + + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:61616"; + super.setUp(); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setUseJmx(true); + answer.setPersistent(isPersistent()); + answer.addConnector(bindAddress); + return answer; + } + + protected void useConnection(Connection connection) throws Exception { + connection.setClientID("foo"); + connection.start(); + Session session = connection.createSession(transacted, authMode); + Destination destination = createDestination(); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + Message message = session.createTextMessage("Hello World"); + producer.send(message); + Thread.sleep(1000); + } +}