ACTIVEMQ6-52 Graceful shutdown

Implements a feature whereby the broker will not shutdown while there are
clients connected. A timeout can be specified so that even if there are
clients connected the broker will still shutdown after a certain time.
This commit is contained in:
jbertram 2015-01-19 09:34:09 -06:00
parent 881f39ce4d
commit 754d481d53
15 changed files with 375 additions and 2 deletions

View File

@ -153,6 +153,12 @@ public final class ActiveMQDefaultConfiguration
// true means that security is enabled
private static boolean DEFAULT_SECURITY_ENABLED = true;
// true means that graceful shutdown is enabled
private static boolean DEFAULT_GRACEFUL_SHUTDOWN_ENABLED = false;
// how long (in ms) to wait before forcing the server to stop even if clients are still connected (i.e circumventing graceful shutdown)
private static long DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = -1;
// how long (in ms) to wait before invalidating the security cache
private static long DEFAULT_SECURITY_INVALIDATION_INTERVAL = 10000;
@ -448,6 +454,22 @@ public final class ActiveMQDefaultConfiguration
return DEFAULT_SECURITY_ENABLED;
}
/**
* true means that graceful shutdown is enabled
*/
public static boolean isDefaultGracefulShutdownEnabled()
{
return DEFAULT_GRACEFUL_SHUTDOWN_ENABLED;
}
/**
* true means that graceful shutdown is enabled
*/
public static long getDefaultGracefulShutdownTimeout()
{
return DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT;
}
/**
* how long (in ms) to wait before invalidating the security cache
*/

View File

