YARN-5303. Clean up ContainerExecutor JavaDoc. Contributed by Daniel Templeton.

This commit is contained in:
Varun Vasudev 2016-07-14 19:28:11 +05:30
parent e5e558b0a3
commit 54bf14f80b
6 changed files with 316 additions and 160 deletions

View File

@ -60,20 +60,31 @@ import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/**
* This class is abstraction of the mechanism used to launch a container on the
* underlying OS. All executor implementations must extend ContainerExecutor.
*/
public abstract class ContainerExecutor implements Configurable { public abstract class ContainerExecutor implements Configurable {
private static final String WILDCARD = "*"; private static final String WILDCARD = "*";
private static final Log LOG = LogFactory.getLog(ContainerExecutor.class); private static final Log LOG = LogFactory.getLog(ContainerExecutor.class);
final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
FsPermission.createImmutable((short) 0700);
/**
* The permissions to use when creating the launch script.
*/
public static final FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
FsPermission.createImmutable((short)0700);
/**
* The relative path to which debug information will be written.
*
* @see ContainerLaunch.ShellScriptBuilder#listDebugInformation
*/
public static final String DIRECTORY_CONTENTS = "directory.info"; public static final String DIRECTORY_CONTENTS = "directory.info";
private Configuration conf; private Configuration conf;
private final ConcurrentMap<ContainerId, Path> pidFiles =
private ConcurrentMap<ContainerId, Path> pidFiles = new ConcurrentHashMap<>();
new ConcurrentHashMap<ContainerId, Path>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReadLock readLock = lock.readLock(); private final ReadLock readLock = lock.readLock();
private final WriteLock writeLock = lock.writeLock(); private final WriteLock writeLock = lock.writeLock();
@ -88,30 +99,34 @@ public abstract class ContainerExecutor implements Configurable {
} }
/** /**
* Run the executor initialization steps. * Run the executor initialization steps.
* Verify that the necessary configs, permissions are in place. * Verify that the necessary configs and permissions are in place.
* @throws IOException *
* @throws IOException if initialization fails
*/ */
public abstract void init() throws IOException; public abstract void init() throws IOException;
/** /**
* On Windows the ContainerLaunch creates a temporary special jar manifest of * This function localizes the JAR file on-demand.
* other jars to workaround the CLASSPATH length. In a secure cluster this * On Windows the ContainerLaunch creates a temporary special JAR manifest of
* jar must be localized so that the container has access to it. * other JARs to workaround the CLASSPATH length. In a secure cluster this
* This function localizes on-demand the jar. * JAR must be localized so that the container has access to it.
* * The default implementation returns the classpath passed to it, which
* @param classPathJar * is expected to have been created in the node manager's <i>fprivate</i>
* @param owner * folder, which will not work with secure Windows clusters.
* @throws IOException *
* @param jarPath the path to the JAR to localize
* @param target the directory where the JAR file should be localized
* @param owner the name of the user who should own the localized file
* @return the path to the localized JAR file
* @throws IOException if localization fails
*/ */
public Path localizeClasspathJar(Path classPathJar, Path pwd, String owner) public Path localizeClasspathJar(Path jarPath, Path target, String owner)
throws IOException { throws IOException {
// Non-secure executor simply use the classpath created return jarPath;
// in the NM fprivate folder
return classPathJar;
} }
/** /**
* Prepare the environment for containers in this application to execute. * Prepare the environment for containers in this application to execute.
* <pre> * <pre>
@ -123,10 +138,11 @@ public abstract class ContainerExecutor implements Configurable {
* For $rsrc in job resources * For $rsrc in job resources
* Copy $rsrc {@literal ->} $N/$user/$appId/filecache/idef * Copy $rsrc {@literal ->} $N/$user/$appId/filecache/idef
* </pre> * </pre>
*
* @param ctx LocalizerStartContext that encapsulates necessary information * @param ctx LocalizerStartContext that encapsulates necessary information
* for starting a localizer. * for starting a localizer.
* @throws IOException For most application init failures * @throws IOException for most application init failures
* @throws InterruptedException If application init thread is halted by NM * @throws InterruptedException if application init thread is halted by NM
*/ */
public abstract void startLocalizer(LocalizerStartContext ctx) public abstract void startLocalizer(LocalizerStartContext ctx)
throws IOException, InterruptedException; throws IOException, InterruptedException;
@ -137,25 +153,28 @@ public abstract class ContainerExecutor implements Configurable {
* when the container exits. * when the container exits.
* @param ctx Encapsulates information necessary for launching containers. * @param ctx Encapsulates information necessary for launching containers.
* @return the return status of the launch * @return the return status of the launch
* @throws IOException * @throws IOException if the container launch fails
*/ */
public abstract int launchContainer(ContainerStartContext ctx) throws public abstract int launchContainer(ContainerStartContext ctx) throws
IOException; IOException;
/** /**
* Signal container with the specified signal. * Signal container with the specified signal.
*
* @param ctx Encapsulates information necessary for signaling containers. * @param ctx Encapsulates information necessary for signaling containers.
* @return returns true if the operation succeeded * @return returns true if the operation succeeded
* @throws IOException * @throws IOException if signaling the container fails
*/ */
public abstract boolean signalContainer(ContainerSignalContext ctx) public abstract boolean signalContainer(ContainerSignalContext ctx)
throws IOException; throws IOException;
/** /**
* Delete specified directories as a given user. * Delete specified directories as a given user.
*
* @param ctx Encapsulates information necessary for deletion. * @param ctx Encapsulates information necessary for deletion.
* @throws IOException * @throws IOException if delete fails
* @throws InterruptedException * @throws InterruptedException if interrupted while waiting for the deletion
* operation to complete
*/ */
public abstract void deleteAsUser(DeletionAsUserContext ctx) public abstract void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException; throws IOException, InterruptedException;
@ -164,7 +183,8 @@ public abstract class ContainerExecutor implements Configurable {
* Check if a container is alive. * Check if a container is alive.
* @param ctx Encapsulates information necessary for container liveness check. * @param ctx Encapsulates information necessary for container liveness check.
* @return true if container is still alive * @return true if container is still alive
* @throws IOException * @throws IOException if there is a failure while checking the container
* status
*/ */
public abstract boolean isContainerAlive(ContainerLivenessContext ctx) public abstract boolean isContainerAlive(ContainerLivenessContext ctx)
throws IOException; throws IOException;
@ -173,56 +193,63 @@ public abstract class ContainerExecutor implements Configurable {
* Recover an already existing container. This is a blocking call and returns * Recover an already existing container. This is a blocking call and returns
* only when the container exits. Note that the container must have been * only when the container exits. Note that the container must have been
* activated prior to this call. * activated prior to this call.
*
* @param ctx encapsulates information necessary to reacquire container * @param ctx encapsulates information necessary to reacquire container
* @return The exit code of the pre-existing container * @return The exit code of the pre-existing container
* @throws IOException * @throws IOException if there is a failure while reacquiring the container
* @throws InterruptedException * @throws InterruptedException if interrupted while waiting to reacquire
* the container
*/ */
public int reacquireContainer(ContainerReacquisitionContext ctx) public int reacquireContainer(ContainerReacquisitionContext ctx)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Container container = ctx.getContainer(); Container container = ctx.getContainer();
String user = ctx.getUser(); String user = ctx.getUser();
ContainerId containerId = ctx.getContainerId(); ContainerId containerId = ctx.getContainerId();
Path pidPath = getPidFilePath(containerId); Path pidPath = getPidFilePath(containerId);
if (pidPath == null) { if (pidPath == null) {
LOG.warn(containerId + " is not active, returning terminated error"); LOG.warn(containerId + " is not active, returning terminated error");
return ExitCode.TERMINATED.getExitCode(); return ExitCode.TERMINATED.getExitCode();
} }
String pid = null; String pid = ProcessIdFileReader.getProcessId(pidPath);
pid = ProcessIdFileReader.getProcessId(pidPath);
if (pid == null) { if (pid == null) {
throw new IOException("Unable to determine pid for " + containerId); throw new IOException("Unable to determine pid for " + containerId);
} }
LOG.info("Reacquiring " + containerId + " with pid " + pid); LOG.info("Reacquiring " + containerId + " with pid " + pid);
ContainerLivenessContext livenessContext = new ContainerLivenessContext ContainerLivenessContext livenessContext = new ContainerLivenessContext
.Builder() .Builder()
.setContainer(container) .setContainer(container)
.setUser(user) .setUser(user)
.setPid(pid) .setPid(pid)
.build(); .build();
while(isContainerAlive(livenessContext)) {
while (isContainerAlive(livenessContext)) {
Thread.sleep(1000); Thread.sleep(1000);
} }
// wait for exit code file to appear // wait for exit code file to appear
String exitCodeFile = ContainerLaunch.getExitCodeFile(pidPath.toString());
File file = new File(exitCodeFile);
final int sleepMsec = 100; final int sleepMsec = 100;
int msecLeft = 2000; int msecLeft = 2000;
String exitCodeFile = ContainerLaunch.getExitCodeFile(pidPath.toString());
File file = new File(exitCodeFile);
while (!file.exists() && msecLeft >= 0) { while (!file.exists() && msecLeft >= 0) {
if (!isContainerActive(containerId)) { if (!isContainerActive(containerId)) {
LOG.info(containerId + " was deactivated"); LOG.info(containerId + " was deactivated");
return ExitCode.TERMINATED.getExitCode(); return ExitCode.TERMINATED.getExitCode();
} }
Thread.sleep(sleepMsec); Thread.sleep(sleepMsec);
msecLeft -= sleepMsec; msecLeft -= sleepMsec;
} }
if (msecLeft < 0) { if (msecLeft < 0) {
throw new IOException("Timeout while waiting for exit code from " throw new IOException("Timeout while waiting for exit code from "
+ containerId); + containerId);
@ -236,15 +263,17 @@ public abstract class ContainerExecutor implements Configurable {
} }
/** /**
* This method writes out the launch environment of a container. This can be * This method writes out the launch environment of a container to the
* overridden by extending ContainerExecutors to provide different behaviors * default container launch script. For the default container script path see
* {@link ContainerLaunch#CONTAINER_SCRIPT}.
*
* @param out the output stream to which the environment is written (usually * @param out the output stream to which the environment is written (usually
* a script file which will be executed by the Launcher) * a script file which will be executed by the Launcher)
* @param environment The environment variables and their values * @param environment the environment variables and their values
* @param resources The resources which have been localized for this container * @param resources the resources which have been localized for this
* Symlinks will be created to these localized resources * container. Symlinks will be created to these localized resources
* @param command The command that will be run. * @param command the command that will be run.
* @param logDir The log dir to copy debugging information to * @param logDir the log dir to copy debugging information to
* @throws IOException if any errors happened writing to the OutputStream, * @throws IOException if any errors happened writing to the OutputStream,
* while creating symlinks * while creating symlinks
*/ */
@ -255,6 +284,21 @@ public abstract class ContainerExecutor implements Configurable {
ContainerLaunch.CONTAINER_SCRIPT); ContainerLaunch.CONTAINER_SCRIPT);
} }
/**
* This method writes out the launch environment of a container to a specified
* path.
*
* @param out the output stream to which the environment is written (usually
* a script file which will be executed by the Launcher)
* @param environment the environment variables and their values
* @param resources the resources which have been localized for this
* container. Symlinks will be created to these localized resources
* @param command the command that will be run.
* @param logDir the log dir to copy debugging information to
* @param outFilename the path to which to write the launch environment
* @throws IOException if any errors happened writing to the OutputStream,
* while creating symlinks
*/
@VisibleForTesting @VisibleForTesting
public void writeLaunchEnv(OutputStream out, public void writeLaunchEnv(OutputStream out,
Map<String, String> environment, Map<Path, List<String>> resources, Map<String, String> environment, Map<Path, List<String>> resources,
@ -262,52 +306,57 @@ public abstract class ContainerExecutor implements Configurable {
throws IOException { throws IOException {
ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder sb =
ContainerLaunch.ShellScriptBuilder.create(); ContainerLaunch.ShellScriptBuilder.create();
Set<String> whitelist = new HashSet<String>(); Set<String> whitelist = new HashSet<>();
whitelist.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME); whitelist.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
whitelist.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name()); whitelist.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name());
whitelist.add(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name()); whitelist.add(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name());
whitelist.add(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name()); whitelist.add(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name());
whitelist.add(ApplicationConstants.Environment.HADOOP_CONF_DIR.name()); whitelist.add(ApplicationConstants.Environment.HADOOP_CONF_DIR.name());
whitelist.add(ApplicationConstants.Environment.JAVA_HOME.name()); whitelist.add(ApplicationConstants.Environment.JAVA_HOME.name());
if (environment != null) { if (environment != null) {
for (Map.Entry<String,String> env : environment.entrySet()) { for (Map.Entry<String,String> env : environment.entrySet()) {
if (!whitelist.contains(env.getKey())) { if (!whitelist.contains(env.getKey())) {
sb.env(env.getKey().toString(), env.getValue().toString()); sb.env(env.getKey(), env.getValue());
} else { } else {
sb.whitelistedEnv(env.getKey().toString(), env.getValue().toString()); sb.whitelistedEnv(env.getKey(), env.getValue());
} }
} }
} }
if (resources != null) { if (resources != null) {
for (Map.Entry<Path,List<String>> entry : resources.entrySet()) { for (Path path: resources.keySet()) {
for (String linkName : entry.getValue()) { for (String linkName: resources.get(path)) {
if (new Path(linkName).getName().equals(WILDCARD)) { if (new Path(linkName).getName().equals(WILDCARD)) {
// If this is a wildcarded path, link to everything in the // If this is a wildcarded path, link to everything in the
// directory from the working directory // directory from the working directory
File directory = new File(entry.getKey().toString()); File directory = new File(path.toString());
for (File wildLink : directory.listFiles()) { for (File wildLink : directory.listFiles()) {
sb.symlink(new Path(wildLink.toString()), sb.symlink(new Path(wildLink.toString()),
new Path(wildLink.getName())); new Path(wildLink.getName()));
} }
} else { } else {
sb.symlink(entry.getKey(), new Path(linkName)); sb.symlink(path, new Path(linkName));
} }
} }
} }
} }
// dump debugging information if configured // dump debugging information if configured
if (getConf() != null && getConf().getBoolean( if (getConf() != null &&
YarnConfiguration.NM_LOG_CONTAINER_DEBUG_INFO, getConf().getBoolean(YarnConfiguration.NM_LOG_CONTAINER_DEBUG_INFO,
YarnConfiguration.DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO)) { YarnConfiguration.DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO)) {
sb.copyDebugInformation(new Path(outFilename), new Path(logDir, outFilename)); sb.copyDebugInformation(new Path(outFilename),
new Path(logDir, outFilename));
sb.listDebugInformation(new Path(logDir, DIRECTORY_CONTENTS)); sb.listDebugInformation(new Path(logDir, DIRECTORY_CONTENTS));
} }
sb.command(command); sb.command(command);
PrintStream pout = null; PrintStream pout = null;
try { try {
pout = new PrintStream(out, false, "UTF-8"); pout = new PrintStream(out, false, "UTF-8");
sb.write(pout); sb.write(pout);
@ -318,17 +367,25 @@ public abstract class ContainerExecutor implements Configurable {
} }
} }
/**
* The container exit code.
*/
public enum ExitCode { public enum ExitCode {
SUCCESS(0), SUCCESS(0),
FORCE_KILLED(137), FORCE_KILLED(137),
TERMINATED(143), TERMINATED(143),
LOST(154); LOST(154);
private final int code; private final int code;
private ExitCode(int exitCode) { private ExitCode(int exitCode) {
this.code = exitCode; this.code = exitCode;
} }
/**
* Get the exit code as an int.
* @return the exit code as an int
*/
public int getExitCode() { public int getExitCode() {
return code; return code;
} }
@ -343,25 +400,41 @@ public abstract class ContainerExecutor implements Configurable {
* The constants for the signals. * The constants for the signals.
*/ */
public enum Signal { public enum Signal {
NULL(0, "NULL"), QUIT(3, "SIGQUIT"), NULL(0, "NULL"),
KILL(9, "SIGKILL"), TERM(15, "SIGTERM"); QUIT(3, "SIGQUIT"),
KILL(9, "SIGKILL"),
TERM(15, "SIGTERM");
private final int value; private final int value;
private final String str; private final String str;
private Signal(int value, String str) { private Signal(int value, String str) {
this.str = str; this.str = str;
this.value = value; this.value = value;
} }
/**
* Get the signal number.
* @return the signal number
*/
public int getValue() { public int getValue() {
return value; return value;
} }
@Override @Override
public String toString() { public String toString() {
return str; return str;
} }
} }
/**
* Log each line of the output string as INFO level log messages.
*
* @param output the output string to log
*/
protected void logOutput(String output) { protected void logOutput(String output) {
String shExecOutput = output; String shExecOutput = output;
if (shExecOutput != null) { if (shExecOutput != null) {
for (String str : shExecOutput.split("\n")) { for (String str : shExecOutput.split("\n")) {
LOG.info(str); LOG.info(str);
@ -371,7 +444,8 @@ public abstract class ContainerExecutor implements Configurable {
/** /**
* Get the pidFile of the container. * Get the pidFile of the container.
* @param containerId *
* @param containerId the container ID
* @return the path of the pid-file for the given containerId. * @return the path of the pid-file for the given containerId.
*/ */
protected Path getPidFilePath(ContainerId containerId) { protected Path getPidFilePath(ContainerId containerId) {
@ -383,81 +457,151 @@ public abstract class ContainerExecutor implements Configurable {
} }
} }
/**
* Return a command line to execute the given command in the OS shell.
* On Windows, the {code}groupId{code} parameter can be used to launch
* and associate the given GID with a process group. On
* non-Windows hosts, the {code}groupId{code} parameter is ignored.
*
* @param command the command to execute
* @param groupId the job owner's GID
* @param userName the job owner's username
* @param pidFile the path to the container's PID file
* @param conf the configuration
* @return the command line to execute
*/
protected String[] getRunCommand(String command, String groupId, protected String[] getRunCommand(String command, String groupId,
String userName, Path pidFile, Configuration conf) { String userName, Path pidFile, Configuration conf) {
return getRunCommand(command, groupId, userName, pidFile, conf, null); return getRunCommand(command, groupId, userName, pidFile, conf, null);
} }
/** /**
* Return a command to execute the given command in OS shell. * Return a command line to execute the given command in the OS shell.
* On Windows, the passed in groupId can be used to launch * On Windows, the {code}groupId{code} parameter can be used to launch
* and associate the given groupId in a process group. On * and associate the given GID with a process group. On
* non-Windows, groupId is ignored. * non-Windows hosts, the {code}groupId{code} parameter is ignored.
*
* @param command the command to execute
* @param groupId the job owner's GID for Windows. On other operating systems
* it is ignored.
* @param userName the job owner's username for Windows. On other operating
* systems it is ignored.
* @param pidFile the path to the container's PID file on Windows. On other
* operating systems it is ignored.
* @param conf the configuration
* @param resource on Windows this parameter controls memory and CPU limits.
* If null, no limits are set. On other operating systems it is ignored.
* @return the command line to execute
*/ */
protected String[] getRunCommand(String command, String groupId, protected String[] getRunCommand(String command, String groupId,
String userName, Path pidFile, Configuration conf, Resource resource) { String userName, Path pidFile, Configuration conf, Resource resource) {
boolean containerSchedPriorityIsSet = false;
int containerSchedPriorityAdjustment =
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY;
if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) !=
null) {
containerSchedPriorityIsSet = true;
containerSchedPriorityAdjustment = conf
.getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
}
if (Shell.WINDOWS) { if (Shell.WINDOWS) {
int cpuRate = -1; return getRunCommandForWindows(command, groupId, userName, pidFile,
int memory = -1; conf, resource);
if (resource != null) {
if (conf
.getBoolean(
YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED,
YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED)) {
memory = (int) resource.getMemorySize();
}
if (conf.getBoolean(
YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED,
YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) {
int containerVCores = resource.getVirtualCores();
int nodeVCores = NodeManagerHardwareUtils.getVCores(conf);
int nodeCpuPercentage =
NodeManagerHardwareUtils.getNodeCpuPercentage(conf);
float containerCpuPercentage =
(float) (nodeCpuPercentage * containerVCores) / nodeVCores;
// CPU should be set to a percentage * 100, e.g. 20% cpu rate limit
// should be set as 20 * 100.
cpuRate = Math.min(10000, (int) (containerCpuPercentage * 100));
}
}
return new String[] { Shell.getWinUtilsPath(), "task", "create", "-m",
String.valueOf(memory), "-c", String.valueOf(cpuRate), groupId,
"cmd /c " + command };
} else { } else {
List<String> retCommand = new ArrayList<String>(); return getRunCommandForOther(command, conf);
if (containerSchedPriorityIsSet) {
retCommand.addAll(Arrays.asList("nice", "-n",
Integer.toString(containerSchedPriorityAdjustment)));
}
retCommand.addAll(Arrays.asList("bash", command));
return retCommand.toArray(new String[retCommand.size()]);
} }
} }
/** /**
* Is the container still active? * Return a command line to execute the given command in the OS shell.
* @param containerId * The {code}groupId{code} parameter can be used to launch
* @return true if the container is active else false. * and associate the given GID with a process group.
*
* @param command the command to execute
* @param groupId the job owner's GID
* @param userName the job owner's username
* @param pidFile the path to the container's PID file
* @param conf the configuration
* @param resource this parameter controls memory and CPU limits.
* If null, no limits are set.
* @return the command line to execute
*/
protected String[] getRunCommandForWindows(String command, String groupId,
String userName, Path pidFile, Configuration conf, Resource resource) {
int cpuRate = -1;
int memory = -1;
if (resource != null) {
if (conf.getBoolean(
YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED,
YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED)) {
memory = (int)resource.getMemorySize();
}
if (conf.getBoolean(
YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED,
YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) {
int containerVCores = resource.getVirtualCores();
int nodeVCores = NodeManagerHardwareUtils.getVCores(conf);
int nodeCpuPercentage =
NodeManagerHardwareUtils.getNodeCpuPercentage(conf);
float containerCpuPercentage =
(float)(nodeCpuPercentage * containerVCores) / nodeVCores;
// CPU should be set to a percentage * 100, e.g. 20% cpu rate limit
// should be set as 20 * 100.
cpuRate = Math.min(10000, (int)(containerCpuPercentage * 100));
}
}
return new String[] {
Shell.getWinUtilsPath(),
"task",
"create",
"-m",
String.valueOf(memory),
"-c",
String.valueOf(cpuRate),
groupId,
"cmd /c " + command
};
}
/**
* Return a command line to execute the given command in the OS shell.
*
* @param command the command to execute
* @param conf the configuration
* @return the command line to execute
*/
protected String[] getRunCommandForOther(String command,
Configuration conf) {
List<String> retCommand = new ArrayList<>();
boolean containerSchedPriorityIsSet = false;
int containerSchedPriorityAdjustment =
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY;
if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) !=
null) {
containerSchedPriorityIsSet = true;
containerSchedPriorityAdjustment = conf
.getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
}
if (containerSchedPriorityIsSet) {
retCommand.addAll(Arrays.asList("nice", "-n",
Integer.toString(containerSchedPriorityAdjustment)));
}
retCommand.addAll(Arrays.asList("bash", command));
return retCommand.toArray(new String[retCommand.size()]);
}
/**
* Return whether the container is still active.
*
* @param containerId the target container's ID
* @return true if the container is active
*/ */
protected boolean isContainerActive(ContainerId containerId) { protected boolean isContainerActive(ContainerId containerId) {
try { try {
readLock.lock(); readLock.lock();
return (this.pidFiles.containsKey(containerId)); return (this.pidFiles.containsKey(containerId));
} finally { } finally {
readLock.unlock(); readLock.unlock();
@ -465,13 +609,11 @@ public abstract class ContainerExecutor implements Configurable {
} }
/** /**
* Mark the container as active * Mark the container as active.
* *
* @param containerId * @param containerId the container ID
* the ContainerId * @param pidFilePath the path where the executor should write the PID
* @param pidFilePath * of the launched process
* Path where the executor should write the pid of the launched
* process
*/ */
public void activateContainer(ContainerId containerId, Path pidFilePath) { public void activateContainer(ContainerId containerId, Path pidFilePath) {
try { try {
@ -483,9 +625,10 @@ public abstract class ContainerExecutor implements Configurable {
} }
/** /**
* Mark the container as inactive. * Mark the container as inactive. For inactive containers this
* Done iff the container is still active. Else treat it as * method has no effect.
* a no-op *
* @param containerId the container ID
*/ */
public void deactivateContainer(ContainerId containerId) { public void deactivateContainer(ContainerId containerId) {
try { try {
@ -497,46 +640,63 @@ public abstract class ContainerExecutor implements Configurable {
} }
/** /**
* Get the process-identifier for the container * Get the process-identifier for the container.
* *
* @param containerID * @param containerID the container ID
* @return the processid of the container if it has already launched, * @return the process ID of the container if it has already launched,
* otherwise return null * or null otherwise
*/ */
public String getProcessId(ContainerId containerID) { public String getProcessId(ContainerId containerID) {
String pid = null; String pid = null;
Path pidFile = pidFiles.get(containerID); Path pidFile = pidFiles.get(containerID);
if (pidFile == null) {
// This container isn't even launched yet. // If PID is null, this container hasn't launched yet.
return pid; if (pidFile != null) {
} try {
try { pid = ProcessIdFileReader.getProcessId(pidFile);
pid = ProcessIdFileReader.getProcessId(pidFile); } catch (IOException e) {
} catch (IOException e) { LOG.error("Got exception reading pid from pid-file " + pidFile, e);
LOG.error("Got exception reading pid from pid-file " + pidFile, e); }
} }
return pid; return pid;
} }
/**
* This class will signal a target container after a specified delay.
* @see #signalContainer
*/
public static class DelayedProcessKiller extends Thread { public static class DelayedProcessKiller extends Thread {
private Container container; private final Container container;
private final String user; private final String user;
private final String pid; private final String pid;
private final long delay; private final long delay;
private final Signal signal; private final Signal signal;
private final ContainerExecutor containerExecutor; private final ContainerExecutor containerExecutor;
/**
* Basic constructor.
*
* @param container the container to signal
* @param user the user as whow to send the signal
* @param pid the PID of the container process
* @param delayMS the period of time to wait in millis before signaling
* the container
* @param signal the signal to send
* @param containerExecutor the executor to use to send the signal
*/
public DelayedProcessKiller(Container container, String user, String pid, public DelayedProcessKiller(Container container, String user, String pid,
long delay, Signal signal, ContainerExecutor containerExecutor) { long delayMS, Signal signal, ContainerExecutor containerExecutor) {
this.container = container; this.container = container;
this.user = user; this.user = user;
this.pid = pid; this.pid = pid;
this.delay = delay; this.delay = delayMS;
this.signal = signal; this.signal = signal;
this.containerExecutor = containerExecutor; this.containerExecutor = containerExecutor;
setName("Task killer for " + pid); setName("Task killer for " + pid);
setDaemon(false); setDaemon(false);
} }
@Override @Override
public void run() { public void run() {
try { try {
@ -548,13 +708,13 @@ public abstract class ContainerExecutor implements Configurable {
.setSignal(signal) .setSignal(signal)
.build()); .build());
} catch (InterruptedException e) { } catch (InterruptedException e) {
return; interrupt();
} catch (IOException e) { } catch (IOException e) {
String message = "Exception when user " + user + " killing task " + pid String message = "Exception when user " + user + " killing task " + pid
+ " in DelayedProcessKiller: " + StringUtils.stringifyException(e); + " in DelayedProcessKiller: " + StringUtils.stringifyException(e);
LOG.warn(message); LOG.warn(message);
container.handle(new ContainerDiagnosticsUpdateEvent(container container.handle(new ContainerDiagnosticsUpdateEvent(
.getContainerId(), message)); container.getContainerId(), message));
} }
} }
} }

View File

@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional; import com.google.common.base.Optional;

View File

@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;

View File

@ -324,8 +324,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
resourcesHandler.preExecute(containerId, resourcesHandler.preExecute(containerId,
container.getResource()); container.getResource());
String resourcesOptions = resourcesHandler.getResourcesOption( String resourcesOptions = resourcesHandler.getResourcesOption(containerId);
containerId);
String tcCommandFile = null; String tcCommandFile = null;
try { try {

View File

@ -628,16 +628,16 @@ public class WindowsSecureContainerExecutor extends DefaultContainerExecutor {
} }
@Override @Override
public Path localizeClasspathJar(Path classPathJar, Path pwd, String owner) public Path localizeClasspathJar(Path jarPath, Path target, String owner)
throws IOException { throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(String.format("localizeClasspathJar: %s %s o:%s", LOG.debug(String.format("localizeClasspathJar: %s %s o:%s",
classPathJar, pwd, owner)); jarPath, target, owner));
} }
createDir(pwd, new FsPermission(DIR_PERM), true, owner); createDir(target, new FsPermission(DIR_PERM), true, owner);
String fileName = classPathJar.getName(); String fileName = jarPath.getName();
Path dst = new Path(pwd, fileName); Path dst = new Path(target, fileName);
Native.Elevated.move(classPathJar, dst, true); Native.Elevated.move(jarPath, dst, true);
Native.Elevated.chown(dst, owner, nodeManagerGroup); Native.Elevated.chown(dst, owner, nodeManagerGroup);
return dst; return dst;
} }

View File

@ -82,7 +82,6 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -835,7 +834,7 @@ public class ContainerLaunch implements Callable<Integer> {
throws IOException; throws IOException;
/** /**
* Method to dump debug information to the a target file. This method will * Method to dump debug information to a target file. This method will
* be called by ContainerExecutor when setting up the container launch * be called by ContainerExecutor when setting up the container launch
* script. * script.
* @param output the file to which debug information is to be written * @param output the file to which debug information is to be written