diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index e908c14ad9..bd1fa0880a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -31,11 +31,9 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; - import org.apache.activemq.ActiveMQConnectionMetaData; import org.apache.activemq.Service; import org.apache.activemq.advisory.AdvisoryBroker; @@ -90,7 +88,6 @@ import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.URISupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - /** * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a * number of transport connectors, network connectors and a bunch of properties @@ -99,14 +96,12 @@ import org.apache.commons.logging.LogFactory; * @version $Revision: 1.1 $ */ public class BrokerService implements Service { - protected CountDownLatch slaveStartSignal = new CountDownLatch(1); + protected CountDownLatch slaveStartSignal = new CountDownLatch(1); public static final String DEFAULT_PORT = "61616"; public static final String LOCAL_HOST_NAME; public static final String DEFAULT_BROKER_NAME = "localhost"; - private static final Log LOG = LogFactory.getLog(BrokerService.class); private static final long serialVersionUID = 7353129142305630237L; - private boolean useJmx = true; private boolean enableStatistics = true; private boolean persistent = true; @@ -135,7 +130,6 @@ public class BrokerService implements Service { private List transportConnectors = new CopyOnWriteArrayList(); private List networkConnectors = new CopyOnWriteArrayList(); private List proxyConnectors = new CopyOnWriteArrayList(); - private List registeredMBeanNames = new CopyOnWriteArrayList(); private List jmsConnectors = new CopyOnWriteArrayList(); private List services = new ArrayList(); private MasterConnector masterConnector; @@ -156,7 +150,7 @@ public class BrokerService implements Service { private boolean keepDurableSubsActive = true; private boolean useVirtualTopics = true; private boolean useMirroredQueues = false; - private boolean useTempMirroredQueues=true; + private boolean useTempMirroredQueues = true; private BrokerId brokerId; private DestinationInterceptor[] destinationInterceptors; private ActiveMQDestination[] destinations; @@ -170,18 +164,16 @@ public class BrokerService implements Service { private int producerSystemUsagePortion = 60; private int consumerSystemUsagePortion = 40; private boolean splitSystemUsageForProducersConsumers; - private boolean monitorConnectionSplits=false; + private boolean monitorConnectionSplits = false; private int taskRunnerPriority = Thread.NORM_PRIORITY; private boolean dedicatedTaskRunner; - private boolean cacheTempDestinations=false;//useful for failover + private boolean cacheTempDestinations = false;// useful for failover private int timeBeforePurgeTempDestinations = 5000; - private List shutdownHooks= new ArrayList(); + private List shutdownHooks = new ArrayList(); private boolean systemExitOnShutdown; private int systemExitOnShutdownExitCode; private SslContext sslContext; - private boolean forceStart = false; - static { String localHostName = "localhost"; try { @@ -234,9 +226,7 @@ public class BrokerService implements Service { * @throws Exception */ public TransportConnector addConnector(TransportConnector connector) throws Exception { - transportConnectors.add(connector); - return connector; } @@ -253,7 +243,6 @@ public class BrokerService implements Service { unregisterConnectorMBean(connector); } return rc; - } /** @@ -284,7 +273,8 @@ public class BrokerService implements Service { */ public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { if (!isAdvisorySupport()) { - throw new javax.jms.IllegalStateException("Networks require advisory messages to function - advisories are currently disabled"); + throw new javax.jms.IllegalStateException( + "Networks require advisory messages to function - advisories are currently disabled"); } NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); return addNetworkConnector(connector); @@ -314,7 +304,6 @@ public class BrokerService implements Service { map.put("network", "true"); uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); connector.setLocalUri(uri); - // Set a connection filter so that the connector does not establish loop // back connections. connector.setConnectionFilter(new ConnectionFilter() { @@ -332,7 +321,6 @@ public class BrokerService implements Service { return true; } }); - networkConnectors.add(connector); if (isUseJmx()) { registerNetworkConnectorMBean(connector); @@ -386,7 +374,8 @@ public class BrokerService implements Service { } /** - * @param masterConnectorURI The masterConnectorURI to set. + * @param masterConnectorURI + * The masterConnectorURI to set. */ public void setMasterConnectorURI(String masterConnectorURI) { this.masterConnectorURI = masterConnectorURI; @@ -423,10 +412,10 @@ public class BrokerService implements Service { } public void start(boolean force) throws Exception { - forceStart = force; - start(); + forceStart = force; + start(); } - + // Service interface // ------------------------------------------------------------------------- public void start() throws Exception { @@ -435,25 +424,18 @@ public class BrokerService implements Service { // as its way too easy to not be completely sure if start() has been // called or not with the gazillion of different configuration // mechanisms - // throw new IllegalStateException("Allready started."); return; } - try { - - if( systemExitOnShutdown ) { - addShutdownHook(new Runnable(){ + if (systemExitOnShutdown) { + addShutdownHook(new Runnable() { public void run() { System.exit(systemExitOnShutdownExitCode); } }); } - processHelperProperties(); - - - getPersistenceAdapter().setUsageManager(getProducerSystemUsage()); getPersistenceAdapter().setBrokerName(getBrokerName()); LOG.info("Using Persistence Adapter: " + getPersistenceAdapter()); @@ -461,54 +443,42 @@ public class BrokerService implements Service { deleteAllMessages(); } getPersistenceAdapter().start(); - startDestinations(); - addShutdownHook(); - getBroker().start(); - if (isUseJmx()) { - getManagementContext().start(); - ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker; + getManagementContext().start(); + ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; managedBroker.setContextBroker(broker); adminView = new BrokerView(this, managedBroker); - MBeanServer mbeanServer = getManagementContext().getMBeanServer(); - if (mbeanServer != null) { - ObjectName objectName = getBrokerObjectName(); - mbeanServer.registerMBean(adminView, objectName); - registeredMBeanNames.add(objectName); - } + ObjectName objectName = getBrokerObjectName(); + getManagementContext().registerMBean(adminView, objectName); } - BrokerRegistry.getInstance().bind(getBrokerName(), this); - - // see if there is a MasterBroker service and if so, configure + // see if there is a MasterBroker service and if so, configure // it and start it. for (Service service : services) { if (service instanceof MasterConnector) { configureService(service); service.start(); } - } + } if (!isSlave()) { startAllConnectors(); } - if (isUseJmx() && masterConnector != null) { registerFTConnectorMBean(masterConnector); } - brokerId = broker.getBrokerId(); LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started"); getBroker().brokerServiceStarted(); startedLatch.countDown(); } catch (Exception e) { LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e); - try{ + try { stop(); - }catch(Exception ex) { - LOG.warn("Failed to stop broker after failure in start ",ex); + } catch (Exception ex) { + LOG.warn("Failed to stop broker after failure in start ", ex); } throw e; } @@ -522,7 +492,7 @@ public class BrokerService implements Service { removeShutdownHook(); ServiceStopper stopper = new ServiceStopper(); if (services != null) { - for (Service service: services) { + for (Service service : services) { stopper.stop(service); } } @@ -531,7 +501,7 @@ public class BrokerService implements Service { // this has to be done after services are stopped, // to avoid timimg issue with discovery (spinning up a new instance) BrokerRegistry.getInstance().unbind(getBrokerName()); - VMTransportFactory.stopped(getBrokerName()); + VMTransportFactory.stopped(getBrokerName()); if (broker != null) { stopper.stop(broker); } @@ -540,18 +510,6 @@ public class BrokerService implements Service { } stopper.stop(persistenceAdapter); if (isUseJmx()) { - MBeanServer mbeanServer = getManagementContext().getMBeanServer(); - if (mbeanServer != null) { - for (Iterator iter = registeredMBeanNames.iterator(); iter.hasNext();) { - ObjectName name = iter.next(); - try { - mbeanServer.unregisterMBean(name); - } catch (Exception e) { - stopper.onException(mbeanServer, e); - } - } - } - registeredMBeanNames.clear(); stopper.stop(getManagementContext()); } // Clear SelectorParser cache to free memory @@ -559,11 +517,11 @@ public class BrokerService implements Service { stopped.set(true); stoppedLatch.countDown(); LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped"); - synchronized(shutdownHooks) { + synchronized (shutdownHooks) { for (Runnable hook : shutdownHooks) { try { hook.run(); - } catch ( Throwable e ) { + } catch (Throwable e) { stopper.onException(hook, e); } } @@ -585,7 +543,6 @@ public class BrokerService implements Service { } } - /** * A helper method to block the caller thread until the broker has been * started @@ -602,13 +559,13 @@ public class BrokerService implements Service { // Properties // ------------------------------------------------------------------------- - /** * Returns the message broker */ public Broker getBroker() throws Exception { if (broker == null) { - LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker (" + getBrokerName() + ") is starting"); + LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker (" + + getBrokerName() + ") is starting"); LOG.info("For help or more information please see: http://activemq.apache.org/"); broker = createBroker(); } @@ -650,7 +607,6 @@ public class BrokerService implements Service { LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str); } this.brokerName = str.trim(); - } public PersistenceAdapterFactory getPersistenceFactory() { @@ -676,7 +632,8 @@ public class BrokerService implements Service { * Sets the directory in which the data files will be stored by default for * the JDBC and Journal persistence adaptors. * - * @param dataDirectory the directory to store data files + * @param dataDirectory + * the directory to store data files */ public void setDataDirectory(String dataDirectory) { setDataDirectoryFile(new File(dataDirectory)); @@ -686,7 +643,8 @@ public class BrokerService implements Service { * Sets the directory in which the data files will be stored by default for * the JDBC and Journal persistence adaptors. * - * @param dataDirectoryFile the directory to store data files + * @param dataDirectoryFile + * the directory to store data files */ public void setDataDirectoryFile(File dataDirectoryFile) { this.dataDirectoryFile = dataDirectoryFile; @@ -703,7 +661,8 @@ public class BrokerService implements Service { } /** - * @param tmpDataDirectory the tmpDataDirectory to set + * @param tmpDataDirectory + * the tmpDataDirectory to set */ public void setTmpDataDirectory(File tmpDataDirectory) { this.tmpDataDirectory = tmpDataDirectory; @@ -743,9 +702,13 @@ public class BrokerService implements Service { try { if (systemUsage == null) { systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore()); - systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default 64 Meg - systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10 Gb - systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB + systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default + // 64 + // Meg + systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10 + // Gb + systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 + // GB addService(this.systemUsage); } return systemUsage; @@ -765,24 +728,25 @@ public class BrokerService implements Service { /** * @return the consumerUsageManager - * @throws IOException + * @throws IOException */ public SystemUsage getConsumerSystemUsage() throws IOException { if (this.consumerSystemUsaage == null) { - if(splitSystemUsageForProducersConsumers) { + if (splitSystemUsageForProducersConsumers) { this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer"); - float portion = consumerSystemUsagePortion/100f; + float portion = consumerSystemUsagePortion / 100f; this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion); addService(this.consumerSystemUsaage); - }else { - consumerSystemUsaage=getSystemUsage(); + } else { + consumerSystemUsaage = getSystemUsage(); } } return this.consumerSystemUsaage; } /** - * @param consumerSystemUsaage the storeSystemUsage to set + * @param consumerSystemUsaage + * the storeSystemUsage to set */ public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) { if (this.consumerSystemUsaage != null) { @@ -794,24 +758,25 @@ public class BrokerService implements Service { /** * @return the producerUsageManager - * @throws IOException + * @throws IOException */ public SystemUsage getProducerSystemUsage() throws IOException { - if (producerSystemUsage == null ) { + if (producerSystemUsage == null) { if (splitSystemUsageForProducersConsumers) { producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); - float portion = producerSystemUsagePortion/100f; + float portion = producerSystemUsagePortion / 100f; producerSystemUsage.getMemoryUsage().setUsagePortion(portion); addService(producerSystemUsage); - }else { - producerSystemUsage=getSystemUsage(); + } else { + producerSystemUsage = getSystemUsage(); } } return producerSystemUsage; } /** - * @param producerUsageManager the producerUsageManager to set + * @param producerUsageManager + * the producerUsageManager to set */ public void setProducerSystemUsage(SystemUsage producerUsageManager) { if (this.producerSystemUsage != null) { @@ -832,18 +797,19 @@ public class BrokerService implements Service { /** * Sets the persistence adaptor implementation to use for this broker - * @throws IOException + * + * @throws IOException */ public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { this.persistenceAdapter = persistenceAdapter; configureService(this.persistenceAdapter); this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); - } public TaskRunnerFactory getTaskRunnerFactory() { if (taskRunnerFactory == null) { - taskRunnerFactory = new TaskRunnerFactory("BrokerService",getTaskRunnerPriority(),true,1000,isDedicatedTaskRunner()); + taskRunnerFactory = new TaskRunnerFactory("BrokerService", getTaskRunnerPriority(), true, 1000, + isDedicatedTaskRunner()); } return taskRunnerFactory; } @@ -854,7 +820,8 @@ public class BrokerService implements Service { public TaskRunnerFactory getPersistenceTaskRunnerFactory() { if (taskRunnerFactory == null) { - persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, true, 1000); + persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, + true, 1000); } return persistenceTaskRunnerFactory; } @@ -912,8 +879,8 @@ public class BrokerService implements Service { } public NetworkConnector getNetworkConnectorByName(String connectorName) { - for(NetworkConnector connector : networkConnectors) { - if(connector.getName().equals(connectorName)) { + for (NetworkConnector connector : networkConnectors) { + if (connector.getName().equals(connectorName)) { return connector; } } @@ -929,8 +896,8 @@ public class BrokerService implements Service { } public TransportConnector getConnectorByName(String connectorName) { - for(TransportConnector connector : transportConnectors) { - if(connector.getName().equals(connectorName)) { + for (TransportConnector connector : transportConnectors) { + if (connector.getName().equals(connectorName)) { return connector; } } @@ -953,7 +920,8 @@ public class BrokerService implements Service { } /** - * @param jmsConnectors The jmsBridgeConnectors to set. + * @param jmsConnectors + * The jmsBridgeConnectors to set. */ public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) { this.jmsBridgeConnectors = jmsConnectors; @@ -970,7 +938,7 @@ public class BrokerService implements Service { public void setServices(Service[] services) { this.services.clear(); if (services != null) { - for (int i=0; i < services.length;i++) { + for (int i = 0; i < services.length; i++) { this.services.add(services[i]); } } @@ -983,7 +951,7 @@ public class BrokerService implements Service { public void addService(Service service) { services.add(service); } - + public void removeService(Service service) { services.remove(service); } @@ -1033,7 +1001,8 @@ public class BrokerService implements Service { * Sets the transport connectors which this broker will listen on for new * clients * - * @org.apache.xbean.Property nestedType="org.apache.activemq.broker.TransportConnector" + * @org.apache.xbean.Property + * nestedType="org.apache.activemq.broker.TransportConnector" */ public void setTransportConnectors(List transportConnectors) throws Exception { for (Iterator iter = transportConnectors.iterator(); iter.hasNext();) { @@ -1054,11 +1023,12 @@ public class BrokerService implements Service { * Sets the network connectors which this broker will use to connect to * other brokers in a federated network * - * @org.apache.xbean.Property nestedType="org.apache.activemq.network.NetworkConnector" + * @org.apache.xbean.Property + * nestedType="org.apache.activemq.network.NetworkConnector" */ public void setNetworkConnectors(List networkConnectors) throws Exception { for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) { - NetworkConnector connector = (NetworkConnector)iter.next(); + NetworkConnector connector = (NetworkConnector) iter.next(); addNetworkConnector(connector); } } @@ -1069,7 +1039,7 @@ public class BrokerService implements Service { */ public void setProxyConnectors(List proxyConnectors) throws Exception { for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) { - ProxyConnector connector = (ProxyConnector)iter.next(); + ProxyConnector connector = (ProxyConnector) iter.next(); addProxyConnector(connector); } } @@ -1154,7 +1124,8 @@ public class BrokerService implements Service { } /** - * @param shutdownOnMasterFailure The shutdownOnMasterFailure to set. + * @param shutdownOnMasterFailure + * The shutdownOnMasterFailure to set. */ public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) { this.shutdownOnMasterFailure = shutdownOnMasterFailure; @@ -1223,11 +1194,9 @@ public class BrokerService implements Service { */ public synchronized Store getTempDataStore() { if (tempDataStore == null) { - if (!isPersistent()) { return null; } - boolean result = true; boolean empty = true; try { @@ -1257,7 +1226,8 @@ public class BrokerService implements Service { } /** - * @param tempDataStore the tempDataStore to set + * @param tempDataStore + * the tempDataStore to set */ public void setTempDataStore(Store tempDataStore) { this.tempDataStore = tempDataStore; @@ -1279,7 +1249,8 @@ public class BrokerService implements Service { } /** - * @param useLocalHostBrokerName the useLocalHostBrokerName to set + * @param useLocalHostBrokerName + * the useLocalHostBrokerName to set */ public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) { this.useLocalHostBrokerName = useLocalHostBrokerName; @@ -1296,23 +1267,25 @@ public class BrokerService implements Service { } /** - * @param supportFailOver the supportFailOver to set + * @param supportFailOver + * the supportFailOver to set */ public void setSupportFailOver(boolean supportFailOver) { this.supportFailOver = supportFailOver; } /** - * Looks up and lazily creates if necessary the destination for the given JMS name + * Looks up and lazily creates if necessary the destination for the given + * JMS name */ public Destination getDestination(ActiveMQDestination destination) throws Exception { return getBroker().addDestination(getAdminConnectionContext(), destination); } - + public void removeDestination(ActiveMQDestination destination) throws Exception { - getBroker().removeDestination(getAdminConnectionContext(), destination,0); + getBroker().removeDestination(getAdminConnectionContext(), destination, 0); } - + public int getProducerSystemUsagePortion() { return producerSystemUsagePortion; } @@ -1333,35 +1306,35 @@ public class BrokerService implements Service { return splitSystemUsageForProducersConsumers; } - public void setSplitSystemUsageForProducersConsumers( - boolean splitSystemUsageForProducersConsumers) { + public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) { this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers; } - + public boolean isMonitorConnectionSplits() { - return monitorConnectionSplits; - } + return monitorConnectionSplits; + } - public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { - this.monitorConnectionSplits = monitorConnectionSplits; - } - public int getTaskRunnerPriority() { - return taskRunnerPriority; - } + public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { + this.monitorConnectionSplits = monitorConnectionSplits; + } - public void setTaskRunnerPriority(int taskRunnerPriority) { - this.taskRunnerPriority = taskRunnerPriority; - } + public int getTaskRunnerPriority() { + return taskRunnerPriority; + } - public boolean isDedicatedTaskRunner() { - return dedicatedTaskRunner; - } + public void setTaskRunnerPriority(int taskRunnerPriority) { + this.taskRunnerPriority = taskRunnerPriority; + } - public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { - this.dedicatedTaskRunner = dedicatedTaskRunner; - } - - public boolean isCacheTempDestinations() { + public boolean isDedicatedTaskRunner() { + return dedicatedTaskRunner; + } + + public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { + this.dedicatedTaskRunner = dedicatedTaskRunner; + } + + public boolean isCacheTempDestinations() { return cacheTempDestinations; } @@ -1373,11 +1346,10 @@ public class BrokerService implements Service { return timeBeforePurgeTempDestinations; } - public void setTimeBeforePurgeTempDestinations( - int timeBeforePurgeTempDestinations) { + public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) { this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations; } - + public boolean isUseTempMirroredQueues() { return useTempMirroredQueues; } @@ -1385,7 +1357,8 @@ public class BrokerService implements Service { public void setUseTempMirroredQueues(boolean useTempMirroredQueues) { this.useTempMirroredQueues = useTempMirroredQueues; } - // + + // // Implementation methods // ------------------------------------------------------------------------- /** @@ -1408,7 +1381,6 @@ public class BrokerService implements Service { addNetworkConnector(uri); } } - if (jmsBridgeConnectors != null) { for (int i = 0; i < jmsBridgeConnectors.length; i++) { addJmsConnector(jmsBridgeConnectors[i]); @@ -1422,7 +1394,8 @@ public class BrokerService implements Service { } if (masterConnectorURI != null) { if (masterServiceExists) { - throw new IllegalStateException("Cannot specify masterConnectorURI when a masterConnector is already registered via the services property"); + throw new IllegalStateException( + "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property"); } else { addService(new MasterConnector(masterConnectorURI)); } @@ -1430,23 +1403,19 @@ public class BrokerService implements Service { } protected void stopAllConnectors(ServiceStopper stopper) { - for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) { NetworkConnector connector = iter.next(); unregisterNetworkConnectorMBean(connector); stopper.stop(connector); } - for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) { ProxyConnector connector = iter.next(); stopper.stop(connector); } - for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) { JmsConnector connector = iter.next(); stopper.stop(connector); } - for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) { TransportConnector connector = iter.next(); stopper.stop(connector); @@ -1454,141 +1423,105 @@ public class BrokerService implements Service { } protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { - MBeanServer mbeanServer = getManagementContext().getMBeanServer(); - if (mbeanServer != null) { - - try { - ObjectName objectName = createConnectorObjectName(connector); - connector = connector.asManagedConnector(getManagementContext().getMBeanServer(), objectName); - ConnectorViewMBean view = new ConnectorView(connector); - mbeanServer.registerMBean(view, objectName); - registeredMBeanNames.add(objectName); - return connector; - } catch (Throwable e) { - throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e); - } + try { + ObjectName objectName = createConnectorObjectName(connector); + connector = connector.asManagedConnector(getManagementContext(), objectName); + ConnectorViewMBean view = new ConnectorView(connector); + getManagementContext().registerMBean(view, objectName); + return connector; + } catch (Throwable e) { + throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e); } - return connector; } protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { if (isUseJmx()) { - MBeanServer mbeanServer = getManagementContext().getMBeanServer(); - if (mbeanServer != null) { - try { - ObjectName objectName = createConnectorObjectName(connector); - - if (registeredMBeanNames.remove(objectName)) { - mbeanServer.unregisterMBean(objectName); - } - } catch (Throwable e) { - throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e); - } + try { + ObjectName objectName = createConnectorObjectName(connector); + getManagementContext().unregisterMBean(objectName); + } catch (Throwable e) { + throw IOExceptionSupport.create( + "Transport Connector could not be registered in JMX: " + e.getMessage(), e); } } } - + protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { -// MBeanServer mbeanServer = getManagementContext().getMBeanServer(); -// if (mbeanServer != null) { -// -// -// } return adaptor; } protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { if (isUseJmx()) { - MBeanServer mbeanServer = getManagementContext().getMBeanServer(); - if (mbeanServer != null) { - - } } } private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { - return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," - + "ConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); + return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName=" + + JMXSupport.encodeObjectNamePart(connector.getName())); } protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { - MBeanServer mbeanServer = getManagementContext().getMBeanServer(); - if (mbeanServer != null) { - NetworkConnectorViewMBean view = new NetworkConnectorView(connector); - try { - ObjectName objectName = createNetworkConnectorObjectName(connector); - connector.setObjectName(objectName); - mbeanServer.registerMBean(view, objectName); - registeredMBeanNames.add(objectName); - } catch (Throwable e) { - throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); - } + NetworkConnectorViewMBean view = new NetworkConnectorView(connector); + try { + ObjectName objectName = createNetworkConnectorObjectName(connector); + connector.setObjectName(objectName); + getManagementContext().registerMBean(view, objectName); + } catch (Throwable e) { + throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); } } - protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { - return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector," - + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); + protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) + throws MalformedObjectNameException { + return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector," + + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); } protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { if (isUseJmx()) { - MBeanServer mbeanServer = getManagementContext().getMBeanServer(); - if (mbeanServer != null) { - try { - ObjectName objectName = createNetworkConnectorObjectName(connector); - if (registeredMBeanNames.remove(objectName)) { - mbeanServer.unregisterMBean(objectName); - } - } catch (Exception e) { - LOG.error("Network Connector could not be unregistered from JMX: " + e, e); - } + try { + ObjectName objectName = createNetworkConnectorObjectName(connector); + getManagementContext().unregisterMBean(objectName); + } catch (Exception e) { + LOG.error("Network Connector could not be unregistered from JMX: " + e, e); } } } protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { - MBeanServer mbeanServer = getManagementContext().getMBeanServer(); - if (mbeanServer != null) { - ProxyConnectorView view = new ProxyConnectorView(connector); - try { - ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," - + "Type=ProxyConnector," + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); - mbeanServer.registerMBean(view, objectName); - registeredMBeanNames.add(objectName); - } catch (Throwable e) { - throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); - } + ProxyConnectorView view = new ProxyConnectorView(connector); + try { + ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector," + + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); + getManagementContext().registerMBean(view, objectName); + } catch (Throwable e) { + throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); } } protected void registerFTConnectorMBean(MasterConnector connector) throws IOException { - MBeanServer mbeanServer = getManagementContext().getMBeanServer(); - if (mbeanServer != null) { - FTConnectorView view = new FTConnectorView(connector); - try { - ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," - + "Type=MasterConnector"); - mbeanServer.registerMBean(view, objectName); - registeredMBeanNames.add(objectName); - } catch (Throwable e) { - throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); - } + FTConnectorView view = new FTConnectorView(connector); + try { + ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector"); + getManagementContext().registerMBean(view, objectName); + } catch (Throwable e) { + throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); } } protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { - MBeanServer mbeanServer = getManagementContext().getMBeanServer(); - if (mbeanServer != null) { - JmsConnectorView view = new JmsConnectorView(connector); - try { - ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," - + "Type=JmsConnector," + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); - mbeanServer.registerMBean(view, objectName); - registeredMBeanNames.add(objectName); - } catch (Throwable e) { - throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); - } + JmsConnectorView view = new JmsConnectorView(connector); + try { + ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector," + + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); + getManagementContext().registerMBean(view, objectName); + } catch (Throwable e) { + throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); } } @@ -1602,32 +1535,27 @@ public class BrokerService implements Service { protected Broker createBroker() throws Exception { regionBroker = createRegionBroker(); Broker broker = addInterceptors(regionBroker); - // Add a filter that will stop access to the broker once stopped broker = new MutableBrokerFilter(broker) { - Broker old; - - public void stop() throws Exception { + Broker old; + + public void stop() throws Exception { old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { // Just ignore additional stop actions. public void stop() throws Exception { } - }); old.stop(); } - - public void start() throws Exception { - if (forceStart && old != null) { - this.next.set(old); - } - getNext().start(); - } - + + public void start() throws Exception { + if (forceStart && old != null) { + this.next.set(old); + } + getNext().start(); + } }; - return broker; - } /** @@ -1641,31 +1569,28 @@ public class BrokerService implements Service { destinationInterceptors = createDefaultDestinationInterceptor(); } configureServices(destinationInterceptors); - DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); if (destinationFactory == null) { destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter()); } return createRegionBroker(destinationInterceptor); } - + protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException { - RegionBroker regionBroker; + RegionBroker regionBroker; if (isUseJmx()) { - MBeanServer mbeanServer = getManagementContext().getMBeanServer(); - regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, - destinationInterceptor); + regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), + getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor); } else { - regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor); + regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, + destinationInterceptor); } destinationFactory.setRegionBroker(regionBroker); - regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); regionBroker.setBrokerName(getBrokerName()); regionBroker.getDestinationStatistics().setEnabled(enableStatistics); - - return regionBroker; - } + return regionBroker; + } /** * Create the default destination interceptor @@ -1676,12 +1601,12 @@ public class BrokerService implements Service { VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); VirtualTopic virtualTopic = new VirtualTopic(); virtualTopic.setName("VirtualTopic.>"); - VirtualDestination[] virtualDestinations = {virtualTopic}; + VirtualDestination[] virtualDestinations = { virtualTopic }; interceptor.setVirtualDestinations(virtualDestinations); answer.add(interceptor); } if (isUseMirroredQueues()) { - MirroredQueue interceptor = new MirroredQueue(); + MirroredQueue interceptor = new MirroredQueue(); answer.add(interceptor); } DestinationInterceptor[] array = new DestinationInterceptor[answer.size()]; @@ -1703,8 +1628,8 @@ public class BrokerService implements Service { if (isPopulateJMSXUserID()) { broker = new UserIDBroker(broker); } - if (isMonitorConnectionSplits()){ - broker = new ConnectionSplitBroker(broker); + if (isMonitorConnectionSplits()) { + broker = new ConnectionSplitBroker(broker); } if (plugins != null) { for (int i = 0; i < plugins.length; i++) { @@ -1733,14 +1658,15 @@ public class BrokerService implements Service { protected ObjectName createBrokerObjectName() throws IOException { try { - return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker"); + return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker"); } catch (Throwable e) { throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e); } } protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { - TransportServer transport = TransportFactory.bind(this, brokerURI); + TransportServer transport = TransportFactory.bind(this, brokerURI); return new TransportConnector(transport); } @@ -1812,7 +1738,6 @@ public class BrokerService implements Service { protected void startDestinations() throws Exception { if (destinations != null) { ConnectionContext adminConnectionContext = getAdminConnectionContext(); - for (int i = 0; i < destinations.length; i++) { ActiveMQDestination destination = destinations[i]; getBroker().addDestination(adminConnectionContext, destination); @@ -1846,19 +1771,18 @@ public class BrokerService implements Service { return context; } - protected void waitForSlave(){ + protected void waitForSlave() { try { - slaveStartSignal.await(); - }catch(InterruptedException e){ - LOG.error("Exception waiting for slave:"+e); + slaveStartSignal.await(); + } catch (InterruptedException e) { + LOG.error("Exception waiting for slave:" + e); } } - - protected void slaveConnectionEstablished(){ - slaveStartSignal.countDown(); + + protected void slaveConnectionEstablished() { + slaveStartSignal.countDown(); } - - + /** * Start all transport and network connections, proxies and bridges * @@ -1866,15 +1790,13 @@ public class BrokerService implements Service { */ protected void startAllConnectors() throws Exception { if (!isSlave()) { - Set durableDestinations = getBroker().getDurableDestinations(); + Set durableDestinations = getBroker().getDurableDestinations(); List al = new ArrayList(); - for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) { TransportConnector connector = iter.next(); connector.setBrokerService(this); al.add(startTransportConnector(connector)); } - if (al.size() > 0) { // let's clear the transportConnectors list and replace it with // the started transportConnector instances @@ -1886,8 +1808,8 @@ public class BrokerService implements Service { map.put("network", "true"); map.put("async", "false"); uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); - if(isWaitForSlave()){ - waitForSlave(); + if (isWaitForSlave()) { + waitForSlave(); } for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) { NetworkConnector connector = iter.next(); @@ -1896,17 +1818,15 @@ public class BrokerService implements Service { connector.setDurableDestinations(durableDestinations); connector.start(); } - for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) { ProxyConnector connector = iter.next(); connector.start(); } - for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) { JmsConnector connector = iter.next(); connector.start(); } - for (Service service:services) { + for (Service service : services) { configureService(service); service.start(); } @@ -1919,15 +1839,11 @@ public class BrokerService implements Service { if (policy != null) { connector.setMessageAuthorizationPolicy(policy); } - if (isUseJmx()) { connector = registerConnectorMBean(connector); } - connector.getStatistics().setEnabled(enableStatistics); - connector.start(); - return connector; } @@ -1964,7 +1880,6 @@ public class BrokerService implements Service { Set destinations = destinationFactory.getDestinations(); if (destinations != null) { Iterator iter = destinations.iterator(); - ConnectionContext adminConnectionContext = broker.getAdminConnectionContext(); if (adminConnectionContext == null) { ConnectionContext context = new ConnectionContext(); @@ -1972,9 +1887,8 @@ public class BrokerService implements Service { adminConnectionContext = context; broker.setAdminConnectionContext(adminConnectionContext); } - while (iter.hasNext()) { - ActiveMQDestination destination = (ActiveMQDestination)iter.next(); + ActiveMQDestination destination = (ActiveMQDestination) iter.next(); broker.addDestination(adminConnectionContext, destination); } } @@ -1988,15 +1902,14 @@ public class BrokerService implements Service { this.regionBroker = regionBroker; } - public void addShutdownHook(Runnable hook) { - synchronized(shutdownHooks) { + synchronized (shutdownHooks) { shutdownHooks.add(hook); } } - + public void removeShutdownHook(Runnable hook) { - synchronized(shutdownHooks) { + synchronized (shutdownHooks) { shutdownHooks.remove(hook); } } @@ -2025,24 +1938,23 @@ public class BrokerService implements Service { this.sslContext = sslContext; } - public boolean isShutdownOnSlaveFailure() { - return shutdownOnSlaveFailure; - } + public boolean isShutdownOnSlaveFailure() { + return shutdownOnSlaveFailure; + } - public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { - this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; - } + public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { + this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; + } - public boolean isWaitForSlave() { - return waitForSlave; - } + public boolean isWaitForSlave() { + return waitForSlave; + } - public void setWaitForSlave(boolean waitForSlave) { - this.waitForSlave = waitForSlave; - } - - public CountDownLatch getSlaveStartSignal() { - return slaveStartSignal; - } + public void setWaitForSlave(boolean waitForSlave) { + this.waitForSlave = waitForSlave; + } + public CountDownLatch getSlaveStartSignal() { + return slaveStartSignal; + } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java index 7123e2c8cc..813bbd9094 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -16,16 +16,8 @@ */ package org.apache.activemq.broker; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Iterator; -import java.util.concurrent.CopyOnWriteArrayList; - -import javax.management.MBeanServer; -import javax.management.ObjectName; - import org.apache.activemq.broker.jmx.ManagedTransportConnector; +import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.broker.region.ConnectorStatistics; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.security.MessageAuthorizationPolicy; @@ -40,6 +32,12 @@ import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Iterator; +import java.util.concurrent.CopyOnWriteArrayList; +import javax.management.ObjectName; /** * @org.apache.xbean.XBean @@ -94,8 +92,8 @@ public class TransportConnector implements Connector, BrokerServiceAware { * Factory method to create a JMX managed version of this transport * connector */ - public ManagedTransportConnector asManagedConnector(MBeanServer mbeanServer, ObjectName connectorName) throws IOException, URISyntaxException { - ManagedTransportConnector rc = new ManagedTransportConnector(mbeanServer, connectorName, getServer()); + public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException { + ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer()); rc.setBrokerInfo(getBrokerInfo()); rc.setConnectUri(getConnectUri()); rc.setDisableAsyncDispatch(isDisableAsyncDispatch()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index fbdc3711d5..cfe9490d38 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -16,30 +16,6 @@ */ package org.apache.activemq.broker.jmx; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; - -import javax.management.InstanceNotFoundException; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; @@ -71,10 +47,31 @@ import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import javax.management.InstanceNotFoundException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; public class ManagedRegionBroker extends RegionBroker { private static final Log LOG = LogFactory.getLog(ManagedRegionBroker.class); - private final MBeanServer mbeanServer; + private final ManagementContext managementContext; private final ObjectName brokerObjectName; private final Map topics = new ConcurrentHashMap(); private final Map queues = new ConcurrentHashMap(); @@ -92,10 +89,10 @@ public class ManagedRegionBroker extends RegionBroker { /* This is the first broker in the broker interceptor chain. */ private Broker contextBroker; - public ManagedRegionBroker(BrokerService brokerService, MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, + public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException { super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor); - this.mbeanServer = mbeanServer; + this.managementContext = context; this.brokerObjectName = brokerObjectName; } @@ -111,7 +108,7 @@ public class ManagedRegionBroker extends RegionBroker { for (Iterator iter = registeredMBeans.iterator(); iter.hasNext();) { ObjectName name = iter.next(); try { - mbeanServer.unregisterMBean(name); + managementContext.unregisterMBean(name); } catch (InstanceNotFoundException e) { LOG.warn("The MBean: " + name + " is no longer registered with JMX"); } catch (Exception e) { @@ -245,7 +242,7 @@ public class ManagedRegionBroker extends RegionBroker { } } try { - mbeanServer.registerMBean(view, key); + managementContext.registerMBean(view, key); registeredMBeans.add(key); } catch (Throwable e) { LOG.warn("Failed to register MBean: " + key); @@ -260,7 +257,7 @@ public class ManagedRegionBroker extends RegionBroker { temporaryTopics.remove(key); if (registeredMBeans.remove(key)) { try { - mbeanServer.unregisterMBean(key); + managementContext.unregisterMBean(key); } catch (Throwable e) { LOG.warn("Failed to unregister MBean: " + key); LOG.debug("Failure reason: " + e, e); @@ -288,7 +285,7 @@ public class ManagedRegionBroker extends RegionBroker { if (inactiveName != null) { inactiveDurableTopicSubscribers.remove(inactiveName); registeredMBeans.remove(inactiveName); - mbeanServer.unregisterMBean(inactiveName); + managementContext.unregisterMBean(inactiveName); } } catch (Throwable e) { LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e); @@ -300,7 +297,7 @@ public class ManagedRegionBroker extends RegionBroker { } try { - mbeanServer.registerMBean(view, key); + managementContext.registerMBean(view, key); registeredMBeans.add(key); } catch (Throwable e) { LOG.warn("Failed to register MBean: " + key); @@ -317,7 +314,7 @@ public class ManagedRegionBroker extends RegionBroker { temporaryTopicSubscribers.remove(key); if (registeredMBeans.remove(key)) { try { - mbeanServer.unregisterMBean(key); + managementContext.unregisterMBean(key); } catch (Throwable e) { LOG.warn("Failed to unregister MBean: " + key); LOG.debug("Failure reason: " + e, e); @@ -370,7 +367,7 @@ public class ManagedRegionBroker extends RegionBroker { SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info); try { - mbeanServer.registerMBean(view, objectName); + managementContext.registerMBean(view, objectName); registeredMBeans.add(objectName); } catch (Throwable e) { LOG.warn("Failed to register MBean: " + key); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java index 7416064af4..d02701e948 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java @@ -16,12 +16,6 @@ */ package org.apache.activemq.broker.jmx; -import java.io.IOException; -import java.util.Hashtable; - -import javax.management.MBeanServer; -import javax.management.ObjectName; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.TransportConnection; import org.apache.activemq.broker.TransportConnector; @@ -33,6 +27,9 @@ import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.JMXSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.io.IOException; +import java.util.Hashtable; +import javax.management.ObjectName; /** * A managed transport connection @@ -42,7 +39,7 @@ import org.apache.commons.logging.LogFactory; public class ManagedTransportConnection extends TransportConnection { private static final Log LOG = LogFactory.getLog(ManagedTransportConnection.class); - private final MBeanServer server; + private final ManagementContext managementContext; private final ObjectName connectorName; private ConnectionViewMBean mbean; @@ -50,10 +47,10 @@ public class ManagedTransportConnection extends TransportConnection { private ObjectName byAddressName; public ManagedTransportConnection(TransportConnector connector, Transport transport, Broker broker, - TaskRunnerFactory factory, MBeanServer server, ObjectName connectorName) + TaskRunnerFactory factory, ManagementContext context, ObjectName connectorName) throws IOException { super(connector, transport, broker, factory); - this.server = server; + this.managementContext = context; this.connectorName = connectorName; this.mbean = new ConnectionView(this); byAddressName = createByAddressObjectName("address", transport.getRemoteAddress()); @@ -99,7 +96,7 @@ public class ManagedTransportConnection extends TransportConnection { protected void registerMBean(ObjectName name) { if (name != null) { try { - server.registerMBean(mbean, name); + managementContext.registerMBean(mbean, name); } catch (Throwable e) { LOG.warn("Failed to register MBean: " + name); LOG.debug("Failure reason: " + e, e); @@ -110,7 +107,7 @@ public class ManagedTransportConnection extends TransportConnection { protected void unregisterMBean(ObjectName name) { if (name != null) { try { - server.unregisterMBean(name); + managementContext.unregisterMBean(name); } catch (Throwable e) { LOG.warn("Failed to unregister mbean: " + name); LOG.debug("Failure reason: " + e, e); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java index 5073c19ef0..f2aa163aa2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java @@ -16,17 +16,14 @@ */ package org.apache.activemq.broker.jmx; -import java.io.IOException; -import java.net.URISyntaxException; - -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportServer; +import java.io.IOException; +import java.net.URISyntaxException; +import javax.management.MBeanServer; +import javax.management.ObjectName; /** * A managed transport connector which can create multiple managed connections @@ -38,12 +35,12 @@ public class ManagedTransportConnector extends TransportConnector { static long nextConnectionId = 1; - private final MBeanServer mbeanServer; + private final ManagementContext managementContext; private final ObjectName connectorName; - public ManagedTransportConnector(MBeanServer mbeanServer, ObjectName connectorName, TransportServer server) { + public ManagedTransportConnector(ManagementContext context, ObjectName connectorName, TransportServer server) { super(server); - this.mbeanServer = mbeanServer; + this.managementContext = context; this.connectorName = connectorName; } @@ -52,7 +49,7 @@ public class ManagedTransportConnector extends TransportConnector { } protected Connection createConnection(Transport transport) throws IOException { - return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), mbeanServer, connectorName); + return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), managementContext, connectorName); } protected static synchronized long getNextConnectionId() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java index 9401954611..e89c1d3c97 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java @@ -21,14 +21,23 @@ import java.lang.reflect.Method; import java.net.MalformedURLException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; +import java.util.Iterator; import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import javax.management.Attribute; +import javax.management.InstanceAlreadyExistsException; +import javax.management.InstanceNotFoundException; import javax.management.JMException; +import javax.management.MBeanRegistrationException; import javax.management.MBeanServer; import javax.management.MBeanServerFactory; +import javax.management.MBeanServerInvocationHandler; import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectInstance; import javax.management.ObjectName; import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXConnectorServerFactory; @@ -37,6 +46,7 @@ import javax.management.remote.JMXServiceURL; import org.apache.activemq.Service; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import sun.security.action.GetBooleanAction; /** * A Flow provides different dispatch policies within the NMR @@ -64,6 +74,7 @@ public class ManagementContext implements Service { private JMXConnectorServer connectorServer; private ObjectName namingServiceObjectName; private Registry registry; + private List registeredMBeanNames = new CopyOnWriteArrayList(); public ManagementContext() { this(null); @@ -101,8 +112,18 @@ public class ManagementContext implements Service { } } - public void stop() throws IOException { + public void stop() throws Exception { if (started.compareAndSet(true, false)) { + MBeanServer mbeanServer = getMBeanServer(); + if (mbeanServer != null) { + for (Iterator iter = registeredMBeanNames.iterator(); iter.hasNext();) { + ObjectName name = iter.next(); + + mbeanServer.unregisterMBean(name); + + } + } + registeredMBeanNames.clear(); JMXConnectorServer server = connectorServer; connectorServer = null; if (server != null) { @@ -146,7 +167,7 @@ public class ManagementContext implements Service { * * @return the MBeanServer */ - public MBeanServer getMBeanServer() { + protected MBeanServer getMBeanServer() { if (this.beanServer == null) { this.beanServer = findMBeanServer(); } @@ -258,7 +279,24 @@ public class ManagementContext implements Service { } return containerName + "." + name; } - + + public Object newProxyInstance( ObjectName objectName, + Class interfaceClass, + boolean notificationBroadcaster){ + return MBeanServerInvocationHandler.newProxyInstance(getMBeanServer(), objectName, interfaceClass, notificationBroadcaster); + + } + + public Object getAttribute(ObjectName name, String attribute) throws Exception{ + return getMBeanServer().getAttribute(name, attribute); + } + + public ObjectInstance registerMBean(Object bean, ObjectName name) throws Exception{ + ObjectInstance result = getMBeanServer().registerMBean(bean, name); + this.registeredMBeanNames.add(name); + return result; + } + /** * Unregister an MBean * @@ -266,7 +304,7 @@ public class ManagementContext implements Service { * @throws JMException */ public void unregisterMBean(ObjectName name) throws JMException { - if (beanServer != null && beanServer.isRegistered(name)) { + if (beanServer != null && beanServer.isRegistered(name) && this.registeredMBeanNames.remove(name)) { beanServer.unregisterMBean(name); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 21fb1b112b..1eeabffef1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -520,9 +520,6 @@ public class RegionBroker extends EmptyBroker { public BrokerId getBrokerId() { if (brokerId == null) { - // TODO: this should persist the broker id so that subsequent - // startup - // uses the same broker id. brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId()); } return brokerId; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java index add44b866a..7971de44cb 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/view/ConnectionDotFileInterceptor.java @@ -16,25 +16,10 @@ */ package org.apache.activemq.broker.view; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -import javax.management.MBeanServer; -import javax.management.MBeanServerInvocationHandler; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.jmx.BrokerViewMBean; -import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; @@ -43,6 +28,15 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.filter.DestinationMapNode; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import javax.management.ObjectName; /** * @version $Revision: $ @@ -55,7 +49,6 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { private boolean clearProducerCacheAfterRender; private String domain = "org.apache.activemq"; private BrokerViewMBean brokerView; - private MBeanServer mbeanServer; // until we have some MBeans for producers, lets do it all ourselves private Map producers = new HashMap(); @@ -65,10 +58,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { public ConnectionDotFileInterceptor(Broker next, String file, boolean redrawOnRemove) throws IOException { super(next, file); this.redrawOnRemove = redrawOnRemove; - - mbeanServer = new ManagementContext().getMBeanServer(); - ObjectName brokerName = next.getBrokerService().getBrokerObjectName(); - brokerView = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + } public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { @@ -123,7 +113,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { writer.println("digraph \"ActiveMQ Connections\" {"); writer.println(); - writer.println("label=\"ActiveMQ Broker: " + brokerView.getBrokerId() + "\"];"); + writer.println("label=\"ActiveMQ Broker: " + getBrokerView().getBrokerId() + "\"];"); writer.println(); writer.println("node [style = \"rounded,filled\", fillcolor = yellow, fontname=\"Helvetica-Oblique\"];"); writer.println(); @@ -132,10 +122,10 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { Map queues = new HashMap(); Map topics = new HashMap(); - printSubscribers(writer, clients, queues, "queue_", brokerView.getQueueSubscribers()); + printSubscribers(writer, clients, queues, "queue_", getBrokerView().getQueueSubscribers()); writer.println(); - printSubscribers(writer, clients, topics, "topic_", brokerView.getTopicSubscribers()); + printSubscribers(writer, clients, topics, "topic_", getBrokerView().getTopicSubscribers()); writer.println(); printProducers(writer, clients, queues, topics); @@ -210,7 +200,7 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { protected void printSubscribers(PrintWriter writer, Map clients, Map destinations, String type, ObjectName[] subscribers) { for (int i = 0; i < subscribers.length; i++) { ObjectName name = subscribers[i]; - SubscriptionViewMBean subscriber = (SubscriptionViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true); + SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true); String clientId = subscriber.getClientId(); String safeClientId = asID(clientId); @@ -332,4 +322,13 @@ public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport { } return path; } + + BrokerViewMBean getBrokerView() throws Exception { + if (this.brokerView == null) { + ObjectName brokerName = getBrokerService().getBrokerObjectName(); + this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName, + BrokerViewMBean.class, true); + } + return this.brokerView; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java index 13d6ef52a0..f201ba3c45 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java @@ -16,22 +16,6 @@ */ package org.apache.activemq.network; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; - -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - import org.apache.activemq.Service; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.NetworkBridgeView; @@ -45,6 +29,17 @@ import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; /** * @version $Revision$ @@ -238,16 +233,12 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem if (!getBrokerService().isUseJmx()) { return; } - - MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer(); - if (mbeanServer != null) { - NetworkBridgeViewMBean view = new NetworkBridgeView(bridge); - try { - ObjectName objectName = createNetworkBridgeObjectName(bridge); - mbeanServer.registerMBean(view, objectName); - } catch (Throwable e) { - LOG.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e); - } + NetworkBridgeViewMBean view = new NetworkBridgeView(bridge); + try { + ObjectName objectName = createNetworkBridgeObjectName(bridge); + getBrokerService().getManagementContext().registerMBean(view, objectName); + } catch (Throwable e) { + LOG.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e); } } @@ -255,15 +246,11 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem if (!getBrokerService().isUseJmx()) { return; } - - MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer(); - if (mbeanServer != null) { - try { - ObjectName objectName = createNetworkBridgeObjectName(bridge); - mbeanServer.unregisterMBean(objectName); - } catch (Throwable e) { - LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e); - } + try { + ObjectName objectName = createNetworkBridgeObjectName(bridge); + getBrokerService().getManagementContext().unregisterMBean(objectName); + } catch (Throwable e) { + LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java index ac4ae5e987..2ed9965453 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java @@ -16,15 +16,13 @@ */ package org.apache.activemq.transport; -import java.io.IOException; - -import javax.management.ObjectName; - import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.LogWriterFinder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.io.IOException; +import javax.management.ObjectName; /** * Singleton class to create TransportLogger objects. @@ -201,7 +199,7 @@ public class TransportLoggerFactory { try { this.objectName = new ObjectName(this.managementContext.getJmxDomainName()+":"+ "Type=TransportLoggerControl"); - this.managementContext.getMBeanServer().registerMBean(new TransportLoggerControl(this.managementContext),this.objectName); + this.managementContext.registerMBean(new TransportLoggerControl(this.managementContext),this.objectName); this.transportLoggerControlCreated = true; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerView.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerView.java index 1ac6b7bb10..8c98f18324 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerView.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportLoggerView.java @@ -119,7 +119,7 @@ public class TransportLoggerView implements TransportLoggerViewMBean { */ private void register() { try { - this.managementContext.getMBeanServer().registerMBean(this, this.name); + this.managementContext.registerMBean(this, this.name); } catch (Exception e) { log.error("Could not register MBean for TransportLoggerView " + id + "with name " + this.name.toString() + ", reason: " + e, e); } @@ -138,7 +138,7 @@ public class TransportLoggerView implements TransportLoggerViewMBean { TransportLoggerView.transportLoggerViews.remove(this); try { - this.managementContext.getMBeanServer().unregisterMBean(this.name); + this.managementContext.unregisterMBean(this.name); } catch (Exception e) { log.error("Could not unregister MBean for TransportLoggerView " + id + "with name " + this.name.toString() + ", reason: " + e, e); } diff --git a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java index 21f4ce29de..e7c61a2998 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -910,7 +910,7 @@ public class JMSConsumerTest extends JmsTestSupport { } protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { - MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer(); + String domain = "org.apache.activemq"; ObjectName name; if (destination.isQueue()) { @@ -918,7 +918,7 @@ public class JMSConsumerTest extends JmsTestSupport { } else { name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test"); } - return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true); + return (DestinationViewMBean)broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java index 5734bd54b0..1919cb29a6 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java @@ -74,9 +74,8 @@ public class QueuePurgeTest extends TestCase { ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":Type=Queue,Destination=" + queue.getQueueName() + ",BrokerName=localhost"); - QueueViewMBean proxy = (QueueViewMBean) MBeanServerInvocationHandler - .newProxyInstance(broker.getManagementContext() - .getMBeanServer(), queueViewMBeanName, + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); return proxy; } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java index 18424bb672..03a3b089fd 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java @@ -236,9 +236,8 @@ public class NegativeQueueTest extends TestCase { ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":Type=Queue,Destination=" + queue.getQueueName() + ",BrokerName=localhost"); - QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance( - broker.getManagementContext().getMBeanServer(), - queueViewMBeanName, QueueViewMBean.class, true); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, + QueueViewMBean.class, true); return proxy; } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java index 22879b6564..6503e5ceb8 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java @@ -2,17 +2,6 @@ package org.apache.activemq.bugs; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; - -import javax.jms.ConnectionFactory; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.management.MBeanServer; -import javax.management.MBeanServerInvocationHandler; -import javax.management.ObjectName; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; @@ -23,6 +12,13 @@ import org.apache.activemq.command.ActiveMQDestination; import org.junit.After; import org.junit.Before; import org.junit.Test; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.ObjectName; /** * Test to determine if expired messages are being reaped if there is @@ -122,7 +118,6 @@ public class MessageExpirationReaperTest { } protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { - MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer(); String domain = "org.apache.activemq"; ObjectName name; if (destination.isQueue()) { @@ -130,6 +125,7 @@ public class MessageExpirationReaperTest { } else { name = new ObjectName(domain + ":BrokerName=" + brokerName + ",Type=Topic,Destination=" + destinationName); } - return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true); + return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, + true); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index 60d8977578..a95b205f88 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -16,29 +16,6 @@ */ package org.apache.activemq.transport.stomp; -import java.io.IOException; -import java.net.Socket; -import java.net.SocketTimeoutException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.management.MBeanServer; -import javax.management.MBeanServerInvocationHandler; -import javax.management.ObjectName; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerFactory; @@ -48,6 +25,25 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.io.IOException; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.ObjectName; public class StompTest extends CombinationTestSupport { private static final Log LOG = LogFactory.getLog(StompTest.class); @@ -882,12 +878,11 @@ public class StompTest extends CombinationTestSupport { public void testDurableUnsub() throws Exception { // get broker JMX view - MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer(); - + String domain = "org.apache.activemq"; ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost"); - BrokerViewMBean view = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true); // connect String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL; diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index b5a24afb36..90144f9186 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -16,21 +16,6 @@ */ package org.apache.activemq.usecases; -import java.io.File; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.MBeanServer; -import javax.management.MBeanServerInvocationHandler; -import javax.management.ObjectName; - -import junit.framework.Test; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; @@ -44,6 +29,16 @@ import org.apache.activemq.store.amq.AMQPersistenceAdapter; import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.io.File; +import java.util.concurrent.atomic.AtomicLong; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; +import junit.framework.Test; public class ExpiredMessagesTest extends CombinationTestSupport { @@ -289,16 +284,18 @@ public class ExpiredMessagesTest extends CombinationTestSupport { } protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { - MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer(); - String domain = "org.apache.activemq"; - ObjectName name; - if (destination.isQueue()) { - name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=" + destination.getPhysicalName()); - } else { - name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=" + destination.getPhysicalName()); - } - return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true); - } + String domain = "org.apache.activemq"; + ObjectName name; + if (destination.isQueue()) { + name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=" + + destination.getPhysicalName()); + } else { + name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=" + + destination.getPhysicalName()); + } + return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, + true); + } protected void tearDown() throws Exception { connection.stop(); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java index f9ab03c76f..13b5f88028 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java @@ -16,21 +16,6 @@ */ package org.apache.activemq.usecases; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.MBeanServer; -import javax.management.MBeanServerInvocationHandler; -import javax.management.ObjectName; - -import junit.framework.Test; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; @@ -42,6 +27,16 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; +import junit.framework.Test; public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { @@ -248,16 +243,16 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { } protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { - MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer(); - String domain = "org.apache.activemq"; - ObjectName name; - if (destination.isQueue()) { - name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test"); - } else { - name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test"); - } - return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true); - } + String domain = "org.apache.activemq"; + ObjectName name; + if (destination.isQueue()) { + name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test"); + } else { + name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test"); + } + return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, + true); + } protected void tearDown() throws Exception { connection.stop(); diff --git a/activemq-core/src/test/java/org/apache/activemq/xbean/ManagementContextXBeanConfigTest.java b/activemq-core/src/test/java/org/apache/activemq/xbean/ManagementContextXBeanConfigTest.java index 7eee520233..d25ee77a5b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/xbean/ManagementContextXBeanConfigTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/xbean/ManagementContextXBeanConfigTest.java @@ -16,17 +16,13 @@ */ package org.apache.activemq.xbean; -import java.net.URI; -import java.util.Hashtable; - -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import junit.framework.TestCase; - import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.util.JMXSupport; +import java.net.URI; +import java.util.Hashtable; +import javax.management.ObjectName; +import junit.framework.TestCase; /** * @version $Revision: 1.1 $ @@ -36,19 +32,14 @@ public class ManagementContextXBeanConfigTest extends TestCase { protected BrokerService brokerService; public void testManagmentContextConfiguredCorrectly() throws Exception { - assertEquals(2011, brokerService.getManagementContext().getConnectorPort()); assertEquals("test.domain", brokerService.getManagementContext().getJmxDomainName()); - - MBeanServer beanServer = brokerService.getManagementContext().getMBeanServer(); - // Make sure the broker is registered in the right jmx domain. Hashtable map = new Hashtable(); map.put("Type", "Broker"); map.put("BrokerName", JMXSupport.encodeObjectNamePart("localhost")); ObjectName on = new ObjectName("test.domain", map); - - Object value = beanServer.getAttribute(on, "TotalEnqueueCount"); + Object value = brokerService.getManagementContext().getAttribute(on, "TotalEnqueueCount"); assertNotNull(value); }