https://issues.apache.org/jira/browse/AMQ-6799 - stop the jmx server connector properly

This commit is contained in:
Dejan Bosanac 2017-09-01 13:02:53 +02:00
parent 35bd3ad938
commit 5e656d394c
2 changed files with 57 additions and 5 deletions

View File

@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.Attribute; import javax.management.Attribute;
@ -87,7 +89,7 @@ public class ManagementContext implements Service {
private int rmiServerPort; private int rmiServerPort;
private String connectorPath = "/jmxrmi"; private String connectorPath = "/jmxrmi";
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean connectorStarting = new AtomicBoolean(false); private final CountDownLatch connectorStarted = new CountDownLatch(1);
private JMXConnectorServer connectorServer; private JMXConnectorServer connectorServer;
private ObjectName namingServiceObjectName; private ObjectName namingServiceObjectName;
private Registry registry; private Registry registry;
@ -141,7 +143,6 @@ public class ManagementContext implements Service {
JMXConnectorServer server = connectorServer; JMXConnectorServer server = connectorServer;
if (started.get() && server != null) { if (started.get() && server != null) {
LOG.debug("Starting JMXConnectorServer..."); LOG.debug("Starting JMXConnectorServer...");
connectorStarting.set(true);
try { try {
// need to remove MDC as we must not inherit MDC in child threads causing leaks // need to remove MDC as we must not inherit MDC in child threads causing leaks
MDC.remove("activemq.broker"); MDC.remove("activemq.broker");
@ -150,7 +151,7 @@ public class ManagementContext implements Service {
if (brokerName != null) { if (brokerName != null) {
MDC.put("activemq.broker", brokerName); MDC.put("activemq.broker", brokerName);
} }
connectorStarting.set(false); connectorStarted.countDown();
} }
LOG.info("JMX consoles can connect to {}", server.getAddress()); LOG.info("JMX consoles can connect to {}", server.getAddress());
} }
@ -198,7 +199,7 @@ public class ManagementContext implements Service {
connectorServer = null; connectorServer = null;
if (server != null) { if (server != null) {
try { try {
if (!connectorStarting.get()) { if (connectorStarted.await(10, TimeUnit.SECONDS)) {
LOG.debug("Stopping jmx connector"); LOG.debug("Stopping jmx connector");
server.stop(); server.stop();
} }
@ -326,7 +327,7 @@ public class ManagementContext implements Service {
} }
public boolean isConnectorStarted() { public boolean isConnectorStarted() {
return connectorStarting.get() || (connectorServer != null && connectorServer.isActive()); return connectorStarted.getCount() == 0 || (connectorServer != null && connectorServer.isActive());
} }
/** /**

View File

@ -17,22 +17,41 @@
package org.apache.activemq.store.jdbc; package org.apache.activemq.store.jdbc;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.Socket;
import java.rmi.registry.Registry;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection; import javax.jms.Connection;
import javax.management.*;
import javax.management.loading.ClassLoaderRepository;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
import com.sun.jndi.rmi.registry.RegistryContext;
import com.sun.jndi.rmi.registry.RegistryContextFactory;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ft.SyncCreateDataSource; import org.apache.activemq.broker.ft.SyncCreateDataSource;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.bugs.embedded.ThreadExplorer; import org.apache.activemq.bugs.embedded.ThreadExplorer;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LeaseLockerIOExceptionHandler; import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.derby.jdbc.EmbeddedDataSource; import org.apache.derby.jdbc.EmbeddedDataSource;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -104,7 +123,32 @@ public class JDBCIOExceptionHandlerTest {
@Test @Test
public void testStartWithDatabaseDown() throws Exception { public void testStartWithDatabaseDown() throws Exception {
final AtomicBoolean connectorStarted = new AtomicBoolean(false);
final AtomicBoolean connectorStopped = new AtomicBoolean(false);
DefaultTestAppender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getMessage().toString().startsWith("JMX consoles can connect to")) {
connectorStarted.set(true);
}
if (event.getMessage().toString().equals("Stopping jmx connector")) {
connectorStopped.set(true);
}
}
};
org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
Level previousLevel = rootLogger.getLevel();
rootLogger.setLevel(Level.DEBUG);
rootLogger.addAppender(appender);
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.getManagementContext().setCreateConnector(true);
broker.getManagementContext().setCreateMBeanServer(true);
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) jdbc.getDataSource(); EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) jdbc.getDataSource();
@ -137,9 +181,16 @@ public class JDBCIOExceptionHandlerTest {
fail("IOExceptionHanlder still active"); fail("IOExceptionHanlder still active");
} }
} }
if (connectorStarted.get() && !connectorStopped.get()) {
fail("JMX Server Connector should have been stopped!");
}
} finally { } finally {
dataSource = null; dataSource = null;
broker = null; broker = null;
rootLogger.removeAppender(appender);
rootLogger.setLevel(previousLevel);
} }
} }