From c5028eaa38e836d66e5944521e22526ef1ce86d4 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 27 Oct 2011 12:06:08 +0000 Subject: [PATCH] MAPREDUCE-3240. Fixed NodeManager to be able to forcefully cleanup its containers (process-trees) irrespective of whether the container succeeded, or killed. Contributed by Hitesh Shah. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1189713 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 + .../v2/app/rm/RMContainerRequestor.java | 4 +- .../hadoop/yarn/conf/YarnConfiguration.java | 14 ++ .../src/main/resources/yarn-default.xml | 12 ++ .../impl/container-executor.c | 76 +++++++- .../impl/container-executor.h | 8 +- .../src/main/c/container-executor/impl/main.c | 6 +- .../test/test-container-executor.c | 29 ++- .../server/nodemanager/ContainerExecutor.java | 100 +++++++--- .../nodemanager/DefaultContainerExecutor.java | 83 +++++++- .../nodemanager/LinuxContainerExecutor.java | 55 +++--- .../container/ContainerImpl.java | 9 +- .../launcher/ContainerLaunch.java | 177 +++++++++++++++++- .../launcher/ContainersLauncher.java | 51 +++-- .../TestLinuxContainerExecutor.java | 4 +- .../TestLinuxContainerExecutorWithMocks.java | 18 +- .../TestContainerManager.java | 2 +- .../container/TestContainer.java | 24 ++- .../launcher/TestContainerLaunch.java | 143 ++++++++++++-- .../monitor/TestContainersMonitor.java | 2 +- .../test/resources/mock-container-executor | 4 +- 21 files changed, 691 insertions(+), 134 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 3c6524638c2..e35f52e10f8 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1743,6 +1743,10 @@ Release 0.23.0 - Unreleased MAPREDUCE-3279. Fixed TestJobHistoryParsing which assumed user name to be mapred all the time. (Siddharth Seth via acmurthy) + MAPREDUCE-3240. Fixed NodeManager to be able to forcefully cleanup its + containers (process-trees) irrespective of whether the container succeeded, + or killed. Contributed by Hitesh Shah. + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index cfedde2229a..ba3c73219dd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -166,8 +166,6 @@ protected void containerFailedOnHost(String hostName) { for (ResourceRequest req : reqMap.values()) { if (!ask.remove(req)) { foundAll = false; - } - else { // if ask already sent to RM, we can try and overwrite it if possible. // send a new ask to RM with numContainers // specified for the blacklisted host to be 0. @@ -181,7 +179,7 @@ protected void containerFailedOnHost(String hostName) { // we can remove this request if (foundAll) { remoteRequests.remove(hostName); - } + } } // TODO handling of rack blacklisting // Removing from rack should be dependent on no. of failures within the rack diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 497bbca5d88..bbc35597f9d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -411,6 +411,20 @@ public class YarnConfiguration extends Configuration { YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER = "security.resourcelocalizer.protocol.acl"; + /** No. of milliseconds to wait between sending a SIGTERM and SIGKILL + * to a running container */ + public static final String NM_SLEEP_DELAY_BEFORE_SIGKILL_MS = + NM_PREFIX + "sleep-delay-before-sigkill.ms"; + public static final long DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS = + 250; + + /** Max time to wait for a process to come up when trying to cleanup + * container resources */ + public static final String NM_PROCESS_KILL_WAIT_MS = + NM_PREFIX + "process-kill-wait.ms"; + public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS = + 2000; + public YarnConfiguration() { super(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml index 055c250b91f..9a8ff379aa6 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml @@ -366,6 +366,18 @@ + + No. of ms to wait between sending a SIGTERM and SIGKILL to a container + yarn.nodemanager.sleep-delay-before-sigkill.ms + 250 + + + + Max time to wait for a process to come up when trying to cleanup a container + yarn.nodemanager.process-kill-wait.ms + 2000 + + yarn.nodemanager.aux-services.mapreduce.shuffle.class diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c index e860f5d329f..73d160ae66b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c @@ -45,6 +45,9 @@ FILE* ERRORFILE = NULL; static uid_t nm_uid = -1; static gid_t nm_gid = -1; +char *concatenate(char *concat_pattern, char *return_path_name, + int numArgs, ...); + void set_nm_uid(uid_t user, gid_t group) { nm_uid = user; nm_gid = group; @@ -147,6 +150,60 @@ static int change_effective_user(uid_t user, gid_t group) { return 0; } +/** + * Write the pid of the current process into the pid file. + * pid_file: Path to pid file where pid needs to be written to + */ +static int write_pid_to_file_as_nm(const char* pid_file, pid_t pid) { + uid_t user = geteuid(); + gid_t group = getegid(); + if (change_effective_user(nm_uid, nm_gid) != 0) { + return -1; + } + + char *temp_pid_file = concatenate("%s.tmp", "pid_file_path", 1, pid_file); + + // create with 700 + int pid_fd = open(temp_pid_file, O_WRONLY|O_CREAT|O_EXCL, S_IRWXU); + if (pid_fd == -1) { + fprintf(LOGFILE, "Can't open file %s as node manager - %s\n", temp_pid_file, + strerror(errno)); + free(temp_pid_file); + return -1; + } + + // write pid to temp file + char pid_buf[21]; + snprintf(pid_buf, 21, "%d", pid); + ssize_t written = write(pid_fd, pid_buf, strlen(pid_buf)); + close(pid_fd); + if (written == -1) { + fprintf(LOGFILE, "Failed to write pid to file %s as node manager - %s\n", + temp_pid_file, strerror(errno)); + free(temp_pid_file); + return -1; + } + + // rename temp file to actual pid file + // use rename as atomic + if (rename(temp_pid_file, pid_file)) { + fprintf(LOGFILE, "Can't move pid file from %s to %s as node manager - %s\n", + temp_pid_file, pid_file, strerror(errno)); + unlink(temp_pid_file); + free(temp_pid_file); + return -1; + } + + // Revert back to the calling user. + if (change_effective_user(user, group)) { + free(temp_pid_file); + return -1; + } + + free(temp_pid_file); + return 0; +} + /** * Change the real and effective user and group to abandon the super user * priviledges. @@ -749,7 +806,8 @@ int initialize_app(const char *user, const char *app_id, int launch_container_as_user(const char *user, const char *app_id, const char *container_id, const char *work_dir, - const char *script_name, const char *cred_file) { + const char *script_name, const char *cred_file, + const char* pid_file) { int exit_code = -1; char *script_file_dest = NULL; char *cred_file_dest = NULL; @@ -776,6 +834,20 @@ int launch_container_as_user(const char *user, const char *app_id, goto cleanup; } + // setsid + pid_t pid = setsid(); + if (pid == -1) { + exit_code = SETSID_OPER_FAILED; + goto cleanup; + } + + // write pid to pidfile + if (pid_file == NULL + || write_pid_to_file_as_nm(pid_file, pid) != 0) { + exit_code = WRITE_PIDFILE_FAILED; + goto cleanup; + } + // give up root privs if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) { exit_code = SETUID_OPER_FAILED; @@ -1031,3 +1103,5 @@ int delete_as_user(const char *user, } return ret; } + + diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h index 6b9bc0fcc9a..3f0e8a5aa2c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h @@ -51,7 +51,9 @@ enum errorcodes { UNABLE_TO_BUILD_PATH, //21 INVALID_CONTAINER_EXEC_PERMISSIONS, //22 // PREPARE_JOB_LOGS_FAILED (NOT USED) 23 - INVALID_CONFIG_FILE = 24 + INVALID_CONFIG_FILE = 24, + SETSID_OPER_FAILED = 25, + WRITE_PIDFILE_FAILED = 26 }; #define NM_GROUP_KEY "yarn.nodemanager.linux-container-executor.group" @@ -106,11 +108,13 @@ int initialize_app(const char *user, const char *app_id, * @param script_name the name of the script to be run to launch the container. * @param cred_file the credentials file that needs to be compied to the * working directory. + * @param pid_file file where pid of process should be written to * @return -1 or errorcode enum value on error (should never return on success). */ int launch_container_as_user(const char * user, const char *app_id, const char *container_id, const char *work_dir, - const char *script_name, const char *cred_file); + const char *script_name, const char *cred_file, + const char *pid_file); /** * Function used to signal a container launched by the user. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/main.c b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/main.c index ea6aa10f0d7..6e62ef9100f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/main.c +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/main.c @@ -66,6 +66,7 @@ int main(int argc, char **argv) { const char * cred_file = NULL; const char * script_file = NULL; const char * current_dir = NULL; + const char * pid_file = NULL; int exit_code = 0; @@ -141,7 +142,7 @@ int main(int argc, char **argv) { argv + optind); break; case LAUNCH_CONTAINER: - if (argc < 8) { + if (argc < 9) { fprintf(ERRORFILE, "Too few arguments (%d vs 8) for launch container\n", argc); fflush(ERRORFILE); @@ -152,8 +153,9 @@ int main(int argc, char **argv) { current_dir = argv[optind++]; script_file = argv[optind++]; cred_file = argv[optind++]; + pid_file = argv[optind++]; exit_code = launch_container_as_user(user_detail->pw_name, app_id, container_id, - current_dir, script_file, cred_file); + current_dir, script_file, cred_file, pid_file); break; case SIGNAL_CONTAINER: if (argc < 5) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/test/test-container-executor.c b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/test/test-container-executor.c index 009bc494af2..7c62f1ba183 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/test/test-container-executor.c +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/test/test-container-executor.c @@ -590,6 +590,7 @@ void test_run_container() { fflush(stderr); char* container_dir = get_container_work_directory(TEST_ROOT "/local-1", username, "app_4", "container_1"); + const char * pid_file = TEST_ROOT "/pid.txt"; pid_t child = fork(); if (child == -1) { printf("FAIL: failed to fork process for init_app - %s\n", @@ -597,7 +598,7 @@ void test_run_container() { exit(1); } else if (child == 0) { if (launch_container_as_user(username, "app_4", "container_1", - container_dir, script_name, TEST_ROOT "creds.txt") != 0) { + container_dir, script_name, TEST_ROOT "/creds.txt", pid_file) != 0) { printf("FAIL: failed in child\n"); exit(42); } @@ -631,6 +632,32 @@ void test_run_container() { exit(1); } free(container_dir); + + if(access(pid_file, R_OK) != 0) { + printf("FAIL: failed to create pid file %s\n", pid_file); + exit(1); + } + int pidfd = open(pid_file, O_RDONLY); + if (pidfd == -1) { + printf("FAIL: failed to open pid file %s - %s\n", pid_file, strerror(errno)); + exit(1); + } + + char pidBuf[100]; + ssize_t bytes = read(pidfd, pidBuf, 100); + if (bytes == -1) { + printf("FAIL: failed to read from pid file %s - %s\n", pid_file, strerror(errno)); + exit(1); + } + + pid_t mypid = child; + char myPidBuf[33]; + snprintf(myPidBuf, 33, "%d", mypid); + if (strncmp(pidBuf, myPidBuf, strlen(myPidBuf)) != 0) { + printf("FAIL: failed to find matching pid in pid file\n"); + printf("FAIL: Expected pid %d : Got %.*s", mypid, (int)bytes, pidBuf); + exit(1); + } } int main(int argc, char **argv) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 25b26f47987..3122592209b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -19,12 +19,13 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; -import java.lang.reflect.Field; - import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +36,7 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; public abstract class ContainerExecutor implements Configurable { @@ -43,8 +45,12 @@ public abstract class ContainerExecutor implements Configurable { FsPermission.createImmutable((short) 0700); private Configuration conf; - protected ConcurrentMap launchCommandObjs = - new ConcurrentHashMap(); + private ConcurrentMap pidFiles = + new ConcurrentHashMap(); + + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReadLock readLock = lock.readLock(); + private final WriteLock writeLock = lock.writeLock(); @Override public void setConf(Configuration conf) { @@ -102,7 +108,8 @@ public abstract void deleteAsUser(String user, Path subDir, Path... basedirs) throws IOException, InterruptedException; public enum ExitCode { - KILLED(137); + FORCE_KILLED(137), + TERMINATED(143); private final int code; private ExitCode(int exitCode) { @@ -149,6 +156,66 @@ protected void logOutput(String output) { } } + /** + * Get the pidFile of the container. + * @param containerId + * @return the path of the pid-file for the given containerId. + */ + protected Path getPidFilePath(ContainerId containerId) { + try { + readLock.lock(); + return (this.pidFiles.get(containerId)); + } finally { + readLock.unlock(); + } + } + + /** + * Is the container still active? + * @param containerId + * @return true if the container is active else false. + */ + protected boolean isContainerActive(ContainerId containerId) { + try { + readLock.lock(); + return (this.pidFiles.containsKey(containerId)); + } finally { + readLock.unlock(); + } + } + + /** + * Mark the container as active + * + * @param containerId + * the ContainerId + * @param pidFilePath + * Path where the executor should write the pid of the launched + * process + */ + public void activateContainer(ContainerId containerId, Path pidFilePath) { + try { + writeLock.lock(); + this.pidFiles.put(containerId, pidFilePath); + } finally { + writeLock.unlock(); + } + } + + /** + * Mark the container as inactive. + * Done iff the container is still active. Else treat it as + * a no-op + */ + public void deactivateContainer(ContainerId containerId) { + try { + writeLock.lock(); + this.pidFiles.remove(containerId); + } finally { + writeLock.unlock(); + } + } + /** * Get the process-identifier for the container * @@ -158,28 +225,15 @@ protected void logOutput(String output) { */ public String getProcessId(ContainerId containerID) { String pid = null; - ShellCommandExecutor shExec = launchCommandObjs.get(containerID); - if (shExec == null) { + Path pidFile = pidFiles.get(containerID); + if (pidFile == null) { // This container isn't even launched yet. return pid; } - Process proc = shExec.getProcess(); - if (proc == null) { - // This happens if the command is not yet started - return pid; - } try { - Field pidField = proc.getClass().getDeclaredField("pid"); - pidField.setAccessible(true); - pid = ((Integer) pidField.get(proc)).toString(); - } catch (SecurityException e) { - // SecurityManager not expected with yarn. Ignore. - } catch (NoSuchFieldException e) { - // Yarn only on UNIX for now. Ignore. - } catch (IllegalArgumentException e) { - ; - } catch (IllegalAccessException e) { - ; + pid = ProcessIdFileReader.getProcessId(pidFile); + } catch (IOException e) { + LOG.error("Got exception reading pid from pid-file " + pidFile, e); } return pid; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 9a0de4efb24..a5255097716 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -18,10 +18,16 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static org.apache.hadoop.fs.CreateFlag.CREATE; +import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; + +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.io.PrintStream; import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.EnumSet; import java.util.List; import org.apache.commons.logging.Log; @@ -48,6 +54,9 @@ public class DefaultContainerExecutor extends ContainerExecutor { private final FileContext lfs; + private static final String WRAPPER_LAUNCH_SCRIPT = + "default_container_executor.sh"; + public DefaultContainerExecutor() { try { this.lfs = FileContext.getLocalFSFileContext(); @@ -100,8 +109,9 @@ public int launchContainer(Container container, ConverterUtils.toString( container.getContainerID().getApplicationAttemptId(). getApplicationId()); - String[] sLocalDirs = - getConf().getStrings(YarnConfiguration.NM_LOCAL_DIRS, YarnConfiguration.DEFAULT_NM_LOCAL_DIRS); + String[] sLocalDirs = getConf().getStrings( + YarnConfiguration.NM_LOCAL_DIRS, + YarnConfiguration.DEFAULT_NM_LOCAL_DIRS); for (String sLocalDir : sLocalDirs) { Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE); Path userdir = new Path(usersdir, userName); @@ -124,21 +134,47 @@ public int launchContainer(Container container, new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE); lfs.util().copy(nmPrivateTokensPath, tokenDst); + // Create new local launch wrapper script + Path wrapperScriptDst = new Path(containerWorkDir, WRAPPER_LAUNCH_SCRIPT); + DataOutputStream wrapperScriptOutStream = + lfs.create(wrapperScriptDst, + EnumSet.of(CREATE, OVERWRITE)); + + Path pidFile = getPidFilePath(containerId); + if (pidFile != null) { + writeLocalWrapperScript(wrapperScriptOutStream, launchDst.toUri() + .getPath().toString(), pidFile.toString()); + } else { + LOG.info("Container " + containerIdStr + + " was marked as inactive. Returning terminated error"); + return ExitCode.TERMINATED.getExitCode(); + } + // create log dir under app // fork script ShellCommandExecutor shExec = null; try { lfs.setPermission(launchDst, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); - String[] command = - new String[] { "bash", "-c", launchDst.toUri().getPath().toString() }; + lfs.setPermission(wrapperScriptDst, + ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); + + // Setup command to run + String[] command = {"bash", "-c", + wrapperScriptDst.toUri().getPath().toString()}; LOG.info("launchContainer: " + Arrays.toString(command)); shExec = new ShellCommandExecutor( command, - new File(containerWorkDir.toUri().getPath()), + new File(containerWorkDir.toUri().getPath()), container.getLaunchContext().getEnvironment()); // sanitized env - launchCommandObjs.put(containerId, shExec); - shExec.execute(); + if (isContainerActive(containerId)) { + shExec.execute(); + } + else { + LOG.info("Container " + containerIdStr + + " was marked as inactive. Returning terminated error"); + return ExitCode.TERMINATED.getExitCode(); + } } catch (IOException e) { if (null == shExec) { return -1; @@ -151,17 +187,44 @@ public int launchContainer(Container container, message)); return exitCode; } finally { - launchCommandObjs.remove(containerId); + ; // } return 0; } + private void writeLocalWrapperScript(DataOutputStream out, + String launchScriptDst, String pidFilePath) throws IOException { + // We need to do a move as writing to a file is not atomic + // Process reading a file being written to may get garbled data + // hence write pid to tmp file first followed by a mv + StringBuilder sb = new StringBuilder("#!/bin/bash\n\n"); + sb.append("echo $$ > " + pidFilePath + ".tmp\n"); + sb.append("/bin/mv -f " + pidFilePath + ".tmp " + pidFilePath + "\n"); + sb.append(ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec"); + sb.append(" /bin/bash "); + sb.append("-c "); + sb.append("\""); + sb.append(launchScriptDst); + sb.append("\"\n"); + PrintStream pout = null; + try { + pout = new PrintStream(out); + pout.append(sb); + } finally { + if (out != null) { + out.close(); + } + } + } + @Override public boolean signalContainer(String user, String pid, Signal signal) throws IOException { final String sigpid = ContainerExecutor.isSetsidAvailable ? "-" + pid : pid; + LOG.debug("Sending signal " + signal.getValue() + " to pid " + sigpid + + " as user " + user); try { sendSignal(sigpid, Signal.NULL); } catch (ExitCodeException e) { @@ -189,8 +252,8 @@ public boolean signalContainer(String user, String pid, Signal signal) */ protected void sendSignal(String pid, Signal signal) throws IOException { ShellCommandExecutor shexec = null; - String[] arg = { "kill", "-" + signal.getValue(), pid }; - shexec = new ShellCommandExecutor(arg); + String[] arg = { "kill", "-" + signal.getValue(), pid }; + shexec = new ShellCommandExecutor(arg); shexec.execute(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index 0da76b29f43..a3cb8d77ab9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -155,36 +155,45 @@ public int launchContainer(Container container, ContainerId containerId = container.getContainerID(); String containerIdStr = ConverterUtils.toString(containerId); - List command = new ArrayList( - Arrays.asList(containerExecutorExe, - user, - Integer.toString(Commands.LAUNCH_CONTAINER.getValue()), - appId, - containerIdStr, - containerWorkDir.toString(), - nmPrivateCotainerScriptPath.toUri().getPath().toString(), - nmPrivateTokensPath.toUri().getPath().toString())); - String[] commandArray = command.toArray(new String[command.size()]); - ShellCommandExecutor shExec = - new ShellCommandExecutor( - commandArray, - null, // NM's cwd - container.getLaunchContext().getEnvironment()); // sanitized env - launchCommandObjs.put(containerId, shExec); - // DEBUG - LOG.info("launchContainer: " + Arrays.toString(commandArray)); + + ShellCommandExecutor shExec = null; + try { - shExec.execute(); - if (LOG.isDebugEnabled()) { - logOutput(shExec.getOutput()); + Path pidFilePath = getPidFilePath(containerId); + if (pidFilePath != null) { + List command = new ArrayList(Arrays.asList( + containerExecutorExe, user, Integer + .toString(Commands.LAUNCH_CONTAINER.getValue()), appId, + containerIdStr, containerWorkDir.toString(), + nmPrivateCotainerScriptPath.toUri().getPath().toString(), + nmPrivateTokensPath.toUri().getPath().toString(), pidFilePath + .toString())); + String[] commandArray = command.toArray(new String[command.size()]); + shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd + container.getLaunchContext().getEnvironment()); // sanitized env + // DEBUG + LOG.info("launchContainer: " + Arrays.toString(commandArray)); + shExec.execute(); + if (LOG.isDebugEnabled()) { + logOutput(shExec.getOutput()); + } + } else { + LOG.info("Container was marked as inactive. Returning terminated error"); + return ExitCode.TERMINATED.getExitCode(); } } catch (ExitCodeException e) { + + if (null == shExec) { + return -1; + } + int exitCode = shExec.getExitCode(); LOG.warn("Exit code from container is : " + exitCode); // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was // terminated/killed forcefully. In all other cases, log the // container-executor's output - if (exitCode != 143 && exitCode != 137) { + if (exitCode != ExitCode.FORCE_KILLED.getExitCode() + && exitCode != ExitCode.TERMINATED.getExitCode()) { LOG.warn("Exception from container-launch : ", e); logOutput(shExec.getOutput()); String diagnostics = "Exception from container-launch: \n" @@ -197,7 +206,7 @@ public int launchContainer(Container container, } return exitCode; } finally { - launchCommandObjs.remove(containerId); + ; // } if (LOG.isDebugEnabled()) { LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:"); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index b9a2c9a720d..4b9bf165024 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -168,8 +168,6 @@ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) .addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) - // TODO race: Can lead to a CONTAINER_LAUNCHED event at state KILLING, - // and a container which will never be killed by the NM. .addTransition(ContainerState.LOCALIZED, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) @@ -239,6 +237,13 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) ContainerState.DONE, ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, CONTAINER_DONE_TRANSITION) + // Handle a launched container during killing stage is a no-op + // as cleanup container is always handled after launch container event + // in the container launcher + .addTransition(ContainerState.KILLING, + ContainerState.KILLING, + ContainerEventType.CONTAINER_LAUNCHED, + new ContainerTransition()) // From CONTAINER_CLEANEDUP_AFTER_KILL State. .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 00f03398fbe..fa7582192bf 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,11 +45,14 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; @@ -56,8 +60,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; -import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; import org.apache.hadoop.yarn.util.Apps; +import org.apache.hadoop.yarn.util.ConverterUtils; + public class ContainerLaunch implements Callable { private static final Log LOG = LogFactory.getLog(ContainerLaunch.class); @@ -65,12 +71,22 @@ public class ContainerLaunch implements Callable { public static final String CONTAINER_SCRIPT = "launch_container.sh"; public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens"; + private static final String PID_FILE_NAME_FMT = "%s.pid"; + private final Dispatcher dispatcher; private final ContainerExecutor exec; private final Application app; private final Container container; private final Configuration conf; private final LocalDirAllocator logDirsSelector; + + private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false); + private volatile AtomicBoolean completed = new AtomicBoolean(false); + + private long sleepDelayBeforeSigKill = 250; + private long maxKillWaitTime = 2000; + + private Path pidFilePath = null; public ContainerLaunch(Configuration configuration, Dispatcher dispatcher, ContainerExecutor exec, Application app, Container container) { @@ -80,6 +96,12 @@ public ContainerLaunch(Configuration configuration, Dispatcher dispatcher, this.container = container; this.dispatcher = dispatcher; this.logDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS); + this.sleepDelayBeforeSigKill = + conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, + YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS); + this.maxKillWaitTime = + conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS, + YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS); } @Override @@ -87,7 +109,8 @@ public ContainerLaunch(Configuration configuration, Dispatcher dispatcher, public Integer call() { final ContainerLaunchContext launchContext = container.getLaunchContext(); final Map localResources = container.getLocalizedResources(); - String containerIdStr = ConverterUtils.toString(container.getContainerID()); + ContainerId containerID = container.getContainerID(); + String containerIdStr = ConverterUtils.toString(containerID); final String user = launchContext.getUser(); final List command = launchContext.getCommands(); int ret = -1; @@ -145,6 +168,17 @@ public Integer call() { + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr + Path.SEPARATOR + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, this.conf, false); + + String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT, + containerIdStr); + + // pid file should be in nm private dir so that it is not + // accessible by users + pidFilePath = lDirAllocator.getLocalPathForWrite( + ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR + + pidFileSuffix, + this.conf); + try { // /////////// Write out the container-script in the nmPrivate space. String[] localDirs = @@ -189,21 +223,36 @@ public Integer call() { // LaunchContainer is a blocking call. We are here almost means the // container is launched, so send out the event. dispatcher.getEventHandler().handle(new ContainerEvent( - container.getContainerID(), + containerID, ContainerEventType.CONTAINER_LAUNCHED)); - ret = - exec.launchContainer(container, nmPrivateContainerScriptPath, - nmPrivateTokensPath, user, appIdStr, containerWorkDir); + // Check if the container is signalled to be killed. + if (!shouldLaunchContainer.compareAndSet(false, true)) { + LOG.info("Container " + containerIdStr + " not launched as " + + "cleanup already called"); + ret = ExitCode.TERMINATED.getExitCode(); + } + else { + exec.activateContainer(containerID, pidFilePath); + ret = + exec.launchContainer(container, nmPrivateContainerScriptPath, + nmPrivateTokensPath, user, appIdStr, containerWorkDir); + } } catch (Throwable e) { LOG.warn("Failed to launch container", e); dispatcher.getEventHandler().handle(new ContainerExitEvent( launchContext.getContainerId(), ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret)); return ret; + } finally { + completed.set(true); + exec.deactivateContainer(containerID); } - if (ret == ExitCode.KILLED.getExitCode()) { + LOG.debug("Container " + containerIdStr + " completed with exit code " + + ret); + if (ret == ExitCode.FORCE_KILLED.getExitCode() + || ret == ExitCode.TERMINATED.getExitCode()) { // If the process was killed, Send container_cleanedup_after_kill and // just break out of this method. dispatcher.getEventHandler().handle( @@ -226,6 +275,114 @@ public Integer call() { ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); return 0; } + + /** + * Cleanup the container. + * Cancels the launch if launch has not started yet or signals + * the executor to not execute the process if not already done so. + * Also, sends a SIGTERM followed by a SIGKILL to the process if + * the process id is available. + * @throws IOException + */ + public void cleanupContainer() throws IOException { + ContainerId containerId = container.getContainerID(); + String containerIdStr = ConverterUtils.toString(containerId); + LOG.info("Cleaning up container " + containerIdStr); + + // launch flag will be set to true if process already launched + boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true); + if (!alreadyLaunched) { + LOG.info("Container " + containerIdStr + " not launched." + + " No cleanup needed to be done"); + return; + } + + LOG.debug("Marking container " + containerIdStr + " as inactive"); + // this should ensure that if the container process has not launched + // by this time, it will never be launched + exec.deactivateContainer(containerId); + + LOG.debug("Getting pid for container " + containerIdStr + " to kill" + + " from pid file " + + (pidFilePath != null ? pidFilePath.toString() : "null")); + + // however the container process may have already started + try { + + // get process id from pid file if available + // else if shell is still active, get it from the shell + String processId = null; + if (pidFilePath != null) { + processId = getContainerPid(pidFilePath); + } + + // kill process + if (processId != null) { + String user = container.getLaunchContext().getUser(); + LOG.debug("Sending signal to pid " + processId + + " as user " + user + + " for container " + containerIdStr); + if (sleepDelayBeforeSigKill > 0) { + boolean result = exec.signalContainer(user, + processId, Signal.TERM); + LOG.debug("Sent signal to pid " + processId + + " as user " + user + + " for container " + containerIdStr + + ", result=" + (result? "success" : "failed")); + new DelayedProcessKiller(user, + processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start(); + } + } + } catch (Exception e) { + LOG.warn("Got error when trying to cleanup container " + containerIdStr + + ", error=" + e.getMessage()); + } finally { + // cleanup pid file if present + if (pidFilePath != null) { + FileContext lfs = FileContext.getLocalFSFileContext(); + lfs.delete(pidFilePath, false); + } + } + } + + /** + * Loop through for a time-bounded interval waiting to + * read the process id from a file generated by a running process. + * @param pidFilePath File from which to read the process id + * @return Process ID + * @throws Exception + */ + private String getContainerPid(Path pidFilePath) throws Exception { + String containerIdStr = + ConverterUtils.toString(container.getContainerID()); + String processId = null; + LOG.debug("Accessing pid for container " + containerIdStr + + " from pid file " + pidFilePath); + int sleepCounter = 0; + final int sleepInterval = 100; + + // loop waiting for pid file to show up + // until either the completed flag is set which means something bad + // happened or our timer expires in which case we admit defeat + while (!completed.get()) { + processId = ProcessIdFileReader.getProcessId(pidFilePath); + if (processId != null) { + LOG.debug("Got pid " + processId + " for container " + + containerIdStr); + break; + } + else if ((sleepCounter*sleepInterval) > maxKillWaitTime) { + LOG.info("Could not get pid for " + containerIdStr + + ". Waited for " + maxKillWaitTime + " ms."); + break; + } + else { + ++sleepCounter; + Thread.sleep(sleepInterval); + } + } + return processId; + } private String getContainerPrivateDir(String appIdStr, String containerIdStr) { return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr @@ -287,7 +444,7 @@ public void line(String... command) { public String toString() { return sb.toString(); } - + } private static void putEnvIfNotNull( @@ -374,9 +531,9 @@ static void writeLaunchEnv(OutputStream out, sb.symlink(link.getKey(), link.getValue()); } } + ArrayList cmd = new ArrayList(2 * command.size() + 5); - cmd.add(ContainerExecutor.isSetsidAvailable ? "exec setsid " : "exec "); - cmd.add("/bin/bash "); + cmd.add("exec /bin/bash "); cmd.add("-c "); cmd.add("\""); for (String cs : command) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index 6c9755873b2..8f8bfc76885 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -26,15 +26,17 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -52,6 +54,8 @@ public class ContainersLauncher extends AbstractService implements EventHandler { + private static final Log LOG = LogFactory.getLog(ContainersLauncher.class); + private final Context context; private final ContainerExecutor exec; private final Dispatcher dispatcher; @@ -64,13 +68,14 @@ public class ContainersLauncher extends AbstractService Collections.synchronizedMap(new HashMap()); private static final class RunningContainer { - public RunningContainer(String string, Future submit) { - this.user = string; + public RunningContainer(Future submit, + ContainerLaunch launcher) { this.runningcontainer = submit; + this.launcher = launcher; } - String user; Future runningcontainer; + ContainerLaunch launcher; } @@ -104,7 +109,6 @@ public void handle(ContainersLauncherEvent event) { // TODO: ContainersLauncher launches containers one by one!! Container container = event.getContainer(); ContainerId containerId = container.getContainerID(); - String userName = container.getUser(); switch (event.getType()) { case LAUNCH_CONTAINER: Application app = @@ -114,33 +118,26 @@ public void handle(ContainersLauncherEvent event) { new ContainerLaunch(getConfig(), dispatcher, exec, app, event.getContainer()); running.put(containerId, - new RunningContainer(userName, - containerLauncher.submit(launch))); + new RunningContainer(containerLauncher.submit(launch), + launch)); break; case CLEANUP_CONTAINER: RunningContainer rContainerDatum = running.remove(containerId); Future rContainer = rContainerDatum.runningcontainer; - if (rContainer != null) { - - if (rContainer.isDone()) { - // The future is already done by this time. - break; - } - - // Cancel the future so that it won't be launched if it isn't already. + if (rContainer != null + && !rContainer.isDone()) { + // Cancel the future so that it won't be launched + // if it isn't already. rContainer.cancel(false); - - // Kill the container - String processId = exec.getProcessId(containerId); - if (processId != null) { - try { - exec.signalContainer(rContainerDatum.user, - processId, Signal.KILL); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } + } + + // Cleanup a container whether it is running/killed/completed, so that + // no sub-processes are alive. + try { + rContainerDatum.launcher.cleanupContainer(); + } catch (IOException e) { + LOG.warn("Got exception while cleaning container " + containerId + + ". Ignoring."); } break; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java index a2dee0f4457..5eb146db2c0 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java @@ -185,7 +185,9 @@ private int runAndBlock(ContainerId cId, String ... cmd) throws IOException { Path scriptPath = new Path(script); Path tokensPath = new Path("/dev/null"); Path workDir = new Path(workSpace.getAbsolutePath()); - + Path pidFile = new Path(workDir, "pid.txt"); + + exec.activateContainer(cId, pidFile); return exec.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java index 32f252e6a88..41517c20204 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java @@ -60,7 +60,8 @@ private void deleteMockParamFile() { private List readMockParams() throws IOException { LinkedList ret = new LinkedList(); - LineNumberReader reader = new LineNumberReader(new FileReader(mockParamFile)); + LineNumberReader reader = new LineNumberReader(new FileReader( + mockParamFile)); String line; while((line = reader.readLine()) != null) { ret.add(line); @@ -70,7 +71,7 @@ private List readMockParams() throws IOException { } @Before - public void setup() throws IOException { + public void setup() { File f = new File("./src/test/resources/mock-container-executor"); if(!f.canExecute()) { f.setExecutable(true); @@ -83,7 +84,7 @@ public void setup() throws IOException { } @After - public void tearDown() throws IOException { + public void tearDown() { deleteMockParamFile(); } @@ -109,11 +110,14 @@ public void testContainerLaunch() throws IOException { Path scriptPath = new Path("file:///bin/echo"); Path tokensPath = new Path("file:///dev/null"); Path workDir = new Path("/tmp"); - int ret = mockExec.launchContainer(container, scriptPath, tokensPath, + Path pidFile = new Path(workDir, "pid.txt"); + + mockExec.activateContainer(cId, pidFile); + int ret = mockExec.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir); assertEquals(0, ret); - assertEquals(Arrays.asList(appSubmitter, cmd, appId, containerId, - workDir.toString(), "/bin/echo", "/dev/null"), + assertEquals(Arrays.asList(appSubmitter, cmd, appId, containerId, + workDir.toString(), "/bin/echo", "/dev/null", pidFile), readMockParams()); } @@ -141,4 +145,4 @@ public void testDeleteAsUser() throws IOException { assertEquals(Arrays.asList(appSubmitter, cmd, "/tmp/testdir"), readMockParams()); } -} \ No newline at end of file +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 939acec5d4f..c096598cc94 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -280,7 +280,7 @@ public void testContainerLaunchAndStop() throws IOException, gcsRequest.setContainerId(cId); ContainerStatus containerStatus = containerManager.getContainerStatus(gcsRequest).getStatus(); - Assert.assertEquals(ExitCode.KILLED.getExitCode(), + Assert.assertEquals(ExitCode.TERMINATED.getExitCode(), containerStatus.getExitStatus()); // Assert that the process is not alive anymore diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index b05cf3dd8ae..f8477c2d42c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -168,7 +168,7 @@ public void testCleanupOnFailure() throws Exception { wc.localizeResources(); wc.launchContainer(); reset(wc.localizerBus); - wc.containerFailed(ExitCode.KILLED.getExitCode()); + wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode()); assertEquals(ContainerState.EXITED_WITH_FAILURE, wc.c.getContainerState()); verifyCleanupCall(wc); @@ -268,6 +268,26 @@ public boolean matches(Object o) { } } + @Test + public void testLaunchAfterKillRequest() throws Exception { + WrappedContainer wc = null; + try { + wc = new WrappedContainer(14, 314159265358979L, 4344, "yak"); + wc.initContainer(); + wc.localizeResources(); + wc.killContainer(); + assertEquals(ContainerState.KILLING, wc.c.getContainerState()); + wc.launchContainer(); + assertEquals(ContainerState.KILLING, wc.c.getContainerState()); + wc.containerKilledOnRequest(); + verifyCleanupCall(wc); + } finally { + if (wc != null) { + wc.finished(); + } + } + } + private void verifyCleanupCall(WrappedContainer wc) throws Exception { ResourcesReleasedMatcher matchesReq = new ResourcesReleasedMatcher(wc.localResources, EnumSet.of( @@ -511,7 +531,7 @@ public void killContainer() { public void containerKilledOnRequest() { c.handle(new ContainerExitEvent(cId, - ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.KILLED + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.FORCE_KILLED .getExitCode())); drainDispatcherEvents(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java index 372ec93d1d7..bdd77f8a20b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java @@ -32,7 +32,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.regex.Pattern; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; @@ -59,7 +58,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; -import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.junit.Before; import org.junit.Test; @@ -76,22 +74,21 @@ public void setup() throws IOException { conf.setClass( YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, LinuxResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class); + conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 1000); super.setup(); } @Test public void testSpecialCharSymlinks() throws IOException { - String rootDir = new File(System.getProperty( - "test.build.data", "/tmp")).getAbsolutePath(); File shellFile = null; File tempFile = null; String badSymlink = "foo@zz%_#*&!-+= bar()"; File symLinkFile = null; try { - shellFile = new File(rootDir, "hello.sh"); - tempFile = new File(rootDir, "temp.sh"); + shellFile = new File(tmpDir, "hello.sh"); + tempFile = new File(tmpDir, "temp.sh"); String timeoutCommand = "echo \"hello\""; PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile)); shellFile.setExecutable(true); @@ -113,15 +110,14 @@ public void testSpecialCharSymlinks() throws IOException { fos.close(); tempFile.setExecutable(true); - File rootDirFile = new File(rootDir); Shell.ShellCommandExecutor shexc - = new Shell.ShellCommandExecutor(new String[]{tempFile.getAbsolutePath()}, rootDirFile); + = new Shell.ShellCommandExecutor(new String[]{tempFile.getAbsolutePath()}, tmpDir); shexc.execute(); assertEquals(shexc.getExitCode(), 0); assert(shexc.getOutput().contains("hello")); - symLinkFile = new File(rootDir, badSymlink); + symLinkFile = new File(tmpDir, badSymlink); } finally { // cleanup @@ -141,6 +137,7 @@ public void testSpecialCharSymlinks() throws IOException { } // this is a dirty hack - but should be ok for a unittest. + @SuppressWarnings({ "rawtypes", "unchecked" }) public static void setNewEnvironmentHack(Map newenv) throws Exception { Class[] classes = Collections.class.getDeclaredClasses(); Map env = System.getenv(); @@ -162,7 +159,6 @@ public static void setNewEnvironmentHack(Map newenv) throws Exce */ @Test public void testContainerEnvVariables() throws Exception { - int exitCode = 0; containerManager.start(); Map envWithDummy = new HashMap(); @@ -217,7 +213,7 @@ public void testContainerEnvVariables() throws Exception { new HashMap(); localResources.put(destinationFile, rsrc_alpha); containerLaunchContext.setLocalResources(localResources); - + // set up the rest of the container containerLaunchContext.setUser(containerLaunchContext.getUser()); List commands = new ArrayList(); @@ -226,11 +222,11 @@ public void testContainerEnvVariables() throws Exception { containerLaunchContext.setCommands(commands); containerLaunchContext.setResource(recordFactory .newRecordInstance(Resource.class)); - containerLaunchContext.getResource().setMemory(100 * 1024 * 1024); + containerLaunchContext.getResource().setMemory(1024); StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); startRequest.setContainerLaunchContext(containerLaunchContext); containerManager.startContainer(startRequest); - + int timeoutSecs = 0; while (!processStartFile.exists() && timeoutSecs++ < 20) { Thread.sleep(1000); @@ -238,7 +234,7 @@ public void testContainerEnvVariables() throws Exception { } Assert.assertTrue("ProcessStartFile doesn't exist!", processStartFile.exists()); - + // Now verify the contents of the file BufferedReader reader = new BufferedReader(new FileReader(processStartFile)); @@ -265,13 +261,13 @@ public void testContainerEnvVariables() throws Exception { BaseContainerManagerTest.waitForContainerState(containerManager, cId, ContainerState.COMPLETE); - + GetContainerStatusRequest gcsRequest = recordFactory.newRecordInstance(GetContainerStatusRequest.class); gcsRequest.setContainerId(cId); ContainerStatus containerStatus = containerManager.getContainerStatus(gcsRequest).getStatus(); - Assert.assertEquals(ExitCode.KILLED.getExitCode(), + Assert.assertEquals(ExitCode.TERMINATED.getExitCode(), containerStatus.getExitStatus()); // Assert that the process is not alive anymore @@ -279,4 +275,119 @@ public void testContainerEnvVariables() throws Exception { exec.signalContainer(user, pid, Signal.NULL)); } + + @Test + public void testDelayedKill() throws Exception { + containerManager.start(); + + File processStartFile = + new File(tmpDir, "pid.txt").getAbsoluteFile(); + + // setup a script that can handle sigterm gracefully + File scriptFile = new File(tmpDir, "testscript.sh"); + PrintWriter writer = new PrintWriter(new FileOutputStream(scriptFile)); + writer.println("#!/bin/bash\n\n"); + writer.println("echo \"Running testscript for delayed kill\""); + writer.println("hello=\"Got SIGTERM\""); + writer.println("umask 0"); + writer.println("trap \"echo $hello >> " + processStartFile + "\" SIGTERM"); + writer.println("echo \"Writing pid to start file\""); + writer.println("echo $$ >> " + processStartFile); + writer.println("while true; do\nsleep 1s;\ndone"); + writer.close(); + scriptFile.setExecutable(true); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + // ////// Construct the Container-id + ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); + appId.setClusterTimestamp(1); + appId.setId(1); + ApplicationAttemptId appAttemptId = + recordFactory.newRecordInstance(ApplicationAttemptId.class); + appAttemptId.setApplicationId(appId); + appAttemptId.setAttemptId(1); + ContainerId cId = + recordFactory.newRecordInstance(ContainerId.class); + cId.setApplicationAttemptId(appAttemptId); + containerLaunchContext.setContainerId(cId); + + containerLaunchContext.setUser(user); + + // upload the script file so that the container can run it + URL resource_alpha = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file.sh"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + + // set up the rest of the container + containerLaunchContext.setUser(containerLaunchContext.getUser()); + List commands = new ArrayList(); + commands.add(scriptFile.getAbsolutePath()); + containerLaunchContext.setCommands(commands); + containerLaunchContext.setResource(recordFactory + .newRecordInstance(Resource.class)); + containerLaunchContext.getResource().setMemory(1024); + StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); + startRequest.setContainerLaunchContext(containerLaunchContext); + containerManager.startContainer(startRequest); + + int timeoutSecs = 0; + while (!processStartFile.exists() && timeoutSecs++ < 20) { + Thread.sleep(1000); + LOG.info("Waiting for process start-file to be created"); + } + Assert.assertTrue("ProcessStartFile doesn't exist!", + processStartFile.exists()); + + // Now test the stop functionality. + StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class); + stopRequest.setContainerId(cId); + containerManager.stopContainer(stopRequest); + + BaseContainerManagerTest.waitForContainerState(containerManager, cId, + ContainerState.COMPLETE); + + // container stop sends a sigterm followed by a sigkill + GetContainerStatusRequest gcsRequest = + recordFactory.newRecordInstance(GetContainerStatusRequest.class); + gcsRequest.setContainerId(cId); + ContainerStatus containerStatus = + containerManager.getContainerStatus(gcsRequest).getStatus(); + Assert.assertEquals(ExitCode.FORCE_KILLED.getExitCode(), + containerStatus.getExitStatus()); + + // Now verify the contents of the file + // Script generates a message when it receives a sigterm + // so we look for that + BufferedReader reader = + new BufferedReader(new FileReader(processStartFile)); + + boolean foundSigTermMessage = false; + while (true) { + String line = reader.readLine(); + if (line == null) { + break; + } + if (line.contains("SIGTERM")) { + foundSigTermMessage = true; + break; + } + } + Assert.assertTrue("Did not find sigterm message", foundSigTermMessage); + reader.close(); + } + } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java index 624de65eb22..d6738cdebf2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java @@ -262,7 +262,7 @@ public void testContainerKillOnMemoryOverflow() throws IOException, gcsRequest.setContainerId(cId); ContainerStatus containerStatus = containerManager.getContainerStatus(gcsRequest).getStatus(); - Assert.assertEquals(ExitCode.KILLED.getExitCode(), + Assert.assertEquals(ExitCode.TERMINATED.getExitCode(), containerStatus.getExitStatus()); String expectedMsgPattern = "Container \\[pid=" + pid + ",containerID=" + cId diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/mock-container-executor b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/mock-container-executor index d71bd6cec86..d1cb9da0de3 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/mock-container-executor +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/mock-container-executor @@ -5,6 +5,6 @@ do done > params.txt if [[ "$2" == "1" ]]; then - cd $5; - exec $6; + cd $6; + exec $7; fi;