Refactored ManagementContext to improve encapsulation - so all registrations of MBeans happen in one place

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@799706 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2009-07-31 19:19:07 +00:00
parent 9caa5c8d4d
commit 1ec71bdff1
19 changed files with 462 additions and 570 deletions

View File

@ -31,11 +31,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
@ -90,7 +88,6 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
* number of transport connectors, network connectors and a bunch of properties
@ -103,10 +100,8 @@ public class BrokerService implements Service {
public static final String DEFAULT_PORT = "61616";
public static final String LOCAL_HOST_NAME;
public static final String DEFAULT_BROKER_NAME = "localhost";
private static final Log LOG = LogFactory.getLog(BrokerService.class);
private static final long serialVersionUID = 7353129142305630237L;
private boolean useJmx = true;
private boolean enableStatistics = true;
private boolean persistent = true;
@ -135,7 +130,6 @@ public class BrokerService implements Service {
private List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
private List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
private List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
private List<ObjectName> registeredMBeanNames = new CopyOnWriteArrayList<ObjectName>();
private List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
private List<Service> services = new ArrayList<Service>();
private MasterConnector masterConnector;
@ -179,9 +173,7 @@ public class BrokerService implements Service {
private boolean systemExitOnShutdown;
private int systemExitOnShutdownExitCode;
private SslContext sslContext;
private boolean forceStart = false;
static {
String localHostName = "localhost";
try {
@ -234,9 +226,7 @@ public class BrokerService implements Service {
* @throws Exception
*/
public TransportConnector addConnector(TransportConnector connector) throws Exception {
transportConnectors.add(connector);
return connector;
}
@ -253,7 +243,6 @@ public class BrokerService implements Service {
unregisterConnectorMBean(connector);
}
return rc;
}
/**
@ -284,7 +273,8 @@ public class BrokerService implements Service {
*/
public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
if (!isAdvisorySupport()) {
throw new javax.jms.IllegalStateException("Networks require advisory messages to function - advisories are currently disabled");
throw new javax.jms.IllegalStateException(
"Networks require advisory messages to function - advisories are currently disabled");
}
NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
return addNetworkConnector(connector);
@ -314,7 +304,6 @@ public class BrokerService implements Service {
map.put("network", "true");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
connector.setLocalUri(uri);
// Set a connection filter so that the connector does not establish loop
// back connections.
connector.setConnectionFilter(new ConnectionFilter() {
@ -332,7 +321,6 @@ public class BrokerService implements Service {
return true;
}
});
networkConnectors.add(connector);
if (isUseJmx()) {
registerNetworkConnectorMBean(connector);
@ -386,7 +374,8 @@ public class BrokerService implements Service {
}
/**
* @param masterConnectorURI The masterConnectorURI to set.
* @param masterConnectorURI
* The masterConnectorURI to set.
*/
public void setMasterConnectorURI(String masterConnectorURI) {
this.masterConnectorURI = masterConnectorURI;
@ -435,13 +424,10 @@ public class BrokerService implements Service {
// as its way too easy to not be completely sure if start() has been
// called or not with the gazillion of different configuration
// mechanisms
// throw new IllegalStateException("Allready started.");
return;
}
try {
if (systemExitOnShutdown) {
addShutdownHook(new Runnable() {
public void run() {
@ -449,11 +435,7 @@ public class BrokerService implements Service {
}
});
}
processHelperProperties();
getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
getPersistenceAdapter().setBrokerName(getBrokerName());
LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
@ -461,28 +443,18 @@ public class BrokerService implements Service {
deleteAllMessages();
}
getPersistenceAdapter().start();
startDestinations();
addShutdownHook();
getBroker().start();
if (isUseJmx()) {
getManagementContext().start();
ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
managedBroker.setContextBroker(broker);
adminView = new BrokerView(this, managedBroker);
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
ObjectName objectName = getBrokerObjectName();
mbeanServer.registerMBean(adminView, objectName);
registeredMBeanNames.add(objectName);
getManagementContext().registerMBean(adminView, objectName);
}
}
BrokerRegistry.getInstance().bind(getBrokerName(), this);
// see if there is a MasterBroker service and if so, configure
// it and start it.
for (Service service : services) {
@ -494,11 +466,9 @@ public class BrokerService implements Service {
if (!isSlave()) {
startAllConnectors();
}
if (isUseJmx() && masterConnector != null) {
registerFTConnectorMBean(masterConnector);
}
brokerId = broker.getBrokerId();
LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
getBroker().brokerServiceStarted();
@ -540,18 +510,6 @@ public class BrokerService implements Service {
}
stopper.stop(persistenceAdapter);
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
for (Iterator<ObjectName> iter = registeredMBeanNames.iterator(); iter.hasNext();) {
ObjectName name = iter.next();
try {
mbeanServer.unregisterMBean(name);
} catch (Exception e) {
stopper.onException(mbeanServer, e);
}
}
}
registeredMBeanNames.clear();
stopper.stop(getManagementContext());
}
// Clear SelectorParser cache to free memory
@ -585,7 +543,6 @@ public class BrokerService implements Service {
}
}
/**
* A helper method to block the caller thread until the broker has been
* started
@ -602,13 +559,13 @@ public class BrokerService implements Service {
// Properties
// -------------------------------------------------------------------------
/**
* Returns the message broker
*/
public Broker getBroker() throws Exception {
if (broker == null) {
LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker (" + getBrokerName() + ") is starting");
LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker ("
+ getBrokerName() + ") is starting");
LOG.info("For help or more information please see: http://activemq.apache.org/");
broker = createBroker();
}
@ -650,7 +607,6 @@ public class BrokerService implements Service {
LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
}
this.brokerName = str.trim();
}
public PersistenceAdapterFactory getPersistenceFactory() {
@ -676,7 +632,8 @@ public class BrokerService implements Service {
* Sets the directory in which the data files will be stored by default for
* the JDBC and Journal persistence adaptors.
*
* @param dataDirectory the directory to store data files
* @param dataDirectory
* the directory to store data files
*/
public void setDataDirectory(String dataDirectory) {
setDataDirectoryFile(new File(dataDirectory));
@ -686,7 +643,8 @@ public class BrokerService implements Service {
* Sets the directory in which the data files will be stored by default for
* the JDBC and Journal persistence adaptors.
*
* @param dataDirectoryFile the directory to store data files
* @param dataDirectoryFile
* the directory to store data files
*/
public void setDataDirectoryFile(File dataDirectoryFile) {
this.dataDirectoryFile = dataDirectoryFile;
@ -703,7 +661,8 @@ public class BrokerService implements Service {
}
/**
* @param tmpDataDirectory the tmpDataDirectory to set
* @param tmpDataDirectory
* the tmpDataDirectory to set
*/
public void setTmpDataDirectory(File tmpDataDirectory) {
this.tmpDataDirectory = tmpDataDirectory;
@ -743,9 +702,13 @@ public class BrokerService implements Service {
try {
if (systemUsage == null) {
systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default 64 Meg
systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10 Gb
systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default
// 64
// Meg
systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10
// Gb
systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100
// GB
addService(this.systemUsage);
}
return systemUsage;
@ -782,7 +745,8 @@ public class BrokerService implements Service {
}
/**
* @param consumerSystemUsaage the storeSystemUsage to set
* @param consumerSystemUsaage
* the storeSystemUsage to set
*/
public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
if (this.consumerSystemUsaage != null) {
@ -811,7 +775,8 @@ public class BrokerService implements Service {
}
/**
* @param producerUsageManager the producerUsageManager to set
* @param producerUsageManager
* the producerUsageManager to set
*/
public void setProducerSystemUsage(SystemUsage producerUsageManager) {
if (this.producerSystemUsage != null) {
@ -832,18 +797,19 @@ public class BrokerService implements Service {
/**
* Sets the persistence adaptor implementation to use for this broker
*
* @throws IOException
*/
public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
this.persistenceAdapter = persistenceAdapter;
configureService(this.persistenceAdapter);
this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
}
public TaskRunnerFactory getTaskRunnerFactory() {
if (taskRunnerFactory == null) {
taskRunnerFactory = new TaskRunnerFactory("BrokerService",getTaskRunnerPriority(),true,1000,isDedicatedTaskRunner());
taskRunnerFactory = new TaskRunnerFactory("BrokerService", getTaskRunnerPriority(), true, 1000,
isDedicatedTaskRunner());
}
return taskRunnerFactory;
}
@ -854,7 +820,8 @@ public class BrokerService implements Service {
public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
if (taskRunnerFactory == null) {
persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, true, 1000);
persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
true, 1000);
}
return persistenceTaskRunnerFactory;
}
@ -953,7 +920,8 @@ public class BrokerService implements Service {
}
/**
* @param jmsConnectors The jmsBridgeConnectors to set.
* @param jmsConnectors
* The jmsBridgeConnectors to set.
*/
public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
this.jmsBridgeConnectors = jmsConnectors;
@ -1033,7 +1001,8 @@ public class BrokerService implements Service {
* Sets the transport connectors which this broker will listen on for new
* clients
*
* @org.apache.xbean.Property nestedType="org.apache.activemq.broker.TransportConnector"
* @org.apache.xbean.Property
* nestedType="org.apache.activemq.broker.TransportConnector"
*/
public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
@ -1054,7 +1023,8 @@ public class BrokerService implements Service {
* Sets the network connectors which this broker will use to connect to
* other brokers in a federated network
*
* @org.apache.xbean.Property nestedType="org.apache.activemq.network.NetworkConnector"
* @org.apache.xbean.Property
* nestedType="org.apache.activemq.network.NetworkConnector"
*/
public void setNetworkConnectors(List networkConnectors) throws Exception {
for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
@ -1154,7 +1124,8 @@ public class BrokerService implements Service {
}
/**
* @param shutdownOnMasterFailure The shutdownOnMasterFailure to set.
* @param shutdownOnMasterFailure
* The shutdownOnMasterFailure to set.
*/
public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
this.shutdownOnMasterFailure = shutdownOnMasterFailure;
@ -1223,11 +1194,9 @@ public class BrokerService implements Service {
*/
public synchronized Store getTempDataStore() {
if (tempDataStore == null) {
if (!isPersistent()) {
return null;
}
boolean result = true;
boolean empty = true;
try {
@ -1257,7 +1226,8 @@ public class BrokerService implements Service {
}
/**
* @param tempDataStore the tempDataStore to set
* @param tempDataStore
* the tempDataStore to set
*/
public void setTempDataStore(Store tempDataStore) {
this.tempDataStore = tempDataStore;
@ -1279,7 +1249,8 @@ public class BrokerService implements Service {
}
/**
* @param useLocalHostBrokerName the useLocalHostBrokerName to set
* @param useLocalHostBrokerName
* the useLocalHostBrokerName to set
*/
public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
this.useLocalHostBrokerName = useLocalHostBrokerName;
@ -1296,14 +1267,16 @@ public class BrokerService implements Service {
}
/**
* @param supportFailOver the supportFailOver to set
* @param supportFailOver
* the supportFailOver to set
*/
public void setSupportFailOver(boolean supportFailOver) {
this.supportFailOver = supportFailOver;
}
/**
* Looks up and lazily creates if necessary the destination for the given JMS name
* Looks up and lazily creates if necessary the destination for the given
* JMS name
*/
public Destination getDestination(ActiveMQDestination destination) throws Exception {
return getBroker().addDestination(getAdminConnectionContext(), destination);
@ -1333,8 +1306,7 @@ public class BrokerService implements Service {
return splitSystemUsageForProducersConsumers;
}
public void setSplitSystemUsageForProducersConsumers(
boolean splitSystemUsageForProducersConsumers) {
public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
}
@ -1345,6 +1317,7 @@ public class BrokerService implements Service {
public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
this.monitorConnectionSplits = monitorConnectionSplits;
}
public int getTaskRunnerPriority() {
return taskRunnerPriority;
}
@ -1373,8 +1346,7 @@ public class BrokerService implements Service {
return timeBeforePurgeTempDestinations;
}
public void setTimeBeforePurgeTempDestinations(
int timeBeforePurgeTempDestinations) {
public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
}
@ -1385,6 +1357,7 @@ public class BrokerService implements Service {
public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
this.useTempMirroredQueues = useTempMirroredQueues;
}
//
// Implementation methods
// -------------------------------------------------------------------------
@ -1408,7 +1381,6 @@ public class BrokerService implements Service {
addNetworkConnector(uri);
}
}
if (jmsBridgeConnectors != null) {
for (int i = 0; i < jmsBridgeConnectors.length; i++) {
addJmsConnector(jmsBridgeConnectors[i]);
@ -1422,7 +1394,8 @@ public class BrokerService implements Service {
}
if (masterConnectorURI != null) {
if (masterServiceExists) {
throw new IllegalStateException("Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
throw new IllegalStateException(
"Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
} else {
addService(new MasterConnector(masterConnectorURI));
}
@ -1430,23 +1403,19 @@ public class BrokerService implements Service {
}
protected void stopAllConnectors(ServiceStopper stopper) {
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = iter.next();
unregisterNetworkConnectorMBean(connector);
stopper.stop(connector);
}
for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
ProxyConnector connector = iter.next();
stopper.stop(connector);
}
for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
JmsConnector connector = iter.next();
stopper.stop(connector);
}
for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = iter.next();
stopper.stop(connector);
@ -1454,143 +1423,107 @@ public class BrokerService implements Service {
}
protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
try {
ObjectName objectName = createConnectorObjectName(connector);
connector = connector.asManagedConnector(getManagementContext().getMBeanServer(), objectName);
connector = connector.asManagedConnector(getManagementContext(), objectName);
ConnectorViewMBean view = new ConnectorView(connector);
mbeanServer.registerMBean(view, objectName);
registeredMBeanNames.add(objectName);
getManagementContext().registerMBean(view, objectName);
return connector;
} catch (Throwable e) {
throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
}
}
return connector;
}
protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
try {
ObjectName objectName = createConnectorObjectName(connector);
if (registeredMBeanNames.remove(objectName)) {
mbeanServer.unregisterMBean(objectName);
}
getManagementContext().unregisterMBean(objectName);
} catch (Throwable e) {
throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
}
throw IOExceptionSupport.create(
"Transport Connector could not be registered in JMX: " + e.getMessage(), e);
}
}
}
protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
// MBeanServer mbeanServer = getManagementContext().getMBeanServer();
// if (mbeanServer != null) {
//
//
// }
return adaptor;
}
protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
}
}
}
private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector,"
+ "ConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName="
+ JMXSupport.encodeObjectNamePart(connector.getName()));
}
protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
try {
ObjectName objectName = createNetworkConnectorObjectName(connector);
connector.setObjectName(objectName);
mbeanServer.registerMBean(view, objectName);
registeredMBeanNames.add(objectName);
getManagementContext().registerMBean(view, objectName);
} catch (Throwable e) {
throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
}
}
}
protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector)
throws MalformedObjectNameException {
return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
+ "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
}
protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
try {
ObjectName objectName = createNetworkConnectorObjectName(connector);
if (registeredMBeanNames.remove(objectName)) {
mbeanServer.unregisterMBean(objectName);
}
getManagementContext().unregisterMBean(objectName);
} catch (Exception e) {
LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
}
}
}
}
protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
ProxyConnectorView view = new ProxyConnectorView(connector);
try {
ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
+ "Type=ProxyConnector," + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
mbeanServer.registerMBean(view, objectName);
registeredMBeanNames.add(objectName);
ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector,"
+ "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
getManagementContext().registerMBean(view, objectName);
} catch (Throwable e) {
throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
}
}
}
protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
FTConnectorView view = new FTConnectorView(connector);
try {
ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
+ "Type=MasterConnector");
mbeanServer.registerMBean(view, objectName);
registeredMBeanNames.add(objectName);
ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
getManagementContext().registerMBean(view, objectName);
} catch (Throwable e) {
throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
}
}
}
protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
JmsConnectorView view = new JmsConnectorView(connector);
try {
ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
+ "Type=JmsConnector," + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
mbeanServer.registerMBean(view, objectName);
registeredMBeanNames.add(objectName);
ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector,"
+ "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
getManagementContext().registerMBean(view, objectName);
} catch (Throwable e) {
throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
}
}
}
/**
* Factory method to create a new broker
@ -1602,7 +1535,6 @@ public class BrokerService implements Service {
protected Broker createBroker() throws Exception {
regionBroker = createRegionBroker();
Broker broker = addInterceptors(regionBroker);
// Add a filter that will stop access to the broker once stopped
broker = new MutableBrokerFilter(broker) {
Broker old;
@ -1612,7 +1544,6 @@ public class BrokerService implements Service {
// Just ignore additional stop actions.
public void stop() throws Exception {
}
});
old.stop();
}
@ -1623,11 +1554,8 @@ public class BrokerService implements Service {
}
getNext().start();
}
};
return broker;
}
/**
@ -1641,7 +1569,6 @@ public class BrokerService implements Service {
destinationInterceptors = createDefaultDestinationInterceptor();
}
configureServices(destinationInterceptors);
DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
if (destinationFactory == null) {
destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
@ -1652,18 +1579,16 @@ public class BrokerService implements Service {
protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
RegionBroker regionBroker;
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
destinationInterceptor);
regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor);
} else {
regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor);
regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
destinationInterceptor);
}
destinationFactory.setRegionBroker(regionBroker);
regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
regionBroker.setBrokerName(getBrokerName());
regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
return regionBroker;
}
@ -1733,7 +1658,8 @@ public class BrokerService implements Service {
protected ObjectName createBrokerObjectName() throws IOException {
try {
return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
} catch (Throwable e) {
throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
}
@ -1812,7 +1738,6 @@ public class BrokerService implements Service {
protected void startDestinations() throws Exception {
if (destinations != null) {
ConnectionContext adminConnectionContext = getAdminConnectionContext();
for (int i = 0; i < destinations.length; i++) {
ActiveMQDestination destination = destinations[i];
getBroker().addDestination(adminConnectionContext, destination);
@ -1858,7 +1783,6 @@ public class BrokerService implements Service {
slaveStartSignal.countDown();
}
/**
* Start all transport and network connections, proxies and bridges
*
@ -1868,13 +1792,11 @@ public class BrokerService implements Service {
if (!isSlave()) {
Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
List<TransportConnector> al = new ArrayList<TransportConnector>();
for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = iter.next();
connector.setBrokerService(this);
al.add(startTransportConnector(connector));
}
if (al.size() > 0) {
// let's clear the transportConnectors list and replace it with
// the started transportConnector instances
@ -1896,12 +1818,10 @@ public class BrokerService implements Service {
connector.setDurableDestinations(durableDestinations);
connector.start();
}
for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
ProxyConnector connector = iter.next();
connector.start();
}
for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
JmsConnector connector = iter.next();
connector.start();
@ -1919,15 +1839,11 @@ public class BrokerService implements Service {
if (policy != null) {
connector.setMessageAuthorizationPolicy(policy);
}
if (isUseJmx()) {
connector = registerConnectorMBean(connector);
}
connector.getStatistics().setEnabled(enableStatistics);
connector.start();
return connector;
}
@ -1964,7 +1880,6 @@ public class BrokerService implements Service {
Set destinations = destinationFactory.getDestinations();
if (destinations != null) {
Iterator iter = destinations.iterator();
ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
if (adminConnectionContext == null) {
ConnectionContext context = new ConnectionContext();
@ -1972,7 +1887,6 @@ public class BrokerService implements Service {
adminConnectionContext = context;
broker.setAdminConnectionContext(adminConnectionContext);
}
while (iter.hasNext()) {
ActiveMQDestination destination = (ActiveMQDestination) iter.next();
broker.addDestination(adminConnectionContext, destination);
@ -1988,7 +1902,6 @@ public class BrokerService implements Service {
this.regionBroker = regionBroker;
}
public void addShutdownHook(Runnable hook) {
synchronized (shutdownHooks) {
shutdownHooks.add(hook);
@ -2044,5 +1957,4 @@ public class BrokerService implements Service {
public CountDownLatch getSlaveStartSignal() {
return slaveStartSignal;
}
}

View File

@ -16,16 +16,8 @@
*/
package org.apache.activemq.broker;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.ManagedTransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.ConnectorStatistics;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.security.MessageAuthorizationPolicy;
@ -40,6 +32,12 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.management.ObjectName;
/**
* @org.apache.xbean.XBean
@ -94,8 +92,8 @@ public class TransportConnector implements Connector, BrokerServiceAware {
* Factory method to create a JMX managed version of this transport
* connector
*/
public ManagedTransportConnector asManagedConnector(MBeanServer mbeanServer, ObjectName connectorName) throws IOException, URISyntaxException {
ManagedTransportConnector rc = new ManagedTransportConnector(mbeanServer, connectorName, getServer());
public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException {
ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
rc.setBrokerInfo(getBrokerInfo());
rc.setConnectUri(getConnectUri());
rc.setDisableAsyncDispatch(isDisableAsyncDispatch());

View File

@ -16,30 +16,6 @@
*/
package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
@ -71,10 +47,31 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
public class ManagedRegionBroker extends RegionBroker {
private static final Log LOG = LogFactory.getLog(ManagedRegionBroker.class);
private final MBeanServer mbeanServer;
private final ManagementContext managementContext;
private final ObjectName brokerObjectName;
private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
@ -92,10 +89,10 @@ public class ManagedRegionBroker extends RegionBroker {
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
public ManagedRegionBroker(BrokerService brokerService, MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor);
this.mbeanServer = mbeanServer;
this.managementContext = context;
this.brokerObjectName = brokerObjectName;
}
@ -111,7 +108,7 @@ public class ManagedRegionBroker extends RegionBroker {
for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
ObjectName name = iter.next();
try {
mbeanServer.unregisterMBean(name);
managementContext.unregisterMBean(name);
} catch (InstanceNotFoundException e) {
LOG.warn("The MBean: " + name + " is no longer registered with JMX");
} catch (Exception e) {
@ -245,7 +242,7 @@ public class ManagedRegionBroker extends RegionBroker {
}
}
try {
mbeanServer.registerMBean(view, key);
managementContext.registerMBean(view, key);
registeredMBeans.add(key);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + key);
@ -260,7 +257,7 @@ public class ManagedRegionBroker extends RegionBroker {
temporaryTopics.remove(key);
if (registeredMBeans.remove(key)) {
try {
mbeanServer.unregisterMBean(key);
managementContext.unregisterMBean(key);
} catch (Throwable e) {
LOG.warn("Failed to unregister MBean: " + key);
LOG.debug("Failure reason: " + e, e);
@ -288,7 +285,7 @@ public class ManagedRegionBroker extends RegionBroker {
if (inactiveName != null) {
inactiveDurableTopicSubscribers.remove(inactiveName);
registeredMBeans.remove(inactiveName);
mbeanServer.unregisterMBean(inactiveName);
managementContext.unregisterMBean(inactiveName);
}
} catch (Throwable e) {
LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
@ -300,7 +297,7 @@ public class ManagedRegionBroker extends RegionBroker {
}
try {
mbeanServer.registerMBean(view, key);
managementContext.registerMBean(view, key);
registeredMBeans.add(key);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + key);
@ -317,7 +314,7 @@ public class ManagedRegionBroker extends RegionBroker {
temporaryTopicSubscribers.remove(key);
if (registeredMBeans.remove(key)) {
try {
mbeanServer.unregisterMBean(key);
managementContext.unregisterMBean(key);
} catch (Throwable e) {
LOG.warn("Failed to unregister MBean: " + key);
LOG.debug("Failure reason: " + e, e);
@ -370,7 +367,7 @@ public class ManagedRegionBroker extends RegionBroker {
SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info);
try {
mbeanServer.registerMBean(view, objectName);
managementContext.registerMBean(view, objectName);
registeredMBeans.add(objectName);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + key);

View File

@ -16,12 +16,6 @@
*/
package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.util.Hashtable;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
@ -33,6 +27,9 @@ import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.JMXSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.Hashtable;
import javax.management.ObjectName;
/**
* A managed transport connection
@ -42,7 +39,7 @@ import org.apache.commons.logging.LogFactory;
public class ManagedTransportConnection extends TransportConnection {
private static final Log LOG = LogFactory.getLog(ManagedTransportConnection.class);
private final MBeanServer server;
private final ManagementContext managementContext;
private final ObjectName connectorName;
private ConnectionViewMBean mbean;
@ -50,10 +47,10 @@ public class ManagedTransportConnection extends TransportConnection {
private ObjectName byAddressName;
public ManagedTransportConnection(TransportConnector connector, Transport transport, Broker broker,
TaskRunnerFactory factory, MBeanServer server, ObjectName connectorName)
TaskRunnerFactory factory, ManagementContext context, ObjectName connectorName)
throws IOException {
super(connector, transport, broker, factory);
this.server = server;
this.managementContext = context;
this.connectorName = connectorName;
this.mbean = new ConnectionView(this);
byAddressName = createByAddressObjectName("address", transport.getRemoteAddress());
@ -99,7 +96,7 @@ public class ManagedTransportConnection extends TransportConnection {
protected void registerMBean(ObjectName name) {
if (name != null) {
try {
server.registerMBean(mbean, name);
managementContext.registerMBean(mbean, name);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + name);
LOG.debug("Failure reason: " + e, e);
@ -110,7 +107,7 @@ public class ManagedTransportConnection extends TransportConnection {
protected void unregisterMBean(ObjectName name) {
if (name != null) {
try {
server.unregisterMBean(name);
managementContext.unregisterMBean(name);
} catch (Throwable e) {
LOG.warn("Failed to unregister mbean: " + name);
LOG.debug("Failure reason: " + e, e);

View File

@ -16,17 +16,14 @@
*/
package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.net.URISyntaxException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import java.io.IOException;
import java.net.URISyntaxException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
/**
* A managed transport connector which can create multiple managed connections
@ -38,12 +35,12 @@ public class ManagedTransportConnector extends TransportConnector {
static long nextConnectionId = 1;
private final MBeanServer mbeanServer;
private final ManagementContext managementContext;
private final ObjectName connectorName;
public ManagedTransportConnector(MBeanServer mbeanServer, ObjectName connectorName, TransportServer server) {
public ManagedTransportConnector(ManagementContext context, ObjectName connectorName, TransportServer server) {
super(server);
this.mbeanServer = mbeanServer;
this.managementContext = context;
this.connectorName = connectorName;
}
@ -52,7 +49,7 @@ public class ManagedTransportConnector extends TransportConnector {
}
protected Connection createConnection(Transport transport) throws IOException {
return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), mbeanServer, connectorName);
return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), managementContext, connectorName);
}
protected static synchronized long getNextConnectionId() {

View File

@ -21,14 +21,23 @@ import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.Attribute;
import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
@ -37,6 +46,7 @@ import javax.management.remote.JMXServiceURL;
import org.apache.activemq.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import sun.security.action.GetBooleanAction;
/**
* A Flow provides different dispatch policies within the NMR
@ -64,6 +74,7 @@ public class ManagementContext implements Service {
private JMXConnectorServer connectorServer;
private ObjectName namingServiceObjectName;
private Registry registry;
private List<ObjectName> registeredMBeanNames = new CopyOnWriteArrayList<ObjectName>();
public ManagementContext() {
this(null);
@ -101,8 +112,18 @@ public class ManagementContext implements Service {
}
}
public void stop() throws IOException {
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
MBeanServer mbeanServer = getMBeanServer();
if (mbeanServer != null) {
for (Iterator<ObjectName> iter = registeredMBeanNames.iterator(); iter.hasNext();) {
ObjectName name = iter.next();
mbeanServer.unregisterMBean(name);
}
}
registeredMBeanNames.clear();
JMXConnectorServer server = connectorServer;
connectorServer = null;
if (server != null) {
@ -146,7 +167,7 @@ public class ManagementContext implements Service {
*
* @return the MBeanServer
*/
public MBeanServer getMBeanServer() {
protected MBeanServer getMBeanServer() {
if (this.beanServer == null) {
this.beanServer = findMBeanServer();
}
@ -259,6 +280,23 @@ public class ManagementContext implements Service {
return containerName + "." + name;
}
public Object newProxyInstance( ObjectName objectName,
Class interfaceClass,
boolean notificationBroadcaster){
return MBeanServerInvocationHandler.newProxyInstance(getMBeanServer(), objectName, interfaceClass, notificationBroadcaster);
}
public Object getAttribute(ObjectName name, String attribute) throws Exception{
return getMBeanServer().getAttribute(name, attribute);
}
public ObjectInstance registerMBean(Object bean, ObjectName name) throws Exception{
ObjectInstance result = getMBeanServer().registerMBean(bean, name);
this.registeredMBeanNames.add(name);
return result;
}
/**
* Unregister an MBean
*
@ -266,7 +304,7 @@ public class ManagementContext implements Service {
* @throws JMException
*/
public void unregisterMBean(ObjectName name) throws JMException {
if (beanServer != null && beanServer.isRegistered(name)) {
if (beanServer != null && beanServer.isRegistered(name) && this.registeredMBeanNames.remove(name)) {
beanServer.unregisterMBean(name);
}
}

View File

@ -520,9 +520,6 @@ public class RegionBroker extends EmptyBroker {
public BrokerId getBrokerId() {
if (brokerId == null) {
// TODO: this should persist the broker id so that subsequent
// startup
// uses the same broker id.
brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
}
return brokerId;

View File

@ -16,25 +16,10 @@
*/
package org.apache.activemq.broker.view;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
@ -43,6 +28,15 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.DestinationMapNode;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.management.ObjectName;
/**
* @version $Revision: $
@ -55,7 +49,6 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
private boolean clearProducerCacheAfterRender;
private String domain = "org.apache.activemq";
private BrokerViewMBean brokerView;
private MBeanServer mbeanServer;
// until we have some MBeans for producers, lets do it all ourselves
private Map<ProducerId, ProducerInfo> producers = new HashMap<ProducerId, ProducerInfo>();
@ -66,9 +59,6 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
super(next, file);
this.redrawOnRemove = redrawOnRemove;
mbeanServer = new ManagementContext().getMBeanServer();
ObjectName brokerName = next.getBrokerService().getBrokerObjectName();
brokerView = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
}
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
@ -123,7 +113,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
writer.println("digraph \"ActiveMQ Connections\" {");
writer.println();
writer.println("label=\"ActiveMQ Broker: " + brokerView.getBrokerId() + "\"];");
writer.println("label=\"ActiveMQ Broker: " + getBrokerView().getBrokerId() + "\"];");
writer.println();
writer.println("node [style = \"rounded,filled\", fillcolor = yellow, fontname=\"Helvetica-Oblique\"];");
writer.println();
@ -132,10 +122,10 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
Map<String, String> queues = new HashMap<String, String>();
Map<String, String> topics = new HashMap<String, String>();
printSubscribers(writer, clients, queues, "queue_", brokerView.getQueueSubscribers());
printSubscribers(writer, clients, queues, "queue_", getBrokerView().getQueueSubscribers());
writer.println();
printSubscribers(writer, clients, topics, "topic_", brokerView.getTopicSubscribers());
printSubscribers(writer, clients, topics, "topic_", getBrokerView().getTopicSubscribers());
writer.println();
printProducers(writer, clients, queues, topics);
@ -210,7 +200,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
protected void printSubscribers(PrintWriter writer, Map<String, String> clients, Map<String, String> destinations, String type, ObjectName[] subscribers) {
for (int i = 0; i < subscribers.length; i++) {
ObjectName name = subscribers[i];
SubscriptionViewMBean subscriber = (SubscriptionViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true);
SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true);
String clientId = subscriber.getClientId();
String safeClientId = asID(clientId);
@ -332,4 +322,13 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
}
return path;
}
BrokerViewMBean getBrokerView() throws Exception {
if (this.brokerView == null) {
ObjectName brokerName = getBrokerService().getBrokerObjectName();
this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName,
BrokerViewMBean.class, true);
}
return this.brokerView;
}
}

View File

@ -16,22 +16,6 @@
*/
package org.apache.activemq.network;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.NetworkBridgeView;
@ -45,6 +29,17 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
/**
* @version $Revision$
@ -238,34 +233,26 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
if (!getBrokerService().isUseJmx()) {
return;
}
MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer();
if (mbeanServer != null) {
NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
try {
ObjectName objectName = createNetworkBridgeObjectName(bridge);
mbeanServer.registerMBean(view, objectName);
getBrokerService().getManagementContext().registerMBean(view, objectName);
} catch (Throwable e) {
LOG.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e);
}
}
}
protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
if (!getBrokerService().isUseJmx()) {
return;
}
MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer();
if (mbeanServer != null) {
try {
ObjectName objectName = createNetworkBridgeObjectName(bridge);
mbeanServer.unregisterMBean(objectName);
getBrokerService().getManagementContext().unregisterMBean(objectName);
} catch (Throwable e) {
LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e);
}
}
}
@SuppressWarnings("unchecked")

View File

@ -16,15 +16,13 @@
*/
package org.apache.activemq.transport;
import java.io.IOException;
import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.LogWriterFinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import javax.management.ObjectName;
/**
* Singleton class to create TransportLogger objects.
@ -201,7 +199,7 @@ public class TransportLoggerFactory {
try {
this.objectName = new ObjectName(this.managementContext.getJmxDomainName()+":"+ "Type=TransportLoggerControl");
this.managementContext.getMBeanServer().registerMBean(new TransportLoggerControl(this.managementContext),this.objectName);
this.managementContext.registerMBean(new TransportLoggerControl(this.managementContext),this.objectName);
this.transportLoggerControlCreated = true;

View File

@ -119,7 +119,7 @@ public class TransportLoggerView implements TransportLoggerViewMBean {
*/
private void register() {
try {
this.managementContext.getMBeanServer().registerMBean(this, this.name);
this.managementContext.registerMBean(this, this.name);
} catch (Exception e) {
log.error("Could not register MBean for TransportLoggerView " + id + "with name " + this.name.toString() + ", reason: " + e, e);
}
@ -138,7 +138,7 @@ public class TransportLoggerView implements TransportLoggerViewMBean {
TransportLoggerView.transportLoggerViews.remove(this);
try {
this.managementContext.getMBeanServer().unregisterMBean(this.name);
this.managementContext.unregisterMBean(this.name);
} catch (Exception e) {
log.error("Could not unregister MBean for TransportLoggerView " + id + "with name " + this.name.toString() + ", reason: " + e, e);
}

View File

@ -910,7 +910,7 @@ public class JMSConsumerTest extends JmsTestSupport {
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
String domain = "org.apache.activemq";
ObjectName name;
if (destination.isQueue()) {
@ -918,7 +918,7 @@ public class JMSConsumerTest extends JmsTestSupport {
} else {
name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
}
return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true);
return (DestinationViewMBean)broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
}
}

View File

@ -74,9 +74,8 @@ public class QueuePurgeTest extends TestCase {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
+ ":Type=Queue,Destination=" + queue.getQueueName()
+ ",BrokerName=localhost");
QueueViewMBean proxy = (QueueViewMBean) MBeanServerInvocationHandler
.newProxyInstance(broker.getManagementContext()
.getMBeanServer(), queueViewMBeanName,
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
.newProxyInstance(queueViewMBeanName,
QueueViewMBean.class, true);
return proxy;
}

View File

@ -236,9 +236,8 @@ public class NegativeQueueTest extends TestCase {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":Type=Queue,Destination=" +
queue.getQueueName() + ",BrokerName=localhost");
QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(
broker.getManagementContext().getMBeanServer(),
queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName,
QueueViewMBean.class, true);
return proxy;
}

View File

@ -2,17 +2,6 @@ package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
@ -23,6 +12,13 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
/**
* Test to determine if expired messages are being reaped if there is
@ -122,7 +118,6 @@ public class MessageExpirationReaperTest {
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
String domain = "org.apache.activemq";
ObjectName name;
if (destination.isQueue()) {
@ -130,6 +125,7 @@ public class MessageExpirationReaperTest {
} else {
name = new ObjectName(domain + ":BrokerName=" + brokerName + ",Type=Topic,Destination=" + destinationName);
}
return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true);
return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class,
true);
}
}

View File

@ -16,29 +16,6 @@
*/
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerFactory;
@ -48,6 +25,25 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
public class StompTest extends CombinationTestSupport {
private static final Log LOG = LogFactory.getLog(StompTest.class);
@ -882,12 +878,11 @@ public class StompTest extends CombinationTestSupport {
public void testDurableUnsub() throws Exception {
// get broker JMX view
MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
String domain = "org.apache.activemq";
ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean view = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
// connect
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL;

View File

@ -16,21 +16,6 @@
*/
package org.apache.activemq.usecases;
import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
@ -44,6 +29,16 @@ import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import junit.framework.Test;
public class ExpiredMessagesTest extends CombinationTestSupport {
@ -289,15 +284,17 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
String domain = "org.apache.activemq";
ObjectName name;
if (destination.isQueue()) {
name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=" + destination.getPhysicalName());
name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination="
+ destination.getPhysicalName());
} else {
name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=" + destination.getPhysicalName());
name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination="
+ destination.getPhysicalName());
}
return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true);
return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class,
true);
}
protected void tearDown() throws Exception {

View File

@ -16,21 +16,6 @@
*/
package org.apache.activemq.usecases;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
@ -42,6 +27,16 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import junit.framework.Test;
public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
@ -248,7 +243,6 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
String domain = "org.apache.activemq";
ObjectName name;
if (destination.isQueue()) {
@ -256,7 +250,8 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
} else {
name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
}
return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true);
return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class,
true);
}
protected void tearDown() throws Exception {

View File

@ -16,17 +16,13 @@
*/
package org.apache.activemq.xbean;
import java.net.URI;
import java.util.Hashtable;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.JMXSupport;
import java.net.URI;
import java.util.Hashtable;
import javax.management.ObjectName;
import junit.framework.TestCase;
/**
* @version $Revision: 1.1 $
@ -36,19 +32,14 @@ public class ManagementContextXBeanConfigTest extends TestCase {
protected BrokerService brokerService;
public void testManagmentContextConfiguredCorrectly() throws Exception {
assertEquals(2011, brokerService.getManagementContext().getConnectorPort());
assertEquals("test.domain", brokerService.getManagementContext().getJmxDomainName());
MBeanServer beanServer = brokerService.getManagementContext().getMBeanServer();
// Make sure the broker is registered in the right jmx domain.
Hashtable<String, String> map = new Hashtable<String, String>();
map.put("Type", "Broker");
map.put("BrokerName", JMXSupport.encodeObjectNamePart("localhost"));
ObjectName on = new ObjectName("test.domain", map);
Object value = beanServer.getAttribute(on, "TotalEnqueueCount");
Object value = brokerService.getManagementContext().getAttribute(on, "TotalEnqueueCount");
assertNotNull(value);
}