HADOOP-12950. ShutdownHookManager should have a timeout for each of the Registered shutdown hook. Contributed by Xiaoyu Yao.

(cherry picked from commit aac4d65bf9)
(cherry picked from commit dc78250aa4)
This commit is contained in:
Xiaoyu Yao 2016-03-31 15:20:09 -07:00
parent 633f612d67
commit ab9dc84d4a
2 changed files with 150 additions and 23 deletions

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.util; package org.apache.hadoop.util;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -26,6 +27,11 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
@ -42,7 +48,12 @@ public class ShutdownHookManager {
private static final ShutdownHookManager MGR = new ShutdownHookManager(); private static final ShutdownHookManager MGR = new ShutdownHookManager();
private static final Log LOG = LogFactory.getLog(ShutdownHookManager.class); private static final Log LOG = LogFactory.getLog(ShutdownHookManager.class);
private static final long TIMEOUT_DEFAULT = 10;
private static final TimeUnit TIME_UNIT_DEFAULT = TimeUnit.SECONDS;
private static final ExecutorService EXECUTOR =
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(true).build());
static { static {
try { try {
Runtime.getRuntime().addShutdownHook( Runtime.getRuntime().addShutdownHook(
@ -50,14 +61,33 @@ public class ShutdownHookManager {
@Override @Override
public void run() { public void run() {
MGR.shutdownInProgress.set(true); MGR.shutdownInProgress.set(true);
for (Runnable hook: MGR.getShutdownHooksInOrder()) { for (HookEntry entry: MGR.getShutdownHooksInOrder()) {
Future<?> future = EXECUTOR.submit(entry.getHook());
try { try {
hook.run(); future.get(entry.getTimeout(), entry.getTimeUnit());
} catch (TimeoutException ex) {
future.cancel(true);
LOG.warn("ShutdownHook '" + entry.getHook().getClass().
getSimpleName() + "' timeout, " + ex.toString(), ex);
} catch (Throwable ex) { } catch (Throwable ex) {
LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() + LOG.warn("ShutdownHook '" + entry.getHook().getClass().
"' failed, " + ex.toString(), ex); getSimpleName() + "' failed, " + ex.toString(), ex);
} }
} }
try {
EXECUTOR.shutdown();
if (!EXECUTOR.awaitTermination(TIMEOUT_DEFAULT,
TIME_UNIT_DEFAULT)) {
LOG.error("ShutdownHookManger shutdown forcefully.");
EXECUTOR.shutdownNow();
}
LOG.info("ShutdownHookManger complete shutdown.");
} catch (InterruptedException ex) {
LOG.error("ShutdownHookManger interrupted while waiting for " +
"termination.", ex);
EXECUTOR.shutdownNow();
Thread.currentThread().interrupt();
}
} }
} }
); );
@ -77,15 +107,24 @@ public static ShutdownHookManager get() {
} }
/** /**
* Private structure to store ShutdownHook and its priority. * Private structure to store ShutdownHook, its priority and timeout
* settings.
*/ */
private static class HookEntry { static class HookEntry {
Runnable hook; private final Runnable hook;
int priority; private final int priority;
private final long timeout;
private final TimeUnit unit;
public HookEntry(Runnable hook, int priority) { HookEntry(Runnable hook, int priority) {
this(hook, priority, TIMEOUT_DEFAULT, TIME_UNIT_DEFAULT);
}
HookEntry(Runnable hook, int priority, long timeout, TimeUnit unit) {
this.hook = hook; this.hook = hook;
this.priority = priority; this.priority = priority;
this.timeout = timeout;
this.unit = unit;
} }
@Override @Override
@ -104,10 +143,25 @@ public boolean equals(Object obj) {
return eq; return eq;
} }
Runnable getHook() {
return hook;
}
int getPriority() {
return priority;
}
long getTimeout() {
return timeout;
}
TimeUnit getTimeUnit() {
return unit;
}
} }
private Set<HookEntry> hooks = private final Set<HookEntry> hooks =
Collections.synchronizedSet(new HashSet<HookEntry>()); Collections.synchronizedSet(new HashSet<HookEntry>());
private AtomicBoolean shutdownInProgress = new AtomicBoolean(false); private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
@ -121,7 +175,7 @@ private ShutdownHookManager() {
* *
* @return the list of shutdownHooks in order of execution. * @return the list of shutdownHooks in order of execution.
*/ */
List<Runnable> getShutdownHooksInOrder() { List<HookEntry> getShutdownHooksInOrder() {
List<HookEntry> list; List<HookEntry> list;
synchronized (MGR.hooks) { synchronized (MGR.hooks) {
list = new ArrayList<HookEntry>(MGR.hooks); list = new ArrayList<HookEntry>(MGR.hooks);
@ -134,11 +188,7 @@ public int compare(HookEntry o1, HookEntry o2) {
return o2.priority - o1.priority; return o2.priority - o1.priority;
} }
}); });
List<Runnable> ordered = new ArrayList<Runnable>(); return list;
for (HookEntry entry: list) {
ordered.add(entry.hook);
}
return ordered;
} }
/** /**
@ -154,11 +204,36 @@ public void addShutdownHook(Runnable shutdownHook, int priority) {
throw new IllegalArgumentException("shutdownHook cannot be NULL"); throw new IllegalArgumentException("shutdownHook cannot be NULL");
} }
if (shutdownInProgress.get()) { if (shutdownInProgress.get()) {
throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook"); throw new IllegalStateException("Shutdown in progress, cannot add a " +
"shutdownHook");
} }
hooks.add(new HookEntry(shutdownHook, priority)); hooks.add(new HookEntry(shutdownHook, priority));
} }
/**
*
* Adds a shutdownHook with a priority and timeout the higher the priority
* the earlier will run. ShutdownHooks with same priority run
* in a non-deterministic order. The shutdown hook will be terminated if it
* has not been finished in the specified period of time.
*
* @param shutdownHook shutdownHook <code>Runnable</code>
* @param priority priority of the shutdownHook
* @param timeout timeout of the shutdownHook
* @param unit unit of the timeout <code>TimeUnit</code>
*/
public void addShutdownHook(Runnable shutdownHook, int priority, long timeout,
TimeUnit unit) {
if (shutdownHook == null) {
throw new IllegalArgumentException("shutdownHook cannot be NULL");
}
if (shutdownInProgress.get()) {
throw new IllegalStateException("Shutdown in progress, cannot add a " +
"shutdownHook");
}
hooks.add(new HookEntry(shutdownHook, priority, timeout, unit));
}
/** /**
* Removes a shutdownHook. * Removes a shutdownHook.
* *
@ -168,7 +243,8 @@ public void addShutdownHook(Runnable shutdownHook, int priority) {
*/ */
public boolean removeShutdownHook(Runnable shutdownHook) { public boolean removeShutdownHook(Runnable shutdownHook) {
if (shutdownInProgress.get()) { if (shutdownInProgress.get()) {
throw new IllegalStateException("Shutdown in progress, cannot remove a shutdownHook"); throw new IllegalStateException("Shutdown in progress, cannot remove a " +
"shutdownHook");
} }
return hooks.remove(new HookEntry(shutdownHook, 0)); return hooks.remove(new HookEntry(shutdownHook, 0));
} }

