diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java index b83ec9b197..a1800db4a6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.management.Attribute; @@ -87,7 +89,7 @@ public class ManagementContext implements Service { private int rmiServerPort; private String connectorPath = "/jmxrmi"; private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean connectorStarting = new AtomicBoolean(false); + private final CountDownLatch connectorStarted = new CountDownLatch(1); private JMXConnectorServer connectorServer; private ObjectName namingServiceObjectName; private Registry registry; @@ -141,7 +143,6 @@ public class ManagementContext implements Service { JMXConnectorServer server = connectorServer; if (started.get() && server != null) { LOG.debug("Starting JMXConnectorServer..."); - connectorStarting.set(true); try { // need to remove MDC as we must not inherit MDC in child threads causing leaks MDC.remove("activemq.broker"); @@ -150,7 +151,7 @@ public class ManagementContext implements Service { if (brokerName != null) { MDC.put("activemq.broker", brokerName); } - connectorStarting.set(false); + connectorStarted.countDown(); } LOG.info("JMX consoles can connect to {}", server.getAddress()); } @@ -198,7 +199,7 @@ public class ManagementContext implements Service { connectorServer = null; if (server != null) { try { - if (!connectorStarting.get()) { + if (connectorStarted.await(10, TimeUnit.SECONDS)) { LOG.debug("Stopping jmx connector"); server.stop(); } @@ -326,7 +327,7 @@ public class ManagementContext implements Service { } public boolean isConnectorStarted() { - return connectorStarting.get() || (connectorServer != null && connectorServer.isActive()); + return connectorStarted.getCount() == 0 || (connectorServer != null && connectorServer.isActive()); } /** diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java index a4bd832396..cd472c0d35 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java @@ -17,22 +17,41 @@ package org.apache.activemq.store.jdbc; import java.io.File; +import java.io.IOException; +import java.io.ObjectInputStream; import java.io.PrintWriter; +import java.net.Socket; +import java.rmi.registry.Registry; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; +import javax.management.*; +import javax.management.loading.ClassLoaderRepository; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXConnectorServerFactory; +import javax.management.remote.JMXServiceURL; +import com.sun.jndi.rmi.registry.RegistryContext; +import com.sun.jndi.rmi.registry.RegistryContextFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ft.SyncCreateDataSource; +import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.bugs.embedded.ThreadExplorer; +import org.apache.activemq.util.DefaultTestAppender; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.LeaseLockerIOExceptionHandler; import org.apache.activemq.util.Wait; import org.apache.derby.jdbc.EmbeddedDataSource; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -104,7 +123,32 @@ public class JDBCIOExceptionHandlerTest { @Test public void testStartWithDatabaseDown() throws Exception { + final AtomicBoolean connectorStarted = new AtomicBoolean(false); + final AtomicBoolean connectorStopped = new AtomicBoolean(false); + + DefaultTestAppender appender = new DefaultTestAppender() { + + @Override + public void doAppend(LoggingEvent event) { + if (event.getMessage().toString().startsWith("JMX consoles can connect to")) { + connectorStarted.set(true); + } + + if (event.getMessage().toString().equals("Stopping jmx connector")) { + connectorStopped.set(true); + } + } + }; + + org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger(); + Level previousLevel = rootLogger.getLevel(); + rootLogger.setLevel(Level.DEBUG); + rootLogger.addAppender(appender); + + BrokerService broker = new BrokerService(); + broker.getManagementContext().setCreateConnector(true); + broker.getManagementContext().setCreateMBeanServer(true); JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) jdbc.getDataSource(); @@ -137,9 +181,16 @@ public class JDBCIOExceptionHandlerTest { fail("IOExceptionHanlder still active"); } } + + if (connectorStarted.get() && !connectorStopped.get()) { + fail("JMX Server Connector should have been stopped!"); + } + } finally { dataSource = null; broker = null; + rootLogger.removeAppender(appender); + rootLogger.setLevel(previousLevel); } }