YARN-3288. Document and fix indentation in the DockerContainerExecutor code
(cherry picked from commit e0ccea33c9
)
This commit is contained in:
parent
274db918c3
commit
b1b4951452
|
@ -33,6 +33,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-3397. yarn rmadmin should skip -failover. (J.Andreina via kasha)
|
YARN-3397. yarn rmadmin should skip -failover. (J.Andreina via kasha)
|
||||||
|
|
||||||
|
YARN-3288. Document and fix indentation in the DockerContainerExecutor code
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
|
|
@ -210,8 +210,22 @@ public abstract class ContainerExecutor implements Configurable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command) throws IOException{
|
/**
|
||||||
ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create();
|
* This method writes out the launch environment of a container. This can be
|
||||||
|
* overridden by extending ContainerExecutors to provide different behaviors
|
||||||
|
* @param out the output stream to which the environment is written (usually
|
||||||
|
* a script file which will be executed by the Launcher)
|
||||||
|
* @param environment The environment variables and their values
|
||||||
|
* @param resources The resources which have been localized for this container
|
||||||
|
* Symlinks will be created to these localized resources
|
||||||
|
* @param command The command that will be run.
|
||||||
|
* @throws IOException if any errors happened writing to the OutputStream,
|
||||||
|
* while creating symlinks
|
||||||
|
*/
|
||||||
|
public void writeLaunchEnv(OutputStream out, Map<String, String> environment,
|
||||||
|
Map<Path, List<String>> resources, List<String> command) throws IOException{
|
||||||
|
ContainerLaunch.ShellScriptBuilder sb =
|
||||||
|
ContainerLaunch.ShellScriptBuilder.create();
|
||||||
if (environment != null) {
|
if (environment != null) {
|
||||||
for (Map.Entry<String,String> env : environment.entrySet()) {
|
for (Map.Entry<String,String> env : environment.entrySet()) {
|
||||||
sb.env(env.getKey().toString(), env.getValue().toString());
|
sb.env(env.getKey().toString(), env.getValue().toString());
|
||||||
|
|
|
@ -18,10 +18,24 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager;
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
import com.google.common.base.Joiner;
|
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.base.Strings;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.PrintStream;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.commons.lang.math.RandomUtils;
|
import org.apache.commons.lang.math.RandomUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -45,38 +59,35 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.io.DataOutputStream;
|
import com.google.common.base.Joiner;
|
||||||
import java.io.File;
|
import com.google.common.base.Preconditions;
|
||||||
import java.io.IOException;
|
import com.google.common.base.Strings;
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.io.PrintStream;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This executor will launch a docker container and run the task inside the container.
|
* This executor will launch and run tasks inside Docker containers. It
|
||||||
|
* currently only supports simple authentication mode. It shares a lot of code
|
||||||
|
* with the DefaultContainerExecutor (and it may make sense to pull out those
|
||||||
|
* common pieces later).
|
||||||
*/
|
*/
|
||||||
public class DockerContainerExecutor extends ContainerExecutor {
|
public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(DockerContainerExecutor.class);
|
.getLog(DockerContainerExecutor.class);
|
||||||
public static final String DOCKER_CONTAINER_EXECUTOR_SCRIPT = "docker_container_executor";
|
//The name of the script file that will launch the Docker containers
|
||||||
public static final String DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT = "docker_container_executor_session";
|
public static final String DOCKER_CONTAINER_EXECUTOR_SCRIPT =
|
||||||
|
"docker_container_executor";
|
||||||
// This validates that the image is a proper docker image and would not crash docker.
|
//The name of the session script that the DOCKER_CONTAINER_EXECUTOR_SCRIPT
|
||||||
public static final String DOCKER_IMAGE_PATTERN = "^(([\\w\\.-]+)(:\\d+)*\\/)?[\\w\\.:-]+$";
|
//launches in turn
|
||||||
|
public static final String DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT =
|
||||||
|
"docker_container_executor_session";
|
||||||
|
|
||||||
|
//This validates that the image is a proper docker image and would not crash
|
||||||
|
//docker. The image name is not allowed to contain spaces. e.g.
|
||||||
|
//registry.somecompany.com:9999/containername:0.1 or
|
||||||
|
//containername:0.1 or
|
||||||
|
//containername
|
||||||
|
public static final String DOCKER_IMAGE_PATTERN =
|
||||||
|
"^(([\\w\\.-]+)(:\\d+)*\\/)?[\\w\\.:-]+$";
|
||||||
|
|
||||||
private final FileContext lfs;
|
private final FileContext lfs;
|
||||||
private final Pattern dockerImagePattern;
|
private final Pattern dockerImagePattern;
|
||||||
|
@ -96,23 +107,26 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init() throws IOException {
|
public void init() throws IOException {
|
||||||
String auth = getConf().get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);
|
String auth =
|
||||||
|
getConf().get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);
|
||||||
if (auth != null && !auth.equals("simple")) {
|
if (auth != null && !auth.equals("simple")) {
|
||||||
throw new IllegalStateException("DockerContainerExecutor only works with simple authentication mode");
|
throw new IllegalStateException(
|
||||||
|
"DockerContainerExecutor only works with simple authentication mode");
|
||||||
}
|
}
|
||||||
String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
|
String dockerExecutor = getConf().get(
|
||||||
|
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
|
||||||
YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME);
|
YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME);
|
||||||
if (!new File(dockerExecutor).exists()) {
|
if (!new File(dockerExecutor).exists()) {
|
||||||
throw new IllegalStateException("Invalid docker exec path: " + dockerExecutor);
|
throw new IllegalStateException(
|
||||||
|
"Invalid docker exec path: " + dockerExecutor);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void startLocalizer(Path nmPrivateContainerTokensPath,
|
public synchronized void startLocalizer(Path nmPrivateContainerTokensPath,
|
||||||
InetSocketAddress nmAddr, String user, String appId, String locId,
|
InetSocketAddress nmAddr, String user, String appId, String locId,
|
||||||
LocalDirsHandlerService dirsHandler)
|
LocalDirsHandlerService dirsHandler)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
|
||||||
List<String> localDirs = dirsHandler.getLocalDirs();
|
List<String> localDirs = dirsHandler.getLocalDirs();
|
||||||
List<String> logDirs = dirsHandler.getLogDirs();
|
List<String> logDirs = dirsHandler.getLogDirs();
|
||||||
|
|
||||||
|
@ -128,7 +142,8 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
// randomly choose the local directory
|
// randomly choose the local directory
|
||||||
Path appStorageDir = getWorkingDir(localDirs, user, appId);
|
Path appStorageDir = getWorkingDir(localDirs, user, appId);
|
||||||
|
|
||||||
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
|
String tokenFn =
|
||||||
|
String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
|
||||||
Path tokenDst = new Path(appStorageDir, tokenFn);
|
Path tokenDst = new Path(appStorageDir, tokenFn);
|
||||||
copyFile(nmPrivateContainerTokensPath, tokenDst, user);
|
copyFile(nmPrivateContainerTokensPath, tokenDst, user);
|
||||||
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
|
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
|
||||||
|
@ -140,31 +155,34 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int launchContainer(Container container,
|
public int launchContainer(Container container, Path
|
||||||
Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
|
nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String userName,
|
||||||
String userName, String appId, Path containerWorkDir,
|
String appId, Path containerWorkDir, List<String> localDirs, List<String>
|
||||||
List<String> localDirs, List<String> logDirs) throws IOException {
|
logDirs) throws IOException {
|
||||||
|
//Variables for the launch environment can be injected from the command-line
|
||||||
|
//while submitting the application
|
||||||
String containerImageName = container.getLaunchContext().getEnvironment()
|
String containerImageName = container.getLaunchContext().getEnvironment()
|
||||||
.get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
|
.get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("containerImageName from launchContext: " + containerImageName);
|
LOG.debug("containerImageName from launchContext: " + containerImageName);
|
||||||
}
|
}
|
||||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(containerImageName), "Container image must not be null");
|
Preconditions.checkArgument(!Strings.isNullOrEmpty(containerImageName),
|
||||||
|
"Container image must not be null");
|
||||||
containerImageName = containerImageName.replaceAll("['\"]", "");
|
containerImageName = containerImageName.replaceAll("['\"]", "");
|
||||||
|
|
||||||
Preconditions.checkArgument(saneDockerImage(containerImageName), "Image: " + containerImageName + " is not a proper docker image");
|
Preconditions.checkArgument(saneDockerImage(containerImageName), "Image: "
|
||||||
String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
|
+ containerImageName + " is not a proper docker image");
|
||||||
YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME);
|
String dockerExecutor = getConf().get(
|
||||||
|
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
|
||||||
|
YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME);
|
||||||
|
|
||||||
FsPermission dirPerm = new FsPermission(APPDIR_PERM);
|
FsPermission dirPerm = new FsPermission(APPDIR_PERM);
|
||||||
ContainerId containerId = container.getContainerId();
|
ContainerId containerId = container.getContainerId();
|
||||||
|
|
||||||
// create container dirs on all disks
|
// create container dirs on all disks
|
||||||
String containerIdStr = ConverterUtils.toString(containerId);
|
String containerIdStr = ConverterUtils.toString(containerId);
|
||||||
String appIdStr =
|
String appIdStr = ConverterUtils.toString(
|
||||||
ConverterUtils.toString(
|
containerId.getApplicationAttemptId().getApplicationId());
|
||||||
containerId.getApplicationAttemptId().
|
|
||||||
getApplicationId());
|
|
||||||
for (String sLocalDir : localDirs) {
|
for (String sLocalDir : localDirs) {
|
||||||
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
|
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
|
||||||
Path userdir = new Path(usersdir, userName);
|
Path userdir = new Path(usersdir, userName);
|
||||||
|
@ -178,46 +196,57 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
createContainerLogDirs(appIdStr, containerIdStr, logDirs, userName);
|
createContainerLogDirs(appIdStr, containerIdStr, logDirs, userName);
|
||||||
|
|
||||||
Path tmpDir = new Path(containerWorkDir,
|
Path tmpDir = new Path(containerWorkDir,
|
||||||
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
|
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
|
||||||
createDir(tmpDir, dirPerm, false, userName);
|
createDir(tmpDir, dirPerm, false, userName);
|
||||||
|
|
||||||
// copy launch script to work dir
|
// copy launch script to work dir
|
||||||
Path launchDst =
|
Path launchDst =
|
||||||
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
|
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
|
||||||
lfs.util().copy(nmPrivateContainerScriptPath, launchDst);
|
lfs.util().copy(nmPrivateContainerScriptPath, launchDst);
|
||||||
|
|
||||||
// copy container tokens to work dir
|
// copy container tokens to work dir
|
||||||
Path tokenDst =
|
Path tokenDst =
|
||||||
new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
|
new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
|
||||||
lfs.util().copy(nmPrivateTokensPath, tokenDst);
|
lfs.util().copy(nmPrivateTokensPath, tokenDst);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
String localDirMount = toMount(localDirs);
|
String localDirMount = toMount(localDirs);
|
||||||
String logDirMount = toMount(logDirs);
|
String logDirMount = toMount(logDirs);
|
||||||
String containerWorkDirMount = toMount(Collections.singletonList(containerWorkDir.toUri().getPath()));
|
String containerWorkDirMount = toMount(Collections.singletonList(
|
||||||
|
containerWorkDir.toUri().getPath()));
|
||||||
StringBuilder commands = new StringBuilder();
|
StringBuilder commands = new StringBuilder();
|
||||||
|
//Use docker run to launch the docker container. See man pages for
|
||||||
|
//docker-run
|
||||||
|
//--rm removes the container automatically once the container finishes
|
||||||
|
//--net=host allows the container to take on the host's network stack
|
||||||
|
//--name sets the Docker Container name to the YARN containerId string
|
||||||
|
//-v is used to bind mount volumes for local, log and work dirs.
|
||||||
String commandStr = commands.append(dockerExecutor)
|
String commandStr = commands.append(dockerExecutor)
|
||||||
.append(" ")
|
.append(" ")
|
||||||
.append("run")
|
.append("run")
|
||||||
.append(" ")
|
.append(" ")
|
||||||
.append("--rm --net=host")
|
.append("--rm --net=host")
|
||||||
.append(" ")
|
.append(" ")
|
||||||
.append(" --name " + containerIdStr)
|
.append(" --name " + containerIdStr)
|
||||||
.append(localDirMount)
|
.append(localDirMount)
|
||||||
.append(logDirMount)
|
.append(logDirMount)
|
||||||
.append(containerWorkDirMount)
|
.append(containerWorkDirMount)
|
||||||
.append(" ")
|
.append(" ")
|
||||||
.append(containerImageName)
|
.append(containerImageName)
|
||||||
.toString();
|
.toString();
|
||||||
String dockerPidScript = "`" + dockerExecutor + " inspect --format {{.State.Pid}} " + containerIdStr + "`";
|
//Get the pid of the process which has been launched as a docker container
|
||||||
|
//using docker inspect
|
||||||
|
String dockerPidScript = "`" + dockerExecutor +
|
||||||
|
" inspect --format {{.State.Pid}} " + containerIdStr + "`";
|
||||||
|
|
||||||
// Create new local launch wrapper script
|
// Create new local launch wrapper script
|
||||||
LocalWrapperScriptBuilder sb =
|
LocalWrapperScriptBuilder sb = new UnixLocalWrapperScriptBuilder(
|
||||||
new UnixLocalWrapperScriptBuilder(containerWorkDir, commandStr, dockerPidScript);
|
containerWorkDir, commandStr, dockerPidScript);
|
||||||
Path pidFile = getPidFilePath(containerId);
|
Path pidFile = getPidFilePath(containerId);
|
||||||
if (pidFile != null) {
|
if (pidFile != null) {
|
||||||
sb.writeLocalWrapperScript(launchDst, pidFile);
|
sb.writeLocalWrapperScript(launchDst, pidFile);
|
||||||
} else {
|
} else {
|
||||||
|
//Although the container was activated by ContainerLaunch before exec()
|
||||||
|
//was called, since then deactivateContainer() has been called.
|
||||||
LOG.info("Container " + containerIdStr
|
LOG.info("Container " + containerIdStr
|
||||||
+ " was marked as inactive. Returning terminated error");
|
+ " was marked as inactive. Returning terminated error");
|
||||||
return ExitCode.TERMINATED.getExitCode();
|
return ExitCode.TERMINATED.getExitCode();
|
||||||
|
@ -234,12 +263,13 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
String[] command = getRunCommand(sb.getWrapperScriptPath().toString(),
|
String[] command = getRunCommand(sb.getWrapperScriptPath().toString(),
|
||||||
containerIdStr, userName, pidFile, this.getConf());
|
containerIdStr, userName, pidFile, this.getConf());
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("launchContainer: " + commandStr + " " + Joiner.on(" ").join(command));
|
LOG.debug("launchContainer: " + commandStr + " " +
|
||||||
|
Joiner.on(" ").join(command));
|
||||||
}
|
}
|
||||||
shExec = new ShellCommandExecutor(
|
shExec = new ShellCommandExecutor(
|
||||||
command,
|
command,
|
||||||
new File(containerWorkDir.toUri().getPath()),
|
new File(containerWorkDir.toUri().getPath()),
|
||||||
container.getLaunchContext().getEnvironment()); // sanitized env
|
container.getLaunchContext().getEnvironment()); // sanitized env
|
||||||
if (isContainerActive(containerId)) {
|
if (isContainerActive(containerId)) {
|
||||||
shExec.execute();
|
shExec.execute();
|
||||||
} else {
|
} else {
|
||||||
|
@ -279,9 +309,17 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command) throws IOException {
|
/**
|
||||||
ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create();
|
* Filter the environment variables that may conflict with the ones set in
|
||||||
|
* the docker image and write them out to an OutputStream.
|
||||||
|
*/
|
||||||
|
public void writeLaunchEnv(OutputStream out, Map<String, String> environment,
|
||||||
|
Map<Path, List<String>> resources, List<String> command)
|
||||||
|
throws IOException {
|
||||||
|
ContainerLaunch.ShellScriptBuilder sb =
|
||||||
|
ContainerLaunch.ShellScriptBuilder.create();
|
||||||
|
|
||||||
|
//Remove environments that may conflict with the ones in Docker image.
|
||||||
Set<String> exclusionSet = new HashSet<String>();
|
Set<String> exclusionSet = new HashSet<String>();
|
||||||
exclusionSet.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
|
exclusionSet.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
|
||||||
exclusionSet.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name());
|
exclusionSet.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name());
|
||||||
|
@ -427,6 +465,9 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//This class facilitates (only) the creation of platform-specific scripts that
|
||||||
|
//will be used to launch the containers
|
||||||
|
//TODO: This should be re-used from the DefaultContainerExecutor.
|
||||||
private abstract class LocalWrapperScriptBuilder {
|
private abstract class LocalWrapperScriptBuilder {
|
||||||
|
|
||||||
private final Path wrapperScriptPath;
|
private final Path wrapperScriptPath;
|
||||||
|
@ -435,7 +476,8 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
return wrapperScriptPath;
|
return wrapperScriptPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOException {
|
public void writeLocalWrapperScript(Path launchDst, Path pidFile)
|
||||||
|
throws IOException {
|
||||||
DataOutputStream out = null;
|
DataOutputStream out = null;
|
||||||
PrintStream pout = null;
|
PrintStream pout = null;
|
||||||
|
|
||||||
|
@ -448,8 +490,8 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile,
|
protected abstract void writeLocalWrapperScript(Path launchDst,
|
||||||
PrintStream pout);
|
Path pidFile, PrintStream pout);
|
||||||
|
|
||||||
protected LocalWrapperScriptBuilder(Path containerWorkDir) {
|
protected LocalWrapperScriptBuilder(Path containerWorkDir) {
|
||||||
this.wrapperScriptPath = new Path(containerWorkDir,
|
this.wrapperScriptPath = new Path(containerWorkDir,
|
||||||
|
@ -457,13 +499,15 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//TODO: This class too should be used from DefaultContainerExecutor.
|
||||||
private final class UnixLocalWrapperScriptBuilder
|
private final class UnixLocalWrapperScriptBuilder
|
||||||
extends LocalWrapperScriptBuilder {
|
extends LocalWrapperScriptBuilder {
|
||||||
private final Path sessionScriptPath;
|
private final Path sessionScriptPath;
|
||||||
private final String dockerCommand;
|
private final String dockerCommand;
|
||||||
private final String dockerPidScript;
|
private final String dockerPidScript;
|
||||||
|
|
||||||
public UnixLocalWrapperScriptBuilder(Path containerWorkDir, String dockerCommand, String dockerPidScript) {
|
public UnixLocalWrapperScriptBuilder(Path containerWorkDir,
|
||||||
|
String dockerCommand, String dockerPidScript) {
|
||||||
super(containerWorkDir);
|
super(containerWorkDir);
|
||||||
this.dockerCommand = dockerCommand;
|
this.dockerCommand = dockerCommand;
|
||||||
this.dockerPidScript = dockerPidScript;
|
this.dockerPidScript = dockerPidScript;
|
||||||
|
@ -480,8 +524,7 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeLocalWrapperScript(Path launchDst, Path pidFile,
|
public void writeLocalWrapperScript(Path launchDst, Path pidFile,
|
||||||
PrintStream pout) {
|
PrintStream pout) {
|
||||||
|
|
||||||
String exitCodeFile = ContainerLaunch.getExitCodeFile(
|
String exitCodeFile = ContainerLaunch.getExitCodeFile(
|
||||||
pidFile.toString());
|
pidFile.toString());
|
||||||
String tmpFile = exitCodeFile + ".tmp";
|
String tmpFile = exitCodeFile + ".tmp";
|
||||||
|
@ -505,7 +548,8 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
// hence write pid to tmp file first followed by a mv
|
// hence write pid to tmp file first followed by a mv
|
||||||
pout.println("#!/usr/bin/env bash");
|
pout.println("#!/usr/bin/env bash");
|
||||||
pout.println();
|
pout.println();
|
||||||
pout.println("echo "+ dockerPidScript +" > " + pidFile.toString() + ".tmp");
|
pout.println("echo "+ dockerPidScript +" > " + pidFile.toString()
|
||||||
|
+ ".tmp");
|
||||||
pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
|
pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
|
||||||
pout.println(dockerCommand + " bash \"" +
|
pout.println(dockerCommand + " bash \"" +
|
||||||
launchDst.toUri().getPath().toString() + "\"");
|
launchDst.toUri().getPath().toString() + "\"");
|
||||||
|
@ -518,7 +562,7 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void createDir(Path dirPath, FsPermission perms,
|
protected void createDir(Path dirPath, FsPermission perms,
|
||||||
boolean createParent, String user) throws IOException {
|
boolean createParent, String user) throws IOException {
|
||||||
lfs.mkdir(dirPath, perms, createParent);
|
lfs.mkdir(dirPath, perms, createParent);
|
||||||
if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
|
if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
|
||||||
lfs.setPermission(dirPath, perms);
|
lfs.setPermission(dirPath, perms);
|
||||||
|
@ -532,13 +576,14 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
* </ul>
|
* </ul>
|
||||||
*/
|
*/
|
||||||
void createUserLocalDirs(List<String> localDirs, String user)
|
void createUserLocalDirs(List<String> localDirs, String user)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
boolean userDirStatus = false;
|
boolean userDirStatus = false;
|
||||||
FsPermission userperms = new FsPermission(USER_PERM);
|
FsPermission userperms = new FsPermission(USER_PERM);
|
||||||
for (String localDir : localDirs) {
|
for (String localDir : localDirs) {
|
||||||
// create $local.dir/usercache/$user and its immediate parent
|
// create $local.dir/usercache/$user and its immediate parent
|
||||||
try {
|
try {
|
||||||
createDir(getUserCacheDir(new Path(localDir), user), userperms, true, user);
|
createDir(getUserCacheDir(new Path(localDir), user), userperms, true,
|
||||||
|
user);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Unable to create the user directory : " + localDir, e);
|
LOG.warn("Unable to create the user directory : " + localDir, e);
|
||||||
continue;
|
continue;
|
||||||
|
@ -633,7 +678,7 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
* Create application log directories on all disks.
|
* Create application log directories on all disks.
|
||||||
*/
|
*/
|
||||||
void createContainerLogDirs(String appId, String containerId,
|
void createContainerLogDirs(String appId, String containerId,
|
||||||
List<String> logDirs, String user) throws IOException {
|
List<String> logDirs, String user) throws IOException {
|
||||||
|
|
||||||
boolean containerLogDirStatus = false;
|
boolean containerLogDirStatus = false;
|
||||||
FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM);
|
FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM);
|
||||||
|
@ -707,7 +752,7 @@ public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Path getWorkingDir(List<String> localDirs, String user,
|
protected Path getWorkingDir(List<String> localDirs, String user,
|
||||||
String appId) throws IOException {
|
String appId) throws IOException {
|
||||||
Path appStorageDir = null;
|
Path appStorageDir = null;
|
||||||
long totalAvailable = 0L;
|
long totalAvailable = 0L;
|
||||||
long[] availableOnDisk = new long[localDirs.size()];
|
long[] availableOnDisk = new long[localDirs.size()];
|
||||||
|
|
|
@ -269,8 +269,8 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
localResources, nmPrivateClasspathJarDir);
|
localResources, nmPrivateClasspathJarDir);
|
||||||
|
|
||||||
// Write out the environment
|
// Write out the environment
|
||||||
exec.writeLaunchEnv(containerScriptOutStream, environment, localResources,
|
exec.writeLaunchEnv(containerScriptOutStream, environment,
|
||||||
launchContext.getCommands());
|
localResources, launchContext.getCommands());
|
||||||
|
|
||||||
// /////////// End of writing out container-script
|
// /////////// End of writing out container-script
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,18 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager;
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
import com.google.common.base.Strings;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -26,48 +37,29 @@ import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.File;
|
import com.google.common.base.Strings;
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.FileReader;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.LineNumberReader;
|
|
||||||
import java.io.PrintWriter;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assume.assumeTrue;
|
|
||||||
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is intended to test the DockerContainerExecutor code, but it requires docker
|
* This is intended to test the DockerContainerExecutor code, but it requires
|
||||||
* to be installed.
|
* docker to be installed.
|
||||||
* <br><ol>
|
* <br><ol>
|
||||||
* <li>Install docker, and Compile the code with docker-service-url set to the host and port
|
* <li>Install docker, and Compile the code with docker-service-url set to the
|
||||||
* where docker service is running.
|
* host and port where docker service is running.
|
||||||
* <br><pre><code>
|
* <br><pre><code>
|
||||||
* > mvn clean install -Ddocker-service-url=tcp://0.0.0.0:4243
|
* > mvn clean install -Ddocker-service-url=tcp://0.0.0.0:4243 -DskipTests
|
||||||
* -DskipTests
|
|
||||||
* </code></pre>
|
* </code></pre>
|
||||||
*/
|
*/
|
||||||
public class TestDockerContainerExecutor {
|
public class TestDockerContainerExecutor {
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(TestDockerContainerExecutor.class);
|
.getLog(TestDockerContainerExecutor.class);
|
||||||
private static File workSpace = null;
|
private static File workSpace = null;
|
||||||
private DockerContainerExecutor exec = null;
|
private DockerContainerExecutor exec = null;
|
||||||
private LocalDirsHandlerService dirsHandler;
|
private LocalDirsHandlerService dirsHandler;
|
||||||
|
@ -75,14 +67,10 @@ public class TestDockerContainerExecutor {
|
||||||
private FileContext lfs;
|
private FileContext lfs;
|
||||||
private String yarnImage;
|
private String yarnImage;
|
||||||
|
|
||||||
private int id = 0;
|
|
||||||
private String appSubmitter;
|
private String appSubmitter;
|
||||||
private String dockerUrl;
|
private String dockerUrl;
|
||||||
private String testImage = "centos:latest";
|
private String testImage = "centos:latest";
|
||||||
private String dockerExec;
|
private String dockerExec;
|
||||||
private String containerIdStr;
|
|
||||||
|
|
||||||
|
|
||||||
private ContainerId getNextContainerId() {
|
private ContainerId getNextContainerId() {
|
||||||
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
|
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
|
||||||
String id = "CONTAINER_" + System.currentTimeMillis();
|
String id = "CONTAINER_" + System.currentTimeMillis();
|
||||||
|
@ -91,6 +79,8 @@ public class TestDockerContainerExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
//Initialize a new DockerContainerExecutor that will be used to launch mocked
|
||||||
|
//containers.
|
||||||
public void setup() {
|
public void setup() {
|
||||||
try {
|
try {
|
||||||
lfs = FileContext.getLocalFSFileContext();
|
lfs = FileContext.getLocalFSFileContext();
|
||||||
|
@ -113,8 +103,10 @@ public class TestDockerContainerExecutor {
|
||||||
}
|
}
|
||||||
dockerUrl = " -H " + dockerUrl;
|
dockerUrl = " -H " + dockerUrl;
|
||||||
dockerExec = "docker " + dockerUrl;
|
dockerExec = "docker " + dockerUrl;
|
||||||
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, yarnImage);
|
conf.set(
|
||||||
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, dockerExec);
|
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, yarnImage);
|
||||||
|
conf.set(
|
||||||
|
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, dockerExec);
|
||||||
exec = new DockerContainerExecutor();
|
exec = new DockerContainerExecutor();
|
||||||
dirsHandler = new LocalDirsHandlerService();
|
dirsHandler = new LocalDirsHandlerService();
|
||||||
dirsHandler.init(conf);
|
dirsHandler.init(conf);
|
||||||
|
@ -129,11 +121,10 @@ public class TestDockerContainerExecutor {
|
||||||
|
|
||||||
private Shell.ShellCommandExecutor shellExec(String command) {
|
private Shell.ShellCommandExecutor shellExec(String command) {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
Shell.ShellCommandExecutor shExec = new Shell.ShellCommandExecutor(
|
Shell.ShellCommandExecutor shExec = new Shell.ShellCommandExecutor(
|
||||||
command.split("\\s+"),
|
command.split("\\s+"),
|
||||||
new File(workDir.toUri().getPath()),
|
new File(workDir.toUri().getPath()),
|
||||||
System.getenv());
|
System.getenv());
|
||||||
shExec.execute();
|
shExec.execute();
|
||||||
return shExec;
|
return shExec;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -145,14 +136,24 @@ public class TestDockerContainerExecutor {
|
||||||
return exec != null;
|
return exec != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int runAndBlock(ContainerId cId, Map<String, String> launchCtxEnv, String... cmd) throws IOException {
|
/**
|
||||||
|
* Test that a docker container can be launched to run a command
|
||||||
|
* @param cId a fake ContainerID
|
||||||
|
* @param launchCtxEnv
|
||||||
|
* @param cmd the command to launch inside the docker container
|
||||||
|
* @return the exit code of the process used to launch the docker container
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private int runAndBlock(ContainerId cId, Map<String, String> launchCtxEnv,
|
||||||
|
String... cmd) throws IOException {
|
||||||
String appId = "APP_" + System.currentTimeMillis();
|
String appId = "APP_" + System.currentTimeMillis();
|
||||||
Container container = mock(Container.class);
|
Container container = mock(Container.class);
|
||||||
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||||
|
|
||||||
when(container.getContainerId()).thenReturn(cId);
|
when(container.getContainerId()).thenReturn(cId);
|
||||||
when(container.getLaunchContext()).thenReturn(context);
|
when(container.getLaunchContext()).thenReturn(context);
|
||||||
when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId);
|
when(cId.getApplicationAttemptId().getApplicationId().toString())
|
||||||
|
.thenReturn(appId);
|
||||||
when(context.getEnvironment()).thenReturn(launchCtxEnv);
|
when(context.getEnvironment()).thenReturn(launchCtxEnv);
|
||||||
|
|
||||||
String script = writeScriptFile(launchCtxEnv, cmd);
|
String script = writeScriptFile(launchCtxEnv, cmd);
|
||||||
|
@ -164,11 +165,13 @@ public class TestDockerContainerExecutor {
|
||||||
|
|
||||||
exec.activateContainer(cId, pidFile);
|
exec.activateContainer(cId, pidFile);
|
||||||
return exec.launchContainer(container, scriptPath, tokensPath,
|
return exec.launchContainer(container, scriptPath, tokensPath,
|
||||||
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
|
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
|
||||||
dirsHandler.getLogDirs());
|
dirsHandler.getLogDirs());
|
||||||
}
|
}
|
||||||
|
|
||||||
private String writeScriptFile(Map<String, String> launchCtxEnv, String... cmd) throws IOException {
|
// Write the script used to launch the docker container in a temp file
|
||||||
|
private String writeScriptFile(Map<String, String> launchCtxEnv,
|
||||||
|
String... cmd) throws IOException {
|
||||||
File f = File.createTempFile("TestDockerContainerExecutor", ".sh");
|
File f = File.createTempFile("TestDockerContainerExecutor", ".sh");
|
||||||
f.deleteOnExit();
|
f.deleteOnExit();
|
||||||
PrintWriter p = new PrintWriter(new FileOutputStream(f));
|
PrintWriter p = new PrintWriter(new FileOutputStream(f));
|
||||||
|
@ -193,6 +196,10 @@ public class TestDockerContainerExecutor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that a touch command can be launched successfully in a docker
|
||||||
|
* container
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testLaunchContainer() throws IOException {
|
public void testLaunchContainer() throws IOException {
|
||||||
if (!shouldRun()) {
|
if (!shouldRun()) {
|
||||||
|
@ -201,12 +208,13 @@ public class TestDockerContainerExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, String> env = new HashMap<String, String>();
|
Map<String, String> env = new HashMap<String, String>();
|
||||||
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME,
|
||||||
|
testImage);
|
||||||
String touchFileName = "touch-file-" + System.currentTimeMillis();
|
String touchFileName = "touch-file-" + System.currentTimeMillis();
|
||||||
File touchFile = new File(dirsHandler.getLocalDirs().get(0), touchFileName);
|
File touchFile = new File(dirsHandler.getLocalDirs().get(0), touchFileName);
|
||||||
ContainerId cId = getNextContainerId();
|
ContainerId cId = getNextContainerId();
|
||||||
int ret = runAndBlock(
|
int ret = runAndBlock(cId, env, "touch", touchFile.getAbsolutePath(), "&&",
|
||||||
cId, env, "touch", touchFile.getAbsolutePath(), "&&", "cp", touchFile.getAbsolutePath(), "/");
|
"cp", touchFile.getAbsolutePath(), "/");
|
||||||
|
|
||||||
assertEquals(0, ret);
|
assertEquals(0, ret);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,23 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager;
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
import com.google.common.base.Strings;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.LineNumberReader;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -36,30 +52,13 @@ import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileReader;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.LineNumberReader;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assume.assumeTrue;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mock tests for docker container executor
|
* Mock tests for docker container executor
|
||||||
*/
|
*/
|
||||||
public class TestDockerContainerExecutorWithMocks {
|
public class TestDockerContainerExecutorWithMocks {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(TestDockerContainerExecutorWithMocks.class);
|
.getLog(TestDockerContainerExecutorWithMocks.class);
|
||||||
public static final String DOCKER_LAUNCH_COMMAND = "/bin/true";
|
public static final String DOCKER_LAUNCH_COMMAND = "/bin/true";
|
||||||
private DockerContainerExecutor dockerContainerExecutor = null;
|
private DockerContainerExecutor dockerContainerExecutor = null;
|
||||||
private LocalDirsHandlerService dirsHandler;
|
private LocalDirsHandlerService dirsHandler;
|
||||||
|
@ -81,8 +80,10 @@ public class TestDockerContainerExecutorWithMocks {
|
||||||
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
|
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
|
||||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, "/tmp/nm-local-dir" + time);
|
conf.set(YarnConfiguration.NM_LOCAL_DIRS, "/tmp/nm-local-dir" + time);
|
||||||
conf.set(YarnConfiguration.NM_LOG_DIRS, "/tmp/userlogs" + time);
|
conf.set(YarnConfiguration.NM_LOG_DIRS, "/tmp/userlogs" + time);
|
||||||
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, yarnImage);
|
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME,
|
||||||
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME , DOCKER_LAUNCH_COMMAND);
|
yarnImage);
|
||||||
|
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
|
||||||
|
DOCKER_LAUNCH_COMMAND);
|
||||||
dockerContainerExecutor = new DockerContainerExecutor();
|
dockerContainerExecutor = new DockerContainerExecutor();
|
||||||
dirsHandler = new LocalDirsHandlerService();
|
dirsHandler = new LocalDirsHandlerService();
|
||||||
dirsHandler.init(conf);
|
dirsHandler.init(conf);
|
||||||
|
@ -95,7 +96,6 @@ public class TestDockerContainerExecutorWithMocks {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -110,13 +110,17 @@ public class TestDockerContainerExecutorWithMocks {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalStateException.class)
|
@Test(expected = IllegalStateException.class)
|
||||||
|
//Test that DockerContainerExecutor doesn't successfully init on a secure
|
||||||
|
//cluster
|
||||||
public void testContainerInitSecure() throws IOException {
|
public void testContainerInitSecure() throws IOException {
|
||||||
dockerContainerExecutor.getConf().set(
|
dockerContainerExecutor.getConf().set(
|
||||||
CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
||||||
dockerContainerExecutor.init();
|
dockerContainerExecutor.init();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalArgumentException.class)
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
//Test that when the image name is null, the container launch throws an
|
||||||
|
//IllegalArgumentException
|
||||||
public void testContainerLaunchNullImage() throws IOException {
|
public void testContainerLaunchNullImage() throws IOException {
|
||||||
String appSubmitter = "nobody";
|
String appSubmitter = "nobody";
|
||||||
String appId = "APP_ID";
|
String appId = "APP_ID";
|
||||||
|
@ -126,17 +130,19 @@ public class TestDockerContainerExecutorWithMocks {
|
||||||
Container container = mock(Container.class, RETURNS_DEEP_STUBS);
|
Container container = mock(Container.class, RETURNS_DEEP_STUBS);
|
||||||
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
|
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
|
||||||
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||||
HashMap<String, String> env = new HashMap<String,String>();
|
|
||||||
|
|
||||||
|
HashMap<String, String> env = new HashMap<String,String>();
|
||||||
when(container.getContainerId()).thenReturn(cId);
|
when(container.getContainerId()).thenReturn(cId);
|
||||||
when(container.getLaunchContext()).thenReturn(context);
|
when(container.getLaunchContext()).thenReturn(context);
|
||||||
when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId);
|
when(cId.getApplicationAttemptId().getApplicationId().toString())
|
||||||
|
.thenReturn(appId);
|
||||||
when(cId.toString()).thenReturn(containerId);
|
when(cId.toString()).thenReturn(containerId);
|
||||||
|
|
||||||
when(context.getEnvironment()).thenReturn(env);
|
when(context.getEnvironment()).thenReturn(env);
|
||||||
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
env.put(
|
||||||
dockerContainerExecutor.getConf()
|
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
||||||
.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
dockerContainerExecutor.getConf().set(
|
||||||
|
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
||||||
Path scriptPath = new Path("file:///bin/echo");
|
Path scriptPath = new Path("file:///bin/echo");
|
||||||
Path tokensPath = new Path("file:///dev/null");
|
Path tokensPath = new Path("file:///dev/null");
|
||||||
|
|
||||||
|
@ -149,6 +155,8 @@ public class TestDockerContainerExecutorWithMocks {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalArgumentException.class)
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
//Test that when the image name is invalid, the container launch throws an
|
||||||
|
//IllegalArgumentException
|
||||||
public void testContainerLaunchInvalidImage() throws IOException {
|
public void testContainerLaunchInvalidImage() throws IOException {
|
||||||
String appSubmitter = "nobody";
|
String appSubmitter = "nobody";
|
||||||
String appId = "APP_ID";
|
String appId = "APP_ID";
|
||||||
|
@ -162,13 +170,15 @@ public class TestDockerContainerExecutorWithMocks {
|
||||||
|
|
||||||
when(container.getContainerId()).thenReturn(cId);
|
when(container.getContainerId()).thenReturn(cId);
|
||||||
when(container.getLaunchContext()).thenReturn(context);
|
when(container.getLaunchContext()).thenReturn(context);
|
||||||
when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId);
|
when(cId.getApplicationAttemptId().getApplicationId().toString())
|
||||||
|
.thenReturn(appId);
|
||||||
when(cId.toString()).thenReturn(containerId);
|
when(cId.toString()).thenReturn(containerId);
|
||||||
|
|
||||||
when(context.getEnvironment()).thenReturn(env);
|
when(context.getEnvironment()).thenReturn(env);
|
||||||
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
env.put(
|
||||||
dockerContainerExecutor.getConf()
|
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
||||||
.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
dockerContainerExecutor.getConf().set(
|
||||||
|
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
||||||
Path scriptPath = new Path("file:///bin/echo");
|
Path scriptPath = new Path("file:///bin/echo");
|
||||||
Path tokensPath = new Path("file:///dev/null");
|
Path tokensPath = new Path("file:///dev/null");
|
||||||
|
|
||||||
|
@ -181,6 +191,8 @@ public class TestDockerContainerExecutorWithMocks {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
//Test that a container launch correctly wrote the session script with the
|
||||||
|
//commands we expected
|
||||||
public void testContainerLaunch() throws IOException {
|
public void testContainerLaunch() throws IOException {
|
||||||
String appSubmitter = "nobody";
|
String appSubmitter = "nobody";
|
||||||
String appId = "APP_ID";
|
String appId = "APP_ID";
|
||||||
|
@ -194,40 +206,48 @@ public class TestDockerContainerExecutorWithMocks {
|
||||||
|
|
||||||
when(container.getContainerId()).thenReturn(cId);
|
when(container.getContainerId()).thenReturn(cId);
|
||||||
when(container.getLaunchContext()).thenReturn(context);
|
when(container.getLaunchContext()).thenReturn(context);
|
||||||
when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId);
|
when(cId.getApplicationAttemptId().getApplicationId().toString())
|
||||||
|
.thenReturn(appId);
|
||||||
when(cId.toString()).thenReturn(containerId);
|
when(cId.toString()).thenReturn(containerId);
|
||||||
|
|
||||||
when(context.getEnvironment()).thenReturn(env);
|
when(context.getEnvironment()).thenReturn(env);
|
||||||
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
env.put(
|
||||||
|
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
||||||
Path scriptPath = new Path("file:///bin/echo");
|
Path scriptPath = new Path("file:///bin/echo");
|
||||||
Path tokensPath = new Path("file:///dev/null");
|
Path tokensPath = new Path("file:///dev/null");
|
||||||
|
|
||||||
Path pidFile = new Path(workDir, "pid");
|
Path pidFile = new Path(workDir, "pid");
|
||||||
|
|
||||||
dockerContainerExecutor.activateContainer(cId, pidFile);
|
dockerContainerExecutor.activateContainer(cId, pidFile);
|
||||||
int ret = dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath,
|
int ret = dockerContainerExecutor.launchContainer(container, scriptPath,
|
||||||
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
|
tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
|
||||||
dirsHandler.getLogDirs());
|
dirsHandler.getLogDirs());
|
||||||
assertEquals(0, ret);
|
assertEquals(0, ret);
|
||||||
//get the script
|
//get the script
|
||||||
Path sessionScriptPath = new Path(workDir,
|
Path sessionScriptPath = new Path(workDir,
|
||||||
Shell.appendScriptExtension(
|
Shell.appendScriptExtension(
|
||||||
DockerContainerExecutor.DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT));
|
DockerContainerExecutor.DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT));
|
||||||
LineNumberReader lnr = new LineNumberReader(new FileReader(sessionScriptPath.toString()));
|
LineNumberReader lnr = new LineNumberReader(new FileReader(
|
||||||
|
sessionScriptPath.toString()));
|
||||||
boolean cmdFound = false;
|
boolean cmdFound = false;
|
||||||
List<String> localDirs = dirsToMount(dirsHandler.getLocalDirs());
|
List<String> localDirs = dirsToMount(dirsHandler.getLocalDirs());
|
||||||
List<String> logDirs = dirsToMount(dirsHandler.getLogDirs());
|
List<String> logDirs = dirsToMount(dirsHandler.getLogDirs());
|
||||||
List<String> workDirMount = dirsToMount(Collections.singletonList(workDir.toUri().getPath()));
|
List<String> workDirMount = dirsToMount(Collections.singletonList(
|
||||||
List<String> expectedCommands = new ArrayList<String>(
|
workDir.toUri().getPath()));
|
||||||
Arrays.asList(DOCKER_LAUNCH_COMMAND, "run", "--rm", "--net=host", "--name", containerId));
|
List<String> expectedCommands = new ArrayList<String>(Arrays.asList(
|
||||||
|
DOCKER_LAUNCH_COMMAND, "run", "--rm", "--net=host", "--name",
|
||||||
|
containerId));
|
||||||
expectedCommands.addAll(localDirs);
|
expectedCommands.addAll(localDirs);
|
||||||
expectedCommands.addAll(logDirs);
|
expectedCommands.addAll(logDirs);
|
||||||
expectedCommands.addAll(workDirMount);
|
expectedCommands.addAll(workDirMount);
|
||||||
String shellScript = workDir + "/launch_container.sh";
|
String shellScript = workDir + "/launch_container.sh";
|
||||||
|
|
||||||
expectedCommands.addAll(Arrays.asList(testImage.replaceAll("['\"]", ""), "bash","\"" + shellScript + "\""));
|
expectedCommands.addAll(Arrays.asList(testImage.replaceAll("['\"]", ""),
|
||||||
|
"bash","\"" + shellScript + "\""));
|
||||||
|
|
||||||
String expectedPidString = "echo `/bin/true inspect --format {{.State.Pid}} " + containerId+"` > "+ pidFile.toString() + ".tmp";
|
String expectedPidString =
|
||||||
|
"echo `/bin/true inspect --format {{.State.Pid}} " + containerId+"` > "+
|
||||||
|
pidFile.toString() + ".tmp";
|
||||||
boolean pidSetterFound = false;
|
boolean pidSetterFound = false;
|
||||||
while(lnr.ready()){
|
while(lnr.ready()){
|
||||||
String line = lnr.readLine();
|
String line = lnr.readLine();
|
||||||
|
|
Loading…
Reference in New Issue