AMQ-4026: Using ThreadPoolUtils to shutdown thread pool. Use thread pool from broker service where applicable.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1381695 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Claus Ibsen 2012-09-06 17:50:43 +00:00
parent b32dbb3f47
commit 6c1676b59f
13 changed files with 148 additions and 71 deletions

View File

@ -103,6 +103,7 @@ import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -692,10 +693,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} finally {
try {
if (executor != null) {
executor.shutdown();
ThreadPoolUtils.shutdown(executor);
}
} catch (Throwable e) {
LOG.error("Error shutting down thread pool " + e, e);
LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
}
ServiceSupport.dispose(this.transport);

View File

@ -26,7 +26,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -61,6 +60,7 @@ import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -769,12 +769,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
if (executorService != null) {
executorService.shutdown();
try {
executorService.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
ThreadPoolUtils.shutdownGraceful(executorService, 60000L);
executorService = null;
}
if (session.isClientAcknowledge()) {

View File

@ -111,6 +111,7 @@ import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -769,7 +770,7 @@ public class BrokerService implements Service {
this.taskRunnerFactory = null;
}
if (this.executor != null) {
this.executor.shutdownNow();
ThreadPoolUtils.shutdownNow(executor);
this.executor = null;
}
@ -2410,8 +2411,7 @@ public class BrokerService implements Service {
}
if (networkConnectorStartExecutor != null) {
// executor done when enqueued tasks are complete
networkConnectorStartExecutor.shutdown();
networkConnectorStartExecutor = null;
ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
}
for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
@ -2755,7 +2755,7 @@ public class BrokerService implements Service {
/**
* Sets whether Authenticated User Name information is shown in MBeans that support this field.
* @param true if MBeans should expose user name information.
* @param value if MBeans should expose user name information.
*/
public void setPopulateUserNameInMBeans(boolean value) {
this.populateUserNameInMBeans = value;

View File

@ -86,6 +86,7 @@ import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@ -883,7 +884,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
taskRunner.shutdown();
}
if (this.executor != null) {
this.executor.shutdownNow();
ThreadPoolUtils.shutdownNow(executor);
executor = null;
}
scheduler.cancel(expireMessagesTask);

View File

@ -70,7 +70,6 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
@ -92,7 +91,6 @@ import org.slf4j.LoggerFactory;
*/
public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
private TaskRunnerFactory asyncTaskRunner;
protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
protected final Transport localBroker;
protected final Transport remoteBroker;
@ -156,8 +154,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
asyncTaskRunner = new TaskRunnerFactory("ActiveMQ ForwardingBridge Task");
asyncTaskRunner.init();
if (brokerService == null) {
throw new IllegalArgumentException("BrokerService is null on " + this);
}
localBroker.setTransportListener(new DefaultTransportListener() {
@ -201,7 +201,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
protected void triggerLocalStartBridge() throws IOException {
asyncTaskRunner.execute(new Runnable() {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
@ -217,7 +217,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
protected void triggerRemoteStartBridge() throws IOException {
asyncTaskRunner.execute(new Runnable() {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
@ -350,7 +350,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
try {
remoteBridgeStarted.set(false);
final CountDownLatch sendShutdown = new CountDownLatch(1);
asyncTaskRunner.execute(new Runnable() {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
try {
localBroker.oneway(new ShutdownInfo());
@ -363,7 +364,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
});
}, "ActiveMQ ForwardingBridge StopTask");
if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
LOG.info("Network Could not shutdown in a timely manner");
}
@ -377,9 +379,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
startedLatch.countDown();
localStartedLatch.countDown();
// stop task runner
asyncTaskRunner.shutdown();
asyncTaskRunner = null;
ss.throwFirstException();
}
}
@ -399,7 +398,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
}
LOG.debug("The remote Exception was: " + error, error);
asyncTaskRunner.execute(new Runnable() {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
@ -632,7 +631,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (!disposed.get()) {
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
LOG.debug("The local Exception was:" + error, error);
asyncTaskRunner.execute(new Runnable() {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
@ -660,7 +659,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
// continue removal in separate thread to free up this thread for outstanding responses
asyncTaskRunner.execute(new Runnable() {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
sub.waitForCompletion();
try {

View File

@ -35,6 +35,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jndi.JndiTemplate;
@ -166,7 +167,8 @@ public abstract class JmsConnector implements Service {
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
this.connectionSerivce.shutdown();
ThreadPoolUtils.shutdown(connectionSerivce);
connectionSerivce = null;
for (DestinationBridge bridge : inboundBridges) {
bridge.stop();

View File

@ -69,6 +69,7 @@ import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -287,7 +288,8 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true, true);
checkpointTask.shutdown();
checkpointExecutor.shutdown();
ThreadPoolUtils.shutdown(checkpointExecutor);
checkpointExecutor = null;
queues.clear();
topics.clear();

View File

@ -64,6 +64,7 @@ import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -237,10 +238,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
this.globalTopicSemaphore.drainPermits();
}
if (this.queueExecutor != null) {
this.queueExecutor.shutdownNow();
ThreadPoolUtils.shutdownNow(queueExecutor);
queueExecutor = null;
}
if (this.topicExecutor != null) {
this.topicExecutor.shutdownNow();
ThreadPoolUtils.shutdownNow(topicExecutor);
topicExecutor = null;
}
LOG.info("Stopped KahaDB");
super.doStop(stopper);

View File

@ -92,9 +92,40 @@ public class TaskRunnerFactory implements Executor {
}
}
/**
* Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively.
*
* @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService)
*/
public void shutdown() {
if (executor != null) {
ThreadPoolUtils.shutdown(executor, shutdownAwaitTermination);
ThreadPoolUtils.shutdown(executor);
executor = null;
}
initDone.set(false);
}
/**
* Performs a shutdown now (aggressively) on the thread pool.
*
* @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService)
*/
public void shutdownNow() {
if (executor != null) {
ThreadPoolUtils.shutdownNow(executor);
executor = null;
}
initDone.set(false);
}
/**
* Performs a graceful shutdown.
*
* @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService)
*/
public void shutdownGraceful() {
if (executor != null) {
ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination);
executor = null;
}
initDone.set(false);
@ -119,10 +150,19 @@ public class TaskRunnerFactory implements Executor {
if (executor != null) {
executor.execute(runnable);
} else {
new Thread(runnable, name + "-" + id.incrementAndGet()).start();
doExecuteNewThread(runnable, name);
}
}
private void doExecuteNewThread(Runnable runnable, String name) {
String threadName = name + "-" + id.incrementAndGet();
Thread thread = new Thread(runnable, threadName);
thread.setDaemon(daemon);
LOG.trace("Created and running thread[{}]: {}", threadName, thread);
thread.start();
}
protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {

View File

@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -364,7 +365,7 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
READ_CHECK_TIMER.cancel();
WRITE_CHECK_TIMER = null;
READ_CHECK_TIMER = null;
ASYNC_TASKS.shutdown();
ThreadPoolUtils.shutdown(ASYNC_TASKS);
ASYNC_TASKS = null;
}
}

View File

@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryListener;
import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -348,7 +349,10 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
if (runner != null) {
runner.interrupt();
}
getExecutor().shutdownNow();
if (executor != null) {
ThreadPoolUtils.shutdownNow(executor);
executor = null;
}
}
}

View File

@ -33,6 +33,7 @@ import org.apache.activemq.transport.AbstractInactivityMonitor;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -276,7 +277,7 @@ public class MQTTInactivityMonitor extends TransportFilter {
if (CHECKER_COUNTER == 0) {
READ_CHECK_TIMER.cancel();
READ_CHECK_TIMER = null;
ASYNC_TASKS.shutdown();
ThreadPoolUtils.shutdown(ASYNC_TASKS);
ASYNC_TASKS = null;
}
}

View File

@ -24,27 +24,53 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* Utility methods for working with thread pools {@link ExecutorService}.
*/
public class ThreadPoolUtils {
public final class ThreadPoolUtils {
private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtils.class);
// TODO: Should be 30 sec
// but lowered due some unit tests dont yet properly shutdown, so want to run these a bit faster
public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 10 * 1000L;
public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 30 * 1000L;
/**
* Shutdown the given executor service only (ie not graceful shutdown).
*
* @see java.util.concurrent.ExecutorService#shutdown()
*/
public static void shutdown(ExecutorService executorService) {
doShutdown(executorService, -1, true);
}
/**
* Shutdown now the given executor service aggressively.
*
* @param executorService the executor service to shutdown now
* @return list of tasks that never commenced execution
* @see java.util.concurrent.ExecutorService#shutdownNow()
*/
public static List<Runnable> shutdownNow(ExecutorService executorService) {
List<Runnable> answer = null;
if (!executorService.isShutdown()) {
LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
answer = executorService.shutdownNow();
if (LOG.isTraceEnabled()) {
LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
}
}
return answer;
}
/**
* Shutdown the given executor service graceful at first, and then aggressively
* if the await termination timeout was hit.
* <p/>
* This implementation invokes the {@link #shutdown(java.util.concurrent.ExecutorService, long)}
* This implementation invokes the {@link #shutdownGraceful(java.util.concurrent.ExecutorService, long)}
* with a timeout value of {@link #DEFAULT_SHUTDOWN_AWAIT_TERMINATION} millis.
*
* @see #shutdown(java.util.concurrent.ExecutorService, long)
*/
public void shutdown(ExecutorService executorService) {
shutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION);
public static void shutdownGraceful(ExecutorService executorService) {
doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION, false);
}
/**
@ -57,14 +83,35 @@ public class ThreadPoolUtils {
* forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
* is used as timeout value waiting for orderly shutdown to
* complete normally, before going aggressively.
* <p/>
* Notice if the given parameter <tt>shutdownAwaitTermination</tt> is negative, then a quick shutdown
* is commenced, by invoking the {@link java.util.concurrent.ExecutorService#shutdown()} method
* and then exit from this method (ie. no graceful shutdown is performed).
*
* @param executorService the executor service to shutdown
* @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
* @see java.util.concurrent.ExecutorService#shutdown()
* @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown, if the value if negative
* then the thread pool is <b>not</b> graceful shutdown, but a regular shutdown
* is commenced.
*/
public static void shutdown(ExecutorService executorService, long shutdownAwaitTermination) {
public static void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) {
doShutdown(executorService, shutdownAwaitTermination, false);
}
private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean quick) {
// code from Apache Camel - org.apache.camel.impl.DefaultExecutorServiceManager
if (executorService == null) {
return;
}
if (quick) {
// do not shutdown graceful, but just quick shutdown on the thread pool
executorService.shutdown();
LOG.debug("Quick shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
return;
}
if (shutdownAwaitTermination <= 0) {
throw new IllegalArgumentException("ShutdownAwaitTermination must be a positive number, was: " + shutdownAwaitTermination);
}
@ -105,27 +152,6 @@ public class ThreadPoolUtils {
}
}
/**
* Shutdown now the given executor service aggressively.
*
* @param executorService the executor service to shutdown now
* @return list of tasks that never commenced execution
* @see java.util.concurrent.ExecutorService#shutdownNow()
*/
public static List<Runnable> shutdownNow(ExecutorService executorService) {
List<Runnable> answer = null;
if (!executorService.isShutdown()) {
LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
answer = executorService.shutdownNow();
if (LOG.isTraceEnabled()) {
LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
}
}
return answer;
}
/**
* Awaits the termination of the thread pool.
* <p/>