From 48c7f0a0d48be8c2d6a142907c857ed0a94274ed Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Wed, 31 Jul 2013 23:57:02 +0000 Subject: [PATCH] HADOOP-9787. ShutdownHelper util to shutdown threads and threadpools. (Karthik Kambatla via Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1509053 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../hadoop/util/ShutdownThreadsHelper.java | 106 ++++++++++++++++++ .../util/TestShutdownThreadsHelper.java | 58 ++++++++++ 3 files changed, 167 insertions(+) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownThreadsHelper.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownThreadsHelper.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 2cff8897eb0..e014015953f 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -40,6 +40,9 @@ Release 2.1.1-beta - UNRELEASED NEW FEATURES IMPROVEMENTS + + HADOOP-9787. ShutdownHelper util to shutdown threads and threadpools. + (Karthik Kambatla via Sandy Ryza) OPTIMIZATIONS diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownThreadsHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownThreadsHelper.java new file mode 100644 index 00000000000..ffd88fb97ac --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownThreadsHelper.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Helper class to shutdown {@link Thread}s and {@link ExecutorService}s. + */ +public class ShutdownThreadsHelper { + private static Log LOG = LogFactory.getLog(ShutdownThreadsHelper.class); + + @VisibleForTesting + static final int SHUTDOWN_WAIT_MS = 3000; + + /** + * @param thread {@link Thread to be shutdown} + * @return true if the thread is successfully interrupted, + * false otherwise + * @throws InterruptedException + */ + public static boolean shutdownThread(Thread thread) { + return shutdownThread(thread, SHUTDOWN_WAIT_MS); + } + + /** + * @param thread {@link Thread to be shutdown} + * @param timeoutInMilliSeconds time to wait for thread to join after being + * interrupted + * @return true if the thread is successfully interrupted, + * false otherwise + * @throws InterruptedException + */ + public static boolean shutdownThread(Thread thread, + long timeoutInMilliSeconds) { + if (thread == null) { + return true; + } + + try { + thread.interrupt(); + thread.join(timeoutInMilliSeconds); + return true; + } catch (InterruptedException ie) { + LOG.warn("Interrupted while shutting down thread - " + thread.getName()); + return false; + } + } + + /** + * @param service {@link ExecutorService to be shutdown} + * @return true if the service is terminated, + * false otherwise + * @throws InterruptedException + */ + public static boolean shutdownExecutorService(ExecutorService service) + throws InterruptedException { + return shutdownExecutorService(service, SHUTDOWN_WAIT_MS); + } + + /** + * @param service {@link ExecutorService to be shutdown} + * @param timeoutInMs time to wait for {@link + * ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)} + * calls in milli seconds. + * @return true if the service is terminated, + * false otherwise + * @throws InterruptedException + */ + public static boolean shutdownExecutorService(ExecutorService service, + long timeoutInMs) + throws InterruptedException { + if (service == null) { + return true; + } + + service.shutdown(); + if (!service.awaitTermination(timeoutInMs, TimeUnit.MILLISECONDS)) { + service.shutdownNow(); + return service.awaitTermination(timeoutInMs, TimeUnit.MILLISECONDS); + } else { + return true; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownThreadsHelper.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownThreadsHelper.java new file mode 100644 index 00000000000..e22e134388c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownThreadsHelper.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import org.junit.Test; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestShutdownThreadsHelper { + private Runnable sampleRunnable = new Runnable() { + @Override + public void run() { + try { + Thread.sleep(2 * ShutdownThreadsHelper.SHUTDOWN_WAIT_MS); + } catch (InterruptedException ie) { + System.out.println("Thread interrupted"); + } + } + }; + + @Test (timeout = 3000) + public void testShutdownThread() { + Thread thread = new Thread(sampleRunnable); + thread.start(); + boolean ret = ShutdownThreadsHelper.shutdownThread(thread); + boolean isTerminated = !thread.isAlive(); + assertEquals("Incorrect return value", ret, isTerminated); + assertTrue("Thread is not shutdown", isTerminated); + + } + + @Test + public void testShutdownThreadPool() throws InterruptedException { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + executor.execute(sampleRunnable); + boolean ret = ShutdownThreadsHelper.shutdownExecutorService(executor); + boolean isTerminated = executor.isTerminated(); + assertEquals("Incorrect return value", ret, isTerminated); + assertTrue("ExecutorService is not shutdown", isTerminated); + } +}