View File

@ -17,10 +17,19 @@
*/ */
package org.apache.hadoop.util; package org.apache.hadoop.util;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.LoggerFactory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
public class TestShutdownHookManager { public class TestShutdownHookManager {
static final Logger LOG =
LoggerFactory.getLogger(TestShutdownHookManager.class.getName());
@Test @Test
public void shutdownHookManager() { public void shutdownHookManager() {
@ -30,18 +39,48 @@ public void shutdownHookManager() {
Runnable hook1 = new Runnable() { Runnable hook1 = new Runnable() {
@Override @Override
public void run() { public void run() {
LOG.info("Shutdown hook1 complete.");
} }
}; };
Runnable hook2 = new Runnable() { Runnable hook2 = new Runnable() {
@Override @Override
public void run() { public void run() {
LOG.info("Shutdown hook2 complete.");
}
};
Runnable hook3 = new Runnable() {
@Override
public void run() {
try {
sleep(3000);
LOG.info("Shutdown hook3 complete.");
} catch (InterruptedException ex) {
LOG.info("Shutdown hook3 interrupted exception:",
ExceptionUtils.getStackTrace(ex));
Assert.fail("Hook 3 should not timeout.");
}
}
};
Runnable hook4 = new Runnable() {
@Override
public void run() {
try {
sleep(3500);
LOG.info("Shutdown hook4 complete.");
Assert.fail("Hook 4 should timeout");
} catch (InterruptedException ex) {
LOG.info("Shutdown hook4 interrupted exception:",
ExceptionUtils.getStackTrace(ex));
}
} }
}; };
mgr.addShutdownHook(hook1, 0); mgr.addShutdownHook(hook1, 0);
Assert.assertTrue(mgr.hasShutdownHook(hook1)); Assert.assertTrue(mgr.hasShutdownHook(hook1));
Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size()); Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0)); Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0).getHook());
mgr.removeShutdownHook(hook1); mgr.removeShutdownHook(hook1);
Assert.assertFalse(mgr.hasShutdownHook(hook1)); Assert.assertFalse(mgr.hasShutdownHook(hook1));
@ -55,8 +94,20 @@ public void run() {
Assert.assertTrue(mgr.hasShutdownHook(hook1)); Assert.assertTrue(mgr.hasShutdownHook(hook1));
Assert.assertTrue(mgr.hasShutdownHook(hook2)); Assert.assertTrue(mgr.hasShutdownHook(hook2));
Assert.assertEquals(2, mgr.getShutdownHooksInOrder().size()); Assert.assertEquals(2, mgr.getShutdownHooksInOrder().size());
Assert.assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0)); Assert.assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0).getHook());
Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1)); Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1).getHook());
// Test hook finish without timeout
mgr.addShutdownHook(hook3, 2, 4, TimeUnit.SECONDS);
Assert.assertTrue(mgr.hasShutdownHook(hook3));
Assert.assertEquals(hook3, mgr.getShutdownHooksInOrder().get(0).getHook());
Assert.assertEquals(4, mgr.getShutdownHooksInOrder().get(0).getTimeout());
// Test hook finish with timeout
mgr.addShutdownHook(hook4, 3, 2, TimeUnit.SECONDS);
Assert.assertTrue(mgr.hasShutdownHook(hook4));
Assert.assertEquals(hook4, mgr.getShutdownHooksInOrder().get(0).getHook());
Assert.assertEquals(2, mgr.getShutdownHooksInOrder().get(0).getTimeout());
LOG.info("Shutdown starts here");
} }
} }