@ -131,6 +131,28 @@ public interface Configuration extends Serializable
*/
Configuration setSecurityEnabled(boolean enabled);
/**
* Returns whether graceful shutdown is enabled for this server. <br>
* Default value is {@value org.apache.activemq.api.config.ActiveMQDefaultConfiguration#DEFAULT_SECURITY_ENABLED}.
*/
boolean isGracefulShutdownEnabled();
/**
* Sets whether security is enabled for this server.
*/
Configuration setGracefulShutdownEnabled(boolean enabled);
/**
* Returns the graceful shutdown timeout for this server. <br>
* Default value is {@value org.apache.activemq.api.config.ActiveMQDefaultConfiguration#DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT}.
*/
long getGracefulShutdownTimeout();
/**
* Sets the graceful shutdown timeout
*/
Configuration setGracefulShutdownTimeout(long timeout);
/**
* Returns whether this server is manageable using JMX or not. <br>
* Default value is {@value org.apache.activemq.api.config.ActiveMQDefaultConfiguration#DEFAULT_JMX_MANAGEMENT_ENABLED}.

View File

@ -78,6 +78,10 @@ public class ConfigurationImpl implements Configuration
private boolean securityEnabled = ActiveMQDefaultConfiguration.isDefaultSecurityEnabled();
private boolean gracefulShutdownEnabled = ActiveMQDefaultConfiguration.isDefaultGracefulShutdownEnabled();
private long gracefulShutdownTimeout = ActiveMQDefaultConfiguration.getDefaultGracefulShutdownTimeout();
protected boolean jmxManagementEnabled = ActiveMQDefaultConfiguration.isDefaultJmxManagementEnabled();
protected String jmxDomain = ActiveMQDefaultConfiguration.getDefaultJmxDomain();
@ -738,6 +742,28 @@ public class ConfigurationImpl implements Configuration
return this;
}
public boolean isGracefulShutdownEnabled()
{
return gracefulShutdownEnabled;
}
public ConfigurationImpl setGracefulShutdownEnabled(final boolean enabled)
{
gracefulShutdownEnabled = enabled;
return this;
}
public long getGracefulShutdownTimeout()
{
return gracefulShutdownTimeout;
}
public ConfigurationImpl setGracefulShutdownTimeout(final long timeout)
{
gracefulShutdownTimeout = timeout;
return this;
}
public boolean isJMXManagementEnabled()
{
return jmxManagementEnabled;

View File

@ -219,6 +219,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
config.setSecurityEnabled(getBoolean(e, "security-enabled", config.isSecurityEnabled()));
config.setGracefulShutdownEnabled(getBoolean(e, "graceful-shutdown-enabled", config.isGracefulShutdownEnabled()));
config.setGracefulShutdownTimeout(getLong(e, "graceful-shutdown-timeout",
config.getGracefulShutdownTimeout(), Validators.MINUS_ONE_OR_GE_ZERO));
config.setJMXManagementEnabled(getBoolean(e, "jmx-management-enabled", config.isJMXManagementEnabled()));
config.setJMXDomain(getString(e, "jmx-domain", config.getJMXDomain(), Validators.NOT_NULL_OR_EMPTY));

View File

@ -23,6 +23,7 @@ import org.apache.activemq.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.core.security.ActiveMQPrincipal;
import org.apache.activemq.spi.core.protocol.RemotingConnection;
import org.apache.activemq.spi.core.remoting.Acceptor;
import org.apache.activemq.utils.ReusableLatch;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@ -43,6 +44,8 @@ public interface RemotingService
Set<RemotingConnection> getConnections();
ReusableLatch getConnectionCountLatch();
void addIncomingInterceptor(Interceptor interceptor);
void addOutgoingInterceptor(Interceptor interceptor);
@ -66,6 +69,11 @@ public interface RemotingService
*/
void allowInvmSecurityOverride(ActiveMQPrincipal principal);
/**
* Pauses the acceptors so that no more connections can be made to the server
*/
void pauseAcceptors();
/**
* Freezes and then disconnects all connections except the given one and tells the client where else
* it might connect (only applicable if server is in a cluster and uses scaleDown-on-failover=true).

View File

@ -68,6 +68,7 @@ import org.apache.activemq.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.utils.ClassloadingUtil;
import org.apache.activemq.utils.ConfigurationHelper;
import org.apache.activemq.utils.ActiveMQThreadFactory;
import org.apache.activemq.utils.ReusableLatch;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@ -97,6 +98,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
private final Map<Object, ConnectionEntry> connections = new ConcurrentHashMap<Object, ConnectionEntry>();
private final ReusableLatch connectionCountLatch = new ReusableLatch(0);
private final ActiveMQServer server;
private final ManagementService managementService;
@ -355,11 +358,10 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
}
}
public synchronized void freeze(final String scaleDownNodeID, final CoreRemotingConnection connectionToKeepOpen)
public synchronized void pauseAcceptors()
{
if (!started)
return;
failureCheckAndFlushThread.close(false);
for (Acceptor acceptor : acceptors.values())
{
@ -372,6 +374,13 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor();
}
}
}
public synchronized void freeze(final String scaleDownNodeID, final CoreRemotingConnection connectionToKeepOpen)
{
if (!started)
return;
failureCheckAndFlushThread.close(false);
HashMap<Object, ConnectionEntry> connectionEntries = new HashMap<Object, ConnectionEntry>(connections);
// Now we ensure that no connections will process any more packets after this method is
@ -392,6 +401,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
{
conn.disconnect(scaleDownNodeID, false);
connections.remove(entry.getKey());
connectionCountLatch.countDown();
}
}
}
@ -444,6 +454,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
acceptors.clear();
connections.clear();
connectionCountLatch.setCount(0);
if (managementService != null)
{
@ -497,6 +508,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
public RemotingConnection removeConnection(final Object remotingConnectionID)
{
ConnectionEntry entry = connections.remove(remotingConnectionID);
connectionCountLatch.countDown();
if (entry != null)
{
@ -522,6 +534,11 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
return conns;
}
public synchronized ReusableLatch getConnectionCountLatch()
{
return connectionCountLatch;
}
// ConnectionLifeCycleListener implementation -----------------------------------
private ProtocolManager getProtocolManager(String protocol)
@ -551,6 +568,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
}
connections.put(connection.getID(), entry);
connectionCountLatch.countUp();
}
public void connectionDestroyed(final Object connectionID)
@ -587,6 +605,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
if (empty)
{
connections.remove(connectionID);
connectionCountLatch.countDown();
conn.connection.destroy();
}

View File

@ -625,6 +625,25 @@ public class ActiveMQServerImpl implements ActiveMQServer
}
stopComponent(clusterManager);
if (remotingService != null)
{
remotingService.pauseAcceptors();
}
// allows for graceful shutdown
if (remotingService != null && configuration.isGracefulShutdownEnabled())
{
long timeout = configuration.getGracefulShutdownTimeout();
if (timeout == -1)
{
remotingService.getConnectionCountLatch().await();
}
else
{
remotingService.getConnectionCountLatch().await(timeout);
}
}
freezeConnections();
}

View File

@ -69,6 +69,22 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="graceful-shutdown-enabled" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
true means that graceful shutdown is enabled
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="graceful-shutdown-timeout" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
how long (in ms) to wait for clients to disconnect before shutting down the server
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="security-enabled" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -144,6 +144,10 @@ public class DefaultsFileConfigurationTest extends ConfigurationImplTest
conf.getMessageExpiryThreadPriority());
Assert.assertTrue(conf.getHAPolicyConfiguration() instanceof LiveOnlyPolicyConfiguration);
Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultGracefulShutdownEnabled(), conf.isGracefulShutdownEnabled());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultGracefulShutdownTimeout(), conf.getGracefulShutdownTimeout());
}
// Protected ---------------------------------------------------------------------------------------------

View File

@ -90,6 +90,8 @@ public class FileConfigurationTest extends ConfigurationImplTest
Assert.assertEquals(100, conf.getJournalMinFiles());
Assert.assertEquals(123, conf.getJournalCompactMinFiles());
Assert.assertEquals(33, conf.getJournalCompactPercentage());
Assert.assertEquals(true, conf.isGracefulShutdownEnabled());
Assert.assertEquals(12345, conf.getGracefulShutdownTimeout());
Assert.assertEquals("largemessagesdir", conf.getLargeMessagesDirectory());
Assert.assertEquals(95, conf.getMemoryWarningThreshold());

View File

@ -25,6 +25,8 @@
<scheduled-thread-pool-max-size>12345</scheduled-thread-pool-max-size>
<thread-pool-max-size>54321</thread-pool-max-size>
<security-enabled>false</security-enabled>
<graceful-shutdown-enabled>true</graceful-shutdown-enabled>
<graceful-shutdown-timeout>12345</graceful-shutdown-timeout>
<security-invalidation-interval>5423</security-invalidation-interval>
<journal-lock-acquisition-timeout>123</journal-lock-acquisition-timeout>
<wild-card-routing-enabled>true</wild-card-routing-enabled>

View File

@ -37,6 +37,7 @@
* [Duplicate Message Detection](duplicate-detection.md)
* [Clusters](clusters.md)
* [High Availability and Failover](ha.md)
* [Graceful Server Shutdown](graceful-shutdown.md)
* [Libaio Native Libraries](libaio.md)
* [Thread management](thread-pooling.md)
* [Logging](logging.md)

View File

@ -1085,6 +1085,22 @@ element is used by the server side JMS service to load JMS Queues, Topics
<td>optional core filter expression</td>
<td></td>
</tr>
<tr>
<td>
<a href="graceful-shutdown.md" title="Graceful Server Shutdown">graceful-shutdown-enabled</a>
</td>
<td>xsd:boolean</td>
<td>true means that graceful shutdown is enabled</td>
<td>true</td>
</tr>
<tr>
<td>
<a href="graceful-shutdown.md" title="Graceful Server Shutdown">graceful-shutdown-timeout</a>
</td>
<td>xsd:long</td>
<td>how long (in ms) to wait for all clients to disconnect before forcefully disconnecting the clients and proceeding with the shutdown process (-1 means no timeout)</td>
<td>-1</td>
</tr>
<tr>
<td>
<a href="message-grouping.md" title="Chapter 28. Message Grouping">grouping-handler</a>

View File

@ -0,0 +1,21 @@
# Graceful Server Shutdown
In certain circumstances an administrator might not want to disconnect
all clients immediately when stopping the broker. In this situation the
broker can be configured to shutdown *gracefully* using the
`graceful-shutdown-enabled` boolean configuration parameter.
When the `graceful-shutdown-enabled` configuration parameter is `true`
and the broker is shutdown it will first prevent any additional clients
from connecting and then it will wait for any existing connections to
be terminated by the client before completing the shutdown process. The
default value is `false`.
Of course, it's possible a client could keep a connection to the broker
indefinitely effectively preventing the broker from shutting down
gracefully. To deal with this of situation the
`graceful-shutdown-timeout` configuration parameter is available. This
tells the broker (in milliseconds) how long to wait for all clients to
disconnect before forcefully disconnecting the clients and proceeding
with the shutdown process. The default value is `-1` which means the
broker will wait indefinitely for clients to disconnect.

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.integration.server;
import org.apache.activemq.api.core.ActiveMQExceptionType;
import org.apache.activemq.api.core.ActiveMQSessionCreationException;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ClientProducer;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.ActiveMQServers;
import org.apache.activemq.tests.util.ServiceTestBase;
import org.junit.Test;
/**
* A GracefulShutdownTest
*
* @author Justin Bertram
*/
public class GracefulShutdownTest extends ServiceTestBase
{
@Test
public void testGracefulShutdown() throws Exception
{
Configuration conf = createDefaultConfig();
conf.setGracefulShutdownEnabled(true);
final ActiveMQServer server = ActiveMQServers.newActiveMQServer(conf, false);
server.start();
ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(true, true);
Thread t = new Thread(new Runnable()
{
public void run()
{
try
{
server.stop();
}
catch (Exception e)
{
e.printStackTrace();
}
}
});
t.setName("shutdown thread");
t.start();
// wait for the thread to actually call stop() on the server
while (server.isStarted())
{
Thread.sleep(100);
}
// confirm we can still do work on the original connection even though the server is stopping
session.createQueue("testAddress", "testQueue");
ClientProducer producer = session.createProducer("testAddress");
producer.send(session.createMessage(true));
session.start();
assertNotNull(session.createConsumer("testQueue").receive(500));
try
{
sf.createSession();
fail("Creating a session here should fail because the acceptors should be paused");
}
catch (Exception e)
{
assertTrue(e instanceof ActiveMQSessionCreationException);
ActiveMQSessionCreationException activeMQSessionCreationException = (ActiveMQSessionCreationException) e;
assertEquals(activeMQSessionCreationException.getType(), ActiveMQExceptionType.SESSION_CREATION_REJECTED);
}
// close the connection to allow broker shutdown to complete
locator.close();
long start = System.currentTimeMillis();
// wait for the shutdown thread to complete, interrupt it if it takes too long
while (t.isAlive())
{
if (System.currentTimeMillis() - start > 3000)
{
t.interrupt();
break;
}
Thread.sleep(100);
}
// make sure the shutdown thread is dead
assertFalse(t.isAlive());
}
@Test
public void testGracefulShutdownWithTimeout() throws Exception
{
long timeout = 10000;
Configuration conf = createDefaultConfig();
conf.setGracefulShutdownEnabled(true);
conf.setGracefulShutdownTimeout(timeout);
final ActiveMQServer server = ActiveMQServers.newActiveMQServer(conf, false);
server.start();
ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession();
Thread t = new Thread(new Runnable()
{
public void run()
{
try
{
server.stop();
}
catch (Exception e)
{
e.printStackTrace();
}
}
});
t.setName("shutdown thread");
long start = System.currentTimeMillis();
t.start();
// wait for the thread to actually call stop() on the server
while (server.isStarted())
{
Thread.sleep(100);
}
try
{
sf.createSession();
fail("Creating a session here should fail because the acceptors should be paused");
}
catch (Exception e)
{
assertTrue(e instanceof ActiveMQSessionCreationException);
ActiveMQSessionCreationException activeMQSessionCreationException = (ActiveMQSessionCreationException) e;
assertEquals(activeMQSessionCreationException.getType(), ActiveMQExceptionType.SESSION_CREATION_REJECTED);
}
Thread.sleep(timeout / 2);
assertTrue("thread should still be alive here waiting for the timeout to elapse", t.isAlive());
while (t.isAlive())
{
Thread.sleep(100);
}
assertTrue("thread terminated too soon, the graceful shutdown timeout wasn't enforced properly", System.currentTimeMillis() - start >= timeout);
locator.close();
}
}