MAPREDUCE-3240. Fixed NodeManager to be able to forcefully cleanup its containers (process-trees) irrespective of whether the container succeeded, or killed. Contributed by Hitesh Shah.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1189713 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-27 12:06:08 +00:00
parent d5ae121690
commit c5028eaa38
21 changed files with 691 additions and 134 deletions

View File

@ -1743,6 +1743,10 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3279. Fixed TestJobHistoryParsing which assumed user name to be MAPREDUCE-3279. Fixed TestJobHistoryParsing which assumed user name to be
mapred all the time. (Siddharth Seth via acmurthy) mapred all the time. (Siddharth Seth via acmurthy)
MAPREDUCE-3240. Fixed NodeManager to be able to forcefully cleanup its
containers (process-trees) irrespective of whether the container succeeded,
or killed. Contributed by Hitesh Shah.
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -166,8 +166,6 @@ public abstract class RMContainerRequestor extends RMCommunicator {
for (ResourceRequest req : reqMap.values()) { for (ResourceRequest req : reqMap.values()) {
if (!ask.remove(req)) { if (!ask.remove(req)) {
foundAll = false; foundAll = false;
}
else {
// if ask already sent to RM, we can try and overwrite it if possible. // if ask already sent to RM, we can try and overwrite it if possible.
// send a new ask to RM with numContainers // send a new ask to RM with numContainers
// specified for the blacklisted host to be 0. // specified for the blacklisted host to be 0.
@ -181,7 +179,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
// we can remove this request // we can remove this request
if (foundAll) { if (foundAll) {
remoteRequests.remove(hostName); remoteRequests.remove(hostName);
} }
} }
// TODO handling of rack blacklisting // TODO handling of rack blacklisting
// Removing from rack should be dependent on no. of failures within the rack // Removing from rack should be dependent on no. of failures within the rack

View File

@ -411,6 +411,20 @@ public class YarnConfiguration extends Configuration {
YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER = YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER =
"security.resourcelocalizer.protocol.acl"; "security.resourcelocalizer.protocol.acl";
/** No. of milliseconds to wait between sending a SIGTERM and SIGKILL
* to a running container */
public static final String NM_SLEEP_DELAY_BEFORE_SIGKILL_MS =
NM_PREFIX + "sleep-delay-before-sigkill.ms";
public static final long DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS =
250;
/** Max time to wait for a process to come up when trying to cleanup
* container resources */
public static final String NM_PROCESS_KILL_WAIT_MS =
NM_PREFIX + "process-kill-wait.ms";
public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS =
2000;
public YarnConfiguration() { public YarnConfiguration() {
super(); super();
} }

View File

@ -366,6 +366,18 @@
<!-- <value>mapreduce.shuffle</value> --> <!-- <value>mapreduce.shuffle</value> -->
</property> </property>
<property>
<description>No. of ms to wait between sending a SIGTERM and SIGKILL to a container</description>
<name>yarn.nodemanager.sleep-delay-before-sigkill.ms</name>
<value>250</value>
</property>
<property>
<description>Max time to wait for a process to come up when trying to cleanup a container</description>
<name>yarn.nodemanager.process-kill-wait.ms</name>
<value>2000</value>
</property>
<!--Map Reduce configuration--> <!--Map Reduce configuration-->
<property> <property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name> <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>

View File

@ -45,6 +45,9 @@ FILE* ERRORFILE = NULL;
static uid_t nm_uid = -1; static uid_t nm_uid = -1;
static gid_t nm_gid = -1; static gid_t nm_gid = -1;
char *concatenate(char *concat_pattern, char *return_path_name,
int numArgs, ...);
void set_nm_uid(uid_t user, gid_t group) { void set_nm_uid(uid_t user, gid_t group) {
nm_uid = user; nm_uid = user;
nm_gid = group; nm_gid = group;
@ -147,6 +150,60 @@ static int change_effective_user(uid_t user, gid_t group) {
return 0; return 0;
} }
/**
* Write the pid of the current process into the pid file.
* pid_file: Path to pid file where pid needs to be written to
*/
static int write_pid_to_file_as_nm(const char* pid_file, pid_t pid) {
uid_t user = geteuid();
gid_t group = getegid();
if (change_effective_user(nm_uid, nm_gid) != 0) {
return -1;
}
char *temp_pid_file = concatenate("%s.tmp", "pid_file_path", 1, pid_file);
// create with 700
int pid_fd = open(temp_pid_file, O_WRONLY|O_CREAT|O_EXCL, S_IRWXU);
if (pid_fd == -1) {
fprintf(LOGFILE, "Can't open file %s as node manager - %s\n", temp_pid_file,
strerror(errno));
free(temp_pid_file);
return -1;
}
// write pid to temp file
char pid_buf[21];
snprintf(pid_buf, 21, "%d", pid);
ssize_t written = write(pid_fd, pid_buf, strlen(pid_buf));
close(pid_fd);
if (written == -1) {
fprintf(LOGFILE, "Failed to write pid to file %s as node manager - %s\n",
temp_pid_file, strerror(errno));
free(temp_pid_file);
return -1;
}
// rename temp file to actual pid file
// use rename as atomic
if (rename(temp_pid_file, pid_file)) {
fprintf(LOGFILE, "Can't move pid file from %s to %s as node manager - %s\n",
temp_pid_file, pid_file, strerror(errno));
unlink(temp_pid_file);
free(temp_pid_file);
return -1;
}
// Revert back to the calling user.
if (change_effective_user(user, group)) {
free(temp_pid_file);
return -1;
}
free(temp_pid_file);
return 0;
}
/** /**
* Change the real and effective user and group to abandon the super user * Change the real and effective user and group to abandon the super user
* priviledges. * priviledges.
@ -749,7 +806,8 @@ int initialize_app(const char *user, const char *app_id,
int launch_container_as_user(const char *user, const char *app_id, int launch_container_as_user(const char *user, const char *app_id,
const char *container_id, const char *work_dir, const char *container_id, const char *work_dir,
const char *script_name, const char *cred_file) { const char *script_name, const char *cred_file,
const char* pid_file) {
int exit_code = -1; int exit_code = -1;
char *script_file_dest = NULL; char *script_file_dest = NULL;
char *cred_file_dest = NULL; char *cred_file_dest = NULL;
@ -776,6 +834,20 @@ int launch_container_as_user(const char *user, const char *app_id,
goto cleanup; goto cleanup;
} }
// setsid
pid_t pid = setsid();
if (pid == -1) {
exit_code = SETSID_OPER_FAILED;
goto cleanup;
}
// write pid to pidfile
if (pid_file == NULL
|| write_pid_to_file_as_nm(pid_file, pid) != 0) {
exit_code = WRITE_PIDFILE_FAILED;
goto cleanup;
}
// give up root privs // give up root privs
if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) { if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
exit_code = SETUID_OPER_FAILED; exit_code = SETUID_OPER_FAILED;
@ -1031,3 +1103,5 @@ int delete_as_user(const char *user,
} }
return ret; return ret;
} }

View File

@ -51,7 +51,9 @@ enum errorcodes {
UNABLE_TO_BUILD_PATH, //21 UNABLE_TO_BUILD_PATH, //21
INVALID_CONTAINER_EXEC_PERMISSIONS, //22 INVALID_CONTAINER_EXEC_PERMISSIONS, //22
// PREPARE_JOB_LOGS_FAILED (NOT USED) 23 // PREPARE_JOB_LOGS_FAILED (NOT USED) 23
INVALID_CONFIG_FILE = 24 INVALID_CONFIG_FILE = 24,
SETSID_OPER_FAILED = 25,
WRITE_PIDFILE_FAILED = 26
}; };
#define NM_GROUP_KEY "yarn.nodemanager.linux-container-executor.group" #define NM_GROUP_KEY "yarn.nodemanager.linux-container-executor.group"
@ -106,11 +108,13 @@ int initialize_app(const char *user, const char *app_id,
* @param script_name the name of the script to be run to launch the container. * @param script_name the name of the script to be run to launch the container.
* @param cred_file the credentials file that needs to be compied to the * @param cred_file the credentials file that needs to be compied to the
* working directory. * working directory.
* @param pid_file file where pid of process should be written to
* @return -1 or errorcode enum value on error (should never return on success). * @return -1 or errorcode enum value on error (should never return on success).
*/ */
int launch_container_as_user(const char * user, const char *app_id, int launch_container_as_user(const char * user, const char *app_id,
const char *container_id, const char *work_dir, const char *container_id, const char *work_dir,
const char *script_name, const char *cred_file); const char *script_name, const char *cred_file,
const char *pid_file);
/** /**
* Function used to signal a container launched by the user. * Function used to signal a container launched by the user.

View File

@ -66,6 +66,7 @@ int main(int argc, char **argv) {
const char * cred_file = NULL; const char * cred_file = NULL;
const char * script_file = NULL; const char * script_file = NULL;
const char * current_dir = NULL; const char * current_dir = NULL;
const char * pid_file = NULL;
int exit_code = 0; int exit_code = 0;
@ -141,7 +142,7 @@ int main(int argc, char **argv) {
argv + optind); argv + optind);
break; break;
case LAUNCH_CONTAINER: case LAUNCH_CONTAINER:
if (argc < 8) { if (argc < 9) {
fprintf(ERRORFILE, "Too few arguments (%d vs 8) for launch container\n", fprintf(ERRORFILE, "Too few arguments (%d vs 8) for launch container\n",
argc); argc);
fflush(ERRORFILE); fflush(ERRORFILE);
@ -152,8 +153,9 @@ int main(int argc, char **argv) {
current_dir = argv[optind++]; current_dir = argv[optind++];
script_file = argv[optind++]; script_file = argv[optind++];
cred_file = argv[optind++]; cred_file = argv[optind++];
pid_file = argv[optind++];
exit_code = launch_container_as_user(user_detail->pw_name, app_id, container_id, exit_code = launch_container_as_user(user_detail->pw_name, app_id, container_id,
current_dir, script_file, cred_file); current_dir, script_file, cred_file, pid_file);
break; break;
case SIGNAL_CONTAINER: case SIGNAL_CONTAINER:
if (argc < 5) { if (argc < 5) {

View File

@ -590,6 +590,7 @@ void test_run_container() {
fflush(stderr); fflush(stderr);
char* container_dir = get_container_work_directory(TEST_ROOT "/local-1", char* container_dir = get_container_work_directory(TEST_ROOT "/local-1",
username, "app_4", "container_1"); username, "app_4", "container_1");
const char * pid_file = TEST_ROOT "/pid.txt";
pid_t child = fork(); pid_t child = fork();
if (child == -1) { if (child == -1) {
printf("FAIL: failed to fork process for init_app - %s\n", printf("FAIL: failed to fork process for init_app - %s\n",
@ -597,7 +598,7 @@ void test_run_container() {
exit(1); exit(1);
} else if (child == 0) { } else if (child == 0) {
if (launch_container_as_user(username, "app_4", "container_1", if (launch_container_as_user(username, "app_4", "container_1",
container_dir, script_name, TEST_ROOT "creds.txt") != 0) { container_dir, script_name, TEST_ROOT "/creds.txt", pid_file) != 0) {
printf("FAIL: failed in child\n"); printf("FAIL: failed in child\n");
exit(42); exit(42);
} }
@ -631,6 +632,32 @@ void test_run_container() {
exit(1); exit(1);
} }
free(container_dir); free(container_dir);
if(access(pid_file, R_OK) != 0) {
printf("FAIL: failed to create pid file %s\n", pid_file);
exit(1);
}
int pidfd = open(pid_file, O_RDONLY);
if (pidfd == -1) {
printf("FAIL: failed to open pid file %s - %s\n", pid_file, strerror(errno));
exit(1);
}
char pidBuf[100];
ssize_t bytes = read(pidfd, pidBuf, 100);
if (bytes == -1) {
printf("FAIL: failed to read from pid file %s - %s\n", pid_file, strerror(errno));
exit(1);
}
pid_t mypid = child;
char myPidBuf[33];
snprintf(myPidBuf, 33, "%d", mypid);
if (strncmp(pidBuf, myPidBuf, strlen(myPidBuf)) != 0) {
printf("FAIL: failed to find matching pid in pid file\n");
printf("FAIL: Expected pid %d : Got %.*s", mypid, (int)bytes, pidBuf);
exit(1);
}
} }
int main(int argc, char **argv) { int main(int argc, char **argv) {

View File

@ -19,12 +19,13 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -35,6 +36,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
public abstract class ContainerExecutor implements Configurable { public abstract class ContainerExecutor implements Configurable {
@ -43,8 +45,12 @@ public abstract class ContainerExecutor implements Configurable {
FsPermission.createImmutable((short) 0700); FsPermission.createImmutable((short) 0700);
private Configuration conf; private Configuration conf;
protected ConcurrentMap<ContainerId, ShellCommandExecutor> launchCommandObjs = private ConcurrentMap<ContainerId, Path> pidFiles =
new ConcurrentHashMap<ContainerId, ShellCommandExecutor>(); new ConcurrentHashMap<ContainerId, Path>();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReadLock readLock = lock.readLock();
private final WriteLock writeLock = lock.writeLock();
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
@ -102,7 +108,8 @@ public abstract class ContainerExecutor implements Configurable {
throws IOException, InterruptedException; throws IOException, InterruptedException;
public enum ExitCode { public enum ExitCode {
KILLED(137); FORCE_KILLED(137),
TERMINATED(143);
private final int code; private final int code;
private ExitCode(int exitCode) { private ExitCode(int exitCode) {
@ -149,6 +156,66 @@ public abstract class ContainerExecutor implements Configurable {
} }
} }
/**
* Get the pidFile of the container.
* @param containerId
* @return the path of the pid-file for the given containerId.
*/
protected Path getPidFilePath(ContainerId containerId) {
try {
readLock.lock();
return (this.pidFiles.get(containerId));
} finally {
readLock.unlock();
}
}
/**
* Is the container still active?
* @param containerId
* @return true if the container is active else false.
*/
protected boolean isContainerActive(ContainerId containerId) {
try {
readLock.lock();
return (this.pidFiles.containsKey(containerId));
} finally {
readLock.unlock();
}
}
/**
* Mark the container as active
*
* @param containerId
* the ContainerId
* @param pidFilePath
* Path where the executor should write the pid of the launched
* process
*/
public void activateContainer(ContainerId containerId, Path pidFilePath) {
try {
writeLock.lock();
this.pidFiles.put(containerId, pidFilePath);
} finally {
writeLock.unlock();
}
}
/**
* Mark the container as inactive.
* Done iff the container is still active. Else treat it as
* a no-op
*/
public void deactivateContainer(ContainerId containerId) {
try {
writeLock.lock();
this.pidFiles.remove(containerId);
} finally {
writeLock.unlock();
}
}
/** /**
* Get the process-identifier for the container * Get the process-identifier for the container
* *
@ -158,28 +225,15 @@ public abstract class ContainerExecutor implements Configurable {
*/ */
public String getProcessId(ContainerId containerID) { public String getProcessId(ContainerId containerID) {
String pid = null; String pid = null;
ShellCommandExecutor shExec = launchCommandObjs.get(containerID); Path pidFile = pidFiles.get(containerID);
if (shExec == null) { if (pidFile == null) {
// This container isn't even launched yet. // This container isn't even launched yet.
return pid; return pid;
} }
Process proc = shExec.getProcess();
if (proc == null) {
// This happens if the command is not yet started
return pid;
}
try { try {
Field pidField = proc.getClass().getDeclaredField("pid"); pid = ProcessIdFileReader.getProcessId(pidFile);
pidField.setAccessible(true); } catch (IOException e) {
pid = ((Integer) pidField.get(proc)).toString(); LOG.error("Got exception reading pid from pid-file " + pidFile, e);
} catch (SecurityException e) {
// SecurityManager not expected with yarn. Ignore.
} catch (NoSuchFieldException e) {
// Yarn only on UNIX for now. Ignore.
} catch (IllegalArgumentException e) {
;
} catch (IllegalAccessException e) {
;
} }
return pid; return pid;
} }

View File

@ -18,10 +18,16 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -48,6 +54,9 @@ public class DefaultContainerExecutor extends ContainerExecutor {
private final FileContext lfs; private final FileContext lfs;
private static final String WRAPPER_LAUNCH_SCRIPT =
"default_container_executor.sh";
public DefaultContainerExecutor() { public DefaultContainerExecutor() {
try { try {
this.lfs = FileContext.getLocalFSFileContext(); this.lfs = FileContext.getLocalFSFileContext();
@ -100,8 +109,9 @@ public class DefaultContainerExecutor extends ContainerExecutor {
ConverterUtils.toString( ConverterUtils.toString(
container.getContainerID().getApplicationAttemptId(). container.getContainerID().getApplicationAttemptId().
getApplicationId()); getApplicationId());
String[] sLocalDirs = String[] sLocalDirs = getConf().getStrings(
getConf().getStrings(YarnConfiguration.NM_LOCAL_DIRS, YarnConfiguration.DEFAULT_NM_LOCAL_DIRS); YarnConfiguration.NM_LOCAL_DIRS,
YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
for (String sLocalDir : sLocalDirs) { for (String sLocalDir : sLocalDirs) {
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE); Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName); Path userdir = new Path(usersdir, userName);
@ -124,21 +134,47 @@ public class DefaultContainerExecutor extends ContainerExecutor {
new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE); new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
lfs.util().copy(nmPrivateTokensPath, tokenDst); lfs.util().copy(nmPrivateTokensPath, tokenDst);
// Create new local launch wrapper script
Path wrapperScriptDst = new Path(containerWorkDir, WRAPPER_LAUNCH_SCRIPT);
DataOutputStream wrapperScriptOutStream =
lfs.create(wrapperScriptDst,
EnumSet.of(CREATE, OVERWRITE));
Path pidFile = getPidFilePath(containerId);
if (pidFile != null) {
writeLocalWrapperScript(wrapperScriptOutStream, launchDst.toUri()
.getPath().toString(), pidFile.toString());
} else {
LOG.info("Container " + containerIdStr
+ " was marked as inactive. Returning terminated error");
return ExitCode.TERMINATED.getExitCode();
}
// create log dir under app // create log dir under app
// fork script // fork script
ShellCommandExecutor shExec = null; ShellCommandExecutor shExec = null;
try { try {
lfs.setPermission(launchDst, lfs.setPermission(launchDst,
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
String[] command = lfs.setPermission(wrapperScriptDst,
new String[] { "bash", "-c", launchDst.toUri().getPath().toString() }; ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
// Setup command to run
String[] command = {"bash", "-c",
wrapperScriptDst.toUri().getPath().toString()};
LOG.info("launchContainer: " + Arrays.toString(command)); LOG.info("launchContainer: " + Arrays.toString(command));
shExec = new ShellCommandExecutor( shExec = new ShellCommandExecutor(
command, command,
new File(containerWorkDir.toUri().getPath()), new File(containerWorkDir.toUri().getPath()),
container.getLaunchContext().getEnvironment()); // sanitized env container.getLaunchContext().getEnvironment()); // sanitized env
launchCommandObjs.put(containerId, shExec); if (isContainerActive(containerId)) {
shExec.execute(); shExec.execute();
}
else {
LOG.info("Container " + containerIdStr +
" was marked as inactive. Returning terminated error");
return ExitCode.TERMINATED.getExitCode();
}
} catch (IOException e) { } catch (IOException e) {
if (null == shExec) { if (null == shExec) {
return -1; return -1;
@ -151,17 +187,44 @@ public class DefaultContainerExecutor extends ContainerExecutor {
message)); message));
return exitCode; return exitCode;
} finally { } finally {
launchCommandObjs.remove(containerId); ; //
} }
return 0; return 0;
} }
private void writeLocalWrapperScript(DataOutputStream out,
String launchScriptDst, String pidFilePath) throws IOException {
// We need to do a move as writing to a file is not atomic
// Process reading a file being written to may get garbled data
// hence write pid to tmp file first followed by a mv
StringBuilder sb = new StringBuilder("#!/bin/bash\n\n");
sb.append("echo $$ > " + pidFilePath + ".tmp\n");
sb.append("/bin/mv -f " + pidFilePath + ".tmp " + pidFilePath + "\n");
sb.append(ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec");
sb.append(" /bin/bash ");
sb.append("-c ");
sb.append("\"");
sb.append(launchScriptDst);
sb.append("\"\n");
PrintStream pout = null;
try {
pout = new PrintStream(out);
pout.append(sb);
} finally {
if (out != null) {
out.close();
}
}
}
@Override @Override
public boolean signalContainer(String user, String pid, Signal signal) public boolean signalContainer(String user, String pid, Signal signal)
throws IOException { throws IOException {
final String sigpid = ContainerExecutor.isSetsidAvailable final String sigpid = ContainerExecutor.isSetsidAvailable
? "-" + pid ? "-" + pid
: pid; : pid;
LOG.debug("Sending signal " + signal.getValue() + " to pid " + sigpid
+ " as user " + user);
try { try {
sendSignal(sigpid, Signal.NULL); sendSignal(sigpid, Signal.NULL);
} catch (ExitCodeException e) { } catch (ExitCodeException e) {
@ -189,8 +252,8 @@ public class DefaultContainerExecutor extends ContainerExecutor {
*/ */
protected void sendSignal(String pid, Signal signal) throws IOException { protected void sendSignal(String pid, Signal signal) throws IOException {
ShellCommandExecutor shexec = null; ShellCommandExecutor shexec = null;
String[] arg = { "kill", "-" + signal.getValue(), pid }; String[] arg = { "kill", "-" + signal.getValue(), pid };
shexec = new ShellCommandExecutor(arg); shexec = new ShellCommandExecutor(arg);
shexec.execute(); shexec.execute();
} }

View File

@ -155,36 +155,45 @@ public class LinuxContainerExecutor extends ContainerExecutor {
ContainerId containerId = container.getContainerID(); ContainerId containerId = container.getContainerID();
String containerIdStr = ConverterUtils.toString(containerId); String containerIdStr = ConverterUtils.toString(containerId);
List<String> command = new ArrayList<String>(
Arrays.asList(containerExecutorExe, ShellCommandExecutor shExec = null;
user,
Integer.toString(Commands.LAUNCH_CONTAINER.getValue()),
appId,
containerIdStr,
containerWorkDir.toString(),
nmPrivateCotainerScriptPath.toUri().getPath().toString(),
nmPrivateTokensPath.toUri().getPath().toString()));
String[] commandArray = command.toArray(new String[command.size()]);
ShellCommandExecutor shExec =
new ShellCommandExecutor(
commandArray,
null, // NM's cwd
container.getLaunchContext().getEnvironment()); // sanitized env
launchCommandObjs.put(containerId, shExec);
// DEBUG
LOG.info("launchContainer: " + Arrays.toString(commandArray));
try { try {
shExec.execute(); Path pidFilePath = getPidFilePath(containerId);
if (LOG.isDebugEnabled()) { if (pidFilePath != null) {
logOutput(shExec.getOutput()); List<String> command = new ArrayList<String>(Arrays.asList(
containerExecutorExe, user, Integer
.toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
containerIdStr, containerWorkDir.toString(),
nmPrivateCotainerScriptPath.toUri().getPath().toString(),
nmPrivateTokensPath.toUri().getPath().toString(), pidFilePath
.toString()));
String[] commandArray = command.toArray(new String[command.size()]);
shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
container.getLaunchContext().getEnvironment()); // sanitized env
// DEBUG
LOG.info("launchContainer: " + Arrays.toString(commandArray));
shExec.execute();
if (LOG.isDebugEnabled()) {
logOutput(shExec.getOutput());
}
} else {
LOG.info("Container was marked as inactive. Returning terminated error");
return ExitCode.TERMINATED.getExitCode();
} }
} catch (ExitCodeException e) { } catch (ExitCodeException e) {
if (null == shExec) {
return -1;
}
int exitCode = shExec.getExitCode(); int exitCode = shExec.getExitCode();
LOG.warn("Exit code from container is : " + exitCode); LOG.warn("Exit code from container is : " + exitCode);
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
// terminated/killed forcefully. In all other cases, log the // terminated/killed forcefully. In all other cases, log the
// container-executor's output // container-executor's output
if (exitCode != 143 && exitCode != 137) { if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
LOG.warn("Exception from container-launch : ", e); LOG.warn("Exception from container-launch : ", e);
logOutput(shExec.getOutput()); logOutput(shExec.getOutput());
String diagnostics = "Exception from container-launch: \n" String diagnostics = "Exception from container-launch: \n"
@ -197,7 +206,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
} }
return exitCode; return exitCode;
} finally { } finally {
launchCommandObjs.remove(containerId); ; //
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:"); LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");

View File

@ -168,8 +168,6 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED, .addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG, ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION) UPDATE_DIAGNOSTICS_TRANSITION)
// TODO race: Can lead to a CONTAINER_LAUNCHED event at state KILLING,
// and a container which will never be killed by the NM.
.addTransition(ContainerState.LOCALIZED, ContainerState.KILLING, .addTransition(ContainerState.LOCALIZED, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition()) ContainerEventType.KILL_CONTAINER, new KillTransition())
@ -239,6 +237,13 @@ public class ContainerImpl implements Container {
ContainerState.DONE, ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
CONTAINER_DONE_TRANSITION) CONTAINER_DONE_TRANSITION)
// Handle a launched container during killing stage is a no-op
// as cleanup container is always handled after launch container event
// in the container launcher
.addTransition(ContainerState.KILLING,
ContainerState.KILLING,
ContainerEventType.CONTAINER_LAUNCHED,
new ContainerTransition())
// From CONTAINER_CLEANEDUP_AFTER_KILL State. // From CONTAINER_CLEANEDUP_AFTER_KILL State.
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,

View File

@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -44,11 +45,14 @@ import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
@ -56,8 +60,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class ContainerLaunch implements Callable<Integer> { public class ContainerLaunch implements Callable<Integer> {
private static final Log LOG = LogFactory.getLog(ContainerLaunch.class); private static final Log LOG = LogFactory.getLog(ContainerLaunch.class);
@ -65,12 +71,22 @@ public class ContainerLaunch implements Callable<Integer> {
public static final String CONTAINER_SCRIPT = "launch_container.sh"; public static final String CONTAINER_SCRIPT = "launch_container.sh";
public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens"; public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
private static final String PID_FILE_NAME_FMT = "%s.pid";
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
private final ContainerExecutor exec; private final ContainerExecutor exec;
private final Application app; private final Application app;
private final Container container; private final Container container;
private final Configuration conf; private final Configuration conf;
private final LocalDirAllocator logDirsSelector; private final LocalDirAllocator logDirsSelector;
private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
private volatile AtomicBoolean completed = new AtomicBoolean(false);
private long sleepDelayBeforeSigKill = 250;
private long maxKillWaitTime = 2000;
private Path pidFilePath = null;
public ContainerLaunch(Configuration configuration, Dispatcher dispatcher, public ContainerLaunch(Configuration configuration, Dispatcher dispatcher,
ContainerExecutor exec, Application app, Container container) { ContainerExecutor exec, Application app, Container container) {
@ -80,6 +96,12 @@ public class ContainerLaunch implements Callable<Integer> {
this.container = container; this.container = container;
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
this.logDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS); this.logDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
this.sleepDelayBeforeSigKill =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
this.maxKillWaitTime =
conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS);
} }
@Override @Override
@ -87,7 +109,8 @@ public class ContainerLaunch implements Callable<Integer> {
public Integer call() { public Integer call() {
final ContainerLaunchContext launchContext = container.getLaunchContext(); final ContainerLaunchContext launchContext = container.getLaunchContext();
final Map<Path,String> localResources = container.getLocalizedResources(); final Map<Path,String> localResources = container.getLocalizedResources();
String containerIdStr = ConverterUtils.toString(container.getContainerID()); ContainerId containerID = container.getContainerID();
String containerIdStr = ConverterUtils.toString(containerID);
final String user = launchContext.getUser(); final String user = launchContext.getUser();
final List<String> command = launchContext.getCommands(); final List<String> command = launchContext.getCommands();
int ret = -1; int ret = -1;
@ -145,6 +168,17 @@ public class ContainerLaunch implements Callable<Integer> {
+ ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
+ Path.SEPARATOR + containerIdStr, + Path.SEPARATOR + containerIdStr,
LocalDirAllocator.SIZE_UNKNOWN, this.conf, false); LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT,
containerIdStr);
// pid file should be in nm private dir so that it is not
// accessible by users
pidFilePath = lDirAllocator.getLocalPathForWrite(
ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
+ pidFileSuffix,
this.conf);
try { try {
// /////////// Write out the container-script in the nmPrivate space. // /////////// Write out the container-script in the nmPrivate space.
String[] localDirs = String[] localDirs =
@ -189,21 +223,36 @@ public class ContainerLaunch implements Callable<Integer> {
// LaunchContainer is a blocking call. We are here almost means the // LaunchContainer is a blocking call. We are here almost means the
// container is launched, so send out the event. // container is launched, so send out the event.
dispatcher.getEventHandler().handle(new ContainerEvent( dispatcher.getEventHandler().handle(new ContainerEvent(
container.getContainerID(), containerID,
ContainerEventType.CONTAINER_LAUNCHED)); ContainerEventType.CONTAINER_LAUNCHED));
ret = // Check if the container is signalled to be killed.
exec.launchContainer(container, nmPrivateContainerScriptPath, if (!shouldLaunchContainer.compareAndSet(false, true)) {
nmPrivateTokensPath, user, appIdStr, containerWorkDir); LOG.info("Container " + containerIdStr + " not launched as "
+ "cleanup already called");
ret = ExitCode.TERMINATED.getExitCode();
}
else {
exec.activateContainer(containerID, pidFilePath);
ret =
exec.launchContainer(container, nmPrivateContainerScriptPath,
nmPrivateTokensPath, user, appIdStr, containerWorkDir);
}
} catch (Throwable e) { } catch (Throwable e) {
LOG.warn("Failed to launch container", e); LOG.warn("Failed to launch container", e);
dispatcher.getEventHandler().handle(new ContainerExitEvent( dispatcher.getEventHandler().handle(new ContainerExitEvent(
launchContext.getContainerId(), launchContext.getContainerId(),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret)); ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret));
return ret; return ret;
} finally {
completed.set(true);
exec.deactivateContainer(containerID);
} }
if (ret == ExitCode.KILLED.getExitCode()) { LOG.debug("Container " + containerIdStr + " completed with exit code "
+ ret);
if (ret == ExitCode.FORCE_KILLED.getExitCode()
|| ret == ExitCode.TERMINATED.getExitCode()) {
// If the process was killed, Send container_cleanedup_after_kill and // If the process was killed, Send container_cleanedup_after_kill and
// just break out of this method. // just break out of this method.
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
@ -226,6 +275,114 @@ public class ContainerLaunch implements Callable<Integer> {
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
return 0; return 0;
} }
/**
* Cleanup the container.
* Cancels the launch if launch has not started yet or signals
* the executor to not execute the process if not already done so.
* Also, sends a SIGTERM followed by a SIGKILL to the process if
* the process id is available.
* @throws IOException
*/
public void cleanupContainer() throws IOException {
ContainerId containerId = container.getContainerID();
String containerIdStr = ConverterUtils.toString(containerId);
LOG.info("Cleaning up container " + containerIdStr);
// launch flag will be set to true if process already launched
boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
if (!alreadyLaunched) {
LOG.info("Container " + containerIdStr + " not launched."
+ " No cleanup needed to be done");
return;
}
LOG.debug("Marking container " + containerIdStr + " as inactive");
// this should ensure that if the container process has not launched
// by this time, it will never be launched
exec.deactivateContainer(containerId);
LOG.debug("Getting pid for container " + containerIdStr + " to kill"
+ " from pid file "
+ (pidFilePath != null ? pidFilePath.toString() : "null"));
// however the container process may have already started
try {
// get process id from pid file if available
// else if shell is still active, get it from the shell
String processId = null;
if (pidFilePath != null) {
processId = getContainerPid(pidFilePath);
}
// kill process
if (processId != null) {
String user = container.getLaunchContext().getUser();
LOG.debug("Sending signal to pid " + processId
+ " as user " + user
+ " for container " + containerIdStr);
if (sleepDelayBeforeSigKill > 0) {
boolean result = exec.signalContainer(user,
processId, Signal.TERM);
LOG.debug("Sent signal to pid " + processId
+ " as user " + user
+ " for container " + containerIdStr
+ ", result=" + (result? "success" : "failed"));
new DelayedProcessKiller(user,
processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
}
}
} catch (Exception e) {
LOG.warn("Got error when trying to cleanup container " + containerIdStr
+ ", error=" + e.getMessage());
} finally {
// cleanup pid file if present
if (pidFilePath != null) {
FileContext lfs = FileContext.getLocalFSFileContext();
lfs.delete(pidFilePath, false);
}
}
}
/**
* Loop through for a time-bounded interval waiting to
* read the process id from a file generated by a running process.
* @param pidFilePath File from which to read the process id
* @return Process ID
* @throws Exception
*/
private String getContainerPid(Path pidFilePath) throws Exception {
String containerIdStr =
ConverterUtils.toString(container.getContainerID());
String processId = null;
LOG.debug("Accessing pid for container " + containerIdStr
+ " from pid file " + pidFilePath);
int sleepCounter = 0;
final int sleepInterval = 100;
// loop waiting for pid file to show up
// until either the completed flag is set which means something bad
// happened or our timer expires in which case we admit defeat
while (!completed.get()) {
processId = ProcessIdFileReader.getProcessId(pidFilePath);
if (processId != null) {
LOG.debug("Got pid " + processId + " for container "
+ containerIdStr);
break;
}
else if ((sleepCounter*sleepInterval) > maxKillWaitTime) {
LOG.info("Could not get pid for " + containerIdStr
+ ". Waited for " + maxKillWaitTime + " ms.");
break;
}
else {
++sleepCounter;
Thread.sleep(sleepInterval);
}
}
return processId;
}
private String getContainerPrivateDir(String appIdStr, String containerIdStr) { private String getContainerPrivateDir(String appIdStr, String containerIdStr) {
return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr
@ -287,7 +444,7 @@ public class ContainerLaunch implements Callable<Integer> {
public String toString() { public String toString() {
return sb.toString(); return sb.toString();
} }
} }
private static void putEnvIfNotNull( private static void putEnvIfNotNull(
@ -374,9 +531,9 @@ public class ContainerLaunch implements Callable<Integer> {
sb.symlink(link.getKey(), link.getValue()); sb.symlink(link.getKey(), link.getValue());
} }
} }
ArrayList<String> cmd = new ArrayList<String>(2 * command.size() + 5); ArrayList<String> cmd = new ArrayList<String>(2 * command.size() + 5);
cmd.add(ContainerExecutor.isSetsidAvailable ? "exec setsid " : "exec "); cmd.add("exec /bin/bash ");
cmd.add("/bin/bash ");
cmd.add("-c "); cmd.add("-c ");
cmd.add("\""); cmd.add("\"");
for (String cs : command) { for (String cs : command) {

View File

@ -26,15 +26,17 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -52,6 +54,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ContainersLauncher extends AbstractService public class ContainersLauncher extends AbstractService
implements EventHandler<ContainersLauncherEvent> { implements EventHandler<ContainersLauncherEvent> {
private static final Log LOG = LogFactory.getLog(ContainersLauncher.class);
private final Context context; private final Context context;
private final ContainerExecutor exec; private final ContainerExecutor exec;
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
@ -64,13 +68,14 @@ public class ContainersLauncher extends AbstractService
Collections.synchronizedMap(new HashMap<ContainerId,RunningContainer>()); Collections.synchronizedMap(new HashMap<ContainerId,RunningContainer>());
private static final class RunningContainer { private static final class RunningContainer {
public RunningContainer(String string, Future<Integer> submit) { public RunningContainer(Future<Integer> submit,
this.user = string; ContainerLaunch launcher) {
this.runningcontainer = submit; this.runningcontainer = submit;
this.launcher = launcher;
} }
String user;
Future<Integer> runningcontainer; Future<Integer> runningcontainer;
ContainerLaunch launcher;
} }
@ -104,7 +109,6 @@ public class ContainersLauncher extends AbstractService
// TODO: ContainersLauncher launches containers one by one!! // TODO: ContainersLauncher launches containers one by one!!
Container container = event.getContainer(); Container container = event.getContainer();
ContainerId containerId = container.getContainerID(); ContainerId containerId = container.getContainerID();
String userName = container.getUser();
switch (event.getType()) { switch (event.getType()) {
case LAUNCH_CONTAINER: case LAUNCH_CONTAINER:
Application app = Application app =
@ -114,33 +118,26 @@ public class ContainersLauncher extends AbstractService
new ContainerLaunch(getConfig(), dispatcher, exec, app, new ContainerLaunch(getConfig(), dispatcher, exec, app,
event.getContainer()); event.getContainer());
running.put(containerId, running.put(containerId,
new RunningContainer(userName, new RunningContainer(containerLauncher.submit(launch),
containerLauncher.submit(launch))); launch));
break; break;
case CLEANUP_CONTAINER: case CLEANUP_CONTAINER:
RunningContainer rContainerDatum = running.remove(containerId); RunningContainer rContainerDatum = running.remove(containerId);
Future<Integer> rContainer = rContainerDatum.runningcontainer; Future<Integer> rContainer = rContainerDatum.runningcontainer;
if (rContainer != null) { if (rContainer != null
&& !rContainer.isDone()) {
if (rContainer.isDone()) { // Cancel the future so that it won't be launched
// The future is already done by this time. // if it isn't already.
break;
}
// Cancel the future so that it won't be launched if it isn't already.
rContainer.cancel(false); rContainer.cancel(false);
}
// Kill the container
String processId = exec.getProcessId(containerId); // Cleanup a container whether it is running/killed/completed, so that
if (processId != null) { // no sub-processes are alive.
try { try {
exec.signalContainer(rContainerDatum.user, rContainerDatum.launcher.cleanupContainer();
processId, Signal.KILL); } catch (IOException e) {
} catch (IOException e) { LOG.warn("Got exception while cleaning container " + containerId
// TODO Auto-generated catch block + ". Ignoring.");
e.printStackTrace();
}
}
} }
break; break;
} }

View File

@ -185,7 +185,9 @@ public class TestLinuxContainerExecutor {
Path scriptPath = new Path(script); Path scriptPath = new Path(script);
Path tokensPath = new Path("/dev/null"); Path tokensPath = new Path("/dev/null");
Path workDir = new Path(workSpace.getAbsolutePath()); Path workDir = new Path(workSpace.getAbsolutePath());
Path pidFile = new Path(workDir, "pid.txt");
exec.activateContainer(cId, pidFile);
return exec.launchContainer(container, scriptPath, tokensPath, return exec.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir); appSubmitter, appId, workDir);
} }

View File

@ -60,7 +60,8 @@ public class TestLinuxContainerExecutorWithMocks {
private List<String> readMockParams() throws IOException { private List<String> readMockParams() throws IOException {
LinkedList<String> ret = new LinkedList<String>(); LinkedList<String> ret = new LinkedList<String>();
LineNumberReader reader = new LineNumberReader(new FileReader(mockParamFile)); LineNumberReader reader = new LineNumberReader(new FileReader(
mockParamFile));
String line; String line;
while((line = reader.readLine()) != null) { while((line = reader.readLine()) != null) {
ret.add(line); ret.add(line);
@ -70,7 +71,7 @@ public class TestLinuxContainerExecutorWithMocks {
} }
@Before @Before
public void setup() throws IOException { public void setup() {
File f = new File("./src/test/resources/mock-container-executor"); File f = new File("./src/test/resources/mock-container-executor");
if(!f.canExecute()) { if(!f.canExecute()) {
f.setExecutable(true); f.setExecutable(true);
@ -83,7 +84,7 @@ public class TestLinuxContainerExecutorWithMocks {
} }
@After @After
public void tearDown() throws IOException { public void tearDown() {
deleteMockParamFile(); deleteMockParamFile();
} }
@ -109,11 +110,14 @@ public class TestLinuxContainerExecutorWithMocks {
Path scriptPath = new Path("file:///bin/echo"); Path scriptPath = new Path("file:///bin/echo");
Path tokensPath = new Path("file:///dev/null"); Path tokensPath = new Path("file:///dev/null");
Path workDir = new Path("/tmp"); Path workDir = new Path("/tmp");
int ret = mockExec.launchContainer(container, scriptPath, tokensPath, Path pidFile = new Path(workDir, "pid.txt");
mockExec.activateContainer(cId, pidFile);
int ret = mockExec.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir); appSubmitter, appId, workDir);
assertEquals(0, ret); assertEquals(0, ret);
assertEquals(Arrays.asList(appSubmitter, cmd, appId, containerId, assertEquals(Arrays.asList(appSubmitter, cmd, appId, containerId,
workDir.toString(), "/bin/echo", "/dev/null"), workDir.toString(), "/bin/echo", "/dev/null", pidFile),
readMockParams()); readMockParams());
} }
@ -141,4 +145,4 @@ public class TestLinuxContainerExecutorWithMocks {
assertEquals(Arrays.asList(appSubmitter, cmd, "/tmp/testdir"), assertEquals(Arrays.asList(appSubmitter, cmd, "/tmp/testdir"),
readMockParams()); readMockParams());
} }
} }

View File

@ -280,7 +280,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
gcsRequest.setContainerId(cId); gcsRequest.setContainerId(cId);
ContainerStatus containerStatus = ContainerStatus containerStatus =
containerManager.getContainerStatus(gcsRequest).getStatus(); containerManager.getContainerStatus(gcsRequest).getStatus();
Assert.assertEquals(ExitCode.KILLED.getExitCode(), Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
containerStatus.getExitStatus()); containerStatus.getExitStatus());
// Assert that the process is not alive anymore // Assert that the process is not alive anymore

View File

@ -168,7 +168,7 @@ public class TestContainer {
wc.localizeResources(); wc.localizeResources();
wc.launchContainer(); wc.launchContainer();
reset(wc.localizerBus); reset(wc.localizerBus);
wc.containerFailed(ExitCode.KILLED.getExitCode()); wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode());
assertEquals(ContainerState.EXITED_WITH_FAILURE, assertEquals(ContainerState.EXITED_WITH_FAILURE,
wc.c.getContainerState()); wc.c.getContainerState());
verifyCleanupCall(wc); verifyCleanupCall(wc);
@ -268,6 +268,26 @@ public class TestContainer {
} }
} }
@Test
public void testLaunchAfterKillRequest() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(14, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
wc.killContainer();
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
wc.launchContainer();
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
wc.containerKilledOnRequest();
verifyCleanupCall(wc);
} finally {
if (wc != null) {
wc.finished();
}
}
}
private void verifyCleanupCall(WrappedContainer wc) throws Exception { private void verifyCleanupCall(WrappedContainer wc) throws Exception {
ResourcesReleasedMatcher matchesReq = ResourcesReleasedMatcher matchesReq =
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of( new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
@ -511,7 +531,7 @@ public class TestContainer {
public void containerKilledOnRequest() { public void containerKilledOnRequest() {
c.handle(new ContainerExitEvent(cId, c.handle(new ContainerExitEvent(cId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.KILLED ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.FORCE_KILLED
.getExitCode())); .getExitCode()));
drainDispatcherEvents(); drainDispatcherEvents();
} }

View File

@ -32,7 +32,6 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
@ -59,7 +58,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerM
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -76,22 +74,21 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
conf.setClass( conf.setClass(
YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
LinuxResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class); LinuxResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 1000);
super.setup(); super.setup();
} }
@Test @Test
public void testSpecialCharSymlinks() throws IOException { public void testSpecialCharSymlinks() throws IOException {
String rootDir = new File(System.getProperty(
"test.build.data", "/tmp")).getAbsolutePath();
File shellFile = null; File shellFile = null;
File tempFile = null; File tempFile = null;
String badSymlink = "foo@zz%_#*&!-+= bar()"; String badSymlink = "foo@zz%_#*&!-+= bar()";
File symLinkFile = null; File symLinkFile = null;
try { try {
shellFile = new File(rootDir, "hello.sh"); shellFile = new File(tmpDir, "hello.sh");
tempFile = new File(rootDir, "temp.sh"); tempFile = new File(tmpDir, "temp.sh");
String timeoutCommand = "echo \"hello\""; String timeoutCommand = "echo \"hello\"";
PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile)); PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
shellFile.setExecutable(true); shellFile.setExecutable(true);
@ -113,15 +110,14 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
fos.close(); fos.close();
tempFile.setExecutable(true); tempFile.setExecutable(true);
File rootDirFile = new File(rootDir);
Shell.ShellCommandExecutor shexc Shell.ShellCommandExecutor shexc
= new Shell.ShellCommandExecutor(new String[]{tempFile.getAbsolutePath()}, rootDirFile); = new Shell.ShellCommandExecutor(new String[]{tempFile.getAbsolutePath()}, tmpDir);
shexc.execute(); shexc.execute();
assertEquals(shexc.getExitCode(), 0); assertEquals(shexc.getExitCode(), 0);
assert(shexc.getOutput().contains("hello")); assert(shexc.getOutput().contains("hello"));
symLinkFile = new File(rootDir, badSymlink); symLinkFile = new File(tmpDir, badSymlink);
} }
finally { finally {
// cleanup // cleanup
@ -141,6 +137,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
} }
// this is a dirty hack - but should be ok for a unittest. // this is a dirty hack - but should be ok for a unittest.
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void setNewEnvironmentHack(Map<String, String> newenv) throws Exception { public static void setNewEnvironmentHack(Map<String, String> newenv) throws Exception {
Class[] classes = Collections.class.getDeclaredClasses(); Class[] classes = Collections.class.getDeclaredClasses();
Map<String, String> env = System.getenv(); Map<String, String> env = System.getenv();
@ -162,7 +159,6 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
*/ */
@Test @Test
public void testContainerEnvVariables() throws Exception { public void testContainerEnvVariables() throws Exception {
int exitCode = 0;
containerManager.start(); containerManager.start();
Map<String, String> envWithDummy = new HashMap<String, String>(); Map<String, String> envWithDummy = new HashMap<String, String>();
@ -217,7 +213,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
new HashMap<String, LocalResource>(); new HashMap<String, LocalResource>();
localResources.put(destinationFile, rsrc_alpha); localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources); containerLaunchContext.setLocalResources(localResources);
// set up the rest of the container // set up the rest of the container
containerLaunchContext.setUser(containerLaunchContext.getUser()); containerLaunchContext.setUser(containerLaunchContext.getUser());
List<String> commands = new ArrayList<String>(); List<String> commands = new ArrayList<String>();
@ -226,11 +222,11 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
containerLaunchContext.setCommands(commands); containerLaunchContext.setCommands(commands);
containerLaunchContext.setResource(recordFactory containerLaunchContext.setResource(recordFactory
.newRecordInstance(Resource.class)); .newRecordInstance(Resource.class));
containerLaunchContext.getResource().setMemory(100 * 1024 * 1024); containerLaunchContext.getResource().setMemory(1024);
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext); startRequest.setContainerLaunchContext(containerLaunchContext);
containerManager.startContainer(startRequest); containerManager.startContainer(startRequest);
int timeoutSecs = 0; int timeoutSecs = 0;
while (!processStartFile.exists() && timeoutSecs++ < 20) { while (!processStartFile.exists() && timeoutSecs++ < 20) {
Thread.sleep(1000); Thread.sleep(1000);
@ -238,7 +234,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
} }
Assert.assertTrue("ProcessStartFile doesn't exist!", Assert.assertTrue("ProcessStartFile doesn't exist!",
processStartFile.exists()); processStartFile.exists());
// Now verify the contents of the file // Now verify the contents of the file
BufferedReader reader = BufferedReader reader =
new BufferedReader(new FileReader(processStartFile)); new BufferedReader(new FileReader(processStartFile));
@ -265,13 +261,13 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
BaseContainerManagerTest.waitForContainerState(containerManager, cId, BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE); ContainerState.COMPLETE);
GetContainerStatusRequest gcsRequest = GetContainerStatusRequest gcsRequest =
recordFactory.newRecordInstance(GetContainerStatusRequest.class); recordFactory.newRecordInstance(GetContainerStatusRequest.class);
gcsRequest.setContainerId(cId); gcsRequest.setContainerId(cId);
ContainerStatus containerStatus = ContainerStatus containerStatus =
containerManager.getContainerStatus(gcsRequest).getStatus(); containerManager.getContainerStatus(gcsRequest).getStatus();
Assert.assertEquals(ExitCode.KILLED.getExitCode(), Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
containerStatus.getExitStatus()); containerStatus.getExitStatus());
// Assert that the process is not alive anymore // Assert that the process is not alive anymore
@ -279,4 +275,119 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
exec.signalContainer(user, exec.signalContainer(user,
pid, Signal.NULL)); pid, Signal.NULL));
} }
@Test
public void testDelayedKill() throws Exception {
containerManager.start();
File processStartFile =
new File(tmpDir, "pid.txt").getAbsoluteFile();
// setup a script that can handle sigterm gracefully
File scriptFile = new File(tmpDir, "testscript.sh");
PrintWriter writer = new PrintWriter(new FileOutputStream(scriptFile));
writer.println("#!/bin/bash\n\n");
writer.println("echo \"Running testscript for delayed kill\"");
writer.println("hello=\"Got SIGTERM\"");
writer.println("umask 0");
writer.println("trap \"echo $hello >> " + processStartFile + "\" SIGTERM");
writer.println("echo \"Writing pid to start file\"");
writer.println("echo $$ >> " + processStartFile);
writer.println("while true; do\nsleep 1s;\ndone");
writer.close();
scriptFile.setExecutable(true);
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
// ////// Construct the Container-id
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(1);
appId.setId(1);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(1);
ContainerId cId =
recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(appAttemptId);
containerLaunchContext.setContainerId(cId);
containerLaunchContext.setUser(user);
// upload the script file so that the container can run it
URL resource_alpha =
ConverterUtils.getYarnUrlFromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
LocalResource rsrc_alpha =
recordFactory.newRecordInstance(LocalResource.class);
rsrc_alpha.setResource(resource_alpha);
rsrc_alpha.setSize(-1);
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
rsrc_alpha.setType(LocalResourceType.FILE);
rsrc_alpha.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file.sh";
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
// set up the rest of the container
containerLaunchContext.setUser(containerLaunchContext.getUser());
List<String> commands = new ArrayList<String>();
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands);
containerLaunchContext.setResource(recordFactory
.newRecordInstance(Resource.class));
containerLaunchContext.getResource().setMemory(1024);
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
containerManager.startContainer(startRequest);
int timeoutSecs = 0;
while (!processStartFile.exists() && timeoutSecs++ < 20) {
Thread.sleep(1000);
LOG.info("Waiting for process start-file to be created");
}
Assert.assertTrue("ProcessStartFile doesn't exist!",
processStartFile.exists());
// Now test the stop functionality.
StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
stopRequest.setContainerId(cId);
containerManager.stopContainer(stopRequest);
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
// container stop sends a sigterm followed by a sigkill
GetContainerStatusRequest gcsRequest =
recordFactory.newRecordInstance(GetContainerStatusRequest.class);
gcsRequest.setContainerId(cId);
ContainerStatus containerStatus =
containerManager.getContainerStatus(gcsRequest).getStatus();
Assert.assertEquals(ExitCode.FORCE_KILLED.getExitCode(),
containerStatus.getExitStatus());
// Now verify the contents of the file
// Script generates a message when it receives a sigterm
// so we look for that
BufferedReader reader =
new BufferedReader(new FileReader(processStartFile));
boolean foundSigTermMessage = false;
while (true) {
String line = reader.readLine();
if (line == null) {
break;
}
if (line.contains("SIGTERM")) {
foundSigTermMessage = true;
break;
}
}
Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
reader.close();
}
} }

View File

@ -262,7 +262,7 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
gcsRequest.setContainerId(cId); gcsRequest.setContainerId(cId);
ContainerStatus containerStatus = ContainerStatus containerStatus =
containerManager.getContainerStatus(gcsRequest).getStatus(); containerManager.getContainerStatus(gcsRequest).getStatus();
Assert.assertEquals(ExitCode.KILLED.getExitCode(), Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
containerStatus.getExitStatus()); containerStatus.getExitStatus());
String expectedMsgPattern = String expectedMsgPattern =
"Container \\[pid=" + pid + ",containerID=" + cId "Container \\[pid=" + pid + ",containerID=" + cId

View File

@ -5,6 +5,6 @@ do
done > params.txt done > params.txt
if [[ "$2" == "1" ]]; if [[ "$2" == "1" ]];
then then
cd $5; cd $6;
exec $6; exec $7;
fi; fi;