From 8a01c5d16a051541df404683e6d8ae71bc6240be Mon Sep 17 00:00:00 2001 From: Claus Ibsen Date: Fri, 7 Sep 2012 11:58:21 +0000 Subject: [PATCH] AMQ-3451: Ensure thread pools is shutdown properly to avoid any leaks. Do not use the old @deprecated thread pool. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1381985 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/TransportConnection.java | 36 +++++---- .../activemq/broker/TransportConnector.java | 7 +- .../jmx/ManagedTransportConnection.java | 5 +- .../broker/jmx/ManagedTransportConnector.java | 5 +- .../transport/failover/FailoverTransport.java | 46 +++++++----- .../transport/fanout/FanoutTransport.java | 45 ++++++----- .../transport/nio/NIOSSLTransport.java | 18 ++++- .../activemq/transport/tcp/TcpTransport.java | 20 +++-- .../activemq/transport/vm/VMTransport.java | 16 +++- .../apache/activemq/util/ThreadPoolUtils.java | 75 ++++++++----------- 10 files changed, 160 insertions(+), 113 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 10ea326369..74e99f6be5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -52,7 +52,6 @@ import org.apache.activemq.state.ConsumerState; import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.SessionState; import org.apache.activemq.state.TransactionState; -import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; @@ -117,6 +116,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); private DemandForwardingBridge duplexBridge; private final TaskRunnerFactory taskRunnerFactory; + private final TaskRunnerFactory stopTaskRunnerFactory; private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); private String duplexNetworkConnectorId; @@ -125,18 +125,20 @@ public class TransportConnection implements Connection, Task, CommandVisitor { /** * @param taskRunnerFactory - can be null if you want direct dispatch to the transport * else commands are sent async. + * @param stopTaskRunnerFactory - can not be null, used for stopping this connection. */ public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, - TaskRunnerFactory taskRunnerFactory) { + TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) { this.connector = connector; this.broker = broker; - this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class); brokerConnectionStates = rb.getConnectionStates(); if (connector != null) { this.statistics.setParent(connector.getStatistics()); + this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); } this.taskRunnerFactory = taskRunnerFactory; + this.stopTaskRunnerFactory = stopTaskRunnerFactory; this.transport = transport; this.transport.setTransportListener(new DefaultTransportListener() { @Override @@ -939,6 +941,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } public void stop() throws Exception { + // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory) + // as their lifecycle is handled elsewhere + stopAsync(); while (!stopped.await(5, TimeUnit.SECONDS)) { LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown."); @@ -952,7 +957,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { stopError = cause; } try { - DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { + stopTaskRunnerFactory.execute(new Runnable() { public void run() { try { Thread.sleep(waitTime); @@ -961,9 +966,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } catch (InterruptedException e) { } } - }, "delayedStop:" + transport.getRemoteAddress()); + }); } catch (Throwable t) { - LOG.warn("cannot create stopAsync :", t); + LOG.warn("Cannot create stopAsync. This exception will be ignored.", t); } } } @@ -988,7 +993,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } try { - DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { + stopTaskRunnerFactory.execute(new Runnable() { public void run() { serviceLock.writeLock().lock(); try { @@ -1000,9 +1005,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { serviceLock.writeLock().unlock(); } } - }, "StopAsync:" + transport.getRemoteAddress()); + }); } catch (Throwable t) { - LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t); + LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t); stopped.countDown(); } } @@ -1013,8 +1018,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return "Transport Connection to: " + transport.getRemoteAddress(); } - protected void doStop() throws Exception, InterruptedException { - LOG.debug("Stopping connection: " + transport.getRemoteAddress()); + protected void doStop() throws Exception { + LOG.debug("Stopping connection: {}", transport.getRemoteAddress()); connector.onStopped(this); try { synchronized (this) { @@ -1026,16 +1031,17 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } } catch (Exception ignore) { - LOG.trace("Exception caught stopping", ignore); + LOG.trace("Exception caught stopping. This exception is ignored.", ignore); } try { transport.stop(); LOG.debug("Stopped transport: " + transport.getRemoteAddress()); } catch (Exception e) { - LOG.debug("Could not stop transport: " + e, e); + LOG.debug("Could not stop transport to " + transport.getRemoteAddress() + ". This exception is ignored.", e); } if (taskRunner != null) { taskRunner.shutdown(1); + taskRunner = null; } active = false; // Run the MessageDispatch callbacks so that message references get @@ -1063,14 +1069,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor { for (TransportConnectionState cs : connectionStates) { cs.getContext().getStopping().set(true); try { - LOG.debug("Cleaning up connection resources: " + getRemoteAddress()); + LOG.debug("Cleaning up connection resources: {}", getRemoteAddress()); processRemoveConnection(cs.getInfo().getConnectionId(), 0l); } catch (Throwable ignore) { ignore.printStackTrace(); } } } - LOG.debug("Connection Stopped: " + getRemoteAddress()); + LOG.debug("Connection Stopped: {}", getRemoteAddress()); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java index 56b7f573e9..aabfe40556 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -32,7 +32,6 @@ import org.apache.activemq.broker.region.ConnectorStatistics; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.security.MessageAuthorizationPolicy; -import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportAcceptListener; @@ -220,7 +219,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { getServer().setAcceptListener(new TransportAcceptListener() { public void onAccept(final Transport transport) { try { - DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { + brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { try { Connection connection = createConnection(transport); @@ -310,8 +309,10 @@ public class TransportConnector implements Connector, BrokerServiceAware { // Implementation methods // ------------------------------------------------------------------------- protected Connection createConnection(Transport transport) throws IOException { + // prefer to use task runner from broker service as stop task runner, as we can then + // tie it to the lifecycle of the broker service TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null - : taskRunnerFactory); + : taskRunnerFactory, brokerService.getTaskRunnerFactory()); boolean statEnabled = this.getStatistics().isEnabled(); answer.getStatistics().setEnabled(statEnabled); answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java index 5165f164ee..d39aff04a8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java @@ -49,9 +49,10 @@ public class ManagedTransportConnection extends TransportConnection { private final boolean populateUserName; public ManagedTransportConnection(TransportConnector connector, Transport transport, Broker broker, - TaskRunnerFactory factory, ManagementContext context, ObjectName connectorName) + TaskRunnerFactory factory, TaskRunnerFactory stopFactory, + ManagementContext context, ObjectName connectorName) throws IOException { - super(connector, transport, broker, factory); + super(connector, transport, broker, factory, stopFactory); this.managementContext = context; this.connectorName = connectorName; this.mbean = new ConnectionView(this, managementContext); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java index eb074cb00c..00d07dd0a4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java @@ -49,7 +49,10 @@ public class ManagedTransportConnector extends TransportConnector { } protected Connection createConnection(Transport transport) throws IOException { - return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), managementContext, connectorName); + // prefer to use task runner from broker service as stop task runner, as we can then + // tie it to the lifecycle of the broker service + return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), + getBrokerService().getTaskRunnerFactory(), managementContext, connectorName); } protected static synchronized long getNextConnectionId() { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index dbb60d0a4c..1e12441609 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -46,9 +46,9 @@ import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; import org.apache.activemq.state.ConnectionStateTracker; import org.apache.activemq.state.Tracked; -import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; +import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.CompositeTransport; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; @@ -86,6 +86,7 @@ public class FailoverTransport implements CompositeTransport { private URI connectedTransportURI; private URI failedConnectTransportURI; private final AtomicReference connectedTransport = new AtomicReference(); + private final TaskRunnerFactory reconnectTaskFactory; private final TaskRunner reconnectTask; private boolean started; private boolean initialized; @@ -128,7 +129,9 @@ public class FailoverTransport implements CompositeTransport { brokerSslContext = SslContext.getCurrentSslContext(); stateTracker.setTrackTransactions(true); // Setup a task that is used to reconnect the a connection async. - reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { + reconnectTaskFactory = new TaskRunnerFactory(); + reconnectTaskFactory.init(); + reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() { public boolean iterate() { boolean result = false; if (!started) { @@ -345,26 +348,31 @@ public class FailoverTransport implements CompositeTransport { Transport transportToStop = null; List backupsToStop = new ArrayList(backups.size()); - synchronized (reconnectMutex) { - if (LOG.isDebugEnabled()) { - LOG.debug("Stopped " + this); - } - if (!started) { - return; - } - started = false; - disposed = true; - connected = false; + try { + synchronized (reconnectMutex) { + if (LOG.isDebugEnabled()) { + LOG.debug("Stopped " + this); + } + if (!started) { + return; + } + started = false; + disposed = true; + connected = false; - if (connectedTransport.get() != null) { - transportToStop = connectedTransport.getAndSet(null); + if (connectedTransport.get() != null) { + transportToStop = connectedTransport.getAndSet(null); + } + reconnectMutex.notifyAll(); } - reconnectMutex.notifyAll(); + synchronized (sleepMutex) { + sleepMutex.notifyAll(); + } + } finally { + reconnectTask.shutdown(); + reconnectTaskFactory.shutdownNow(); } - synchronized (sleepMutex) { - sleepMutex.notifyAll(); - } - reconnectTask.shutdown(); + synchronized(backupMutex) { for (BackupTransport backup : backups) { backup.setDisposed(true); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java index a9b9aad461..0921ec009a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java @@ -30,9 +30,9 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; import org.apache.activemq.state.ConnectionStateTracker; -import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; +import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.CompositeTransport; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; @@ -63,6 +63,7 @@ public class FanoutTransport implements CompositeTransport { private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); private final ConcurrentHashMap requestMap = new ConcurrentHashMap(); + private final TaskRunnerFactory reconnectTaskFactory; private final TaskRunner reconnectTask; private boolean started; @@ -157,7 +158,9 @@ public class FanoutTransport implements CompositeTransport { public FanoutTransport() throws InterruptedIOException { // Setup a task that is used to reconnect the a connection async. - reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { + reconnectTaskFactory = new TaskRunnerFactory(); + reconnectTaskFactory.init(); + reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() { public boolean iterate() { return doConnect(); } @@ -291,27 +294,31 @@ public class FanoutTransport implements CompositeTransport { } public void stop() throws Exception { - synchronized (reconnectMutex) { - ServiceStopper ss = new ServiceStopper(); + try { + synchronized (reconnectMutex) { + ServiceStopper ss = new ServiceStopper(); - if (!started) { - return; - } - started = false; - disposed = true; - connected=false; - - for (Iterator iter = transports.iterator(); iter.hasNext();) { - FanoutTransportHandler th = iter.next(); - if (th.transport != null) { - ss.stop(th.transport); + if (!started) { + return; } - } + started = false; + disposed = true; + connected=false; - LOG.debug("Stopped: " + this); - ss.throwFirstException(); + for (Iterator iter = transports.iterator(); iter.hasNext();) { + FanoutTransportHandler th = iter.next(); + if (th.transport != null) { + ss.stop(th.transport); + } + } + + LOG.debug("Stopped: " + this); + ss.throwFirstException(); + } + } finally { + reconnectTask.shutdown(); + reconnectTaskFactory.shutdownNow(); } - reconnectTask.shutdown(); } public int getMinAckCount() { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java index 1147e6665b..280fd81f64 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java @@ -37,7 +37,7 @@ import javax.net.ssl.SSLSession; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.thread.DefaultThreadPools; +import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.wireformat.WireFormat; @@ -52,9 +52,10 @@ public class NIOSSLTransport extends NIOTransport { protected SSLEngine sslEngine; protected SSLSession sslSession; - protected boolean handshakeInProgress = false; + protected volatile boolean handshakeInProgress = false; protected SSLEngineResult.Status status = null; protected SSLEngineResult.HandshakeStatus handshakeStatus = null; + protected TaskRunnerFactory taskRunnerFactory; public NIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); @@ -259,7 +260,7 @@ public class NIOSSLTransport extends NIOTransport { case NEED_TASK: Runnable task; while ((task = sslEngine.getDelegatedTask()) != null) { - DefaultThreadPools.getDefaultTaskRunnerFactory().execute(task); + taskRunnerFactory.execute(task); } break; case NEED_WRAP: @@ -273,8 +274,19 @@ public class NIOSSLTransport extends NIOTransport { } } + @Override + protected void doStart() throws Exception { + taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport Task"); + // no need to init as we can delay that until demand (eg in doHandshake) + super.doStart(); + } + @Override protected void doStop(ServiceStopper stopper) throws Exception { + if (taskRunnerFactory != null) { + taskRunnerFactory.shutdownNow(); + taskRunnerFactory = null; + } if (channel != null) { channel.close(); channel = null; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index 55958ac360..fe59b961c1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.net.SocketFactory; import org.apache.activemq.Service; -import org.apache.activemq.thread.DefaultThreadPools; +import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportLoggerFactory; import org.apache.activemq.transport.TransportThreadSupport; @@ -536,13 +536,17 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S //closing the socket can hang also final CountDownLatch latch = new CountDownLatch(1); - DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { + // need a async task for this + final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory(); + taskRunnerFactory.execute(new Runnable() { public void run() { + LOG.trace("Closing socket {}", socket); try { socket.close(); + LOG.debug("Closed socket {}", socket); } catch (IOException e) { if (LOG.isDebugEnabled()) { - LOG.debug("Caught exception closing socket", e); + LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e); } } finally { latch.countDown(); @@ -554,14 +558,20 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S latch.await(1,TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + } finally { + taskRunnerFactory.shutdownNow(); } } else { - + // close synchronously + LOG.trace("Closing socket {}", socket); try { socket.close(); + LOG.debug("Closed socket {}", socket); } catch (IOException e) { - LOG.debug("Caught exception closing socket",e); + if (LOG.isDebugEnabled()) { + LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e); + } } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 74c44199ed..6916a989c4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -26,9 +26,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.command.ShutdownInfo; -import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; +import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; @@ -55,6 +55,7 @@ public class VMTransport implements Transport, Task { // Implementation private LinkedBlockingQueue messageQueue; + private TaskRunnerFactory taskRunnerFactory; private TaskRunner taskRunner; // Transport State @@ -188,6 +189,7 @@ public class VMTransport implements Transport, Task { tr.shutdown(TimeUnit.SECONDS.toMillis(1)); } catch(Exception e) { } + taskRunner = null; } // let the peer know that we are disconnecting after attempting @@ -197,6 +199,12 @@ public class VMTransport implements Transport, Task { peer.transportListener.onCommand(new ShutdownInfo()); } catch (Exception ignore) { } + + // shutdown task runner factory + if (taskRunnerFactory != null) { + taskRunnerFactory.shutdownNow(); + taskRunnerFactory = null; + } } } @@ -280,7 +288,11 @@ public class VMTransport implements Transport, Task { throw new TransportDisposedIOException("The Transport has been disposed"); } - taskRunner = result = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString()); + if (taskRunnerFactory == null) { + taskRunnerFactory = new TaskRunnerFactory("ActiveMQ VMTransport: " + toString()); + taskRunnerFactory.init(); + } + taskRunner = result = taskRunnerFactory.createTaskRunner(this, "VMTransport: " + toString()); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java b/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java index 6f1f5bdd55..1a3dc340de 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java @@ -30,7 +30,7 @@ public final class ThreadPoolUtils { private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtils.class); - public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 30 * 1000L; + public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 10 * 1000L; /** * Shutdown the given executor service only (ie not graceful shutdown). @@ -38,7 +38,7 @@ public final class ThreadPoolUtils { * @see java.util.concurrent.ExecutorService#shutdown() */ public static void shutdown(ExecutorService executorService) { - doShutdown(executorService, -1, true); + doShutdown(executorService, 0); } /** @@ -70,7 +70,7 @@ public final class ThreadPoolUtils { * with a timeout value of {@link #DEFAULT_SHUTDOWN_AWAIT_TERMINATION} millis. */ public static void shutdownGraceful(ExecutorService executorService) { - doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION, false); + doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION); } /** @@ -83,62 +83,49 @@ public final class ThreadPoolUtils { * forces a shutdown. The parameter shutdownAwaitTermination * is used as timeout value waiting for orderly shutdown to * complete normally, before going aggressively. - *

- * Notice if the given parameter shutdownAwaitTermination is negative, then a quick shutdown - * is commenced, by invoking the {@link java.util.concurrent.ExecutorService#shutdown()} method - * and then exit from this method (ie. no graceful shutdown is performed). * * @param executorService the executor service to shutdown - * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown, if the value if negative - * then the thread pool is not graceful shutdown, but a regular shutdown - * is commenced. + * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown */ public static void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) { - doShutdown(executorService, shutdownAwaitTermination, false); + doShutdown(executorService, shutdownAwaitTermination); } - private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean quick) { + private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination) { // code from Apache Camel - org.apache.camel.impl.DefaultExecutorServiceManager if (executorService == null) { return; } - if (quick) { - // do not shutdown graceful, but just quick shutdown on the thread pool - executorService.shutdown(); - LOG.debug("Quick shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.", - new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()}); - return; - } - - if (shutdownAwaitTermination <= 0) { - throw new IllegalArgumentException("ShutdownAwaitTermination must be a positive number, was: " + shutdownAwaitTermination); - } - // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively // and try shutting down again. In both cases we wait at most the given shutdown timeout value given - // (total wait could then be 2 x shutdownAwaitTermination) - boolean warned = false; - StopWatch watch = new StopWatch(); + // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus + // we ought to shutdown much faster) if (!executorService.isShutdown()) { + boolean warned = false; + StopWatch watch = new StopWatch(); + LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination); executorService.shutdown(); - try { - if (!awaitTermination(executorService, shutdownAwaitTermination)) { - warned = true; - LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService); - executorService.shutdownNow(); - // we are now shutting down aggressively, so wait to see if we can completely shutdown or not + + if (shutdownAwaitTermination > 0) { + try { if (!awaitTermination(executorService, shutdownAwaitTermination)) { - LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService); + warned = true; + LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService); + executorService.shutdownNow(); + // we are now shutting down aggressively, so wait to see if we can completely shutdown or not + if (!awaitTermination(executorService, shutdownAwaitTermination)) { + LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService); + } } + } catch (InterruptedException e) { + warned = true; + LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService); + // we were interrupted during shutdown, so force shutdown + executorService.shutdownNow(); } - } catch (InterruptedException e) { - warned = true; - LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService); - // we were interrupted during shutdown, so force shutdown - executorService.shutdownNow(); } // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log @@ -155,8 +142,8 @@ public final class ThreadPoolUtils { /** * Awaits the termination of the thread pool. *

- * This implementation will log every 5th second at INFO level that we are waiting, so the end user - * can see we are not hanging in case it takes longer time to shutdown the pool. + * This implementation will log every 2nd second at INFO level that we are waiting, so the end user + * can see we are not hanging in case it takes longer time to terminate the pool. * * @param executorService the thread pool * @param shutdownAwaitTermination time in millis to use as timeout @@ -166,15 +153,15 @@ public final class ThreadPoolUtils { public static boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException { // log progress every 5th second so end user is aware of we are shutting down StopWatch watch = new StopWatch(); - long interval = Math.min(5000, shutdownAwaitTermination); + long interval = Math.min(2000, shutdownAwaitTermination); boolean done = false; while (!done && interval > 0) { if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) { done = true; } else { - LOG.info("Waited {} for ExecutorService: {} to shutdown...", TimeUtils.printDuration(watch.taken()), executorService); + LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService); // recalculate interval - interval = Math.min(5000, shutdownAwaitTermination - watch.taken()); + interval = Math.min(2000, shutdownAwaitTermination - watch.taken()); } }