diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java index 843454b08de..2a388f5bdd6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.util; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -26,6 +27,11 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -42,7 +48,12 @@ public class ShutdownHookManager { private static final ShutdownHookManager MGR = new ShutdownHookManager(); private static final Log LOG = LogFactory.getLog(ShutdownHookManager.class); + private static final long TIMEOUT_DEFAULT = 10; + private static final TimeUnit TIME_UNIT_DEFAULT = TimeUnit.SECONDS; + private static final ExecutorService EXECUTOR = + Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setDaemon(true).build()); static { try { Runtime.getRuntime().addShutdownHook( @@ -50,14 +61,33 @@ public class ShutdownHookManager { @Override public void run() { MGR.shutdownInProgress.set(true); - for (Runnable hook: MGR.getShutdownHooksInOrder()) { + for (HookEntry entry: MGR.getShutdownHooksInOrder()) { + Future future = EXECUTOR.submit(entry.getHook()); try { - hook.run(); + future.get(entry.getTimeout(), entry.getTimeUnit()); + } catch (TimeoutException ex) { + future.cancel(true); + LOG.warn("ShutdownHook '" + entry.getHook().getClass(). + getSimpleName() + "' timeout, " + ex.toString(), ex); } catch (Throwable ex) { - LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() + - "' failed, " + ex.toString(), ex); + LOG.warn("ShutdownHook '" + entry.getHook().getClass(). + getSimpleName() + "' failed, " + ex.toString(), ex); } } + try { + EXECUTOR.shutdown(); + if (!EXECUTOR.awaitTermination(TIMEOUT_DEFAULT, + TIME_UNIT_DEFAULT)) { + LOG.error("ShutdownHookManger shutdown forcefully."); + EXECUTOR.shutdownNow(); + } + LOG.info("ShutdownHookManger complete shutdown."); + } catch (InterruptedException ex) { + LOG.error("ShutdownHookManger interrupted while waiting for " + + "termination.", ex); + EXECUTOR.shutdownNow(); + Thread.currentThread().interrupt(); + } } } ); @@ -77,15 +107,24 @@ public class ShutdownHookManager { } /** - * Private structure to store ShutdownHook and its priority. + * Private structure to store ShutdownHook, its priority and timeout + * settings. */ - private static class HookEntry { - Runnable hook; - int priority; + static class HookEntry { + private final Runnable hook; + private final int priority; + private final long timeout; + private final TimeUnit unit; - public HookEntry(Runnable hook, int priority) { + HookEntry(Runnable hook, int priority) { + this(hook, priority, TIMEOUT_DEFAULT, TIME_UNIT_DEFAULT); + } + + HookEntry(Runnable hook, int priority, long timeout, TimeUnit unit) { this.hook = hook; this.priority = priority; + this.timeout = timeout; + this.unit = unit; } @Override @@ -104,10 +143,25 @@ public class ShutdownHookManager { return eq; } + Runnable getHook() { + return hook; + } + + int getPriority() { + return priority; + } + + long getTimeout() { + return timeout; + } + + TimeUnit getTimeUnit() { + return unit; + } } - private Set hooks = - Collections.synchronizedSet(new HashSet()); + private final Set hooks = + Collections.synchronizedSet(new HashSet()); private AtomicBoolean shutdownInProgress = new AtomicBoolean(false); @@ -121,7 +175,7 @@ public class ShutdownHookManager { * * @return the list of shutdownHooks in order of execution. */ - List getShutdownHooksInOrder() { + List getShutdownHooksInOrder() { List list; synchronized (MGR.hooks) { list = new ArrayList(MGR.hooks); @@ -134,11 +188,7 @@ public class ShutdownHookManager { return o2.priority - o1.priority; } }); - List ordered = new ArrayList(); - for (HookEntry entry: list) { - ordered.add(entry.hook); - } - return ordered; + return list; } /** @@ -154,11 +204,36 @@ public class ShutdownHookManager { throw new IllegalArgumentException("shutdownHook cannot be NULL"); } if (shutdownInProgress.get()) { - throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook"); + throw new IllegalStateException("Shutdown in progress, cannot add a " + + "shutdownHook"); } hooks.add(new HookEntry(shutdownHook, priority)); } + /** + * + * Adds a shutdownHook with a priority and timeout the higher the priority + * the earlier will run. ShutdownHooks with same priority run + * in a non-deterministic order. The shutdown hook will be terminated if it + * has not been finished in the specified period of time. + * + * @param shutdownHook shutdownHook Runnable + * @param priority priority of the shutdownHook + * @param timeout timeout of the shutdownHook + * @param unit unit of the timeout TimeUnit + */ + public void addShutdownHook(Runnable shutdownHook, int priority, long timeout, + TimeUnit unit) { + if (shutdownHook == null) { + throw new IllegalArgumentException("shutdownHook cannot be NULL"); + } + if (shutdownInProgress.get()) { + throw new IllegalStateException("Shutdown in progress, cannot add a " + + "shutdownHook"); + } + hooks.add(new HookEntry(shutdownHook, priority, timeout, unit)); + } + /** * Removes a shutdownHook. * @@ -168,7 +243,8 @@ public class ShutdownHookManager { */ public boolean removeShutdownHook(Runnable shutdownHook) { if (shutdownInProgress.get()) { - throw new IllegalStateException("Shutdown in progress, cannot remove a shutdownHook"); + throw new IllegalStateException("Shutdown in progress, cannot remove a " + + "shutdownHook"); } return hooks.remove(new HookEntry(shutdownHook, 0)); } @@ -198,4 +274,4 @@ public class ShutdownHookManager { public void clearShutdownHooks() { hooks.clear(); } -} +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java index 586b8991c6b..2aa5e95b043 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java @@ -17,10 +17,19 @@ */ package org.apache.hadoop.util; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.slf4j.LoggerFactory; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; + +import java.util.concurrent.TimeUnit; + +import static java.lang.Thread.sleep; public class TestShutdownHookManager { + static final Logger LOG = + LoggerFactory.getLogger(TestShutdownHookManager.class.getName()); @Test public void shutdownHookManager() { @@ -30,18 +39,48 @@ public class TestShutdownHookManager { Runnable hook1 = new Runnable() { @Override public void run() { + LOG.info("Shutdown hook1 complete."); } }; Runnable hook2 = new Runnable() { @Override public void run() { + LOG.info("Shutdown hook2 complete."); + } + }; + + Runnable hook3 = new Runnable() { + @Override + public void run() { + try { + sleep(3000); + LOG.info("Shutdown hook3 complete."); + } catch (InterruptedException ex) { + LOG.info("Shutdown hook3 interrupted exception:", + ExceptionUtils.getStackTrace(ex)); + Assert.fail("Hook 3 should not timeout."); + } + } + }; + + Runnable hook4 = new Runnable() { + @Override + public void run() { + try { + sleep(3500); + LOG.info("Shutdown hook4 complete."); + Assert.fail("Hook 4 should timeout"); + } catch (InterruptedException ex) { + LOG.info("Shutdown hook4 interrupted exception:", + ExceptionUtils.getStackTrace(ex)); + } } }; mgr.addShutdownHook(hook1, 0); Assert.assertTrue(mgr.hasShutdownHook(hook1)); Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size()); - Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0)); + Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0).getHook()); mgr.removeShutdownHook(hook1); Assert.assertFalse(mgr.hasShutdownHook(hook1)); @@ -55,8 +94,20 @@ public class TestShutdownHookManager { Assert.assertTrue(mgr.hasShutdownHook(hook1)); Assert.assertTrue(mgr.hasShutdownHook(hook2)); Assert.assertEquals(2, mgr.getShutdownHooksInOrder().size()); - Assert.assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0)); - Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1)); + Assert.assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0).getHook()); + Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1).getHook()); + // Test hook finish without timeout + mgr.addShutdownHook(hook3, 2, 4, TimeUnit.SECONDS); + Assert.assertTrue(mgr.hasShutdownHook(hook3)); + Assert.assertEquals(hook3, mgr.getShutdownHooksInOrder().get(0).getHook()); + Assert.assertEquals(4, mgr.getShutdownHooksInOrder().get(0).getTimeout()); + + // Test hook finish with timeout + mgr.addShutdownHook(hook4, 3, 2, TimeUnit.SECONDS); + Assert.assertTrue(mgr.hasShutdownHook(hook4)); + Assert.assertEquals(hook4, mgr.getShutdownHooksInOrder().get(0).getHook()); + Assert.assertEquals(2, mgr.getShutdownHooksInOrder().get(0).getTimeout()); + LOG.info("Shutdown starts here"); } }