YARN-5388. Deprecate and remove DockerContainerExecutor. (Daniel Templeton via kasha)

This commit is contained in:
Karthik Kambatla 2016-10-25 13:35:47 -07:00
parent 0f0c15f7a5
commit de6faae97c
8 changed files with 0 additions and 1618 deletions

View File

@ -131,7 +131,6 @@
<item name="Writing YARN Applications" href="hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html"/>
<item name="YARN Application Security" href="hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html"/>
<item name="NodeManager" href="hadoop-yarn/hadoop-yarn-site/NodeManager.html"/>
<item name="DockerContainerExecutor" href="hadoop-yarn/hadoop-yarn-site/DockerContainerExecutor.html"/>
<item name="Using CGroups" href="hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html"/>
<item name="Secure Containers" href="hadoop-yarn/hadoop-yarn-site/SecureContainer.html"/>
<item name="Registry" href="hadoop-yarn/hadoop-yarn-site/registry/index.html"/>

View File

@ -1392,18 +1392,6 @@ public class YarnConfiguration extends Configuration {
public static final String NM_CONTAINER_LOCALIZER_JAVA_OPTS_DEFAULT =
"-Xmx256m";
/** The Docker image name(For DockerContainerExecutor).*/
public static final String NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME =
NM_PREFIX + "docker-container-executor.image-name";
/** The name of the docker executor (For DockerContainerExecutor).*/
public static final String NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME =
NM_PREFIX + "docker-container-executor.exec-name";
/** The default docker executor (For DockerContainerExecutor).*/
public static final String NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME =
"/usr/bin/docker";
/** Prefix for runtime configuration constants. */
public static final String LINUX_CONTAINER_RUNTIME_PREFIX = NM_PREFIX +
"runtime.linux.";

View File

@ -1798,24 +1798,6 @@
<value></value>
</property>
<!-- Docker Configuration -->
<property>
<name>yarn.nodemanager.docker-container-executor.exec-name</name>
<value>/usr/bin/docker</value>
<description>
Name or path to the Docker client.
</description>
</property>
<property>
<description>
The Docker image name to use for DockerContainerExecutor
</description>
<name>yarn.nodemanager.docker-container-executor.image-name</name>
<value></value>
</property>
<!-- Map Reduce Configuration -->
<property>

View File

@ -320,7 +320,6 @@ public abstract class ContainerExecutor implements Configurable {
ContainerLaunch.ShellScriptBuilder.create();
Set<String> whitelist = new HashSet<>();
whitelist.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
whitelist.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name());
whitelist.add(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name());
whitelist.add(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name());

View File

@ -1,883 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
/**
* This executor will launch and run tasks inside Docker containers. It
* currently only supports simple authentication mode. It shares a lot of code
* with the DefaultContainerExecutor (and it may make sense to pull out those
* common pieces later).
*/
public class DockerContainerExecutor extends ContainerExecutor {
private static final Log LOG = LogFactory
.getLog(DockerContainerExecutor.class);
//The name of the script file that will launch the Docker containers
public static final String DOCKER_CONTAINER_EXECUTOR_SCRIPT =
"docker_container_executor";
//The name of the session script that the DOCKER_CONTAINER_EXECUTOR_SCRIPT
//launches in turn
public static final String DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT =
"docker_container_executor_session";
//This validates that the image is a proper docker image and would not crash
//docker. The image name is not allowed to contain spaces. e.g.
//registry.somecompany.com:9999/containername:0.1 or
//containername:0.1 or
//containername
public static final String DOCKER_IMAGE_PATTERN =
"^(([\\w\\.-]+)(:\\d+)*\\/)?[\\w\\.:-]+$";
private final FileContext lfs;
private final Pattern dockerImagePattern;
public DockerContainerExecutor() {
try {
this.lfs = FileContext.getLocalFSFileContext();
this.dockerImagePattern = Pattern.compile(DOCKER_IMAGE_PATTERN);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
}
protected void copyFile(Path src, Path dst, String owner) throws IOException {
lfs.util().copy(src, dst);
}
@Override
public void init() throws IOException {
String auth =
getConf().get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);
if (auth != null && !auth.equals("simple")) {
throw new IllegalStateException(
"DockerContainerExecutor only works with simple authentication mode");
}
String dockerExecutor = getConf().get(
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME);
if (!new File(dockerExecutor).exists()) {
throw new IllegalStateException(
"Invalid docker exec path: " + dockerExecutor);
}
}
@Override
public synchronized void startLocalizer(LocalizerStartContext ctx)
throws IOException, InterruptedException {
Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
InetSocketAddress nmAddr = ctx.getNmAddr();
String user = ctx.getUser();
String appId = ctx.getAppId();
String locId = ctx.getLocId();
LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
ContainerLocalizer localizer =
new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs),
RecordFactoryProvider.getRecordFactory(getConf()));
createUserLocalDirs(localDirs, user);
createUserCacheDirs(localDirs, user);
createAppDirs(localDirs, user, appId);
createAppLogDirs(appId, logDirs, user);
// randomly choose the local directory
Path appStorageDir = getWorkingDir(localDirs, user, appId);
String tokenFn =
String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
copyFile(nmPrivateContainerTokensPath, tokenDst, user);
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
lfs.setWorkingDirectory(appStorageDir);
LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
// TODO: DO it over RPC for maintaining similarity?
localizer.runLocalization(nmAddr);
}
@Override
public int launchContainer(ContainerStartContext ctx) throws IOException {
Container container = ctx.getContainer();
Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath();
Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath();
String userName = ctx.getUser();
Path containerWorkDir = ctx.getContainerWorkDir();
List<String> localDirs = ctx.getLocalDirs();
List<String> logDirs = ctx.getLogDirs();
//Variables for the launch environment can be injected from the command-line
//while submitting the application
String containerImageName = container.getLaunchContext().getEnvironment()
.get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
if (LOG.isDebugEnabled()) {
LOG.debug("containerImageName from launchContext: " + containerImageName);
}
Preconditions.checkArgument(!Strings.isNullOrEmpty(containerImageName),
"Container image must not be null");
containerImageName = containerImageName.replaceAll("['\"]", "");
Preconditions.checkArgument(saneDockerImage(containerImageName), "Image: "
+ containerImageName + " is not a proper docker image");
String dockerExecutor = getConf().get(
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME);
FsPermission dirPerm = new FsPermission(APPDIR_PERM);
ContainerId containerId = container.getContainerId();
// create container dirs on all disks
String containerIdStr = containerId.toString();
String appIdStr =
containerId.getApplicationAttemptId().getApplicationId().toString();
for (String sLocalDir : localDirs) {
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(appCacheDir, appIdStr);
Path containerDir = new Path(appDir, containerIdStr);
createDir(containerDir, dirPerm, true, userName);
}
// Create the container log-dirs on all disks
createContainerLogDirs(appIdStr, containerIdStr, logDirs, userName);
Path tmpDir = new Path(containerWorkDir,
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
createDir(tmpDir, dirPerm, false, userName);
// copy launch script to work dir
Path launchDst =
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
lfs.util().copy(nmPrivateContainerScriptPath, launchDst);
// copy container tokens to work dir
Path tokenDst =
new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
lfs.util().copy(nmPrivateTokensPath, tokenDst);
String localDirMount = toMount(localDirs);
String logDirMount = toMount(logDirs);
String containerWorkDirMount = toMount(Collections.singletonList(
containerWorkDir.toUri().getPath()));
StringBuilder commands = new StringBuilder();
//Use docker run to launch the docker container. See man pages for
//docker-run
//--rm removes the container automatically once the container finishes
//--net=host allows the container to take on the host's network stack
//--name sets the Docker Container name to the YARN containerId string
//-v is used to bind mount volumes for local, log and work dirs.
String commandStr = commands.append(dockerExecutor)
.append(" ")
.append("run")
.append(" ")
.append("--rm --net=host")
.append(" ")
.append(" --name " + containerIdStr)
.append(localDirMount)
.append(logDirMount)
.append(containerWorkDirMount)
.append(" ")
.append(containerImageName)
.toString();
//Get the pid of the process which has been launched as a docker container
//using docker inspect
String dockerPidScript = "`" + dockerExecutor +
" inspect --format {{.State.Pid}} " + containerIdStr + "`";
// Create new local launch wrapper script
LocalWrapperScriptBuilder sb = new UnixLocalWrapperScriptBuilder(
containerWorkDir, commandStr, dockerPidScript);
Path pidFile = getPidFilePath(containerId);
if (pidFile != null) {
sb.writeLocalWrapperScript(launchDst, pidFile);
} else {
//Although the container was activated by ContainerLaunch before exec()
//was called, since then deactivateContainer() has been called.
LOG.info("Container " + containerIdStr
+ " was marked as inactive. Returning terminated error");
return ExitCode.TERMINATED.getExitCode();
}
ShellCommandExecutor shExec = null;
try {
lfs.setPermission(launchDst,
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
lfs.setPermission(sb.getWrapperScriptPath(),
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
// Setup command to run
String[] command = getRunCommand(sb.getWrapperScriptPath().toString(),
containerIdStr, userName, pidFile, this.getConf());
if (LOG.isDebugEnabled()) {
LOG.debug("launchContainer: " + commandStr + " " +
Joiner.on(" ").join(command));
}
shExec = new ShellCommandExecutor(
command,
new File(containerWorkDir.toUri().getPath()),
container.getLaunchContext().getEnvironment(), // sanitized env
0L,
false);
if (isContainerActive(containerId)) {
shExec.execute();
} else {
LOG.info("Container " + containerIdStr +
" was marked as inactive. Returning terminated error");
return ExitCode.TERMINATED.getExitCode();
}
} catch (IOException e) {
if (null == shExec) {
return -1;
}
int exitCode = shExec.getExitCode();
LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
// terminated/killed forcefully. In all other cases, log the
// container-executor's output
if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
LOG.warn("Exception from container-launch with container ID: "
+ containerId + " and exit code: " + exitCode, e);
logOutput(shExec.getOutput());
String diagnostics = "Exception from container-launch: \n"
+ StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
diagnostics));
} else {
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
"Container killed on request. Exit code is " + exitCode));
}
return exitCode;
} finally {
if (shExec != null) {
shExec.close();
}
}
return 0;
}
@Override
/**
* Filter the environment variables that may conflict with the ones set in
* the docker image and write them out to an OutputStream.
*/
public void writeLaunchEnv(OutputStream out, Map<String, String> environment,
Map<Path, List<String>> resources, List<String> command, Path logDir,
String user) throws IOException {
ContainerLaunch.ShellScriptBuilder sb =
ContainerLaunch.ShellScriptBuilder.create();
//Remove environments that may conflict with the ones in Docker image.
Set<String> exclusionSet = new HashSet<String>();
exclusionSet.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
exclusionSet.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name());
exclusionSet.add(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name());
exclusionSet.add(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name());
exclusionSet.add(ApplicationConstants.Environment.HADOOP_CONF_DIR.name());
exclusionSet.add(ApplicationConstants.Environment.JAVA_HOME.name());
if (environment != null) {
for (Map.Entry<String,String> env : environment.entrySet()) {
if (!exclusionSet.contains(env.getKey())) {
sb.env(env.getKey().toString(), env.getValue().toString());
}
}
}
if (resources != null) {
for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
for (String linkName : entry.getValue()) {
sb.symlink(entry.getKey(), new Path(linkName));
}
}
}
// dump debugging information if configured
if (getConf() != null && getConf().getBoolean(
YarnConfiguration.NM_LOG_CONTAINER_DEBUG_INFO,
YarnConfiguration.DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO)) {
sb.copyDebugInformation(new Path(ContainerLaunch.CONTAINER_SCRIPT),
new Path(logDir, ContainerLaunch.CONTAINER_SCRIPT));
sb.listDebugInformation(new Path(logDir, DIRECTORY_CONTENTS));
}
sb.command(command);
PrintStream pout = null;
PrintStream ps = null;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
pout = new PrintStream(out, false, "UTF-8");
if (LOG.isDebugEnabled()) {
ps = new PrintStream(baos, false, "UTF-8");
sb.write(ps);
}
sb.write(pout);
} finally {
if (out != null) {
out.close();
}
if (ps != null) {
ps.close();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Script: " + baos.toString("UTF-8"));
}
}
private boolean saneDockerImage(String containerImageName) {
return dockerImagePattern.matcher(containerImageName).matches();
}
@Override
public boolean signalContainer(ContainerSignalContext ctx)
throws IOException {
String user = ctx.getUser();
String pid = ctx.getPid();
Signal signal = ctx.getSignal();
if (LOG.isDebugEnabled()) {
LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid
+ " as user " + user);
}
if (!containerIsAlive(pid)) {
return false;
}
try {
killContainer(pid, signal);
} catch (IOException e) {
if (!containerIsAlive(pid)) {
return false;
}
throw e;
}
return true;
}
@Override
public boolean isContainerAlive(ContainerLivenessContext ctx)
throws IOException {
String pid = ctx.getPid();
return containerIsAlive(pid);
}
/**
* Returns true if the process with the specified pid is alive.
*
* @param pid String pid
* @return boolean true if the process is alive
*/
@VisibleForTesting
public static boolean containerIsAlive(String pid) throws IOException {
try {
new ShellCommandExecutor(Shell.getCheckProcessIsAliveCommand(pid))
.execute();
// successful execution means process is alive
return true;
}
catch (Shell.ExitCodeException e) {
// failure (non-zero exit code) means process is not alive
return false;
}
}
/**
* Send a specified signal to the specified pid
*
* @param pid the pid of the process [group] to signal.
* @param signal signal to send
* (for logging).
*/
protected void killContainer(String pid, Signal signal) throws IOException {
new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid))
.execute();
}
@Override
public void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException {
Path subDir = ctx.getSubDir();
List<Path> baseDirs = ctx.getBasedirs();
if (baseDirs == null || baseDirs.size() == 0) {
LOG.info("Deleting absolute path : " + subDir);
if (!lfs.delete(subDir, true)) {
//Maybe retry
LOG.warn("delete returned false for path: [" + subDir + "]");
}
return;
}
for (Path baseDir : baseDirs) {
Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
LOG.info("Deleting path : " + del);
try {
if (!lfs.delete(del, true)) {
LOG.warn("delete returned false for path: [" + del + "]");
}
} catch (FileNotFoundException e) {
continue;
}
}
}
@Override
public void symLink(String target, String symlink)
throws IOException {
}
/**
* Converts a directory list to a docker mount string
* @param dirs
* @return a string of mounts for docker
*/
private String toMount(List<String> dirs) {
StringBuilder builder = new StringBuilder();
for (String dir : dirs) {
builder.append(" -v " + dir + ":" + dir);
}
return builder.toString();
}
//This class facilitates (only) the creation of platform-specific scripts that
//will be used to launch the containers
//TODO: This should be re-used from the DefaultContainerExecutor.
private abstract class LocalWrapperScriptBuilder {
private final Path wrapperScriptPath;
public Path getWrapperScriptPath() {
return wrapperScriptPath;
}
public void writeLocalWrapperScript(Path launchDst, Path pidFile)
throws IOException {
DataOutputStream out = null;
PrintStream pout = null;
try {
out = lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE));
pout = new PrintStream(out, false, "UTF-8");
writeLocalWrapperScript(launchDst, pidFile, pout);
} finally {
IOUtils.cleanup(LOG, pout, out);
}
}
protected abstract void writeLocalWrapperScript(Path launchDst,
Path pidFile, PrintStream pout);
protected LocalWrapperScriptBuilder(Path containerWorkDir) {
this.wrapperScriptPath = new Path(containerWorkDir,
Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SCRIPT));
}
}
//TODO: This class too should be used from DefaultContainerExecutor.
private final class UnixLocalWrapperScriptBuilder
extends LocalWrapperScriptBuilder {
private final Path sessionScriptPath;
private final String dockerCommand;
private final String dockerPidScript;
public UnixLocalWrapperScriptBuilder(Path containerWorkDir,
String dockerCommand, String dockerPidScript) {
super(containerWorkDir);
this.dockerCommand = dockerCommand;
this.dockerPidScript = dockerPidScript;
this.sessionScriptPath = new Path(containerWorkDir,
Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT));
}
@Override
public void writeLocalWrapperScript(Path launchDst, Path pidFile)
throws IOException {
writeSessionScript(launchDst, pidFile);
super.writeLocalWrapperScript(launchDst, pidFile);
}
@Override
public void writeLocalWrapperScript(Path launchDst, Path pidFile,
PrintStream pout) {
String exitCodeFile = ContainerLaunch.getExitCodeFile(
pidFile.toString());
String tmpFile = exitCodeFile + ".tmp";
pout.println("#!/usr/bin/env bash");
pout.println("bash \"" + sessionScriptPath.toString() + "\"");
pout.println("rc=$?");
pout.println("echo $rc > \"" + tmpFile + "\"");
pout.println("mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\"");
pout.println("exit $rc");
}
private void writeSessionScript(Path launchDst, Path pidFile)
throws IOException {
DataOutputStream out = null;
PrintStream pout = null;
try {
out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE));
pout = new PrintStream(out, false, "UTF-8");
// We need to do a move as writing to a file is not atomic
// Process reading a file being written to may get garbled data
// hence write pid to tmp file first followed by a mv
pout.println("#!/usr/bin/env bash");
pout.println();
pout.println("echo "+ dockerPidScript +" > " + pidFile.toString()
+ ".tmp");
pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
pout.println(dockerCommand + " bash \"" +
launchDst.toUri().getPath().toString() + "\"");
} finally {
IOUtils.cleanup(LOG, pout, out);
}
lfs.setPermission(sessionScriptPath,
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
}
}
protected void createDir(Path dirPath, FsPermission perms,
boolean createParent, String user) throws IOException {
lfs.mkdir(dirPath, perms, createParent);
if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
lfs.setPermission(dirPath, perms);
}
}
/**
* Initialize the local directories for a particular user.
* <ul>.mkdir
* <li>$local.dir/usercache/$user</li>
* </ul>
*/
void createUserLocalDirs(List<String> localDirs, String user)
throws IOException {
boolean userDirStatus = false;
FsPermission userperms = new FsPermission(USER_PERM);
for (String localDir : localDirs) {
// create $local.dir/usercache/$user and its immediate parent
try {
createDir(getUserCacheDir(new Path(localDir), user), userperms, true,
user);
} catch (IOException e) {
LOG.warn("Unable to create the user directory : " + localDir, e);
continue;
}
userDirStatus = true;
}
if (!userDirStatus) {
throw new IOException("Not able to initialize user directories "
+ "in any of the configured local directories for user " + user);
}
}
/**
* Initialize the local cache directories for a particular user.
* <ul>
* <li>$local.dir/usercache/$user</li>
* <li>$local.dir/usercache/$user/appcache</li>
* <li>$local.dir/usercache/$user/filecache</li>
* </ul>
*/
void createUserCacheDirs(List<String> localDirs, String user)
throws IOException {
LOG.info("Initializing user " + user);
boolean appcacheDirStatus = false;
boolean distributedCacheDirStatus = false;
FsPermission appCachePerms = new FsPermission(APPCACHE_PERM);
FsPermission fileperms = new FsPermission(FILECACHE_PERM);
for (String localDir : localDirs) {
// create $local.dir/usercache/$user/appcache
Path localDirPath = new Path(localDir);
final Path appDir = getAppcacheDir(localDirPath, user);
try {
createDir(appDir, appCachePerms, true, user);
appcacheDirStatus = true;
} catch (IOException e) {
LOG.warn("Unable to create app cache directory : " + appDir, e);
}
// create $local.dir/usercache/$user/filecache
final Path distDir = getFileCacheDir(localDirPath, user);
try {
createDir(distDir, fileperms, true, user);
distributedCacheDirStatus = true;
} catch (IOException e) {
LOG.warn("Unable to create file cache directory : " + distDir, e);
}
}
if (!appcacheDirStatus) {
throw new IOException("Not able to initialize app-cache directories "
+ "in any of the configured local directories for user " + user);
}
if (!distributedCacheDirStatus) {
throw new IOException(
"Not able to initialize distributed-cache directories "
+ "in any of the configured local directories for user "
+ user);
}
}
/**
* Initialize the local directories for a particular user.
* <ul>
* <li>$local.dir/usercache/$user/appcache/$appid</li>
* </ul>
* @param localDirs
*/
void createAppDirs(List<String> localDirs, String user, String appId)
throws IOException {
boolean initAppDirStatus = false;
FsPermission appperms = new FsPermission(APPDIR_PERM);
for (String localDir : localDirs) {
Path fullAppDir = getApplicationDir(new Path(localDir), user, appId);
// create $local.dir/usercache/$user/appcache/$appId
try {
createDir(fullAppDir, appperms, true, user);
initAppDirStatus = true;
} catch (IOException e) {
LOG.warn("Unable to create app directory " + fullAppDir.toString(), e);
}
}
if (!initAppDirStatus) {
throw new IOException("Not able to initialize app directories "
+ "in any of the configured local directories for app "
+ appId.toString());
}
}
/**
* Create application log directories on all disks.
*/
void createContainerLogDirs(String appId, String containerId,
List<String> logDirs, String user) throws IOException {
boolean containerLogDirStatus = false;
FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM);
for (String rootLogDir : logDirs) {
// create $log.dir/$appid/$containerid
Path appLogDir = new Path(rootLogDir, appId);
Path containerLogDir = new Path(appLogDir, containerId);
try {
createDir(containerLogDir, containerLogDirPerms, true, user);
} catch (IOException e) {
LOG.warn("Unable to create the container-log directory : "
+ appLogDir, e);
continue;
}
containerLogDirStatus = true;
}
if (!containerLogDirStatus) {
throw new IOException(
"Not able to initialize container-log directories "
+ "in any of the configured local directories for container "
+ containerId);
}
}
/**
* Permissions for user dir.
* $local.dir/usercache/$user
*/
static final short USER_PERM = (short) 0750;
/**
* Permissions for user appcache dir.
* $local.dir/usercache/$user/appcache
*/
static final short APPCACHE_PERM = (short) 0710;
/**
* Permissions for user filecache dir.
* $local.dir/usercache/$user/filecache
*/
static final short FILECACHE_PERM = (short) 0710;
/**
* Permissions for user app dir.
* $local.dir/usercache/$user/appcache/$appId
*/
static final short APPDIR_PERM = (short) 0710;
/**
* Permissions for user log dir.
* $logdir/$user/$appId
*/
static final short LOGDIR_PERM = (short) 0710;
private long getDiskFreeSpace(Path base) throws IOException {
return lfs.getFsStatus(base).getRemaining();
}
private Path getApplicationDir(Path base, String user, String appId) {
return new Path(getAppcacheDir(base, user), appId);
}
private Path getUserCacheDir(Path base, String user) {
return new Path(new Path(base, ContainerLocalizer.USERCACHE), user);
}
private Path getAppcacheDir(Path base, String user) {
return new Path(getUserCacheDir(base, user),
ContainerLocalizer.APPCACHE);
}
private Path getFileCacheDir(Path base, String user) {
return new Path(getUserCacheDir(base, user),
ContainerLocalizer.FILECACHE);
}
protected Path getWorkingDir(List<String> localDirs, String user,
String appId) throws IOException {
Path appStorageDir = null;
long totalAvailable = 0L;
long[] availableOnDisk = new long[localDirs.size()];
int i = 0;
// randomly choose the app directory
// the chance of picking a directory is proportional to
// the available space on the directory.
// firstly calculate the sum of all available space on these directories
for (String localDir : localDirs) {
Path curBase = getApplicationDir(new Path(localDir),
user, appId);
long space = 0L;
try {
space = getDiskFreeSpace(curBase);
} catch (IOException e) {
LOG.warn("Unable to get Free Space for " + curBase.toString(), e);
}
availableOnDisk[i++] = space;
totalAvailable += space;
}
// throw an IOException if totalAvailable is 0.
if (totalAvailable <= 0L) {
throw new IOException("Not able to find a working directory for "
+ user);
}
// make probability to pick a directory proportional to
// the available space on the directory.
long randomPosition = RandomUtils.nextLong() % totalAvailable;
int dir = 0;
// skip zero available space directory,
// because totalAvailable is greater than 0 and randomPosition
// is less than totalAvailable, we can find a valid directory
// with nonzero available space.
while (availableOnDisk[dir] == 0L) {
dir++;
}
while (randomPosition > availableOnDisk[dir]) {
randomPosition -= availableOnDisk[dir++];
}
appStorageDir = getApplicationDir(new Path(localDirs.get(dir)),
user, appId);
return appStorageDir;
}
/**
* Create application log directories on all disks.
*/
void createAppLogDirs(String appId, List<String> logDirs, String user)
throws IOException {
boolean appLogDirStatus = false;
FsPermission appLogDirPerms = new FsPermission(LOGDIR_PERM);
for (String rootLogDir : logDirs) {
// create $log.dir/$appid
Path appLogDir = new Path(rootLogDir, appId);
try {
createDir(appLogDir, appLogDirPerms, true, user);
} catch (IOException e) {
LOG.warn("Unable to create the app-log directory : " + appLogDir, e);
continue;
}
appLogDirStatus = true;
}
if (!appLogDirStatus) {
throw new IOException("Not able to initialize app-log directories "
+ "in any of the configured local directories for app " + appId);
}
}
/**
* @return the list of paths of given local directories
*/
private static List<Path> getPaths(List<String> dirs) {
List<Path> paths = new ArrayList<Path>(dirs.size());
for (int i = 0; i < dirs.size(); i++) {
paths.add(new Path(dirs.get(i)));
}
return paths;
}
}

