AMQ-4026: Introduced ThreadPoolUtils to shutdown thread pools in a more graceful manner. Improved logging for lifecycle of thread pools and tasks being executed.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1381557 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Claus Ibsen 2012-09-06 11:08:11 +00:00
parent 2251ba0796
commit 287f69db4d
7 changed files with 431 additions and 5 deletions

View File

@ -16,11 +16,15 @@
*/
package org.apache.activemq.thread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
class DedicatedTaskRunner implements TaskRunner {
private static final Logger LOG = LoggerFactory.getLogger(DedicatedTaskRunner.class);
private final Task task;
private final Thread thread;
@ -29,11 +33,15 @@ class DedicatedTaskRunner implements TaskRunner {
private boolean pending;
private boolean shutdown;
public DedicatedTaskRunner(Task task, String name, int priority, boolean daemon) {
public DedicatedTaskRunner(final Task task, String name, int priority, boolean daemon) {
this.task = task;
thread = new Thread(name) {
public void run() {
runTask();
try {
runTask();
} finally {
LOG.trace("Run task done: {}", task);
}
}
};
thread.setDaemon(daemon);
@ -61,6 +69,7 @@ class DedicatedTaskRunner implements TaskRunner {
* @throws InterruptedException
*/
public void shutdown(long timeout) throws InterruptedException {
LOG.trace("Shutdown timeout: {} task: {}", task);
synchronized (mutex) {
shutdown = true;
pending = true;
@ -95,6 +104,7 @@ class DedicatedTaskRunner implements TaskRunner {
}
}
LOG.trace("Running task {}", task);
if (!task.iterate()) {
// wait to be notified.
synchronized (mutex) {

View File

@ -18,11 +18,15 @@ package org.apache.activemq.thread;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
class PooledTaskRunner implements TaskRunner {
private static final Logger LOG = LoggerFactory.getLogger(PooledTaskRunner.class);
private final int maxIterationsPerRun;
private final Executor executor;
private final Task task;
@ -32,7 +36,7 @@ class PooledTaskRunner implements TaskRunner {
private boolean iterating;
private volatile Thread runningThread;
public PooledTaskRunner(Executor executor, Task task, int maxIterationsPerRun) {
public PooledTaskRunner(Executor executor, final Task task, int maxIterationsPerRun) {
this.executor = executor;
this.maxIterationsPerRun = maxIterationsPerRun;
this.task = task;
@ -42,6 +46,7 @@ class PooledTaskRunner implements TaskRunner {
try {
runTask();
} finally {
LOG.trace("Run task done: {}", task);
runningThread = null;
}
}
@ -84,6 +89,7 @@ class PooledTaskRunner implements TaskRunner {
* @throws InterruptedException
*/
public void shutdown(long timeout) throws InterruptedException {
LOG.trace("Shutdown timeout: {} task: {}", task);
synchronized (runable) {
shutdown = true;
// the check on the thread is done
@ -119,6 +125,7 @@ class PooledTaskRunner implements TaskRunner {
boolean done = false;
try {
for (int i = 0; i < maxIterationsPerRun; i++) {
LOG.trace("Running task iteration {} - {}", i, task);
if (!task.iterate()) {
done = true;
break;

View File

@ -26,6 +26,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manages the thread pool for long running tasks. Long running tasks are not
* always active but when they are active, they may need a few iterations of
@ -37,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class TaskRunnerFactory implements Executor {
private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerFactory.class);
private ExecutorService executor;
private int maxIterationsPerRun;
private String name;
@ -44,6 +49,7 @@ public class TaskRunnerFactory implements Executor {
private boolean daemon;
private AtomicLong id = new AtomicLong(0);
private boolean dedicatedTaskRunner;
private long shutdownAwaitTermination = 30000;
private AtomicBoolean initDone = new AtomicBoolean(false);
private int maxThreadPoolSize = Integer.MAX_VALUE;
private RejectedExecutionHandler rejectedTaskHandler = null;
@ -82,12 +88,14 @@ public class TaskRunnerFactory implements Executor {
} else if (executor == null) {
executor = createDefaultExecutor();
}
LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executor);
}
}
public void shutdown() {
if (executor != null) {
executor.shutdownNow();
ThreadPoolUtils.shutdown(executor, shutdownAwaitTermination);
executor = null;
}
initDone.set(false);
}
@ -107,6 +115,7 @@ public class TaskRunnerFactory implements Executor {
public void execute(Runnable runnable, String name) {
init();
LOG.trace("Execute[{}] runnable: {}", name, runnable);
if (executor != null) {
executor.execute(runnable);
} else {
@ -117,9 +126,12 @@ public class TaskRunnerFactory implements Executor {
protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet());
String threadName = name + "-" + id.incrementAndGet();
Thread thread = new Thread(runnable, threadName);
thread.setDaemon(daemon);
thread.setPriority(priority);
LOG.trace("Created thread[{}]: {}", threadName, thread);
return thread;
}
});
@ -192,4 +204,13 @@ public class TaskRunnerFactory implements Executor {
public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
this.rejectedTaskHandler = rejectedTaskHandler;
}
public long getShutdownAwaitTermination() {
return shutdownAwaitTermination;
}
public void setShutdownAwaitTermination(long shutdownAwaitTermination) {
this.shutdownAwaitTermination = shutdownAwaitTermination;
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
/**
* A very simple stop watch.
* <p/>
* This implementation is not thread safe and can only time one task at any given time.
*
* @version
*/
public final class StopWatch {
private long start;
private long stop;
/**
* Starts the stop watch
*/
public StopWatch() {
this(true);
}
/**
* Creates the stop watch
*
* @param started whether it should start immediately
*/
public StopWatch(boolean started) {
if (started) {
restart();
}
}
/**
* Starts or restarts the stop watch
*/
public void restart() {
start = System.currentTimeMillis();
stop = 0;
}
/**
* Stops the stop watch
*
* @return the time taken in millis.
*/
public long stop() {
stop = System.currentTimeMillis();
return taken();
}
/**
* Returns the time taken in millis.
*
* @return time in millis
*/
public long taken() {
if (start > 0 && stop > 0) {
return stop - start;
} else if (start > 0) {
return System.currentTimeMillis() - start;
} else {
return 0;
}
}
}

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class ThreadPoolUtils {
private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtils.class);
// TODO: Should be 30 sec
// but lowered due some unit tests dont yet properly shutdown, so want to run these a bit faster
public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 10 * 1000L;
/**
* Shutdown the given executor service graceful at first, and then aggressively
* if the await termination timeout was hit.
* <p/>
* This implementation invokes the {@link #shutdown(java.util.concurrent.ExecutorService, long)}
* with a timeout value of {@link #DEFAULT_SHUTDOWN_AWAIT_TERMINATION} millis.
*
* @see #shutdown(java.util.concurrent.ExecutorService, long)
*/
public void shutdown(ExecutorService executorService) {
shutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION);
}
/**
* Shutdown the given executor service graceful at first, and then aggressively
* if the await termination timeout was hit.
* <p/>
* Will try to perform an orderly shutdown by giving the running threads
* time to complete tasks, before going more aggressively by doing a
* {@link #shutdownNow(java.util.concurrent.ExecutorService)} which
* forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
* is used as timeout value waiting for orderly shutdown to
* complete normally, before going aggressively.
*
* @param executorService the executor service to shutdown
* @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
* @see java.util.concurrent.ExecutorService#shutdown()
*/
public static void shutdown(ExecutorService executorService, long shutdownAwaitTermination) {
// code from Apache Camel - org.apache.camel.impl.DefaultExecutorServiceManager
if (shutdownAwaitTermination <= 0) {
throw new IllegalArgumentException("ShutdownAwaitTermination must be a positive number, was: " + shutdownAwaitTermination);
}
// shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively
// and try shutting down again. In both cases we wait at most the given shutdown timeout value given
// (total wait could then be 2 x shutdownAwaitTermination)
boolean warned = false;
StopWatch watch = new StopWatch();
if (!executorService.isShutdown()) {
LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination);
executorService.shutdown();
try {
if (!awaitTermination(executorService, shutdownAwaitTermination)) {
warned = true;
LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
executorService.shutdownNow();
// we are now shutting down aggressively, so wait to see if we can completely shutdown or not
if (!awaitTermination(executorService, shutdownAwaitTermination)) {
LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService);
}
}
} catch (InterruptedException e) {
warned = true;
LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
// we were interrupted during shutdown, so force shutdown
executorService.shutdownNow();
}
// if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
if (warned) {
LOG.info("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
} else if (LOG.isDebugEnabled()) {
LOG.debug("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
}
}
}
/**
* Shutdown now the given executor service aggressively.
*
* @param executorService the executor service to shutdown now
* @return list of tasks that never commenced execution
* @see java.util.concurrent.ExecutorService#shutdownNow()
*/
public static List<Runnable> shutdownNow(ExecutorService executorService) {
List<Runnable> answer = null;
if (!executorService.isShutdown()) {
LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
answer = executorService.shutdownNow();
if (LOG.isTraceEnabled()) {
LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
}
}
return answer;
}
/**
* Awaits the termination of the thread pool.
* <p/>
* This implementation will log every 5th second at INFO level that we are waiting, so the end user
* can see we are not hanging in case it takes longer time to shutdown the pool.
*
* @param executorService the thread pool
* @param shutdownAwaitTermination time in millis to use as timeout
* @return <tt>true</tt> if the pool is terminated, or <tt>false</tt> if we timed out
* @throws InterruptedException is thrown if we are interrupted during the waiting
*/
public static boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException {
// log progress every 5th second so end user is aware of we are shutting down
StopWatch watch = new StopWatch();
long interval = Math.min(5000, shutdownAwaitTermination);
boolean done = false;
while (!done && interval > 0) {
if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
done = true;
} else {
LOG.info("Waited {} for ExecutorService: {} to shutdown...", TimeUtils.printDuration(watch.taken()), executorService);
// recalculate interval
interval = Math.min(5000, shutdownAwaitTermination - watch.taken());
}
}
return done;
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.text.NumberFormat;
import java.util.Locale;
/**
* Time utils.
*
* @version
*/
public final class TimeUtils {
private TimeUtils() {
}
/**
* Prints the duration in a human readable format as X days Y hours Z minutes etc.
*
* @param uptime the uptime in millis
* @return the time used for displaying on screen or in logs
*/
public static String printDuration(double uptime) {
// Code taken from Karaf
// https://svn.apache.org/repos/asf/karaf/trunk/shell/commands/src/main/java/org/apache/karaf/shell/commands/impl/InfoAction.java
NumberFormat fmtI = new DecimalFormat("###,###", new DecimalFormatSymbols(Locale.ENGLISH));
NumberFormat fmtD = new DecimalFormat("###,##0.000", new DecimalFormatSymbols(Locale.ENGLISH));
uptime /= 1000;
if (uptime < 60) {
return fmtD.format(uptime) + " seconds";
}
uptime /= 60;
if (uptime < 60) {
long minutes = (long) uptime;
String s = fmtI.format(minutes) + (minutes > 1 ? " minutes" : " minute");
return s;
}
uptime /= 60;
if (uptime < 24) {
long hours = (long) uptime;
long minutes = (long) ((uptime - hours) * 60);
String s = fmtI.format(hours) + (hours > 1 ? " hours" : " hour");
if (minutes != 0) {
s += " " + fmtI.format(minutes) + (minutes > 1 ? " minutes" : " minute");
}
return s;
}
uptime /= 24;
long days = (long) uptime;
long hours = (long) ((uptime - days) * 24);
String s = fmtI.format(days) + (days > 1 ? " days" : " day");
if (hours != 0) {
s += " " + fmtI.format(hours) + (hours > 1 ? " hours" : " hour");
}
return s;
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import junit.framework.TestCase;
/**
*
*/
public class StopWatchTest extends TestCase {
public void testStopWatch() throws Exception {
StopWatch watch = new StopWatch();
Thread.sleep(200);
long taken = watch.stop();
assertEquals(taken, watch.taken());
assertTrue("Should take approx 200 millis, was: " + taken, taken > 150);
}
public void testStopWatchNotStarted() throws Exception {
StopWatch watch = new StopWatch(false);
long taken = watch.stop();
assertEquals(0, taken);
watch.restart();
Thread.sleep(200);
taken = watch.stop();
assertEquals(taken, watch.taken());
assertTrue("Should take approx 200 millis, was: " + taken, taken > 150);
}
public void testStopWatchRestart() throws Exception {
StopWatch watch = new StopWatch();
Thread.sleep(200);
long taken = watch.stop();
assertEquals(taken, watch.taken());
assertTrue("Should take approx 200 millis, was: " + taken, taken > 150);
watch.restart();
Thread.sleep(100);
taken = watch.stop();
assertEquals(taken, watch.taken());
assertTrue("Should take approx 100 millis, was: " + taken, taken > 50);
}
public void testStopWatchTaken() throws Exception {
StopWatch watch = new StopWatch();
Thread.sleep(100);
long taken = watch.taken();
Thread.sleep(100);
long taken2 = watch.taken();
assertNotSame(taken, taken2);
assertTrue(taken2 > taken);
}
}