YARN-5466. DefaultContainerExecutor needs JavaDocs (templedf via rkanter)

This commit is contained in:
Robert Kanter 2016-10-17 14:29:09 -07:00
parent 8fd4c37c45
commit f5d9235914
2 changed files with 230 additions and 42 deletions

View File

@ -65,6 +65,11 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional; import com.google.common.base.Optional;
/**
* The {@code DefaultContainerExecuter} class offers generic container
* execution services. Process execution is handled in a platform-independent
* way via {@link ProcessBuilder}.
*/
public class DefaultContainerExecutor extends ContainerExecutor { public class DefaultContainerExecutor extends ContainerExecutor {
private static final Log LOG = LogFactory private static final Log LOG = LogFactory
@ -72,10 +77,17 @@ public class DefaultContainerExecutor extends ContainerExecutor {
private static final int WIN_MAX_PATH = 260; private static final int WIN_MAX_PATH = 260;
/**
* A {@link FileContext} for the local file system.
*/
protected final FileContext lfs; protected final FileContext lfs;
private String logDirPermissions = null; private String logDirPermissions = null;
/**
* Default constructor for use in testing.
*/
@VisibleForTesting
public DefaultContainerExecutor() { public DefaultContainerExecutor() {
try { try {
this.lfs = FileContext.getLocalFSFileContext(); this.lfs = FileContext.getLocalFSFileContext();
@ -84,15 +96,40 @@ public class DefaultContainerExecutor extends ContainerExecutor {
} }
} }
/**
* Create an instance with a given {@link FileContext}.
*
* @param lfs the given {@link FileContext}
*/
DefaultContainerExecutor(FileContext lfs) { DefaultContainerExecutor(FileContext lfs) {
this.lfs = lfs; this.lfs = lfs;
} }
/**
* Copy a file using the {@link #lfs} {@link FileContext}.
*
* @param src the file to copy
* @param dst where to copy the file
* @param owner the owner of the new copy. Used only in secure Windows
* clusters
* @throws IOException when the copy fails
* @see WindowsSecureContainerExecutor
*/
protected void copyFile(Path src, Path dst, String owner) throws IOException { protected void copyFile(Path src, Path dst, String owner) throws IOException {
lfs.util().copy(src, dst, false, true); lfs.util().copy(src, dst, false, true);
} }
protected void setScriptExecutable(Path script, String owner) throws IOException { /**
* Make a file executable using the {@link #lfs} {@link FileContext}.
*
* @param script the path to make executable
* @param owner the new owner for the file. Used only in secure Windows
* clusters
* @throws IOException when the change mode operation fails
* @see WindowsSecureContainerExecutor
*/
protected void setScriptExecutable(Path script, String owner)
throws IOException {
lfs.setPermission(script, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); lfs.setPermission(script, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
} }
@ -122,14 +159,16 @@ public class DefaultContainerExecutor extends ContainerExecutor {
// randomly choose the local directory // randomly choose the local directory
Path appStorageDir = getWorkingDir(localDirs, user, appId); Path appStorageDir = getWorkingDir(localDirs, user, appId);
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); String tokenFn =
String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn); Path tokenDst = new Path(appStorageDir, tokenFn);
copyFile(nmPrivateContainerTokensPath, tokenDst, user); copyFile(nmPrivateContainerTokensPath, tokenDst, user);
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst); LOG.info("Copying from " + nmPrivateContainerTokensPath
+ " to " + tokenDst);
FileContext localizerFc = FileContext.getFileContext( FileContext localizerFc =
lfs.getDefaultFileSystem(), getConf()); FileContext.getFileContext(lfs.getDefaultFileSystem(), getConf());
localizerFc.setUMask(lfs.getUMask()); localizerFc.setUMask(lfs.getUMask());
localizerFc.setWorkingDirectory(appStorageDir); localizerFc.setWorkingDirectory(appStorageDir);
LOG.info("Localizer CWD set to " + appStorageDir + " = " LOG.info("Localizer CWD set to " + appStorageDir + " = "
@ -141,6 +180,22 @@ public class DefaultContainerExecutor extends ContainerExecutor {
localizer.runLocalization(nmAddr); localizer.runLocalization(nmAddr);
} }
/**
* Create a new {@link ContainerLocalizer} instance.
*
* @param user the user who owns the job for which the localization is being
* run
* @param appId the ID of the application for which the localization is being
* run
* @param locId the ID of the container for which the localization is being
* run
* @param localDirs a list of directories to use as destinations for the
* localization
* @param localizerFc the {@link FileContext} to use when localizing files
* @return the new {@link ContainerLocalizer} instance
* @throws IOException if {@code user} or {@code locId} is {@code null} or if
* the container localizer has an initialization failure
*/
@Private @Private
@VisibleForTesting @VisibleForTesting
protected ContainerLocalizer createContainerLocalizer(String user, protected ContainerLocalizer createContainerLocalizer(String user,
@ -258,15 +313,17 @@ public class DefaultContainerExecutor extends ContainerExecutor {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("Exception from container-launch.\n"); builder.append("Exception from container-launch.\n");
builder.append("Container id: " + containerId + "\n"); builder.append("Container id: ").append(containerId).append("\n");
builder.append("Exit code: " + exitCode + "\n"); builder.append("Exit code: ").append(exitCode).append("\n");
if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) { if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) {
builder.append("Exception message: " + e.getMessage() + "\n"); builder.append("Exception message: ");
builder.append(e.getMessage()).append("\n");
} }
builder.append("Stack trace: " builder.append("Stack trace: ");
+ StringUtils.stringifyException(e) + "\n"); builder.append(StringUtils.stringifyException(e)).append("\n");
if (!shExec.getOutput().isEmpty()) { if (!shExec.getOutput().isEmpty()) {
builder.append("Shell output: " + shExec.getOutput() + "\n"); builder.append("Shell output: ");
builder.append(shExec.getOutput()).append("\n");
} }
String diagnostics = builder.toString(); String diagnostics = builder.toString();
logOutput(diagnostics); logOutput(diagnostics);
@ -283,10 +340,24 @@ public class DefaultContainerExecutor extends ContainerExecutor {
return 0; return 0;
} }
/**
* Create a new {@link ShellCommandExecutor} using the parameters.
*
* @param wrapperScriptPath the path to the script to execute
* @param containerIdStr the container ID
* @param user the application owner's username
* @param pidFile the path to the container's PID file
* @param resource this parameter controls memory and CPU limits.
* @param workDir If not-null, specifies the directory which should be set
* as the current working directory for the command. If null,
* the current working directory is not modified.
* @param environment the container environment
* @return the new {@link ShellCommandExecutor}
* @see ShellCommandExecutor
*/
protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
String containerIdStr, String user, Path pidFile, Resource resource, String containerIdStr, String user, Path pidFile, Resource resource,
File wordDir, Map<String, String> environment) File workDir, Map<String, String> environment) {
throws IOException {
String[] command = getRunCommand(wrapperScriptPath, String[] command = getRunCommand(wrapperScriptPath,
containerIdStr, user, pidFile, this.getConf(), resource); containerIdStr, user, pidFile, this.getConf(), resource);
@ -294,12 +365,20 @@ public class DefaultContainerExecutor extends ContainerExecutor {
LOG.info("launchContainer: " + Arrays.toString(command)); LOG.info("launchContainer: " + Arrays.toString(command));
return new ShellCommandExecutor( return new ShellCommandExecutor(
command, command,
wordDir, workDir,
environment, environment,
0L, 0L,
false); false);
} }
/**
* Create a {@link LocalWrapperScriptBuilder} for the given container ID
* and path that is appropriate to the current platform.
*
* @param containerIdStr the container ID
* @param containerWorkDir the container's working directory
* @return a new {@link LocalWrapperScriptBuilder}
*/
protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder( protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder(
String containerIdStr, Path containerWorkDir) { String containerIdStr, Path containerWorkDir) {
return Shell.WINDOWS ? return Shell.WINDOWS ?
@ -307,15 +386,34 @@ public class DefaultContainerExecutor extends ContainerExecutor {
new UnixLocalWrapperScriptBuilder(containerWorkDir); new UnixLocalWrapperScriptBuilder(containerWorkDir);
} }
/**
* This class is a utility to create a wrapper script that is platform
* appropriate.
*/
protected abstract class LocalWrapperScriptBuilder { protected abstract class LocalWrapperScriptBuilder {
private final Path wrapperScriptPath; private final Path wrapperScriptPath;
/**
* Return the path for the wrapper script.
*
* @return the path for the wrapper script
*/
public Path getWrapperScriptPath() { public Path getWrapperScriptPath() {
return wrapperScriptPath; return wrapperScriptPath;
} }
public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOException { /**
* Write out the wrapper script for the container launch script. This method
* will create the script at the configured wrapper script path.
*
* @param launchDst the script to launch
* @param pidFile the file that will hold the PID
* @throws IOException if the wrapper script cannot be created
* @see #getWrapperScriptPath
*/
public void writeLocalWrapperScript(Path launchDst, Path pidFile)
throws IOException {
DataOutputStream out = null; DataOutputStream out = null;
PrintStream pout = null; PrintStream pout = null;
@ -328,19 +426,40 @@ public class DefaultContainerExecutor extends ContainerExecutor {
} }
} }
protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile, /**
PrintStream pout); * Write out the wrapper script for the container launch script.
*
* @param launchDst the script to launch
* @param pidFile the file that will hold the PID
* @param pout the stream to use to write out the wrapper script
*/
protected abstract void writeLocalWrapperScript(Path launchDst,
Path pidFile, PrintStream pout);
/**
* Create an instance for the given container working directory.
*
* @param containerWorkDir the working directory for the container
*/
protected LocalWrapperScriptBuilder(Path containerWorkDir) { protected LocalWrapperScriptBuilder(Path containerWorkDir) {
this.wrapperScriptPath = new Path(containerWorkDir, this.wrapperScriptPath = new Path(containerWorkDir,
Shell.appendScriptExtension("default_container_executor")); Shell.appendScriptExtension("default_container_executor"));
} }
} }
/**
* This class is an instance of {@link LocalWrapperScriptBuilder} for
* non-Windows hosts.
*/
private final class UnixLocalWrapperScriptBuilder private final class UnixLocalWrapperScriptBuilder
extends LocalWrapperScriptBuilder { extends LocalWrapperScriptBuilder {
private final Path sessionScriptPath; private final Path sessionScriptPath;
/**
* Create an instance for the given container path.
*
* @param containerWorkDir the container's working directory
*/
public UnixLocalWrapperScriptBuilder(Path containerWorkDir) { public UnixLocalWrapperScriptBuilder(Path containerWorkDir) {
super(containerWorkDir); super(containerWorkDir);
this.sessionScriptPath = new Path(containerWorkDir, this.sessionScriptPath = new Path(containerWorkDir,
@ -383,8 +502,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
pout.println("echo $$ > " + pidFile.toString() + ".tmp"); pout.println("echo $$ > " + pidFile.toString() + ".tmp");
pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
String exec = Shell.isSetsidAvailable? "exec setsid" : "exec"; String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
pout.println(exec + " /bin/bash \"" + pout.printf("%s /bin/bash \"%s\"", exec, launchDst.toUri().getPath());
launchDst.toUri().getPath().toString() + "\"");
} finally { } finally {
IOUtils.cleanup(LOG, pout, out); IOUtils.cleanup(LOG, pout, out);
} }
@ -393,11 +511,21 @@ public class DefaultContainerExecutor extends ContainerExecutor {
} }
} }
/**
* This class is an instance of {@link LocalWrapperScriptBuilder} for
* Windows hosts.
*/
private final class WindowsLocalWrapperScriptBuilder private final class WindowsLocalWrapperScriptBuilder
extends LocalWrapperScriptBuilder { extends LocalWrapperScriptBuilder {
private final String containerIdStr; private final String containerIdStr;
/**
* Create an instance for the given container and working directory.
*
* @param containerIdStr the container ID
* @param containerWorkDir the container's working directory
*/
public WindowsLocalWrapperScriptBuilder(String containerIdStr, public WindowsLocalWrapperScriptBuilder(String containerIdStr,
Path containerWorkDir) { Path containerWorkDir) {
@ -458,6 +586,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
* *
* @param pid String pid * @param pid String pid
* @return boolean true if the process is alive * @return boolean true if the process is alive
* @throws IOException if the command to test process liveliness fails
*/ */
@VisibleForTesting @VisibleForTesting
public static boolean containerIsAlive(String pid) throws IOException { public static boolean containerIsAlive(String pid) throws IOException {
@ -478,7 +607,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
* *
* @param pid the pid of the process [group] to signal. * @param pid the pid of the process [group] to signal.
* @param signal signal to send * @param signal signal to send
* (for logging). * @throws IOException if the command to kill the process fails
*/ */
protected void killContainer(String pid, Signal signal) throws IOException { protected void killContainer(String pid, Signal signal) throws IOException {
new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid)) new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid))
@ -517,17 +646,25 @@ public class DefaultContainerExecutor extends ContainerExecutor {
FileUtil.symLink(target, symlink); FileUtil.symLink(target, symlink);
} }
/** Permissions for user dir. /**
* $local.dir/usercache/$user */ * Permissions for user dir.
* $local.dir/usercache/$user
*/
static final short USER_PERM = (short)0750; static final short USER_PERM = (short)0750;
/** Permissions for user appcache dir. /**
* $local.dir/usercache/$user/appcache */ * Permissions for user appcache dir.
* $local.dir/usercache/$user/appcache
*/
static final short APPCACHE_PERM = (short)0710; static final short APPCACHE_PERM = (short)0710;
/** Permissions for user filecache dir. /**
* $local.dir/usercache/$user/filecache */ * Permissions for user filecache dir.
* $local.dir/usercache/$user/filecache
*/
static final short FILECACHE_PERM = (short)0710; static final short FILECACHE_PERM = (short)0710;
/** Permissions for user app dir. /**
* $local.dir/usercache/$user/appcache/$appId */ * Permissions for user app dir.
* $local.dir/usercache/$user/appcache/$appId
*/
static final short APPDIR_PERM = (short)0710; static final short APPDIR_PERM = (short)0710;
private long getDiskFreeSpace(Path base) throws IOException { private long getDiskFreeSpace(Path base) throws IOException {
@ -552,9 +689,20 @@ public class DefaultContainerExecutor extends ContainerExecutor {
ContainerLocalizer.FILECACHE); ContainerLocalizer.FILECACHE);
} }
/**
* Return a randomly chosen application directory from a list of local storage
* directories. The probability of selecting a directory is proportional to
* its size.
*
* @param localDirs the target directories from which to select
* @param user the user who owns the application
* @param appId the application ID
* @return the selected directory
* @throws IOException if no application directories for the user can be
* found
*/
protected Path getWorkingDir(List<String> localDirs, String user, protected Path getWorkingDir(List<String> localDirs, String user,
String appId) throws IOException { String appId) throws IOException {
Path appStorageDir = null;
long totalAvailable = 0L; long totalAvailable = 0L;
long[] availableOnDisk = new long[localDirs.size()]; long[] availableOnDisk = new long[localDirs.size()];
int i = 0; int i = 0;
@ -563,8 +711,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
// the available space on the directory. // the available space on the directory.
// firstly calculate the sum of all available space on these directories // firstly calculate the sum of all available space on these directories
for (String localDir : localDirs) { for (String localDir : localDirs) {
Path curBase = getApplicationDir(new Path(localDir), Path curBase = getApplicationDir(new Path(localDir), user, appId);
user, appId);
long space = 0L; long space = 0L;
try { try {
space = getDiskFreeSpace(curBase); space = getDiskFreeSpace(curBase);
@ -577,8 +724,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
// throw an IOException if totalAvailable is 0. // throw an IOException if totalAvailable is 0.
if (totalAvailable <= 0L) { if (totalAvailable <= 0L) {
throw new IOException("Not able to find a working directory for " throw new IOException("Not able to find a working directory for " + user);
+ user);
} }
// make probability to pick a directory proportional to // make probability to pick a directory proportional to
@ -595,12 +741,21 @@ public class DefaultContainerExecutor extends ContainerExecutor {
while (randomPosition > availableOnDisk[dir]) { while (randomPosition > availableOnDisk[dir]) {
randomPosition -= availableOnDisk[dir++]; randomPosition -= availableOnDisk[dir++];
} }
appStorageDir = getApplicationDir(new Path(localDirs.get(dir)),
user, appId);
return appStorageDir; return getApplicationDir(new Path(localDirs.get(dir)), user, appId);
} }
/**
* Use the {@link #lfs} {@link FileContext} to create the target directory.
*
* @param dirPath the target directory
* @param perms the target permissions for the target directory
* @param createParent whether the parent directories should also be created
* @param user the user as whom the target directory should be created.
* Used only on secure Windows hosts.
* @throws IOException if there's a failure performing a file operation
* @see WindowsSecureContainerExecutor
*/
protected void createDir(Path dirPath, FsPermission perms, protected void createDir(Path dirPath, FsPermission perms,
boolean createParent, String user) throws IOException { boolean createParent, String user) throws IOException {
lfs.mkdir(dirPath, perms, createParent); lfs.mkdir(dirPath, perms, createParent);
@ -614,6 +769,11 @@ public class DefaultContainerExecutor extends ContainerExecutor {
* <ul>.mkdir * <ul>.mkdir
* <li>$local.dir/usercache/$user</li> * <li>$local.dir/usercache/$user</li>
* </ul> * </ul>
*
* @param localDirs the target directories to create
* @param user the user whose local cache directories should be initialized
* @throws IOException if there's an issue initializing the user local
* directories
*/ */
void createUserLocalDirs(List<String> localDirs, String user) void createUserLocalDirs(List<String> localDirs, String user)
throws IOException { throws IOException {
@ -622,7 +782,8 @@ public class DefaultContainerExecutor extends ContainerExecutor {
for (String localDir : localDirs) { for (String localDir : localDirs) {
// create $local.dir/usercache/$user and its immediate parent // create $local.dir/usercache/$user and its immediate parent
try { try {
createDir(getUserCacheDir(new Path(localDir), user), userperms, true, user); createDir(getUserCacheDir(new Path(localDir), user), userperms, true,
user);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unable to create the user directory : " + localDir, e); LOG.warn("Unable to create the user directory : " + localDir, e);
continue; continue;
@ -643,6 +804,11 @@ public class DefaultContainerExecutor extends ContainerExecutor {
* <li>$local.dir/usercache/$user/appcache</li> * <li>$local.dir/usercache/$user/appcache</li>
* <li>$local.dir/usercache/$user/filecache</li> * <li>$local.dir/usercache/$user/filecache</li>
* </ul> * </ul>
*
* @param localDirs the target directories to create
* @param user the user whose local cache directories should be initialized
* @throws IOException if there's an issue initializing the cache
* directories
*/ */
void createUserCacheDirs(List<String> localDirs, String user) void createUserCacheDirs(List<String> localDirs, String user)
throws IOException { throws IOException {
@ -689,7 +855,12 @@ public class DefaultContainerExecutor extends ContainerExecutor {
* <ul> * <ul>
* <li>$local.dir/usercache/$user/appcache/$appid</li> * <li>$local.dir/usercache/$user/appcache/$appid</li>
* </ul> * </ul>
* @param localDirs *
* @param localDirs the target directories to create
* @param user the user whose local cache directories should be initialized
* @param appId the application ID
* @throws IOException if there's an issue initializing the application
* directories
*/ */
void createAppDirs(List<String> localDirs, String user, String appId) void createAppDirs(List<String> localDirs, String user, String appId)
throws IOException { throws IOException {
@ -714,6 +885,12 @@ public class DefaultContainerExecutor extends ContainerExecutor {
/** /**
* Create application log directories on all disks. * Create application log directories on all disks.
*
* @param appId the application ID
* @param logDirs the target directories to create
* @param user the user whose local cache directories should be initialized
* @throws IOException if there's an issue initializing the application log
* directories
*/ */
void createAppLogDirs(String appId, List<String> logDirs, String user) void createAppLogDirs(String appId, List<String> logDirs, String user)
throws IOException { throws IOException {
@ -740,10 +917,17 @@ public class DefaultContainerExecutor extends ContainerExecutor {
/** /**
* Create application log directories on all disks. * Create application log directories on all disks.
*
* @param appId the application ID
* @param containerId the container ID
* @param logDirs the target directories to create
* @param user the user as whom the directories should be created.
* Used only on secure Windows hosts.
* @throws IOException if there's an issue initializing the container log
* directories
*/ */
void createContainerLogDirs(String appId, String containerId, void createContainerLogDirs(String appId, String containerId,
List<String> logDirs, String user) throws IOException { List<String> logDirs, String user) throws IOException {
boolean containerLogDirStatus = false; boolean containerLogDirStatus = false;
FsPermission containerLogDirPerms = new FsPermission containerLogDirPerms = new
FsPermission(getLogDirPermissions()); FsPermission(getLogDirPermissions());
@ -769,7 +953,9 @@ public class DefaultContainerExecutor extends ContainerExecutor {
} }
/** /**
* Return default container log directory permissions. * Return the default container log directory permissions.
*
* @return the default container log directory permissions
*/ */
@VisibleForTesting @VisibleForTesting
public String getLogDirPermissions() { public String getLogDirPermissions() {
@ -790,10 +976,12 @@ public class DefaultContainerExecutor extends ContainerExecutor {
} }
/** /**
* Return the list of paths of given local directories.
*
* @return the list of paths of given local directories * @return the list of paths of given local directories
*/ */
private static List<Path> getPaths(List<String> dirs) { private static List<Path> getPaths(List<String> dirs) {
List<Path> paths = new ArrayList<Path>(dirs.size()); List<Path> paths = new ArrayList<>(dirs.size());
for (int i = 0; i < dirs.size(); i++) { for (int i = 0; i < dirs.size(); i++) {
paths.add(new Path(dirs.get(i))); paths.add(new Path(dirs.get(i)));
} }

View File

@ -732,7 +732,7 @@ public class WindowsSecureContainerExecutor extends DefaultContainerExecutor {
@Override @Override
protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
String containerIdStr, String userName, Path pidFile, Resource resource, String containerIdStr, String userName, Path pidFile, Resource resource,
File wordDir, Map<String, String> environment) throws IOException { File wordDir, Map<String, String> environment) {
return new WintuilsProcessStubExecutor( return new WintuilsProcessStubExecutor(
wordDir.toString(), wordDir.toString(),
containerIdStr, userName, pidFile.toString(), containerIdStr, userName, pidFile.toString(),