View File

@ -1,244 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Strings;
/**
* This is intended to test the DockerContainerExecutor code, but it requires
* docker to be installed.
* <br><ol>
* <li>To run the tests, set the docker-service-url to the host and port where
* docker service is running (If docker-service-url is not specified then the
* local daemon will be used).
* <br><pre><code>
* mvn test -Ddocker-service-url=tcp://0.0.0.0:4243 -Dtest=TestDockerContainerExecutor
* </code></pre>
*/
public class TestDockerContainerExecutor {
private static final Log LOG = LogFactory
.getLog(TestDockerContainerExecutor.class);
private static File workSpace = null;
private DockerContainerExecutor exec = null;
private LocalDirsHandlerService dirsHandler;
private Path workDir;
private FileContext lfs;
private String yarnImage;
private String appSubmitter;
private String dockerUrl;
private String testImage = "centos:latest";
private String dockerExec;
private ContainerId getNextContainerId() {
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
String id = "CONTAINER_" + System.currentTimeMillis();
when(cId.toString()).thenReturn(id);
return cId;
}
@Before
//Initialize a new DockerContainerExecutor that will be used to launch mocked
//containers.
public void setup() {
try {
lfs = FileContext.getLocalFSFileContext();
workDir = new Path("/tmp/temp-" + System.currentTimeMillis());
workSpace = new File(workDir.toUri().getPath());
lfs.mkdir(workDir, FsPermission.getDirDefault(), true);
} catch (IOException e) {
throw new RuntimeException(e);
}
Configuration conf = new Configuration();
yarnImage = "yarnImage";
long time = System.currentTimeMillis();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, "/tmp/nm-local-dir" + time);
conf.set(YarnConfiguration.NM_LOG_DIRS, "/tmp/userlogs" + time);
dockerUrl = System.getProperty("docker-service-url");
LOG.info("dockerUrl: " + dockerUrl);
if (!Strings.isNullOrEmpty(dockerUrl)) {
dockerUrl = " -H " + dockerUrl;
} else if(isDockerDaemonRunningLocally()) {
dockerUrl = "";
} else {
return;
}
dockerExec = "docker " + dockerUrl;
conf.set(
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, yarnImage);
conf.set(
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, dockerExec);
exec = new DockerContainerExecutor();
dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
exec.setConf(conf);
appSubmitter = System.getProperty("application.submitter");
if (appSubmitter == null || appSubmitter.isEmpty()) {
appSubmitter = "nobody";
}
shellExec(dockerExec + " pull " + testImage);
}
private Shell.ShellCommandExecutor shellExec(String command) {
try {
Shell.ShellCommandExecutor shExec = new Shell.ShellCommandExecutor(
command.split("\\s+"),
new File(workDir.toUri().getPath()),
System.getenv());
shExec.execute();
return shExec;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private boolean shouldRun() {
return exec != null;
}
private boolean isDockerDaemonRunningLocally() {
boolean dockerDaemonRunningLocally = true;
try {
shellExec("docker info");
} catch (Exception e) {
LOG.info("docker daemon is not running on local machine.");
dockerDaemonRunningLocally = false;
}
return dockerDaemonRunningLocally;
}
/**
* Test that a docker container can be launched to run a command
* @param cId a fake ContainerID
* @param launchCtxEnv
* @param cmd the command to launch inside the docker container
* @return the exit code of the process used to launch the docker container
* @throws IOException
*/
private int runAndBlock(ContainerId cId, Map<String, String> launchCtxEnv,
String... cmd) throws IOException {
String appId = "APP_" + System.currentTimeMillis();
Container container = mock(Container.class);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
when(cId.getApplicationAttemptId().getApplicationId().toString())
.thenReturn(appId);
when(context.getEnvironment()).thenReturn(launchCtxEnv);
String script = writeScriptFile(launchCtxEnv, cmd);
Path scriptPath = new Path(script);
Path tokensPath = new Path("/dev/null");
Path workDir = new Path(workSpace.getAbsolutePath());
Path pidFile = new Path(workDir, "pid.txt");
exec.activateContainer(cId, pidFile);
return exec.launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
}
// Write the script used to launch the docker container in a temp file
private String writeScriptFile(Map<String, String> launchCtxEnv,
String... cmd) throws IOException {
File f = File.createTempFile("TestDockerContainerExecutor", ".sh");
f.deleteOnExit();
PrintWriter p = new PrintWriter(new FileOutputStream(f));
for(Map.Entry<String, String> entry: launchCtxEnv.entrySet()) {
p.println("export " + entry.getKey() + "=\"" + entry.getValue() + "\"");
}
for (String part : cmd) {
p.print(part.replace("\\", "\\\\").replace("'", "\\'"));
p.print(" ");
}
p.println();
p.close();
return f.getAbsolutePath();
}
@After
public void tearDown() {
try {
lfs.delete(workDir, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Test that a touch command can be launched successfully in a docker
* container
*/
@Test(timeout=1000000)
public void testLaunchContainer() throws IOException {
if (!shouldRun()) {
LOG.warn("Docker not installed, aborting test.");
return;
}
Map<String, String> env = new HashMap<String, String>();
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME,
testImage);
String touchFileName = "touch-file-" + System.currentTimeMillis();
File touchFile = new File(dirsHandler.getLocalDirs().get(0), touchFileName);
ContainerId cId = getNextContainerId();
int ret = runAndBlock(cId, env, "touch", touchFile.getAbsolutePath(), "&&",
"cp", touchFile.getAbsolutePath(), "/");
assertEquals(0, ret);
}
}

View File

@ -1,305 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Mock tests for docker container executor
*/
public class TestDockerContainerExecutorWithMocks {
private static final Log LOG = LogFactory
.getLog(TestDockerContainerExecutorWithMocks.class);
public static final String DOCKER_LAUNCH_COMMAND = "/bin/true";
private DockerContainerExecutor dockerContainerExecutor = null;
private LocalDirsHandlerService dirsHandler;
private Path workDir;
private FileContext lfs;
private String yarnImage;
@Before
public void setup() {
assumeTrue(Shell.LINUX);
File f = new File("./src/test/resources/mock-container-executor");
if(!FileUtil.canExecute(f)) {
FileUtil.setExecutable(f, true);
}
String executorPath = f.getAbsolutePath();
Configuration conf = new Configuration();
yarnImage = "yarnImage";
long time = System.currentTimeMillis();
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, "/tmp/nm-local-dir" + time);
conf.set(YarnConfiguration.NM_LOG_DIRS, "/tmp/userlogs" + time);
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME,
yarnImage);
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
DOCKER_LAUNCH_COMMAND);
dockerContainerExecutor = new DockerContainerExecutor();
dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
dockerContainerExecutor.setConf(conf);
lfs = null;
try {
lfs = FileContext.getLocalFSFileContext();
workDir = new Path("/tmp/temp-"+ System.currentTimeMillis());
lfs.mkdir(workDir, FsPermission.getDirDefault(), true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@After
public void tearDown() {
try {
if (lfs != null) {
lfs.delete(workDir, true);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test(expected = IllegalStateException.class)
//Test that DockerContainerExecutor doesn't successfully init on a secure
//cluster
public void testContainerInitSecure() throws IOException {
dockerContainerExecutor.getConf().set(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
dockerContainerExecutor.init();
}
@Test(expected = IllegalArgumentException.class)
//Test that when the image name is null, the container launch throws an
//IllegalArgumentException
public void testContainerLaunchNullImage() throws IOException {
String appSubmitter = "nobody";
String appId = "APP_ID";
String containerId = "CONTAINER_ID";
String testImage = "";
Container container = mock(Container.class, RETURNS_DEEP_STUBS);
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String,String>();
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
when(cId.getApplicationAttemptId().getApplicationId().toString())
.thenReturn(appId);
when(cId.toString()).thenReturn(containerId);
when(context.getEnvironment()).thenReturn(env);
env.put(
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
dockerContainerExecutor.getConf().set(
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
Path scriptPath = new Path("file:///bin/echo");
Path tokensPath = new Path("file:///dev/null");
Path pidFile = new Path(workDir, "pid.txt");
dockerContainerExecutor.activateContainer(cId, pidFile);
dockerContainerExecutor.launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
}
@Test(expected = IllegalArgumentException.class)
//Test that when the image name is invalid, the container launch throws an
//IllegalArgumentException
public void testContainerLaunchInvalidImage() throws IOException {
String appSubmitter = "nobody";
String appId = "APP_ID";
String containerId = "CONTAINER_ID";
String testImage = "testrepo.com/test-image rm -rf $HADOOP_HOME/*";
Container container = mock(Container.class, RETURNS_DEEP_STUBS);
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String,String>();
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
when(cId.getApplicationAttemptId().getApplicationId().toString())
.thenReturn(appId);
when(cId.toString()).thenReturn(containerId);
when(context.getEnvironment()).thenReturn(env);
env.put(
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
dockerContainerExecutor.getConf().set(
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
Path scriptPath = new Path("file:///bin/echo");
Path tokensPath = new Path("file:///dev/null");
Path pidFile = new Path(workDir, "pid.txt");
dockerContainerExecutor.activateContainer(cId, pidFile);
dockerContainerExecutor.launchContainer(
new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
}
@Test
//Test that a container launch correctly wrote the session script with the
//commands we expected
public void testContainerLaunch() throws IOException {
String appSubmitter = "nobody";
String appId = "APP_ID";
String containerId = "CONTAINER_ID";
String testImage = "\"sequenceiq/hadoop-docker:2.4.1\"";
Container container = mock(Container.class, RETURNS_DEEP_STUBS);
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String,String>();
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
when(cId.getApplicationAttemptId().getApplicationId().toString())
.thenReturn(appId);
when(cId.toString()).thenReturn(containerId);
when(context.getEnvironment()).thenReturn(env);
env.put(
YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
Path scriptPath = new Path("file:///bin/echo");
Path tokensPath = new Path("file:///dev/null");
Path pidFile = new Path(workDir, "pid");
dockerContainerExecutor.activateContainer(cId, pidFile);
int ret = dockerContainerExecutor.launchContainer(
new ContainerStartContext.Builder()
.setContainer(container)
.setNmPrivateContainerScriptPath(scriptPath)
.setNmPrivateTokensPath(tokensPath)
.setUser(appSubmitter)
.setAppId(appId)
.setContainerWorkDir(workDir)
.setLocalDirs(dirsHandler.getLocalDirs())
.setLogDirs(dirsHandler.getLogDirs())
.build());
assertEquals(0, ret);
//get the script
Path sessionScriptPath = new Path(workDir,
Shell.appendScriptExtension(
DockerContainerExecutor.DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT));
LineNumberReader lnr = new LineNumberReader(new FileReader(
sessionScriptPath.toString()));
boolean cmdFound = false;
List<String> localDirs = dirsToMount(dirsHandler.getLocalDirs());
List<String> logDirs = dirsToMount(dirsHandler.getLogDirs());
List<String> workDirMount = dirsToMount(Collections.singletonList(
workDir.toUri().getPath()));
List<String> expectedCommands = new ArrayList<String>(Arrays.asList(
DOCKER_LAUNCH_COMMAND, "run", "--rm", "--net=host", "--name",
containerId));
expectedCommands.addAll(localDirs);
expectedCommands.addAll(logDirs);
expectedCommands.addAll(workDirMount);
String shellScript = workDir + "/launch_container.sh";
expectedCommands.addAll(Arrays.asList(testImage.replaceAll("['\"]", ""),
"bash","\"" + shellScript + "\""));
String expectedPidString =
"echo `/bin/true inspect --format {{.State.Pid}} " + containerId+"` > "+
pidFile.toString() + ".tmp";
boolean pidSetterFound = false;
while(lnr.ready()){
String line = lnr.readLine();
LOG.debug("line: " + line);
if (line.startsWith(DOCKER_LAUNCH_COMMAND)){
List<String> command = new ArrayList<String>();
for( String s :line.split("\\s+")){
command.add(s.trim());
}
assertEquals(expectedCommands, command);
cmdFound = true;
} else if (line.startsWith("echo")) {
assertEquals(expectedPidString, line);
pidSetterFound = true;
}
}
assertTrue(cmdFound);
assertTrue(pidSetterFound);
}
private List<String> dirsToMount(List<String> dirs) {
List<String> localDirs = new ArrayList<String>();
for(String dir: dirs){
localDirs.add("-v");
localDirs.add(dir + ":" + dir);
}
return localDirs;
}
}

View File

@ -1,154 +0,0 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
Docker Container Executor
=========================
* [Overview](#Overview)
* [Cluster Configuration](#Cluster_Configuration)
* [Tips for connecting to a secure docker repository](#Tips_for_connecting_to_a_secure_docker_repository)
* [Job Configuration](#Job_Configuration)
* [Docker Image Requirements](#Docker_Image_Requirements)
* [Working example of yarn launched docker containers](#Working_example_of_yarn_launched_docker_containers)
Overview
--------
[Docker](https://www.docker.io/) combines an easy-to-use interface to Linux containers with easy-to-construct image files for those containers. In short, Docker launches very light weight virtual machines.
The Docker Container Executor (DCE) allows the YARN NodeManager to launch YARN containers into Docker containers. Users can specify the Docker images they want for their YARN containers. These containers provide a custom software environment in which the user's code runs, isolated from the software environment of the NodeManager. These containers can include special libraries needed by the application, and they can have different versions of Perl, Python, and even Java than what is installed on the NodeManager. Indeed, these containers can run a different flavor of Linux than what is running on the NodeManager -- although the YARN container must define all the environments and libraries needed to run the job, nothing will be shared with the NodeManager.
Docker for YARN provides both consistency (all YARN containers will have the same software environment) and isolation (no interference with whatever is installed on the physical machine).
Cluster Configuration
---------------------
Docker Container Executor runs in non-secure mode of HDFS and YARN. It will not run in secure mode, and will exit if it detects secure mode.
The DockerContainerExecutor requires Docker daemon to be running on the NodeManagers, and the Docker client installed and able to start Docker containers. To prevent timeouts while starting jobs, the Docker images to be used by a job should already be downloaded in the NodeManagers. Here's an example of how this can be done:
sudo docker pull sequenceiq/hadoop-docker:2.4.1
This should be done as part of the NodeManager startup.
The following properties must be set in yarn-site.xml:
```xml
<property>
<name>yarn.nodemanager.docker-container-executor.exec-name</name>
<value>/usr/bin/docker</value>
<description>
Name or path to the Docker client. This is a required parameter. If this is empty,
user must pass an image name as part of the job invocation(see below).
</description>
</property>
<property>
<name>yarn.nodemanager.container-executor.class</name>
<value>org.apache.hadoop.yarn.server.nodemanager.DockerContainerExecutor</value>
<description>
This is the container executor setting that ensures that all
jobs are started with the DockerContainerExecutor.
</description>
</property>
```
Administrators should be aware that DCE doesn't currently provide user name-space isolation. This means, in particular, that software running as root in the YARN container will have root privileges in the underlying NodeManager. Put differently, DCE currently provides no better security guarantees than YARN's Default Container Executor. In fact, DockerContainerExecutor will exit if it detects secure yarn.
Tips for connecting to a secure docker repository
-------------------------------------------------
By default, docker images are pulled from the docker public repository. The format of a docker image url is: *username*/*image\_name*. For example, sequenceiq/hadoop-docker:2.4.1 is an image in docker public repository that contains java and hadoop.
If you want your own private repository, you provide the repository url instead of your username. Therefore, the image url becomes: *private\_repo\_url*/*image\_name*. For example, if your repository is on localhost:8080, your images would be like: localhost:8080/hadoop-docker
To connect to a secure docker repository, you can use the following invocation:
```
docker login [OPTIONS] [SERVER]
Register or log in to a Docker registry server, if no server is specified
"https://index.docker.io/v1/" is the default.
-e, --email="" Email
-p, --password="" Password
-u, --username="" Username
```
If you want to login to a self-hosted registry you can specify this by adding the server name.
docker login <private_repo_url>
This needs to be run as part of the NodeManager startup, or as a cron job if the login session expires periodically. You can login to multiple docker repositories from the same NodeManager, but all your users will have access to all your repositories, as at present the DockerContainerExecutor does not support per-job docker login.
Job Configuration
-----------------
Currently you cannot configure any of the Docker settings with the job configuration. You can provide Mapper, Reducer, and ApplicationMaster environment overrides for the docker images, using the following 3 JVM properties respectively(only for MR jobs):
* `mapreduce.map.env`: You can override the mapper's image by passing `yarn.nodemanager.docker-container-executor.image-name`=*your_image_name* to this JVM property.
* `mapreduce.reduce.env`: You can override the reducer's image by passing `yarn.nodemanager.docker-container-executor.image-name`=*your_image_name* to this JVM property.
* `yarn.app.mapreduce.am.env`: You can override the ApplicationMaster's image by passing `yarn.nodemanager.docker-container-executor.image-name`=*your_image_name* to this JVM property.
Docker Image Requirements
-------------------------
The Docker Images used for YARN containers must meet the following requirements:
The distro and version of Linux in your Docker Image can be quite different from that of your NodeManager. (Docker does have a few limitations in this regard, but you're not likely to hit them.) However, if you're using the MapReduce framework, then your image will need to be configured for running Hadoop. Java must be installed in the container, and the following environment variables must be defined in the image: JAVA_HOME, HADOOP_COMMON_PATH, HADOOP_HDFS_HOME, HADOOP_MAPRED_HOME, HADOOP_YARN_HOME, and HADOOP_CONF_DIR
Working example of yarn launched docker containers
--------------------------------------------------
The following example shows how to run teragen using DockerContainerExecutor.
Step 1. First ensure that YARN is properly configured with DockerContainerExecutor(see above).
```xml
<property>
<name>yarn.nodemanager.docker-container-executor.exec-name</name>
<value>docker -H=tcp://0.0.0.0:4243</value>
<description>
Name or path to the Docker client. The tcp socket must be
where docker daemon is listening.
</description>
</property>
<property>
<name>yarn.nodemanager.container-executor.class</name>
<value>org.apache.hadoop.yarn.server.nodemanager.DockerContainerExecutor</value>
<description>
This is the container executor setting that ensures that all
jobs are started with the DockerContainerExecutor.
</description>
</property>
```
Step 2. Pick a custom Docker image if you want. In this example, we'll use sequenceiq/hadoop-docker:2.4.1 from the docker hub repository. It has jdk, hadoop, and all the previously mentioned environment variables configured.
Step 3. Run.
```bash
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-${project.version}.jar \
teragen \
-Dmapreduce.map.env="yarn.nodemanager.docker-container-executor.image-name=sequenceiq/hadoop-docker:2.4.1" \
-Dyarn.app.mapreduce.am.env="yarn.nodemanager.docker-container-executor.image-name=sequenceiq/hadoop-docker:2.4.1" \
1000 \
teragen_out_dir
```
Once it succeeds, you can check the yarn debug logs to verify that docker indeed has launched containers.