HADOOP-13709. Ability to clean up subprocesses spawned by Shell when the process exits. Contributed by Eric Badger

(cherry picked from commit 631f1daee3)
This commit is contained in:
Jason Lowe 2016-12-15 20:52:40 +00:00
parent 867389247f
commit ee12363f2d
2 changed files with 76 additions and 0 deletions

View File

@ -26,9 +26,11 @@ import java.io.InputStream;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -48,6 +50,8 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public abstract class Shell { public abstract class Shell {
private static final Map <Process, Object> CHILD_PROCESSES =
Collections.synchronizedMap(new WeakHashMap<Process, Object>());
public static final Logger LOG = LoggerFactory.getLogger(Shell.class); public static final Logger LOG = LoggerFactory.getLogger(Shell.class);
/** /**
@ -920,6 +924,7 @@ public abstract class Shell {
} else { } else {
process = builder.start(); process = builder.start();
} }
CHILD_PROCESSES.put(process, null);
if (timeOutInterval > 0) { if (timeOutInterval > 0) {
timeOutTimer = new Timer("Shell command timeout"); timeOutTimer = new Timer("Shell command timeout");
@ -1016,6 +1021,7 @@ public abstract class Shell {
LOG.warn("Error while closing the error stream", ioe); LOG.warn("Error while closing the error stream", ioe);
} }
process.destroy(); process.destroy();
CHILD_PROCESSES.remove(process);
lastTime = Time.monotonicNow(); lastTime = Time.monotonicNow();
} }
} }
@ -1314,4 +1320,22 @@ public abstract class Shell {
} }
} }
} }
/**
* Static method to destroy all running <code>Shell</code> processes
* Iterates through a list of all currently running <code>Shell</code>
* processes and destroys them one by one. This method is thread safe and
* is intended to be used in a shutdown hook.
*/
public static void destroyAllProcesses() {
synchronized (CHILD_PROCESSES) {
for (Process key : CHILD_PROCESSES.keySet()) {
Process process = key;
if (key != null) {
process.destroy();
}
}
CHILD_PROCESSES.clear();
}
}
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.util; package org.apache.hadoop.util;
import com.google.common.base.Supplier;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.security.alias.AbstractJavaKeyStoreProvider; import org.apache.hadoop.security.alias.AbstractJavaKeyStoreProvider;
import org.junit.Assert; import org.junit.Assert;
@ -476,4 +477,55 @@ public class TestShell extends Assert {
assertEquals("'foo'\\''bar'", Shell.bashQuote("foo'bar")); assertEquals("'foo'\\''bar'", Shell.bashQuote("foo'bar"));
assertEquals("''\\''foo'\\''bar'\\'''", Shell.bashQuote("'foo'bar'")); assertEquals("''\\''foo'\\''bar'\\'''", Shell.bashQuote("'foo'bar'"));
} }
@Test(timeout=120000)
public void testShellKillAllProcesses() throws Throwable {
Assume.assumeFalse(WINDOWS);
StringBuffer sleepCommand = new StringBuffer();
sleepCommand.append("sleep 200");
String[] shellCmd = {"bash", "-c", sleepCommand.toString()};
final ShellCommandExecutor shexc1 = new ShellCommandExecutor(shellCmd);
final ShellCommandExecutor shexc2 = new ShellCommandExecutor(shellCmd);
Thread shellThread1 = new Thread() {
@Override
public void run() {
try {
shexc1.execute();
} catch(IOException ioe) {
//ignore IOException from thread interrupt
}
}
};
Thread shellThread2 = new Thread() {
@Override
public void run() {
try {
shexc2.execute();
} catch(IOException ioe) {
//ignore IOException from thread interrupt
}
}
};
shellThread1.start();
shellThread2.start();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return shexc1.getProcess() != null;
}
}, 10, 10000);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return shexc2.getProcess() != null;
}
}, 10, 10000);
Shell.destroyAllProcesses();
shexc1.getProcess().waitFor();
shexc2.getProcess().waitFor();
}
} }