This closes #234

This commit is contained in:
Clebert Suconic 2015-11-04 21:30:08 -05:00
commit d585d465b4
3 changed files with 93 additions and 24 deletions

View File

@ -346,8 +346,15 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("Pausing acceptor " + acceptor);
}
try {
acceptor.pause();
}
catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor();
}
}
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("Sending disconnect on live connections");
@ -368,8 +375,13 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
}
for (Acceptor acceptor : acceptors.values()) {
try {
acceptor.stop();
}
catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor();
}
}
acceptors.clear();
@ -391,7 +403,6 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
}
started = false;
}
@Override

View File

@ -1457,4 +1457,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224067, value = "Error populating security roles from LDAP", format = Message.Format.MESSAGE_FORMAT)
void errorPopulatingSecurityRolesFromLDAP(@Cause Exception e);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224068, value = "Unable to stop component: {0}", format = Message.Format.MESSAGE_FORMAT)
void errorStoppingComponent(@Cause Throwable t, String componentClassName);
@LogMessage(level = Logger.Level.DEBUG)
@Message(id = 224070, value = "Received Interrupt Exception whilst waiting for component to shutdown: {0}", format = Message.Format.MESSAGE_FORMAT)
void interruptWhilstStoppingComponent(String componentClassName);
}

View File

@ -572,7 +572,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
*
* @param criticalIOError whether we have encountered an IO error with the journal etc
*/
void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting) throws Exception {
void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting) {
synchronized (this) {
if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) {
return;
@ -587,7 +587,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// aren't removed in case of failover
if (groupingHandler != null) {
managementService.removeNotificationListener(groupingHandler);
groupingHandler.stop();
stopComponent(groupingHandler);
}
stopComponent(clusterManager);
@ -598,6 +598,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// allows for graceful shutdown
if (remotingService != null && configuration.isGracefulShutdownEnabled()) {
long timeout = configuration.getGracefulShutdownTimeout();
try {
if (timeout == -1) {
remotingService.getConnectionCountLatch().await();
}
@ -605,6 +606,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
remotingService.getConnectionCountLatch().await(timeout);
}
}
catch (InterruptedException e) {
ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(remotingService.getClass().getName());
}
}
freezeConnections();
}
@ -629,24 +634,45 @@ public class ActiveMQServerImpl implements ActiveMQServer {
callDeActiveCallbacks();
stopComponent(backupManager);
try {
activation.preStorageClose();
}
catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, activation.getClass().getName());
}
stopComponent(pagingManager);
if (storageManager != null)
try {
storageManager.stop(criticalIOError);
}
catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, storageManager.getClass().getName());
}
// We stop remotingService before otherwise we may lock the system in case of a critical IO
// error shutdown
if (remotingService != null)
try {
remotingService.stop(criticalIOError);
}
catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, remotingService.getClass().getName());
}
// Stop the management service after the remoting service to ensure all acceptors are deregistered with JMX
if (managementService != null)
try {
managementService.unregisterServer();
}
catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, managementService.getClass().getName());
}
stopComponent(managementService);
stopComponent(resourceManager);
stopComponent(postOffice);
if (scheduledPool != null && !scheduledPoolSupplied) {
@ -667,7 +693,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
catch (InterruptedException e) {
// Ignore
ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(threadPool.getClass().getName());
}
}
@ -676,8 +702,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (!scheduledPoolSupplied)
scheduledPool = null;
if (securityStore != null)
if (securityStore != null) {
try {
securityStore.stop();
}
catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, managementService.getClass().getName());
}
}
pagingManager = null;
securityStore = null;
@ -699,11 +732,22 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// to display in the log message
SimpleString tempNodeID = getNodeID();
if (activation != null) {
try {
activation.close(failoverOnServerShutdown, restarting);
}
if (backupActivationThread != null) {
catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, activation.getClass().getName());
}
}
if (backupActivationThread != null) {
try {
backupActivationThread.join(30000);
}
catch (InterruptedException e) {
ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(backupActivationThread.getClass().getName());
}
if (backupActivationThread.isAlive()) {
ActiveMQServerLogger.LOGGER.backupActivationDidntFinish(this);
backupActivationThread.interrupt();
@ -790,10 +834,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
static void stopComponent(ActiveMQComponent component) throws Exception {
if (component != null)
static void stopComponent(ActiveMQComponent component) {
try {
if (component != null) {
component.stop();
}
}
catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, component.getClass().getName());
}
}
// ActiveMQServer implementation
// -----------------------------------------------------------