diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java index b6302ea00ba..55f92d66638 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java @@ -26,9 +26,11 @@ import java.io.InterruptedIOException; import java.nio.charset.Charset; import java.util.Arrays; +import java.util.Collections; import java.util.Map; import java.util.Timer; import java.util.TimerTask; +import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; @@ -48,6 +50,8 @@ @InterfaceAudience.Public @InterfaceStability.Evolving public abstract class Shell { + private static final Map CHILD_PROCESSES = + Collections.synchronizedMap(new WeakHashMap()); public static final Logger LOG = LoggerFactory.getLogger(Shell.class); /** @@ -920,6 +924,7 @@ private void runCommand() throws IOException { } else { process = builder.start(); } + CHILD_PROCESSES.put(process, null); if (timeOutInterval > 0) { timeOutTimer = new Timer("Shell command timeout"); @@ -1016,6 +1021,7 @@ public void run() { LOG.warn("Error while closing the error stream", ioe); } process.destroy(); + CHILD_PROCESSES.remove(process); lastTime = Time.monotonicNow(); } } @@ -1314,4 +1320,22 @@ public void run() { } } } + + /** + * Static method to destroy all running Shell processes + * Iterates through a list of all currently running Shell + * processes and destroys them one by one. This method is thread safe and + * is intended to be used in a shutdown hook. + */ + public static void destroyAllProcesses() { + synchronized (CHILD_PROCESSES) { + for (Process key : CHILD_PROCESSES.keySet()) { + Process process = key; + if (key != null) { + process.destroy(); + } + } + CHILD_PROCESSES.clear(); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java index 3ef6fb11e6d..2707573ab4f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.util; +import com.google.common.base.Supplier; import org.apache.commons.io.FileUtils; import org.apache.hadoop.security.alias.AbstractJavaKeyStoreProvider; import org.junit.Assert; @@ -476,4 +477,55 @@ public void testBashQuote() { assertEquals("'foo'\\''bar'", Shell.bashQuote("foo'bar")); assertEquals("''\\''foo'\\''bar'\\'''", Shell.bashQuote("'foo'bar'")); } + + @Test(timeout=120000) + public void testShellKillAllProcesses() throws Throwable { + Assume.assumeFalse(WINDOWS); + StringBuffer sleepCommand = new StringBuffer(); + sleepCommand.append("sleep 200"); + String[] shellCmd = {"bash", "-c", sleepCommand.toString()}; + final ShellCommandExecutor shexc1 = new ShellCommandExecutor(shellCmd); + final ShellCommandExecutor shexc2 = new ShellCommandExecutor(shellCmd); + + Thread shellThread1 = new Thread() { + @Override + public void run() { + try { + shexc1.execute(); + } catch(IOException ioe) { + //ignore IOException from thread interrupt + } + } + }; + Thread shellThread2 = new Thread() { + @Override + public void run() { + try { + shexc2.execute(); + } catch(IOException ioe) { + //ignore IOException from thread interrupt + } + } + }; + + shellThread1.start(); + shellThread2.start(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return shexc1.getProcess() != null; + } + }, 10, 10000); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return shexc2.getProcess() != null; + } + }, 10, 10000); + + Shell.destroyAllProcesses(); + shexc1.getProcess().waitFor(); + shexc2.getProcess().waitFor(); + } }