HADOOP-12950. ShutdownHookManager should have a timeout for each of the Registered shutdown hook. Contributed by Xiaoyu Yao.
(cherry picked from commit aac4d65bf9
)
This commit is contained in:
parent
6d454a5d6e
commit
dc78250aa4
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.util;
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
@ -26,6 +27,11 @@ import java.util.Comparator;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -42,7 +48,12 @@ public class ShutdownHookManager {
|
||||||
private static final ShutdownHookManager MGR = new ShutdownHookManager();
|
private static final ShutdownHookManager MGR = new ShutdownHookManager();
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(ShutdownHookManager.class);
|
private static final Log LOG = LogFactory.getLog(ShutdownHookManager.class);
|
||||||
|
private static final long TIMEOUT_DEFAULT = 10;
|
||||||
|
private static final TimeUnit TIME_UNIT_DEFAULT = TimeUnit.SECONDS;
|
||||||
|
|
||||||
|
private static final ExecutorService EXECUTOR =
|
||||||
|
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
|
||||||
|
.setDaemon(true).build());
|
||||||
static {
|
static {
|
||||||
try {
|
try {
|
||||||
Runtime.getRuntime().addShutdownHook(
|
Runtime.getRuntime().addShutdownHook(
|
||||||
|
@ -50,14 +61,33 @@ public class ShutdownHookManager {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
MGR.shutdownInProgress.set(true);
|
MGR.shutdownInProgress.set(true);
|
||||||
for (Runnable hook: MGR.getShutdownHooksInOrder()) {
|
for (HookEntry entry: MGR.getShutdownHooksInOrder()) {
|
||||||
|
Future<?> future = EXECUTOR.submit(entry.getHook());
|
||||||
try {
|
try {
|
||||||
hook.run();
|
future.get(entry.getTimeout(), entry.getTimeUnit());
|
||||||
|
} catch (TimeoutException ex) {
|
||||||
|
future.cancel(true);
|
||||||
|
LOG.warn("ShutdownHook '" + entry.getHook().getClass().
|
||||||
|
getSimpleName() + "' timeout, " + ex.toString(), ex);
|
||||||
} catch (Throwable ex) {
|
} catch (Throwable ex) {
|
||||||
LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() +
|
LOG.warn("ShutdownHook '" + entry.getHook().getClass().
|
||||||
"' failed, " + ex.toString(), ex);
|
getSimpleName() + "' failed, " + ex.toString(), ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
EXECUTOR.shutdown();
|
||||||
|
if (!EXECUTOR.awaitTermination(TIMEOUT_DEFAULT,
|
||||||
|
TIME_UNIT_DEFAULT)) {
|
||||||
|
LOG.error("ShutdownHookManger shutdown forcefully.");
|
||||||
|
EXECUTOR.shutdownNow();
|
||||||
|
}
|
||||||
|
LOG.info("ShutdownHookManger complete shutdown.");
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
LOG.error("ShutdownHookManger interrupted while waiting for " +
|
||||||
|
"termination.", ex);
|
||||||
|
EXECUTOR.shutdownNow();
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
@ -77,15 +107,24 @@ public class ShutdownHookManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Private structure to store ShutdownHook and its priority.
|
* Private structure to store ShutdownHook, its priority and timeout
|
||||||
|
* settings.
|
||||||
*/
|
*/
|
||||||
private static class HookEntry {
|
static class HookEntry {
|
||||||
Runnable hook;
|
private final Runnable hook;
|
||||||
int priority;
|
private final int priority;
|
||||||
|
private final long timeout;
|
||||||
|
private final TimeUnit unit;
|
||||||
|
|
||||||
public HookEntry(Runnable hook, int priority) {
|
HookEntry(Runnable hook, int priority) {
|
||||||
|
this(hook, priority, TIMEOUT_DEFAULT, TIME_UNIT_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
HookEntry(Runnable hook, int priority, long timeout, TimeUnit unit) {
|
||||||
this.hook = hook;
|
this.hook = hook;
|
||||||
this.priority = priority;
|
this.priority = priority;
|
||||||
|
this.timeout = timeout;
|
||||||
|
this.unit = unit;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -104,9 +143,24 @@ public class ShutdownHookManager {
|
||||||
return eq;
|
return eq;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Runnable getHook() {
|
||||||
|
return hook;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<HookEntry> hooks =
|
int getPriority() {
|
||||||
|
return priority;
|
||||||
|
}
|
||||||
|
|
||||||
|
long getTimeout() {
|
||||||
|
return timeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
TimeUnit getTimeUnit() {
|
||||||
|
return unit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Set<HookEntry> hooks =
|
||||||
Collections.synchronizedSet(new HashSet<HookEntry>());
|
Collections.synchronizedSet(new HashSet<HookEntry>());
|
||||||
|
|
||||||
private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
|
private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
|
||||||
|
@ -121,7 +175,7 @@ public class ShutdownHookManager {
|
||||||
*
|
*
|
||||||
* @return the list of shutdownHooks in order of execution.
|
* @return the list of shutdownHooks in order of execution.
|
||||||
*/
|
*/
|
||||||
List<Runnable> getShutdownHooksInOrder() {
|
List<HookEntry> getShutdownHooksInOrder() {
|
||||||
List<HookEntry> list;
|
List<HookEntry> list;
|
||||||
synchronized (MGR.hooks) {
|
synchronized (MGR.hooks) {
|
||||||
list = new ArrayList<HookEntry>(MGR.hooks);
|
list = new ArrayList<HookEntry>(MGR.hooks);
|
||||||
|
@ -134,11 +188,7 @@ public class ShutdownHookManager {
|
||||||
return o2.priority - o1.priority;
|
return o2.priority - o1.priority;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
List<Runnable> ordered = new ArrayList<Runnable>();
|
return list;
|
||||||
for (HookEntry entry: list) {
|
|
||||||
ordered.add(entry.hook);
|
|
||||||
}
|
|
||||||
return ordered;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -154,11 +204,36 @@ public class ShutdownHookManager {
|
||||||
throw new IllegalArgumentException("shutdownHook cannot be NULL");
|
throw new IllegalArgumentException("shutdownHook cannot be NULL");
|
||||||
}
|
}
|
||||||
if (shutdownInProgress.get()) {
|
if (shutdownInProgress.get()) {
|
||||||
throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook");
|
throw new IllegalStateException("Shutdown in progress, cannot add a " +
|
||||||
|
"shutdownHook");
|
||||||
}
|
}
|
||||||
hooks.add(new HookEntry(shutdownHook, priority));
|
hooks.add(new HookEntry(shutdownHook, priority));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Adds a shutdownHook with a priority and timeout the higher the priority
|
||||||
|
* the earlier will run. ShutdownHooks with same priority run
|
||||||
|
* in a non-deterministic order. The shutdown hook will be terminated if it
|
||||||
|
* has not been finished in the specified period of time.
|
||||||
|
*
|
||||||
|
* @param shutdownHook shutdownHook <code>Runnable</code>
|
||||||
|
* @param priority priority of the shutdownHook
|
||||||
|
* @param timeout timeout of the shutdownHook
|
||||||
|
* @param unit unit of the timeout <code>TimeUnit</code>
|
||||||
|
*/
|
||||||
|
public void addShutdownHook(Runnable shutdownHook, int priority, long timeout,
|
||||||
|
TimeUnit unit) {
|
||||||
|
if (shutdownHook == null) {
|
||||||
|
throw new IllegalArgumentException("shutdownHook cannot be NULL");
|
||||||
|
}
|
||||||
|
if (shutdownInProgress.get()) {
|
||||||
|
throw new IllegalStateException("Shutdown in progress, cannot add a " +
|
||||||
|
"shutdownHook");
|
||||||
|
}
|
||||||
|
hooks.add(new HookEntry(shutdownHook, priority, timeout, unit));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes a shutdownHook.
|
* Removes a shutdownHook.
|
||||||
*
|
*
|
||||||
|
@ -168,7 +243,8 @@ public class ShutdownHookManager {
|
||||||
*/
|
*/
|
||||||
public boolean removeShutdownHook(Runnable shutdownHook) {
|
public boolean removeShutdownHook(Runnable shutdownHook) {
|
||||||
if (shutdownInProgress.get()) {
|
if (shutdownInProgress.get()) {
|
||||||
throw new IllegalStateException("Shutdown in progress, cannot remove a shutdownHook");
|
throw new IllegalStateException("Shutdown in progress, cannot remove a " +
|
||||||
|
"shutdownHook");
|
||||||
}
|
}
|
||||||
return hooks.remove(new HookEntry(shutdownHook, 0));
|
return hooks.remove(new HookEntry(shutdownHook, 0));
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,10 +17,19 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.util;
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static java.lang.Thread.sleep;
|
||||||
|
|
||||||
public class TestShutdownHookManager {
|
public class TestShutdownHookManager {
|
||||||
|
static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestShutdownHookManager.class.getName());
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shutdownHookManager() {
|
public void shutdownHookManager() {
|
||||||
|
@ -30,18 +39,48 @@ public class TestShutdownHookManager {
|
||||||
Runnable hook1 = new Runnable() {
|
Runnable hook1 = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
LOG.info("Shutdown hook1 complete.");
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Runnable hook2 = new Runnable() {
|
Runnable hook2 = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
LOG.info("Shutdown hook2 complete.");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Runnable hook3 = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
sleep(3000);
|
||||||
|
LOG.info("Shutdown hook3 complete.");
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
LOG.info("Shutdown hook3 interrupted exception:",
|
||||||
|
ExceptionUtils.getStackTrace(ex));
|
||||||
|
Assert.fail("Hook 3 should not timeout.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Runnable hook4 = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
sleep(3500);
|
||||||
|
LOG.info("Shutdown hook4 complete.");
|
||||||
|
Assert.fail("Hook 4 should timeout");
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
LOG.info("Shutdown hook4 interrupted exception:",
|
||||||
|
ExceptionUtils.getStackTrace(ex));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
mgr.addShutdownHook(hook1, 0);
|
mgr.addShutdownHook(hook1, 0);
|
||||||
Assert.assertTrue(mgr.hasShutdownHook(hook1));
|
Assert.assertTrue(mgr.hasShutdownHook(hook1));
|
||||||
Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
|
Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
|
||||||
Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0));
|
Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0).getHook());
|
||||||
mgr.removeShutdownHook(hook1);
|
mgr.removeShutdownHook(hook1);
|
||||||
Assert.assertFalse(mgr.hasShutdownHook(hook1));
|
Assert.assertFalse(mgr.hasShutdownHook(hook1));
|
||||||
|
|
||||||
|
@ -55,8 +94,20 @@ public class TestShutdownHookManager {
|
||||||
Assert.assertTrue(mgr.hasShutdownHook(hook1));
|
Assert.assertTrue(mgr.hasShutdownHook(hook1));
|
||||||
Assert.assertTrue(mgr.hasShutdownHook(hook2));
|
Assert.assertTrue(mgr.hasShutdownHook(hook2));
|
||||||
Assert.assertEquals(2, mgr.getShutdownHooksInOrder().size());
|
Assert.assertEquals(2, mgr.getShutdownHooksInOrder().size());
|
||||||
Assert.assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0));
|
Assert.assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0).getHook());
|
||||||
Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1));
|
Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1).getHook());
|
||||||
|
|
||||||
|
// Test hook finish without timeout
|
||||||
|
mgr.addShutdownHook(hook3, 2, 4, TimeUnit.SECONDS);
|
||||||
|
Assert.assertTrue(mgr.hasShutdownHook(hook3));
|
||||||
|
Assert.assertEquals(hook3, mgr.getShutdownHooksInOrder().get(0).getHook());
|
||||||
|
Assert.assertEquals(4, mgr.getShutdownHooksInOrder().get(0).getTimeout());
|
||||||
|
|
||||||
|
// Test hook finish with timeout
|
||||||
|
mgr.addShutdownHook(hook4, 3, 2, TimeUnit.SECONDS);
|
||||||
|
Assert.assertTrue(mgr.hasShutdownHook(hook4));
|
||||||
|
Assert.assertEquals(hook4, mgr.getShutdownHooksInOrder().get(0).getHook());
|
||||||
|
Assert.assertEquals(2, mgr.getShutdownHooksInOrder().get(0).getTimeout());
|
||||||
|
LOG.info("Shutdown starts here");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue