mirror of https://github.com/apache/activemq.git
added extra check to make sure all MBeans are unregistered on a stop() call to ensure that AMQ-585 is fixed
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383828 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3e30fec10b
commit
3cd3fd364e
|
@ -13,25 +13,8 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.jmx;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Hashtable;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.openmbean.CompositeDataSupport;
|
||||
import javax.management.openmbean.CompositeType;
|
||||
import javax.management.openmbean.OpenDataException;
|
||||
import javax.management.openmbean.TabularData;
|
||||
import javax.management.openmbean.TabularDataSupport;
|
||||
import javax.management.openmbean.TabularType;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
|
@ -56,11 +39,34 @@ import org.apache.activemq.store.PersistenceAdapter;
|
|||
import org.apache.activemq.store.TopicMessageStore;
|
||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||
import org.apache.activemq.util.JMXSupport;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
import org.apache.activemq.util.SubscriptionKey;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
|
||||
import javax.management.InstanceNotFoundException;
|
||||
import javax.management.MBeanRegistrationException;
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.openmbean.CompositeDataSupport;
|
||||
import javax.management.openmbean.CompositeType;
|
||||
import javax.management.openmbean.OpenDataException;
|
||||
import javax.management.openmbean.TabularData;
|
||||
import javax.management.openmbean.TabularDataSupport;
|
||||
import javax.management.openmbean.TabularType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Hashtable;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
public class ManagedRegionBroker extends RegionBroker{
|
||||
private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
|
||||
private final MBeanServer mbeanServer;
|
||||
|
@ -77,6 +83,7 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
private final Map temporaryTopicSubscribers=new ConcurrentHashMap();
|
||||
private final Map subscriptionKeys = new ConcurrentHashMap();
|
||||
private final Map subscriptionMap = new ConcurrentHashMap();
|
||||
private final Set registeredMBeans = new CopyOnWriteArraySet();
|
||||
|
||||
/* This is the first broker in the broker interceptor chain. */
|
||||
private Broker contextBroker;
|
||||
|
@ -95,6 +102,23 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
|
||||
}
|
||||
|
||||
|
||||
protected void doStop(ServiceStopper stopper) {
|
||||
super.doStop(stopper);
|
||||
|
||||
// lets remove any mbeans not yet removed
|
||||
for (Iterator iter = registeredMBeans.iterator(); iter.hasNext();) {
|
||||
ObjectName name = (ObjectName) iter.next();
|
||||
try {
|
||||
mbeanServer.unregisterMBean(name);
|
||||
}
|
||||
catch (Exception e) {
|
||||
stopper.onException(this, e);
|
||||
}
|
||||
}
|
||||
registeredMBeans.clear();
|
||||
}
|
||||
|
||||
protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
|
||||
PersistenceAdapter adapter){
|
||||
return new ManagedQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter);
|
||||
|
@ -114,15 +138,8 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
}
|
||||
|
||||
public void register(ActiveMQDestination destName,Destination destination){
|
||||
// Build the object name for the destination
|
||||
Hashtable map=brokerObjectName.getKeyPropertyList();
|
||||
try{
|
||||
ObjectName objectName = new ObjectName(
|
||||
brokerObjectName.getDomain()+":"+
|
||||
"BrokerName="+map.get("BrokerName")+","+
|
||||
"Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+
|
||||
"Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName())
|
||||
);
|
||||
ObjectName objectName = createObjectName(destName);
|
||||
DestinationView view;
|
||||
if(destination instanceof Queue){
|
||||
view=new QueueView(this, (Queue) destination);
|
||||
|
@ -136,15 +153,8 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
}
|
||||
|
||||
public void unregister(ActiveMQDestination destName){
|
||||
// Build the object name for the destination
|
||||
Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
|
||||
try{
|
||||
ObjectName objectName = new ObjectName(
|
||||
brokerObjectName.getDomain()+":"+
|
||||
"BrokerName="+map.get("BrokerName")+","+
|
||||
"Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+
|
||||
"Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName())
|
||||
);
|
||||
ObjectName objectName = createObjectName(destName);
|
||||
unregisterDestination(objectName);
|
||||
}catch(Exception e){
|
||||
log.error("Failed to unregister "+destName,e);
|
||||
|
@ -208,6 +218,7 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
topics.put(key,view);
|
||||
}
|
||||
}
|
||||
registeredMBeans.add(key);
|
||||
mbeanServer.registerMBean(view,key);
|
||||
}
|
||||
|
||||
|
@ -216,6 +227,7 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
queues.remove(key);
|
||||
temporaryQueues.remove(key);
|
||||
temporaryTopics.remove(key);
|
||||
registeredMBeans.remove(key);
|
||||
mbeanServer.unregisterMBean(key);
|
||||
}
|
||||
|
||||
|
@ -238,6 +250,7 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
ObjectName inactiveName = (ObjectName) subscriptionKeys.get(subscriptionKey);
|
||||
if (inactiveName != null){
|
||||
inactiveDurableTopicSubscribers.remove(inactiveName);
|
||||
registeredMBeans.remove(inactiveName);
|
||||
mbeanServer.unregisterMBean(inactiveName);
|
||||
}
|
||||
}catch(Exception e){
|
||||
|
@ -248,6 +261,7 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
}
|
||||
}
|
||||
}
|
||||
registeredMBeans.add(key);
|
||||
mbeanServer.registerMBean(view,key);
|
||||
}
|
||||
|
||||
|
@ -257,6 +271,7 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
inactiveDurableTopicSubscribers.remove(key);
|
||||
temporaryQueueSubscribers.remove(key);
|
||||
temporaryTopicSubscribers.remove(key);
|
||||
registeredMBeans.remove(key);
|
||||
mbeanServer.unregisterMBean(key);
|
||||
DurableSubscriptionView view = (DurableSubscriptionView) durableTopicSubscribers.remove(key);
|
||||
if (view != null){
|
||||
|
@ -313,6 +328,7 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
);
|
||||
|
||||
SubscriptionView view = new InactiveDurableSubscriptionView(this,key.getClientId(),info);
|
||||
registeredMBeans.add(objectName);
|
||||
mbeanServer.registerMBean(view,objectName);
|
||||
inactiveDurableTopicSubscribers.put(objectName,view);
|
||||
subscriptionKeys.put(key, objectName);
|
||||
|
@ -418,4 +434,16 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
public void setContextBroker(Broker contextBroker) {
|
||||
this.contextBroker = contextBroker;
|
||||
}
|
||||
|
||||
protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException {
|
||||
// Build the object name for the destination
|
||||
Hashtable map=brokerObjectName.getKeyPropertyList();
|
||||
ObjectName objectName = new ObjectName(
|
||||
brokerObjectName.getDomain()+":"+
|
||||
"BrokerName="+map.get("BrokerName")+","+
|
||||
"Type="+JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString())+","+
|
||||
"Destination="+JMXSupport.encodeObjectNamePart(destName.getPhysicalName())
|
||||
);
|
||||
return objectName;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -133,10 +133,7 @@ public class RegionBroker implements Broker {
|
|||
public void stop() throws Exception {
|
||||
stopped = true;
|
||||
ServiceStopper ss = new ServiceStopper();
|
||||
ss.stop(queueRegion);
|
||||
ss.stop(topicRegion);
|
||||
ss.stop(tempQueueRegion);
|
||||
ss.stop(tempTopicRegion);
|
||||
doStop(ss);
|
||||
ss.throwFirstException();
|
||||
}
|
||||
|
||||
|
@ -462,5 +459,12 @@ public class RegionBroker implements Broker {
|
|||
}
|
||||
|
||||
|
||||
protected void doStop(ServiceStopper ss) {
|
||||
ss.stop(queueRegion);
|
||||
ss.stop(topicRegion);
|
||||
ss.stop(tempQueueRegion);
|
||||
ss.stop(tempTopicRegion);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
**/
|
||||
package org.apache.activemq.broker;
|
||||
|
||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
/**
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class ReconnectWithJMXEnabledTest extends EmbeddedBrokerTestSupport {
|
||||
|
||||
protected Connection connection;
|
||||
protected boolean transacted;
|
||||
protected int authMode = Session.AUTO_ACKNOWLEDGE;
|
||||
|
||||
public void testTestUseConnectionCloseBrokerThenRestartInSameJVM() throws Exception {
|
||||
connection = connectionFactory.createConnection();
|
||||
useConnection(connection);
|
||||
connection.close();
|
||||
|
||||
broker.stop();
|
||||
broker = createBroker();
|
||||
startBroker();
|
||||
|
||||
connection = connectionFactory.createConnection();
|
||||
useConnection(connection);
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
bindAddress = "tcp://localhost:61616";
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
connection = null;
|
||||
}
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService answer = new BrokerService();
|
||||
answer.setUseJmx(true);
|
||||
answer.setPersistent(isPersistent());
|
||||
answer.addConnector(bindAddress);
|
||||
return answer;
|
||||
}
|
||||
|
||||
protected void useConnection(Connection connection) throws Exception {
|
||||
connection.setClientID("foo");
|
||||
connection.start();
|
||||
Session session = connection.createSession(transacted, authMode);
|
||||
Destination destination = createDestination();
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
Message message = session.createTextMessage("Hello World");
|
||||
producer.send(message);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue