YARN-7595. Container launching code suppresses close exceptions after writes. Contributed by Jim Brennan
This commit is contained in:
parent
3ebe6a7819
commit
2abab1d7c5
|
@ -42,12 +42,10 @@ import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.util.Shell.CommandExecutor;
|
import org.apache.hadoop.util.Shell.CommandExecutor;
|
||||||
import org.apache.hadoop.util.Shell.ExitCodeException;
|
import org.apache.hadoop.util.Shell.ExitCodeException;
|
||||||
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -414,15 +412,11 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
*/
|
*/
|
||||||
public void writeLocalWrapperScript(Path launchDst, Path pidFile)
|
public void writeLocalWrapperScript(Path launchDst, Path pidFile)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
DataOutputStream out = null;
|
try (DataOutputStream out =
|
||||||
PrintStream pout = null;
|
lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE));
|
||||||
|
PrintStream pout =
|
||||||
try {
|
new PrintStream(out, false, "UTF-8")) {
|
||||||
out = lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE));
|
|
||||||
pout = new PrintStream(out, false, "UTF-8");
|
|
||||||
writeLocalWrapperScript(launchDst, pidFile, pout);
|
writeLocalWrapperScript(launchDst, pidFile, pout);
|
||||||
} finally {
|
|
||||||
IOUtils.cleanupWithLogger(LOG, pout, out);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -489,11 +483,10 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
|
|
||||||
private void writeSessionScript(Path launchDst, Path pidFile)
|
private void writeSessionScript(Path launchDst, Path pidFile)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
DataOutputStream out = null;
|
try (DataOutputStream out =
|
||||||
PrintStream pout = null;
|
lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE));
|
||||||
try {
|
PrintStream pout =
|
||||||
out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE));
|
new PrintStream(out, false, "UTF-8")) {
|
||||||
pout = new PrintStream(out, false, "UTF-8");
|
|
||||||
// We need to do a move as writing to a file is not atomic
|
// We need to do a move as writing to a file is not atomic
|
||||||
// Process reading a file being written to may get garbled data
|
// Process reading a file being written to may get garbled data
|
||||||
// hence write pid to tmp file first followed by a mv
|
// hence write pid to tmp file first followed by a mv
|
||||||
|
@ -503,8 +496,6 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
|
pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
|
||||||
String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
|
String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
|
||||||
pout.printf("%s /bin/bash \"%s\"", exec, launchDst.toUri().getPath());
|
pout.printf("%s /bin/bash \"%s\"", exec, launchDst.toUri().getPath());
|
||||||
} finally {
|
|
||||||
IOUtils.cleanupWithLogger(LOG, pout, out);
|
|
||||||
}
|
}
|
||||||
lfs.setPermission(sessionScriptPath,
|
lfs.setPermission(sessionScriptPath,
|
||||||
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
|
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
|
||||||
|
|
|
@ -220,15 +220,13 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
containerIdStr));
|
containerIdStr));
|
||||||
Path nmPrivateClasspathJarDir = dirsHandler.getLocalPathForWrite(
|
Path nmPrivateClasspathJarDir = dirsHandler.getLocalPathForWrite(
|
||||||
getContainerPrivateDir(appIdStr, containerIdStr));
|
getContainerPrivateDir(appIdStr, containerIdStr));
|
||||||
DataOutputStream containerScriptOutStream = null;
|
|
||||||
DataOutputStream tokensOutStream = null;
|
|
||||||
|
|
||||||
// Select the working directory for the container
|
// Select the working directory for the container
|
||||||
Path containerWorkDir = deriveContainerWorkDir();
|
Path containerWorkDir = deriveContainerWorkDir();
|
||||||
recordContainerWorkDir(containerID, containerWorkDir.toString());
|
recordContainerWorkDir(containerID, containerWorkDir.toString());
|
||||||
|
|
||||||
String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
|
String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
|
||||||
// pid file should be in nm private dir so that it is not
|
// pid file should be in nm private dir so that it is not
|
||||||
// accessible by users
|
// accessible by users
|
||||||
pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
|
pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
|
||||||
List<String> localDirs = dirsHandler.getLocalDirs();
|
List<String> localDirs = dirsHandler.getLocalDirs();
|
||||||
|
@ -243,24 +241,24 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
throw new IOException("Most of the disks failed. "
|
throw new IOException("Most of the disks failed. "
|
||||||
+ dirsHandler.getDisksHealthReport(false));
|
+ dirsHandler.getDisksHealthReport(false));
|
||||||
}
|
}
|
||||||
try {
|
List<Path> appDirs = new ArrayList<Path>(localDirs.size());
|
||||||
// /////////// Write out the container-script in the nmPrivate space.
|
for (String localDir : localDirs) {
|
||||||
List<Path> appDirs = new ArrayList<Path>(localDirs.size());
|
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
|
||||||
for (String localDir : localDirs) {
|
Path userdir = new Path(usersdir, user);
|
||||||
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
|
Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
|
||||||
Path userdir = new Path(usersdir, user);
|
appDirs.add(new Path(appsdir, appIdStr));
|
||||||
Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
|
}
|
||||||
appDirs.add(new Path(appsdir, appIdStr));
|
|
||||||
}
|
|
||||||
containerScriptOutStream =
|
|
||||||
lfs.create(nmPrivateContainerScriptPath,
|
|
||||||
EnumSet.of(CREATE, OVERWRITE));
|
|
||||||
|
|
||||||
// Set the token location too.
|
// Set the token location too.
|
||||||
environment.put(
|
environment.put(
|
||||||
ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
|
ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
|
||||||
new Path(containerWorkDir,
|
new Path(containerWorkDir,
|
||||||
FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
|
FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
|
||||||
|
|
||||||
|
// /////////// Write out the container-script in the nmPrivate space.
|
||||||
|
try (DataOutputStream containerScriptOutStream =
|
||||||
|
lfs.create(nmPrivateContainerScriptPath,
|
||||||
|
EnumSet.of(CREATE, OVERWRITE))) {
|
||||||
// Sanitize the container's environment
|
// Sanitize the container's environment
|
||||||
sanitizeEnv(environment, containerWorkDir, appDirs, userLocalDirs,
|
sanitizeEnv(environment, containerWorkDir, appDirs, userLocalDirs,
|
||||||
containerLogDirs, localResources, nmPrivateClasspathJarDir);
|
containerLogDirs, localResources, nmPrivateClasspathJarDir);
|
||||||
|
@ -271,18 +269,16 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
exec.writeLaunchEnv(containerScriptOutStream, environment,
|
exec.writeLaunchEnv(containerScriptOutStream, environment,
|
||||||
localResources, launchContext.getCommands(),
|
localResources, launchContext.getCommands(),
|
||||||
new Path(containerLogDirs.get(0)), user);
|
new Path(containerLogDirs.get(0)), user);
|
||||||
// /////////// End of writing out container-script
|
}
|
||||||
|
// /////////// End of writing out container-script
|
||||||
|
|
||||||
// /////////// Write out the container-tokens in the nmPrivate space.
|
// /////////// Write out the container-tokens in the nmPrivate space.
|
||||||
tokensOutStream =
|
try (DataOutputStream tokensOutStream =
|
||||||
lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE));
|
lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE))) {
|
||||||
Credentials creds = container.getCredentials();
|
Credentials creds = container.getCredentials();
|
||||||
creds.writeTokenStorageToStream(tokensOutStream);
|
creds.writeTokenStorageToStream(tokensOutStream);
|
||||||
// /////////// End of writing out container-tokens
|
|
||||||
} finally {
|
|
||||||
IOUtils.cleanupWithLogger(LOG, containerScriptOutStream,
|
|
||||||
tokensOutStream);
|
|
||||||
}
|
}
|
||||||
|
// /////////// End of writing out container-tokens
|
||||||
|
|
||||||
ret = launchContainer(new ContainerStartContext.Builder()
|
ret = launchContainer(new ContainerStartContext.Builder()
|
||||||
.setContainer(container)
|
.setContainer(container)
|
||||||
|
|
|
@ -231,7 +231,6 @@ public class JavaSandboxLinuxContainerRuntime
|
||||||
throw new ContainerExecutionException("hadoop.tmp.dir not set!");
|
throw new ContainerExecutionException("hadoop.tmp.dir not set!");
|
||||||
}
|
}
|
||||||
|
|
||||||
OutputStream policyOutputStream = null;
|
|
||||||
try {
|
try {
|
||||||
String containerID = ctx.getExecutionAttribute(CONTAINER_ID_STR);
|
String containerID = ctx.getExecutionAttribute(CONTAINER_ID_STR);
|
||||||
initializePolicyDir();
|
initializePolicyDir();
|
||||||
|
@ -242,19 +241,19 @@ public class JavaSandboxLinuxContainerRuntime
|
||||||
Paths.get(policyFileDir.toString(),
|
Paths.get(policyFileDir.toString(),
|
||||||
containerID + "-" + NMContainerPolicyUtils.POLICY_FILE),
|
containerID + "-" + NMContainerPolicyUtils.POLICY_FILE),
|
||||||
POLICY_ATTR);
|
POLICY_ATTR);
|
||||||
policyOutputStream = Files.newOutputStream(policyFilePath);
|
|
||||||
|
|
||||||
containerPolicies.put(containerID, policyFilePath);
|
try(OutputStream policyOutputStream =
|
||||||
|
Files.newOutputStream(policyFilePath)) {
|
||||||
|
|
||||||
NMContainerPolicyUtils.generatePolicyFile(policyOutputStream,
|
containerPolicies.put(containerID, policyFilePath);
|
||||||
localDirs, groupPolicyFiles, resources, configuration);
|
|
||||||
NMContainerPolicyUtils.appendSecurityFlags(
|
|
||||||
commands, env, policyFilePath, sandboxMode);
|
|
||||||
|
|
||||||
|
NMContainerPolicyUtils.generatePolicyFile(policyOutputStream,
|
||||||
|
localDirs, groupPolicyFiles, resources, configuration);
|
||||||
|
NMContainerPolicyUtils.appendSecurityFlags(
|
||||||
|
commands, env, policyFilePath, sandboxMode);
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ContainerExecutionException(e);
|
throw new ContainerExecutionException(e);
|
||||||
} finally {
|
|
||||||
IOUtils.cleanupWithLogger(LOG, policyOutputStream);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue