diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java index 01e789dbed..4049fb4ae8 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java @@ -153,6 +153,12 @@ public final class ActiveMQDefaultConfiguration // true means that security is enabled private static boolean DEFAULT_SECURITY_ENABLED = true; + // true means that graceful shutdown is enabled + private static boolean DEFAULT_GRACEFUL_SHUTDOWN_ENABLED = false; + + // how long (in ms) to wait before forcing the server to stop even if clients are still connected (i.e circumventing graceful shutdown) + private static long DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = -1; + // how long (in ms) to wait before invalidating the security cache private static long DEFAULT_SECURITY_INVALIDATION_INTERVAL = 10000; @@ -448,6 +454,22 @@ public final class ActiveMQDefaultConfiguration return DEFAULT_SECURITY_ENABLED; } + /** + * true means that graceful shutdown is enabled + */ + public static boolean isDefaultGracefulShutdownEnabled() + { + return DEFAULT_GRACEFUL_SHUTDOWN_ENABLED; + } + + /** + * true means that graceful shutdown is enabled + */ + public static long getDefaultGracefulShutdownTimeout() + { + return DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT; + } + /** * how long (in ms) to wait before invalidating the security cache */ diff --git a/activemq-server/src/main/java/org/apache/activemq/core/config/Configuration.java b/activemq-server/src/main/java/org/apache/activemq/core/config/Configuration.java index 72b29dd544..c814979cdb 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/config/Configuration.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/config/Configuration.java @@ -131,6 +131,28 @@ public interface Configuration extends Serializable */ Configuration setSecurityEnabled(boolean enabled); + /** + * Returns whether graceful shutdown is enabled for this server.
+ * Default value is {@value org.apache.activemq.api.config.ActiveMQDefaultConfiguration#DEFAULT_SECURITY_ENABLED}. + */ + boolean isGracefulShutdownEnabled(); + + /** + * Sets whether security is enabled for this server. + */ + Configuration setGracefulShutdownEnabled(boolean enabled); + + /** + * Returns the graceful shutdown timeout for this server.
+ * Default value is {@value org.apache.activemq.api.config.ActiveMQDefaultConfiguration#DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT}. + */ + long getGracefulShutdownTimeout(); + + /** + * Sets the graceful shutdown timeout + */ + Configuration setGracefulShutdownTimeout(long timeout); + /** * Returns whether this server is manageable using JMX or not.
* Default value is {@value org.apache.activemq.api.config.ActiveMQDefaultConfiguration#DEFAULT_JMX_MANAGEMENT_ENABLED}. diff --git a/activemq-server/src/main/java/org/apache/activemq/core/config/impl/ConfigurationImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/config/impl/ConfigurationImpl.java index cf6354ae2a..3c39a8b0cb 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/config/impl/ConfigurationImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/config/impl/ConfigurationImpl.java @@ -78,6 +78,10 @@ public class ConfigurationImpl implements Configuration private boolean securityEnabled = ActiveMQDefaultConfiguration.isDefaultSecurityEnabled(); + private boolean gracefulShutdownEnabled = ActiveMQDefaultConfiguration.isDefaultGracefulShutdownEnabled(); + + private long gracefulShutdownTimeout = ActiveMQDefaultConfiguration.getDefaultGracefulShutdownTimeout(); + protected boolean jmxManagementEnabled = ActiveMQDefaultConfiguration.isDefaultJmxManagementEnabled(); protected String jmxDomain = ActiveMQDefaultConfiguration.getDefaultJmxDomain(); @@ -738,6 +742,28 @@ public class ConfigurationImpl implements Configuration return this; } + public boolean isGracefulShutdownEnabled() + { + return gracefulShutdownEnabled; + } + + public ConfigurationImpl setGracefulShutdownEnabled(final boolean enabled) + { + gracefulShutdownEnabled = enabled; + return this; + } + + public long getGracefulShutdownTimeout() + { + return gracefulShutdownTimeout; + } + + public ConfigurationImpl setGracefulShutdownTimeout(final long timeout) + { + gracefulShutdownTimeout = timeout; + return this; + } + public boolean isJMXManagementEnabled() { return jmxManagementEnabled; diff --git a/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java b/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java index 996d740b4d..8ebea6ec0f 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java @@ -219,6 +219,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil config.setSecurityEnabled(getBoolean(e, "security-enabled", config.isSecurityEnabled())); + config.setGracefulShutdownEnabled(getBoolean(e, "graceful-shutdown-enabled", config.isGracefulShutdownEnabled())); + + config.setGracefulShutdownTimeout(getLong(e, "graceful-shutdown-timeout", + config.getGracefulShutdownTimeout(), Validators.MINUS_ONE_OR_GE_ZERO)); + config.setJMXManagementEnabled(getBoolean(e, "jmx-management-enabled", config.isJMXManagementEnabled())); config.setJMXDomain(getString(e, "jmx-domain", config.getJMXDomain(), Validators.NOT_NULL_OR_EMPTY)); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/RemotingService.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/RemotingService.java index c12ce3e1fa..0eaf6e2976 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/RemotingService.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/RemotingService.java @@ -23,6 +23,7 @@ import org.apache.activemq.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.core.security.ActiveMQPrincipal; import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.spi.core.remoting.Acceptor; +import org.apache.activemq.utils.ReusableLatch; /** * @author Jeff Mesnil @@ -43,6 +44,8 @@ public interface RemotingService Set getConnections(); + ReusableLatch getConnectionCountLatch(); + void addIncomingInterceptor(Interceptor interceptor); void addOutgoingInterceptor(Interceptor interceptor); @@ -66,6 +69,11 @@ public interface RemotingService */ void allowInvmSecurityOverride(ActiveMQPrincipal principal); + /** + * Pauses the acceptors so that no more connections can be made to the server + */ + void pauseAcceptors(); + /** * Freezes and then disconnects all connections except the given one and tells the client where else * it might connect (only applicable if server is in a cluster and uses scaleDown-on-failover=true). diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java index 4df5b7adf9..9013995948 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java @@ -68,6 +68,7 @@ import org.apache.activemq.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.utils.ClassloadingUtil; import org.apache.activemq.utils.ConfigurationHelper; import org.apache.activemq.utils.ActiveMQThreadFactory; +import org.apache.activemq.utils.ReusableLatch; /** * @author Jeff Mesnil @@ -97,6 +98,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle private final Map connections = new ConcurrentHashMap(); + private final ReusableLatch connectionCountLatch = new ReusableLatch(0); + private final ActiveMQServer server; private final ManagementService managementService; @@ -355,11 +358,10 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle } } - public synchronized void freeze(final String scaleDownNodeID, final CoreRemotingConnection connectionToKeepOpen) + public synchronized void pauseAcceptors() { if (!started) return; - failureCheckAndFlushThread.close(false); for (Acceptor acceptor : acceptors.values()) { @@ -372,6 +374,13 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(); } } + } + + public synchronized void freeze(final String scaleDownNodeID, final CoreRemotingConnection connectionToKeepOpen) + { + if (!started) + return; + failureCheckAndFlushThread.close(false); HashMap connectionEntries = new HashMap(connections); // Now we ensure that no connections will process any more packets after this method is @@ -392,6 +401,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle { conn.disconnect(scaleDownNodeID, false); connections.remove(entry.getKey()); + connectionCountLatch.countDown(); } } } @@ -444,6 +454,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle acceptors.clear(); connections.clear(); + connectionCountLatch.setCount(0); if (managementService != null) { @@ -497,6 +508,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle public RemotingConnection removeConnection(final Object remotingConnectionID) { ConnectionEntry entry = connections.remove(remotingConnectionID); + connectionCountLatch.countDown(); if (entry != null) { @@ -522,6 +534,11 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle return conns; } + public synchronized ReusableLatch getConnectionCountLatch() + { + return connectionCountLatch; + } + // ConnectionLifeCycleListener implementation ----------------------------------- private ProtocolManager getProtocolManager(String protocol) @@ -551,6 +568,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle } connections.put(connection.getID(), entry); + connectionCountLatch.countUp(); } public void connectionDestroyed(final Object connectionID) @@ -587,6 +605,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle if (empty) { connections.remove(connectionID); + connectionCountLatch.countDown(); conn.connection.destroy(); } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java index b1f237db1e..cb91351221 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java @@ -625,6 +625,25 @@ public class ActiveMQServerImpl implements ActiveMQServer } stopComponent(clusterManager); + if (remotingService != null) + { + remotingService.pauseAcceptors(); + } + + // allows for graceful shutdown + if (remotingService != null && configuration.isGracefulShutdownEnabled()) + { + long timeout = configuration.getGracefulShutdownTimeout(); + if (timeout == -1) + { + remotingService.getConnectionCountLatch().await(); + } + else + { + remotingService.getConnectionCountLatch().await(timeout); + } + } + freezeConnections(); } diff --git a/activemq-server/src/main/resources/schema/activemq-configuration.xsd b/activemq-server/src/main/resources/schema/activemq-configuration.xsd index 7ef6023150..723cf57b67 100644 --- a/activemq-server/src/main/resources/schema/activemq-configuration.xsd +++ b/activemq-server/src/main/resources/schema/activemq-configuration.xsd @@ -69,6 +69,22 @@ + + + + true means that graceful shutdown is enabled + + + + + + + + how long (in ms) to wait for clients to disconnect before shutting down the server + + + + diff --git a/activemq-server/src/test/java/org/apache/activemq/core/config/impl/DefaultsFileConfigurationTest.java b/activemq-server/src/test/java/org/apache/activemq/core/config/impl/DefaultsFileConfigurationTest.java index 2eb50c357b..f0643969c3 100644 --- a/activemq-server/src/test/java/org/apache/activemq/core/config/impl/DefaultsFileConfigurationTest.java +++ b/activemq-server/src/test/java/org/apache/activemq/core/config/impl/DefaultsFileConfigurationTest.java @@ -144,6 +144,10 @@ public class DefaultsFileConfigurationTest extends ConfigurationImplTest conf.getMessageExpiryThreadPriority()); Assert.assertTrue(conf.getHAPolicyConfiguration() instanceof LiveOnlyPolicyConfiguration); + + Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultGracefulShutdownEnabled(), conf.isGracefulShutdownEnabled()); + + Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultGracefulShutdownTimeout(), conf.getGracefulShutdownTimeout()); } // Protected --------------------------------------------------------------------------------------------- diff --git a/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java b/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java index 7c1a16698b..182cd7f857 100644 --- a/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java +++ b/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java @@ -90,6 +90,8 @@ public class FileConfigurationTest extends ConfigurationImplTest Assert.assertEquals(100, conf.getJournalMinFiles()); Assert.assertEquals(123, conf.getJournalCompactMinFiles()); Assert.assertEquals(33, conf.getJournalCompactPercentage()); + Assert.assertEquals(true, conf.isGracefulShutdownEnabled()); + Assert.assertEquals(12345, conf.getGracefulShutdownTimeout()); Assert.assertEquals("largemessagesdir", conf.getLargeMessagesDirectory()); Assert.assertEquals(95, conf.getMemoryWarningThreshold()); diff --git a/activemq-server/src/test/resources/ConfigurationTest-full-config.xml b/activemq-server/src/test/resources/ConfigurationTest-full-config.xml index 1045e21fd6..79cded1696 100644 --- a/activemq-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/activemq-server/src/test/resources/ConfigurationTest-full-config.xml @@ -25,6 +25,8 @@ 12345 54321 false + true + 12345 5423 123 true diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md index df0978660a..400495c6b2 100644 --- a/docs/user-manual/en/SUMMARY.md +++ b/docs/user-manual/en/SUMMARY.md @@ -37,6 +37,7 @@ * [Duplicate Message Detection](duplicate-detection.md) * [Clusters](clusters.md) * [High Availability and Failover](ha.md) +* [Graceful Server Shutdown](graceful-shutdown.md) * [Libaio Native Libraries](libaio.md) * [Thread management](thread-pooling.md) * [Logging](logging.md) diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md index 3d9d4781af..8ece98e4da 100644 --- a/docs/user-manual/en/configuration-index.md +++ b/docs/user-manual/en/configuration-index.md @@ -1085,6 +1085,22 @@ element is used by the server side JMS service to load JMS Queues, Topics optional core filter expression + + + graceful-shutdown-enabled + + xsd:boolean + true means that graceful shutdown is enabled + true + + + + graceful-shutdown-timeout + + xsd:long + how long (in ms) to wait for all clients to disconnect before forcefully disconnecting the clients and proceeding with the shutdown process (-1 means no timeout) + -1 + grouping-handler diff --git a/docs/user-manual/en/graceful-shutdown.md b/docs/user-manual/en/graceful-shutdown.md new file mode 100644 index 0000000000..a69aea4bf0 --- /dev/null +++ b/docs/user-manual/en/graceful-shutdown.md @@ -0,0 +1,21 @@ +# Graceful Server Shutdown + +In certain circumstances an administrator might not want to disconnect +all clients immediately when stopping the broker. In this situation the +broker can be configured to shutdown *gracefully* using the +`graceful-shutdown-enabled` boolean configuration parameter. + +When the `graceful-shutdown-enabled` configuration parameter is `true` +and the broker is shutdown it will first prevent any additional clients +from connecting and then it will wait for any existing connections to +be terminated by the client before completing the shutdown process. The +default value is `false`. + +Of course, it's possible a client could keep a connection to the broker +indefinitely effectively preventing the broker from shutting down +gracefully. To deal with this of situation the +`graceful-shutdown-timeout` configuration parameter is available. This +tells the broker (in milliseconds) how long to wait for all clients to +disconnect before forcefully disconnecting the clients and proceeding +with the shutdown process. The default value is `-1` which means the +broker will wait indefinitely for clients to disconnect. \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/GracefulShutdownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/GracefulShutdownTest.java new file mode 100644 index 0000000000..736fac8b38 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/GracefulShutdownTest.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tests.integration.server; + +import org.apache.activemq.api.core.ActiveMQExceptionType; +import org.apache.activemq.api.core.ActiveMQSessionCreationException; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.api.core.client.ActiveMQClient; +import org.apache.activemq.api.core.client.ClientProducer; +import org.apache.activemq.api.core.client.ClientSession; +import org.apache.activemq.api.core.client.ClientSessionFactory; +import org.apache.activemq.api.core.client.ServerLocator; +import org.apache.activemq.core.config.Configuration; +import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.core.server.ActiveMQServers; +import org.apache.activemq.tests.util.ServiceTestBase; +import org.junit.Test; + +/** + * A GracefulShutdownTest + * + * @author Justin Bertram + */ +public class GracefulShutdownTest extends ServiceTestBase +{ + @Test + public void testGracefulShutdown() throws Exception + { + Configuration conf = createDefaultConfig(); + + conf.setGracefulShutdownEnabled(true); + + final ActiveMQServer server = ActiveMQServers.newActiveMQServer(conf, false); + + server.start(); + + ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY)); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(true, true); + + Thread t = new Thread(new Runnable() + { + public void run() + { + try + { + server.stop(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + }); + + t.setName("shutdown thread"); + t.start(); + + // wait for the thread to actually call stop() on the server + while (server.isStarted()) + { + Thread.sleep(100); + } + + // confirm we can still do work on the original connection even though the server is stopping + session.createQueue("testAddress", "testQueue"); + ClientProducer producer = session.createProducer("testAddress"); + producer.send(session.createMessage(true)); + session.start(); + assertNotNull(session.createConsumer("testQueue").receive(500)); + + try + { + sf.createSession(); + fail("Creating a session here should fail because the acceptors should be paused"); + } + catch (Exception e) + { + assertTrue(e instanceof ActiveMQSessionCreationException); + ActiveMQSessionCreationException activeMQSessionCreationException = (ActiveMQSessionCreationException) e; + assertEquals(activeMQSessionCreationException.getType(), ActiveMQExceptionType.SESSION_CREATION_REJECTED); + } + + // close the connection to allow broker shutdown to complete + locator.close(); + + long start = System.currentTimeMillis(); + + // wait for the shutdown thread to complete, interrupt it if it takes too long + while (t.isAlive()) + { + if (System.currentTimeMillis() - start > 3000) + { + t.interrupt(); + break; + } + Thread.sleep(100); + } + + // make sure the shutdown thread is dead + assertFalse(t.isAlive()); + } + + @Test + public void testGracefulShutdownWithTimeout() throws Exception + { + long timeout = 10000; + + Configuration conf = createDefaultConfig(); + + conf.setGracefulShutdownEnabled(true); + conf.setGracefulShutdownTimeout(timeout); + + final ActiveMQServer server = ActiveMQServers.newActiveMQServer(conf, false); + + server.start(); + + ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY)); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(); + + Thread t = new Thread(new Runnable() + { + public void run() + { + try + { + server.stop(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + }); + + t.setName("shutdown thread"); + long start = System.currentTimeMillis(); + t.start(); + + // wait for the thread to actually call stop() on the server + while (server.isStarted()) + { + Thread.sleep(100); + } + + try + { + sf.createSession(); + fail("Creating a session here should fail because the acceptors should be paused"); + } + catch (Exception e) + { + assertTrue(e instanceof ActiveMQSessionCreationException); + ActiveMQSessionCreationException activeMQSessionCreationException = (ActiveMQSessionCreationException) e; + assertEquals(activeMQSessionCreationException.getType(), ActiveMQExceptionType.SESSION_CREATION_REJECTED); + } + + Thread.sleep(timeout / 2); + + assertTrue("thread should still be alive here waiting for the timeout to elapse", t.isAlive()); + + while (t.isAlive()) + { + Thread.sleep(100); + } + + assertTrue("thread terminated too soon, the graceful shutdown timeout wasn't enforced properly", System.currentTimeMillis() - start >= timeout); + + locator.close(); + } +}