From 6c1676b59fcd7d46ed4ea9c5c1d6ef77f9bbd779 Mon Sep 17 00:00:00 2001 From: Claus Ibsen Date: Thu, 6 Sep 2012 17:50:43 +0000 Subject: [PATCH] AMQ-4026: Using ThreadPoolUtils to shutdown thread pool. Use thread pool from broker service where applicable. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1381695 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 5 +- .../activemq/ActiveMQMessageConsumer.java | 10 +- .../apache/activemq/broker/BrokerService.java | 8 +- .../apache/activemq/broker/region/Queue.java | 4 +- .../DemandForwardingBridgeSupport.java | 27 +++--- .../activemq/network/jms/JmsConnector.java | 4 +- .../journal/JournalPersistenceAdapter.java | 4 +- .../activemq/store/kahadb/KahaDBStore.java | 7 +- .../activemq/thread/TaskRunnerFactory.java | 44 ++++++++- .../transport/AbstractInactivityMonitor.java | 3 +- .../multicast/MulticastDiscoveryAgent.java | 6 +- .../transport/mqtt/MQTTInactivityMonitor.java | 3 +- .../apache/activemq/util/ThreadPoolUtils.java | 94 ++++++++++++------- 13 files changed, 148 insertions(+), 71 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 8a7e1d41e9..eaa8d07265 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -103,6 +103,7 @@ import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.ServiceSupport; +import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -692,10 +693,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } finally { try { if (executor != null) { - executor.shutdown(); + ThreadPoolUtils.shutdown(executor); } } catch (Throwable e) { - LOG.error("Error shutting down thread pool " + e, e); + LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e); } ServiceSupport.dispose(this.transport); diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 9403fda77c..46b19aa4a6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -61,6 +60,7 @@ import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.util.Callback; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -769,12 +769,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } if (executorService != null) { - executorService.shutdown(); - try { - executorService.awaitTermination(60, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + ThreadPoolUtils.shutdownGraceful(executorService, 60000L); + executorService = null; } if (session.isClientAcknowledge()) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 113788b6d4..3658e0445e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -111,6 +111,7 @@ import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.InetAddressUtil; import org.apache.activemq.util.JMXSupport; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ThreadPoolUtils; import org.apache.activemq.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -769,7 +770,7 @@ public class BrokerService implements Service { this.taskRunnerFactory = null; } if (this.executor != null) { - this.executor.shutdownNow(); + ThreadPoolUtils.shutdownNow(executor); this.executor = null; } @@ -2410,8 +2411,7 @@ public class BrokerService implements Service { } if (networkConnectorStartExecutor != null) { // executor done when enqueued tasks are complete - networkConnectorStartExecutor.shutdown(); - networkConnectorStartExecutor = null; + ThreadPoolUtils.shutdown(networkConnectorStartExecutor); } for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) { @@ -2755,7 +2755,7 @@ public class BrokerService implements Service { /** * Sets whether Authenticated User Name information is shown in MBeans that support this field. - * @param true if MBeans should expose user name information. + * @param value if MBeans should expose user name information. */ public void setPopulateUserNameInMBeans(boolean value) { this.populateUserNameInMBeans = value; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 4e30d35889..cd8804857e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -86,6 +86,7 @@ import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; import org.apache.activemq.util.BrokerSupport; +import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -883,7 +884,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { taskRunner.shutdown(); } if (this.executor != null) { - this.executor.shutdownNow(); + ThreadPoolUtils.shutdownNow(executor); + executor = null; } scheduler.cancel(expireMessagesTask); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 92db402bf8..457711c160 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -70,7 +70,6 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.MessageEvaluationContext; -import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.ResponseCallback; @@ -92,7 +91,6 @@ import org.slf4j.LoggerFactory; */ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); - private TaskRunnerFactory asyncTaskRunner; protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; protected final Transport localBroker; protected final Transport remoteBroker; @@ -156,8 +154,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br public void start() throws Exception { if (started.compareAndSet(false, true)) { - asyncTaskRunner = new TaskRunnerFactory("ActiveMQ ForwardingBridge Task"); - asyncTaskRunner.init(); + + if (brokerService == null) { + throw new IllegalArgumentException("BrokerService is null on " + this); + } localBroker.setTransportListener(new DefaultTransportListener() { @@ -201,7 +201,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } protected void triggerLocalStartBridge() throws IOException { - asyncTaskRunner.execute(new Runnable() { + brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { final String originalName = Thread.currentThread().getName(); Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker); @@ -217,7 +217,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } protected void triggerRemoteStartBridge() throws IOException { - asyncTaskRunner.execute(new Runnable() { + brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { final String originalName = Thread.currentThread().getName(); Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker); @@ -350,7 +350,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br try { remoteBridgeStarted.set(false); final CountDownLatch sendShutdown = new CountDownLatch(1); - asyncTaskRunner.execute(new Runnable() { + + brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { try { localBroker.oneway(new ShutdownInfo()); @@ -363,7 +364,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } - }); + }, "ActiveMQ ForwardingBridge StopTask"); + if (!sendShutdown.await(10, TimeUnit.SECONDS)) { LOG.info("Network Could not shutdown in a timely manner"); } @@ -377,9 +379,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br startedLatch.countDown(); localStartedLatch.countDown(); - // stop task runner - asyncTaskRunner.shutdown(); - asyncTaskRunner = null; ss.throwFirstException(); } } @@ -399,7 +398,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); } LOG.debug("The remote Exception was: " + error, error); - asyncTaskRunner.execute(new Runnable() { + brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { ServiceSupport.dispose(getControllingService()); } @@ -632,7 +631,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (!disposed.get()) { LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error); LOG.debug("The local Exception was:" + error, error); - asyncTaskRunner.execute(new Runnable() { + brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { ServiceSupport.dispose(getControllingService()); } @@ -660,7 +659,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); // continue removal in separate thread to free up this thread for outstanding responses - asyncTaskRunner.execute(new Runnable() { + brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { sub.waitForCompletion(); try { diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java index ee1c20c700..63aef74fbe 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java @@ -35,6 +35,7 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.Service; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.util.LRUCache; +import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jndi.JndiTemplate; @@ -166,7 +167,8 @@ public abstract class JmsConnector implements Service { public void stop() throws Exception { if (started.compareAndSet(true, false)) { - this.connectionSerivce.shutdown(); + ThreadPoolUtils.shutdown(connectionSerivce); + connectionSerivce = null; for (DestinationBridge bridge : inboundBridges) { bridge.stop(); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index ac74f7baff..ca549ca174 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -69,6 +69,7 @@ import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ThreadPoolUtils; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -287,7 +288,8 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve // Take one final checkpoint and stop checkpoint processing. checkpoint(true, true); checkpointTask.shutdown(); - checkpointExecutor.shutdown(); + ThreadPoolUtils.shutdown(checkpointExecutor); + checkpointExecutor = null; queues.clear(); topics.clear(); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 9f8b52ae96..eccf8a71a5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -64,6 +64,7 @@ import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ThreadPoolUtils; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -237,10 +238,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { this.globalTopicSemaphore.drainPermits(); } if (this.queueExecutor != null) { - this.queueExecutor.shutdownNow(); + ThreadPoolUtils.shutdownNow(queueExecutor); + queueExecutor = null; } if (this.topicExecutor != null) { - this.topicExecutor.shutdownNow(); + ThreadPoolUtils.shutdownNow(topicExecutor); + topicExecutor = null; } LOG.info("Stopped KahaDB"); super.doStop(stopper); diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java index b5a21fe836..1a0358c12b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java @@ -92,9 +92,40 @@ public class TaskRunnerFactory implements Executor { } } + /** + * Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively. + * + * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService) + */ public void shutdown() { if (executor != null) { - ThreadPoolUtils.shutdown(executor, shutdownAwaitTermination); + ThreadPoolUtils.shutdown(executor); + executor = null; + } + initDone.set(false); + } + + /** + * Performs a shutdown now (aggressively) on the thread pool. + * + * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService) + */ + public void shutdownNow() { + if (executor != null) { + ThreadPoolUtils.shutdownNow(executor); + executor = null; + } + initDone.set(false); + } + + /** + * Performs a graceful shutdown. + * + * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService) + */ + public void shutdownGraceful() { + if (executor != null) { + ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination); executor = null; } initDone.set(false); @@ -119,10 +150,19 @@ public class TaskRunnerFactory implements Executor { if (executor != null) { executor.execute(runnable); } else { - new Thread(runnable, name + "-" + id.incrementAndGet()).start(); + doExecuteNewThread(runnable, name); } } + private void doExecuteNewThread(Runnable runnable, String name) { + String threadName = name + "-" + id.incrementAndGet(); + Thread thread = new Thread(runnable, threadName); + thread.setDaemon(daemon); + + LOG.trace("Created and running thread[{}]: {}", threadName, thread); + thread.start(); + } + protected ExecutorService createDefaultExecutor() { ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), 30, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java index 342147e797..89e29f94dc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.thread.SchedulerTimerTask; +import org.apache.activemq.util.ThreadPoolUtils; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -364,7 +365,7 @@ public abstract class AbstractInactivityMonitor extends TransportFilter { READ_CHECK_TIMER.cancel(); WRITE_CHECK_TIMER = null; READ_CHECK_TIMER = null; - ASYNC_TASKS.shutdown(); + ThreadPoolUtils.shutdown(ASYNC_TASKS); ASYNC_TASKS = null; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java index 8b80c7fdbd..058130dc7c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.transport.discovery.DiscoveryAgent; import org.apache.activemq.transport.discovery.DiscoveryListener; +import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -348,7 +349,10 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { if (runner != null) { runner.interrupt(); } - getExecutor().shutdownNow(); + if (executor != null) { + ThreadPoolUtils.shutdownNow(executor); + executor = null; + } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java index 4f11ec5984..7a1e48f565 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java @@ -33,6 +33,7 @@ import org.apache.activemq.transport.AbstractInactivityMonitor; import org.apache.activemq.transport.InactivityIOException; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.util.ThreadPoolUtils; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -276,7 +277,7 @@ public class MQTTInactivityMonitor extends TransportFilter { if (CHECKER_COUNTER == 0) { READ_CHECK_TIMER.cancel(); READ_CHECK_TIMER = null; - ASYNC_TASKS.shutdown(); + ThreadPoolUtils.shutdown(ASYNC_TASKS); ASYNC_TASKS = null; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java b/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java index c1280019f9..6f1f5bdd55 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java @@ -24,27 +24,53 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * Utility methods for working with thread pools {@link ExecutorService}. */ -public class ThreadPoolUtils { +public final class ThreadPoolUtils { private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtils.class); - // TODO: Should be 30 sec - // but lowered due some unit tests dont yet properly shutdown, so want to run these a bit faster - public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 10 * 1000L; + public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 30 * 1000L; + + /** + * Shutdown the given executor service only (ie not graceful shutdown). + * + * @see java.util.concurrent.ExecutorService#shutdown() + */ + public static void shutdown(ExecutorService executorService) { + doShutdown(executorService, -1, true); + } + + /** + * Shutdown now the given executor service aggressively. + * + * @param executorService the executor service to shutdown now + * @return list of tasks that never commenced execution + * @see java.util.concurrent.ExecutorService#shutdownNow() + */ + public static List shutdownNow(ExecutorService executorService) { + List answer = null; + if (!executorService.isShutdown()) { + LOG.debug("Forcing shutdown of ExecutorService: {}", executorService); + answer = executorService.shutdownNow(); + if (LOG.isTraceEnabled()) { + LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.", + new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()}); + } + } + + return answer; + } /** * Shutdown the given executor service graceful at first, and then aggressively * if the await termination timeout was hit. *

- * This implementation invokes the {@link #shutdown(java.util.concurrent.ExecutorService, long)} + * This implementation invokes the {@link #shutdownGraceful(java.util.concurrent.ExecutorService, long)} * with a timeout value of {@link #DEFAULT_SHUTDOWN_AWAIT_TERMINATION} millis. - * - * @see #shutdown(java.util.concurrent.ExecutorService, long) */ - public void shutdown(ExecutorService executorService) { - shutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION); + public static void shutdownGraceful(ExecutorService executorService) { + doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION, false); } /** @@ -57,14 +83,35 @@ public class ThreadPoolUtils { * forces a shutdown. The parameter shutdownAwaitTermination * is used as timeout value waiting for orderly shutdown to * complete normally, before going aggressively. + *

+ * Notice if the given parameter shutdownAwaitTermination is negative, then a quick shutdown + * is commenced, by invoking the {@link java.util.concurrent.ExecutorService#shutdown()} method + * and then exit from this method (ie. no graceful shutdown is performed). * * @param executorService the executor service to shutdown - * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown - * @see java.util.concurrent.ExecutorService#shutdown() + * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown, if the value if negative + * then the thread pool is not graceful shutdown, but a regular shutdown + * is commenced. */ - public static void shutdown(ExecutorService executorService, long shutdownAwaitTermination) { + public static void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) { + doShutdown(executorService, shutdownAwaitTermination, false); + } + + private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean quick) { // code from Apache Camel - org.apache.camel.impl.DefaultExecutorServiceManager + if (executorService == null) { + return; + } + + if (quick) { + // do not shutdown graceful, but just quick shutdown on the thread pool + executorService.shutdown(); + LOG.debug("Quick shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.", + new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()}); + return; + } + if (shutdownAwaitTermination <= 0) { throw new IllegalArgumentException("ShutdownAwaitTermination must be a positive number, was: " + shutdownAwaitTermination); } @@ -105,27 +152,6 @@ public class ThreadPoolUtils { } } - /** - * Shutdown now the given executor service aggressively. - * - * @param executorService the executor service to shutdown now - * @return list of tasks that never commenced execution - * @see java.util.concurrent.ExecutorService#shutdownNow() - */ - public static List shutdownNow(ExecutorService executorService) { - List answer = null; - if (!executorService.isShutdown()) { - LOG.debug("Forcing shutdown of ExecutorService: {}", executorService); - answer = executorService.shutdownNow(); - if (LOG.isTraceEnabled()) { - LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.", - new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()}); - } - } - - return answer; - } - /** * Awaits the termination of the thread pool. *