YARN-1964. Create Docker analog of the LinuxContainerExecutor in YARN

This commit is contained in:
Ravi Prakash 2014-11-11 21:28:11 -08:00
parent 3cb426be45
commit d863f54f57
11 changed files with 1533 additions and 40 deletions

View File

@ -125,6 +125,7 @@
<item name="YARN Commands" href="hadoop-yarn/hadoop-yarn-site/YarnCommands.html"/>
<item name="Scheduler Load Simulator" href="hadoop-sls/SchedulerLoadSimulator.html"/>
<item name="NodeManager Restart" href="hadoop-yarn/hadoop-yarn-site/NodeManagerRestart.html"/>
<item name="DockerContainerExecutor" href="hadoop-yarn/hadoop-yarn-site/DockerContainerExecutor.html"/>
</menu>
<menu name="YARN REST APIs" inherit="top">

View File

@ -62,6 +62,9 @@ Release 2.6.0 - 2014-11-15
NEW FEATURES
YARN-1964. Create Docker analog of the LinuxContainerExecutor in YARN. (Abin
Shahab via raviprak)
YARN-2131. Add a way to format the RMStateStore. (Robert Kanter via kasha)
YARN-1367. Changed NM to not kill containers on NM resync if RM work-preserving

View File

@ -892,6 +892,18 @@ private static void addDeprecatedKeys() {
public static final String NM_HEALTH_CHECK_SCRIPT_OPTS =
NM_PREFIX + "health-checker.script.opts";
/** The Docker image name(For DockerContainerExecutor).*/
public static final String NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME =
NM_PREFIX + "docker-container-executor.image-name";
/** The name of the docker executor (For DockerContainerExecutor).*/
public static final String NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME =
NM_PREFIX + "docker-container-executor.exec-name";
/** The default docker executor (For DockerContainerExecutor).*/
public static final String NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME =
"/usr/bin/docker";
/** The path to the Linux container executor.*/
public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH =
NM_PREFIX + "linux-container-executor.path";

View File

@ -1145,6 +1145,16 @@
<value>${hadoop.tmp.dir}/yarn-nm-recovery</value>
</property>
<!--Docker configuration-->
<property>
<name>yarn.nodemanager.docker-container-executor.exec-name</name>
<value>/usr/bin/docker</value>
<description>
Name or path to the Docker client.
</description>
</property>
<!--Map Reduce configuration-->
<property>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>

View File

@ -20,10 +20,13 @@
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -211,6 +214,34 @@ public int reacquireContainer(String user, ContainerId containerId)
}
}
public void writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command) throws IOException{
ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create();
if (environment != null) {
for (Map.Entry<String,String> env : environment.entrySet()) {
sb.env(env.getKey().toString(), env.getValue().toString());
}
}
if (resources != null) {
for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
for (String linkName : entry.getValue()) {
sb.symlink(entry.getKey(), new Path(linkName));
}
}
}
sb.command(command);
PrintStream pout = null;
try {
pout = new PrintStream(out);
sb.write(pout);
} finally {
if (out != null) {
out.close();
}
}
}
public enum ExitCode {
FORCE_KILLED(137),
TERMINATED(143),

View File

@ -0,0 +1,794 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.util.ConverterUtils;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.regex.Pattern;
import java.net.InetSocketAddress;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
/**
* This executor will launch a docker container and run the task inside the container.
*/
public class DockerContainerExecutor extends ContainerExecutor {
private static final Log LOG = LogFactory
.getLog(DockerContainerExecutor.class);
public static final String DOCKER_CONTAINER_EXECUTOR_SCRIPT = "docker_container_executor";
public static final String DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT = "docker_container_executor_session";
// This validates that the image is a proper docker image and would not crash docker.
public static final String DOCKER_IMAGE_PATTERN = "^(([\\w\\.-]+)(:\\d+)*\\/)?[\\w\\.:-]+$";
private final FileContext lfs;
private final Pattern dockerImagePattern;
public DockerContainerExecutor() {
try {
this.lfs = FileContext.getLocalFSFileContext();
this.dockerImagePattern = Pattern.compile(DOCKER_IMAGE_PATTERN);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
}
protected void copyFile(Path src, Path dst, String owner) throws IOException {
lfs.util().copy(src, dst);
}
@Override
public void init() throws IOException {
String auth = getConf().get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);
if (auth != null && !auth.equals("simple")) {
throw new IllegalStateException("DockerContainerExecutor only works with simple authentication mode");
}
String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME);
if (!new File(dockerExecutor).exists()) {
throw new IllegalStateException("Invalid docker exec path: " + dockerExecutor);
}
}
@Override
public synchronized void startLocalizer(Path nmPrivateContainerTokensPath,
InetSocketAddress nmAddr, String user, String appId, String locId,
LocalDirsHandlerService dirsHandler)
throws IOException, InterruptedException {
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
ContainerLocalizer localizer =
new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs),
RecordFactoryProvider.getRecordFactory(getConf()));
createUserLocalDirs(localDirs, user);
createUserCacheDirs(localDirs, user);
createAppDirs(localDirs, user, appId);
createAppLogDirs(appId, logDirs, user);
// randomly choose the local directory
Path appStorageDir = getWorkingDir(localDirs, user, appId);
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
copyFile(nmPrivateContainerTokensPath, tokenDst, user);
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
lfs.setWorkingDirectory(appStorageDir);
LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
// TODO: DO it over RPC for maintaining similarity?
localizer.runLocalization(nmAddr);
}
@Override
public int launchContainer(Container container,
Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
String userName, String appId, Path containerWorkDir,
List<String> localDirs, List<String> logDirs) throws IOException {
String containerImageName = container.getLaunchContext().getEnvironment()
.get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
if (LOG.isDebugEnabled()) {
LOG.debug("containerImageName from launchContext: " + containerImageName);
}
Preconditions.checkArgument(!Strings.isNullOrEmpty(containerImageName), "Container image must not be null");
containerImageName = containerImageName.replaceAll("['\"]", "");
Preconditions.checkArgument(saneDockerImage(containerImageName), "Image: " + containerImageName + " is not a proper docker image");
String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME);
FsPermission dirPerm = new FsPermission(APPDIR_PERM);
ContainerId containerId = container.getContainerId();
// create container dirs on all disks
String containerIdStr = ConverterUtils.toString(containerId);
String appIdStr =
ConverterUtils.toString(
containerId.getApplicationAttemptId().
getApplicationId());
for (String sLocalDir : localDirs) {
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(appCacheDir, appIdStr);
Path containerDir = new Path(appDir, containerIdStr);
createDir(containerDir, dirPerm, true, userName);
}
// Create the container log-dirs on all disks
createContainerLogDirs(appIdStr, containerIdStr, logDirs, userName);
Path tmpDir = new Path(containerWorkDir,
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
createDir(tmpDir, dirPerm, false, userName);
// copy launch script to work dir
Path launchDst =
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
lfs.util().copy(nmPrivateContainerScriptPath, launchDst);
// copy container tokens to work dir
Path tokenDst =
new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
lfs.util().copy(nmPrivateTokensPath, tokenDst);
String localDirMount = toMount(localDirs);
String logDirMount = toMount(logDirs);
String containerWorkDirMount = toMount(Collections.singletonList(containerWorkDir.toUri().getPath()));
StringBuilder commands = new StringBuilder();
String commandStr = commands.append(dockerExecutor)
.append(" ")
.append("run")
.append(" ")
.append("--rm --net=host")
.append(" ")
.append(" --name " + containerIdStr)
.append(localDirMount)
.append(logDirMount)
.append(containerWorkDirMount)
.append(" ")
.append(containerImageName)
.toString();
String dockerPidScript = "`" + dockerExecutor + " inspect --format {{.State.Pid}} " + containerIdStr + "`";
// Create new local launch wrapper script
LocalWrapperScriptBuilder sb =
new UnixLocalWrapperScriptBuilder(containerWorkDir, commandStr, dockerPidScript);
Path pidFile = getPidFilePath(containerId);
if (pidFile != null) {
sb.writeLocalWrapperScript(launchDst, pidFile);
} else {
LOG.info("Container " + containerIdStr
+ " was marked as inactive. Returning terminated error");
return ExitCode.TERMINATED.getExitCode();
}
ShellCommandExecutor shExec = null;
try {
lfs.setPermission(launchDst,
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
lfs.setPermission(sb.getWrapperScriptPath(),
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
// Setup command to run
String[] command = getRunCommand(sb.getWrapperScriptPath().toString(),
containerIdStr, userName, pidFile, this.getConf());
if (LOG.isDebugEnabled()) {
LOG.debug("launchContainer: " + commandStr + " " + Joiner.on(" ").join(command));
}
shExec = new ShellCommandExecutor(
command,
new File(containerWorkDir.toUri().getPath()),
container.getLaunchContext().getEnvironment()); // sanitized env
if (isContainerActive(containerId)) {
shExec.execute();
} else {
LOG.info("Container " + containerIdStr +
" was marked as inactive. Returning terminated error");
return ExitCode.TERMINATED.getExitCode();
}
} catch (IOException e) {
if (null == shExec) {
return -1;
}
int exitCode = shExec.getExitCode();
LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
// terminated/killed forcefully. In all other cases, log the
// container-executor's output
if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
LOG.warn("Exception from container-launch with container ID: "
+ containerId + " and exit code: " + exitCode, e);
logOutput(shExec.getOutput());
String diagnostics = "Exception from container-launch: \n"
+ StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
diagnostics));
} else {
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
"Container killed on request. Exit code is " + exitCode));
}
return exitCode;
} finally {
if (shExec != null) {
shExec.close();
}
}
return 0;
}
@Override
public void writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command) throws IOException {
ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create();
Set<String> exclusionSet = new HashSet<String>();
exclusionSet.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
exclusionSet.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name());
exclusionSet.add(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name());
exclusionSet.add(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name());
exclusionSet.add(ApplicationConstants.Environment.HADOOP_CONF_DIR.name());
exclusionSet.add(ApplicationConstants.Environment.JAVA_HOME.name());
if (environment != null) {
for (Map.Entry<String,String> env : environment.entrySet()) {
if (!exclusionSet.contains(env.getKey())) {
sb.env(env.getKey().toString(), env.getValue().toString());
}
}
}
if (resources != null) {
for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
for (String linkName : entry.getValue()) {
sb.symlink(entry.getKey(), new Path(linkName));
}
}
}
sb.command(command);
PrintStream pout = null;
PrintStream ps = null;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
pout = new PrintStream(out);
if (LOG.isDebugEnabled()) {
ps = new PrintStream(baos);
sb.write(ps);
}
sb.write(pout);
} finally {
if (out != null) {
out.close();
}
if (ps != null) {
ps.close();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Script: " + baos.toString());
}
}
private boolean saneDockerImage(String containerImageName) {
return dockerImagePattern.matcher(containerImageName).matches();
}
@Override
public boolean signalContainer(String user, String pid, Signal signal)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid
+ " as user " + user);
}
if (!containerIsAlive(pid)) {
return false;
}
try {
killContainer(pid, signal);
} catch (IOException e) {
if (!containerIsAlive(pid)) {
return false;
}
throw e;
}
return true;
}
@Override
public boolean isContainerProcessAlive(String user, String pid)
throws IOException {
return containerIsAlive(pid);
}
/**
* Returns true if the process with the specified pid is alive.
*
* @param pid String pid
* @return boolean true if the process is alive
*/
@VisibleForTesting
public static boolean containerIsAlive(String pid) throws IOException {
try {
new ShellCommandExecutor(Shell.getCheckProcessIsAliveCommand(pid))
.execute();
// successful execution means process is alive
return true;
}
catch (Shell.ExitCodeException e) {
// failure (non-zero exit code) means process is not alive
return false;
}
}
/**
* Send a specified signal to the specified pid
*
* @param pid the pid of the process [group] to signal.
* @param signal signal to send
* (for logging).
*/
protected void killContainer(String pid, Signal signal) throws IOException {
new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid))
.execute();
}
@Override
public void deleteAsUser(String user, Path subDir, Path... baseDirs)
throws IOException, InterruptedException {
if (baseDirs == null || baseDirs.length == 0) {
LOG.info("Deleting absolute path : " + subDir);
if (!lfs.delete(subDir, true)) {
//Maybe retry
LOG.warn("delete returned false for path: [" + subDir + "]");
}
return;
}
for (Path baseDir : baseDirs) {
Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
LOG.info("Deleting path : " + del);
if (!lfs.delete(del, true)) {
LOG.warn("delete returned false for path: [" + del + "]");
}
}
}
/**
* Converts a directory list to a docker mount string
* @param dirs
* @return a string of mounts for docker
*/
private String toMount(List<String> dirs) {
StringBuilder builder = new StringBuilder();
for (String dir : dirs) {
builder.append(" -v " + dir + ":" + dir);
}
return builder.toString();
}
private abstract class LocalWrapperScriptBuilder {
private final Path wrapperScriptPath;
public Path getWrapperScriptPath() {
return wrapperScriptPath;
}
public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOException {
DataOutputStream out = null;
PrintStream pout = null;
try {
out = lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE));
pout = new PrintStream(out);
writeLocalWrapperScript(launchDst, pidFile, pout);
} finally {
IOUtils.cleanup(LOG, pout, out);
}
}
protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile,
PrintStream pout);
protected LocalWrapperScriptBuilder(Path containerWorkDir) {
this.wrapperScriptPath = new Path(containerWorkDir,
Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SCRIPT));
}
}
private final class UnixLocalWrapperScriptBuilder
extends LocalWrapperScriptBuilder {
private final Path sessionScriptPath;
private final String dockerCommand;
private final String dockerPidScript;
public UnixLocalWrapperScriptBuilder(Path containerWorkDir, String dockerCommand, String dockerPidScript) {
super(containerWorkDir);
this.dockerCommand = dockerCommand;
this.dockerPidScript = dockerPidScript;
this.sessionScriptPath = new Path(containerWorkDir,
Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT));
}
@Override
public void writeLocalWrapperScript(Path launchDst, Path pidFile)
throws IOException {
writeSessionScript(launchDst, pidFile);
super.writeLocalWrapperScript(launchDst, pidFile);
}
@Override
public void writeLocalWrapperScript(Path launchDst, Path pidFile,
PrintStream pout) {
String exitCodeFile = ContainerLaunch.getExitCodeFile(
pidFile.toString());
String tmpFile = exitCodeFile + ".tmp";
pout.println("#!/usr/bin/env bash");
pout.println("bash \"" + sessionScriptPath.toString() + "\"");
pout.println("rc=$?");
pout.println("echo $rc > \"" + tmpFile + "\"");
pout.println("mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\"");
pout.println("exit $rc");
}
private void writeSessionScript(Path launchDst, Path pidFile)
throws IOException {
DataOutputStream out = null;
PrintStream pout = null;
try {
out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE));
pout = new PrintStream(out);
// We need to do a move as writing to a file is not atomic
// Process reading a file being written to may get garbled data
// hence write pid to tmp file first followed by a mv
pout.println("#!/usr/bin/env bash");
pout.println();
pout.println("echo "+ dockerPidScript +" > " + pidFile.toString() + ".tmp");
pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
pout.println(dockerCommand + " bash \"" +
launchDst.toUri().getPath().toString() + "\"");
} finally {
IOUtils.cleanup(LOG, pout, out);
}
lfs.setPermission(sessionScriptPath,
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
}
}
protected void createDir(Path dirPath, FsPermission perms,
boolean createParent, String user) throws IOException {
lfs.mkdir(dirPath, perms, createParent);
if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
lfs.setPermission(dirPath, perms);
}
}
/**
* Initialize the local directories for a particular user.
* <ul>.mkdir
* <li>$local.dir/usercache/$user</li>
* </ul>
*/
void createUserLocalDirs(List<String> localDirs, String user)
throws IOException {
boolean userDirStatus = false;
FsPermission userperms = new FsPermission(USER_PERM);
for (String localDir : localDirs) {
// create $local.dir/usercache/$user and its immediate parent
try {
createDir(getUserCacheDir(new Path(localDir), user), userperms, true, user);
} catch (IOException e) {
LOG.warn("Unable to create the user directory : " + localDir, e);
continue;
}
userDirStatus = true;
}
if (!userDirStatus) {
throw new IOException("Not able to initialize user directories "
+ "in any of the configured local directories for user " + user);
}
}
/**
* Initialize the local cache directories for a particular user.
* <ul>
* <li>$local.dir/usercache/$user</li>
* <li>$local.dir/usercache/$user/appcache</li>
* <li>$local.dir/usercache/$user/filecache</li>
* </ul>
*/
void createUserCacheDirs(List<String> localDirs, String user)
throws IOException {
LOG.info("Initializing user " + user);
boolean appcacheDirStatus = false;
boolean distributedCacheDirStatus = false;
FsPermission appCachePerms = new FsPermission(APPCACHE_PERM);
FsPermission fileperms = new FsPermission(FILECACHE_PERM);
for (String localDir : localDirs) {
// create $local.dir/usercache/$user/appcache
Path localDirPath = new Path(localDir);
final Path appDir = getAppcacheDir(localDirPath, user);
try {
createDir(appDir, appCachePerms, true, user);
appcacheDirStatus = true;
} catch (IOException e) {
LOG.warn("Unable to create app cache directory : " + appDir, e);
}
// create $local.dir/usercache/$user/filecache
final Path distDir = getFileCacheDir(localDirPath, user);
try {
createDir(distDir, fileperms, true, user);
distributedCacheDirStatus = true;
} catch (IOException e) {
LOG.warn("Unable to create file cache directory : " + distDir, e);
}
}
if (!appcacheDirStatus) {
throw new IOException("Not able to initialize app-cache directories "
+ "in any of the configured local directories for user " + user);
}
if (!distributedCacheDirStatus) {
throw new IOException(
"Not able to initialize distributed-cache directories "
+ "in any of the configured local directories for user "
+ user);
}
}
/**
* Initialize the local directories for a particular user.
* <ul>
* <li>$local.dir/usercache/$user/appcache/$appid</li>
* </ul>
* @param localDirs
*/
void createAppDirs(List<String> localDirs, String user, String appId)
throws IOException {
boolean initAppDirStatus = false;
FsPermission appperms = new FsPermission(APPDIR_PERM);
for (String localDir : localDirs) {
Path fullAppDir = getApplicationDir(new Path(localDir), user, appId);
// create $local.dir/usercache/$user/appcache/$appId
try {
createDir(fullAppDir, appperms, true, user);
initAppDirStatus = true;
} catch (IOException e) {
LOG.warn("Unable to create app directory " + fullAppDir.toString(), e);
}
}
if (!initAppDirStatus) {
throw new IOException("Not able to initialize app directories "
+ "in any of the configured local directories for app "
+ appId.toString());
}
}
/**
* Create application log directories on all disks.
*/
void createContainerLogDirs(String appId, String containerId,
List<String> logDirs, String user) throws IOException {
boolean containerLogDirStatus = false;
FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM);
for (String rootLogDir : logDirs) {
// create $log.dir/$appid/$containerid
Path appLogDir = new Path(rootLogDir, appId);
Path containerLogDir = new Path(appLogDir, containerId);
try {
createDir(containerLogDir, containerLogDirPerms, true, user);
} catch (IOException e) {
LOG.warn("Unable to create the container-log directory : "
+ appLogDir, e);
continue;
}
containerLogDirStatus = true;
}
if (!containerLogDirStatus) {
throw new IOException(
"Not able to initialize container-log directories "
+ "in any of the configured local directories for container "
+ containerId);
}
}
/**
* Permissions for user dir.
* $local.dir/usercache/$user
*/
static final short USER_PERM = (short) 0750;
/**
* Permissions for user appcache dir.
* $local.dir/usercache/$user/appcache
*/
static final short APPCACHE_PERM = (short) 0710;
/**
* Permissions for user filecache dir.
* $local.dir/usercache/$user/filecache
*/
static final short FILECACHE_PERM = (short) 0710;
/**
* Permissions for user app dir.
* $local.dir/usercache/$user/appcache/$appId
*/
static final short APPDIR_PERM = (short) 0710;
/**
* Permissions for user log dir.
* $logdir/$user/$appId
*/
static final short LOGDIR_PERM = (short) 0710;
private long getDiskFreeSpace(Path base) throws IOException {
return lfs.getFsStatus(base).getRemaining();
}
private Path getApplicationDir(Path base, String user, String appId) {
return new Path(getAppcacheDir(base, user), appId);
}
private Path getUserCacheDir(Path base, String user) {
return new Path(new Path(base, ContainerLocalizer.USERCACHE), user);
}
private Path getAppcacheDir(Path base, String user) {
return new Path(getUserCacheDir(base, user),
ContainerLocalizer.APPCACHE);
}
private Path getFileCacheDir(Path base, String user) {
return new Path(getUserCacheDir(base, user),
ContainerLocalizer.FILECACHE);
}
protected Path getWorkingDir(List<String> localDirs, String user,
String appId) throws IOException {
Path appStorageDir = null;
long totalAvailable = 0L;
long[] availableOnDisk = new long[localDirs.size()];
int i = 0;
// randomly choose the app directory
// the chance of picking a directory is proportional to
// the available space on the directory.
// firstly calculate the sum of all available space on these directories
for (String localDir : localDirs) {
Path curBase = getApplicationDir(new Path(localDir),
user, appId);
long space = 0L;
try {
space = getDiskFreeSpace(curBase);
} catch (IOException e) {
LOG.warn("Unable to get Free Space for " + curBase.toString(), e);
}
availableOnDisk[i++] = space;
totalAvailable += space;
}
// throw an IOException if totalAvailable is 0.
if (totalAvailable <= 0L) {
throw new IOException("Not able to find a working directory for "
+ user);
}
// make probability to pick a directory proportional to
// the available space on the directory.
Random r = new Random();
long randomPosition = Math.abs(r.nextLong()) % totalAvailable;
int dir = 0;
// skip zero available space directory,
// because totalAvailable is greater than 0 and randomPosition
// is less than totalAvailable, we can find a valid directory
// with nonzero available space.
while (availableOnDisk[dir] == 0L) {
dir++;
}
while (randomPosition > availableOnDisk[dir]) {
randomPosition -= availableOnDisk[dir++];
}
appStorageDir = getApplicationDir(new Path(localDirs.get(dir)),
user, appId);
return appStorageDir;
}
/**
* Create application log directories on all disks.
*/
void createAppLogDirs(String appId, List<String> logDirs, String user)
throws IOException {
boolean appLogDirStatus = false;
FsPermission appLogDirPerms = new FsPermission(LOGDIR_PERM);
for (String rootLogDir : logDirs) {
// create $log.dir/$appid
Path appLogDir = new Path(rootLogDir, appId);
try {
createDir(appLogDir, appLogDirPerms, true, user);
} catch (IOException e) {
LOG.warn("Unable to create the app-log directory : " + appLogDir, e);
continue;
}
appLogDirStatus = true;
}
if (!appLogDirStatus) {
throw new IOException("Not able to initialize app-log directories "
+ "in any of the configured local directories for app " + appId);
}
}
/**
* @return the list of paths of given local directories
*/
private static List<Path> getPaths(List<String> dirs) {
List<Path> paths = new ArrayList<Path>(dirs.size());
for (int i = 0; i < dirs.size(); i++) {
paths.add(new Path(dirs.get(i)));
}
return paths;
}
}

View File

@ -269,7 +269,7 @@ public Integer call() {
localResources, nmPrivateClasspathJarDir);
// Write out the environment
writeLaunchEnv(containerScriptOutStream, environment, localResources,
exec.writeLaunchEnv(containerScriptOutStream, environment, localResources,
launchContext.getCommands());
// /////////// End of writing out container-script
@ -502,8 +502,7 @@ Context getContext() {
return context;
}
@VisibleForTesting
static abstract class ShellScriptBuilder {
public static abstract class ShellScriptBuilder {
public static ShellScriptBuilder create() {
return Shell.WINDOWS ? new WindowsShellScriptBuilder() :
new UnixShellScriptBuilder();
@ -794,37 +793,6 @@ public void sanitizeEnv(Map<String, String> environment, Path pwd,
}
}
static void writeLaunchEnv(OutputStream out,
Map<String,String> environment, Map<Path,List<String>> resources,
List<String> command)
throws IOException {
ShellScriptBuilder sb = ShellScriptBuilder.create();
if (environment != null) {
for (Map.Entry<String,String> env : environment.entrySet()) {
sb.env(env.getKey().toString(), env.getValue().toString());
}
}
if (resources != null) {
for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
for (String linkName : entry.getValue()) {
sb.symlink(entry.getKey(), new Path(linkName));
}
}
}
sb.command(command);
PrintStream pout = null;
try {
pout = new PrintStream(out);
sb.write(pout);
} finally {
if (out != null) {
out.close();
}
}
}
public static String getExitCodeFile(String pidFile) {
return pidFile + EXIT_CODE_FILE_SUFFIX;
}

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import com.google.common.base.Strings;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* This is intended to test the DockerContainerExecutor code, but it requires docker
* to be installed.
* <br><ol>
* <li>Install docker, and Compile the code with docker-service-url set to the host and port
* where docker service is running.
* <br><pre><code>
* > mvn clean install -Ddocker-service-url=tcp://0.0.0.0:4243
* -DskipTests
* </code></pre>
*/
public class TestDockerContainerExecutor {
private static final Log LOG = LogFactory
.getLog(TestDockerContainerExecutor.class);
private static File workSpace = null;
private DockerContainerExecutor exec = null;
private LocalDirsHandlerService dirsHandler;
private Path workDir;
private FileContext lfs;
private String yarnImage;
private int id = 0;
private String appSubmitter;
private String dockerUrl;
private String testImage = "centos";
private String dockerExec;
private String containerIdStr;
private ContainerId getNextContainerId() {
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
String id = "CONTAINER_" + System.currentTimeMillis();
when(cId.toString()).thenReturn(id);
return cId;
}
@Before
public void setup() {
try {
lfs = FileContext.getLocalFSFileContext();
workDir = new Path("/tmp/temp-" + System.currentTimeMillis());
workSpace = new File(workDir.toUri().getPath());
lfs.mkdir(workDir, FsPermission.getDirDefault(), true);
} catch (IOException e) {
throw new RuntimeException(e);
}
Configuration conf = new Configuration();
yarnImage = "yarnImage";
long time = System.currentTimeMillis();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, "/tmp/nm-local-dir" + time);
conf.set(YarnConfiguration.NM_LOG_DIRS, "/tmp/userlogs" + time);
dockerUrl = System.getProperty("docker-service-url");
LOG.info("dockerUrl: " + dockerUrl);
if (Strings.isNullOrEmpty(dockerUrl)) {
return;
}
dockerUrl = " -H " + dockerUrl;
dockerExec = "docker " + dockerUrl;
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, yarnImage);
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, dockerExec);
exec = new DockerContainerExecutor();
dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
exec.setConf(conf);
appSubmitter = System.getProperty("application.submitter");
if (appSubmitter == null || appSubmitter.isEmpty()) {
appSubmitter = "nobody";
}
shellExec(dockerExec + " pull " + testImage);
}
private Shell.ShellCommandExecutor shellExec(String command) {
try {
Shell.ShellCommandExecutor shExec = new Shell.ShellCommandExecutor(
command.split("\\s+"),
new File(workDir.toUri().getPath()),
System.getenv());
shExec.execute();
return shExec;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private boolean shouldRun() {
return exec != null;
}
private int runAndBlock(ContainerId cId, Map<String, String> launchCtxEnv, String... cmd) throws IOException {
String appId = "APP_" + System.currentTimeMillis();
Container container = mock(Container.class);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId);
when(context.getEnvironment()).thenReturn(launchCtxEnv);
String script = writeScriptFile(launchCtxEnv, cmd);
Path scriptPath = new Path(script);
Path tokensPath = new Path("/dev/null");
Path workDir = new Path(workSpace.getAbsolutePath());
Path pidFile = new Path(workDir, "pid.txt");
exec.activateContainer(cId, pidFile);
return exec.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
}
private String writeScriptFile(Map<String, String> launchCtxEnv, String... cmd) throws IOException {
File f = File.createTempFile("TestDockerContainerExecutor", ".sh");
f.deleteOnExit();
PrintWriter p = new PrintWriter(new FileOutputStream(f));
for(Map.Entry<String, String> entry: launchCtxEnv.entrySet()) {
p.println("export " + entry.getKey() + "=\"" + entry.getValue() + "\"");
}
for (String part : cmd) {
p.print(part.replace("\\", "\\\\").replace("'", "\\'"));
p.print(" ");
}
p.println();
p.close();
return f.getAbsolutePath();
}
@After
public void tearDown() {
try {
lfs.delete(workDir, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
public void testLaunchContainer() throws IOException {
if (!shouldRun()) {
LOG.warn("Docker not installed, aborting test.");
return;
}
Map<String, String> env = new HashMap<String, String>();
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
String touchFileName = "touch-file-" + System.currentTimeMillis();
File touchFile = new File(dirsHandler.getLocalDirs().get(0), touchFileName);
ContainerId cId = getNextContainerId();
int ret = runAndBlock(
cId, env, "touch", touchFile.getAbsolutePath(), "&&", "cp", touchFile.getAbsolutePath(), "/");
assertEquals(0, ret);
}
}

View File

@ -0,0 +1,259 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import com.google.common.base.Strings;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
/**
* Mock tests for docker container executor
*/
public class TestDockerContainerExecutorWithMocks {
private static final Log LOG = LogFactory
.getLog(TestDockerContainerExecutorWithMocks.class);
public static final String DOCKER_LAUNCH_COMMAND = "/bin/true";
private DockerContainerExecutor dockerContainerExecutor = null;
private LocalDirsHandlerService dirsHandler;
private Path workDir;
private FileContext lfs;
private String yarnImage;
@Before
public void setup() {
assumeTrue(!Path.WINDOWS);
File f = new File("./src/test/resources/mock-container-executor");
if(!FileUtil.canExecute(f)) {
FileUtil.setExecutable(f, true);
}
String executorPath = f.getAbsolutePath();
Configuration conf = new Configuration();
yarnImage = "yarnImage";
long time = System.currentTimeMillis();
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, "/tmp/nm-local-dir" + time);
conf.set(YarnConfiguration.NM_LOG_DIRS, "/tmp/userlogs" + time);
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, yarnImage);
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME , DOCKER_LAUNCH_COMMAND);
dockerContainerExecutor = new DockerContainerExecutor();
dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
dockerContainerExecutor.setConf(conf);
lfs = null;
try {
lfs = FileContext.getLocalFSFileContext();
workDir = new Path("/tmp/temp-"+ System.currentTimeMillis());
lfs.mkdir(workDir, FsPermission.getDirDefault(), true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@After
public void tearDown() {
try {
lfs.delete(workDir, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test(expected = IllegalStateException.class)
public void testContainerInitSecure() throws IOException {
dockerContainerExecutor.getConf().set(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
dockerContainerExecutor.init();
}
@Test(expected = IllegalArgumentException.class)
public void testContainerLaunchNullImage() throws IOException {
String appSubmitter = "nobody";
String appId = "APP_ID";
String containerId = "CONTAINER_ID";
String testImage = "";
Container container = mock(Container.class, RETURNS_DEEP_STUBS);
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String,String>();
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId);
when(cId.toString()).thenReturn(containerId);
when(context.getEnvironment()).thenReturn(env);
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
dockerContainerExecutor.getConf()
.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
Path scriptPath = new Path("file:///bin/echo");
Path tokensPath = new Path("file:///dev/null");
Path pidFile = new Path(workDir, "pid.txt");
dockerContainerExecutor.activateContainer(cId, pidFile);
dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
}
@Test(expected = IllegalArgumentException.class)
public void testContainerLaunchInvalidImage() throws IOException {
String appSubmitter = "nobody";
String appId = "APP_ID";
String containerId = "CONTAINER_ID";
String testImage = "testrepo.com/test-image rm -rf $HADOOP_PREFIX/*";
Container container = mock(Container.class, RETURNS_DEEP_STUBS);
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String,String>();
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId);
when(cId.toString()).thenReturn(containerId);
when(context.getEnvironment()).thenReturn(env);
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
dockerContainerExecutor.getConf()
.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
Path scriptPath = new Path("file:///bin/echo");
Path tokensPath = new Path("file:///dev/null");
Path pidFile = new Path(workDir, "pid.txt");
dockerContainerExecutor.activateContainer(cId, pidFile);
dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
}
@Test
public void testContainerLaunch() throws IOException {
String appSubmitter = "nobody";
String appId = "APP_ID";
String containerId = "CONTAINER_ID";
String testImage = "\"sequenceiq/hadoop-docker:2.4.1\"";
Container container = mock(Container.class, RETURNS_DEEP_STUBS);
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String,String>();
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId);
when(cId.toString()).thenReturn(containerId);
when(context.getEnvironment()).thenReturn(env);
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
Path scriptPath = new Path("file:///bin/echo");
Path tokensPath = new Path("file:///dev/null");
Path pidFile = new Path(workDir, "pid");
dockerContainerExecutor.activateContainer(cId, pidFile);
int ret = dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
assertEquals(0, ret);
//get the script
Path sessionScriptPath = new Path(workDir,
Shell.appendScriptExtension(
DockerContainerExecutor.DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT));
LineNumberReader lnr = new LineNumberReader(new FileReader(sessionScriptPath.toString()));
boolean cmdFound = false;
List<String> localDirs = dirsToMount(dirsHandler.getLocalDirs());
List<String> logDirs = dirsToMount(dirsHandler.getLogDirs());
List<String> workDirMount = dirsToMount(Collections.singletonList(workDir.toUri().getPath()));
List<String> expectedCommands = new ArrayList<String>(
Arrays.asList(DOCKER_LAUNCH_COMMAND, "run", "--rm", "--net=host", "--name", containerId));
expectedCommands.addAll(localDirs);
expectedCommands.addAll(logDirs);
expectedCommands.addAll(workDirMount);
String shellScript = workDir + "/launch_container.sh";
expectedCommands.addAll(Arrays.asList(testImage.replaceAll("['\"]", ""), "bash","\"" + shellScript + "\""));
String expectedPidString = "echo `/bin/true inspect --format {{.State.Pid}} " + containerId+"` > "+ pidFile.toString() + ".tmp";
boolean pidSetterFound = false;
while(lnr.ready()){
String line = lnr.readLine();
LOG.debug("line: " + line);
if (line.startsWith(DOCKER_LAUNCH_COMMAND)){
List<String> command = new ArrayList<String>();
for( String s :line.split("\\s+")){
command.add(s.trim());
}
assertEquals(expectedCommands, command);
cmdFound = true;
} else if (line.startsWith("echo")) {
assertEquals(expectedPidString, line);
pidSetterFound = true;
}
}
assertTrue(cmdFound);
assertTrue(pidSetterFound);
}
private List<String> dirsToMount(List<String> dirs) {
List<String> localDirs = new ArrayList<String>();
for(String dir: dirs){
localDirs.add("-v");
localDirs.add(dir + ":" + dir);
}
return localDirs;
}
}

View File

@ -75,6 +75,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
@ -144,7 +145,7 @@ public void testSpecialCharSymlinks() throws IOException {
commands.add("/bin/sh ./\\\"" + badSymlink + "\\\"");
}
ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
new DefaultContainerExecutor().writeLaunchEnv(fos, env, resources, commands);
fos.flush();
fos.close();
FileUtil.setExecutable(tempFile, true);
@ -211,7 +212,7 @@ public void testInvalidSymlinkDiagnostics() throws IOException {
} else {
commands.add("/bin/sh ./\\\"" + symLink + "\\\"");
}
ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
new DefaultContainerExecutor().writeLaunchEnv(fos, env, resources, commands);
fos.flush();
fos.close();
FileUtil.setExecutable(tempFile, true);
@ -264,7 +265,7 @@ public void testInvalidEnvSyntaxDiagnostics() throws IOException {
"\"workflowName\":\"\n\ninsert table " +
"\npartition (cd_education_status)\nselect cd_demo_sk, cd_gender, " );
List<String> commands = new ArrayList<String>();
ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
new DefaultContainerExecutor().writeLaunchEnv(fos, env, resources, commands);
fos.flush();
fos.close();
@ -341,7 +342,8 @@ public void testContainerLaunchStdoutAndStderrDiagnostics() throws IOException {
Map<String, String> env = new HashMap<String, String>();
List<String> commands = new ArrayList<String>();
commands.add(command);
ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
ContainerExecutor exec = new DefaultContainerExecutor();
exec.writeLaunchEnv(fos, env, resources, commands);
fos.flush();
fos.close();

View File

@ -0,0 +1,200 @@
~~ Licensed under the Apache License, Version 2.0 (the "License");
~~ you may not use this file except in compliance with the License.
~~ You may obtain a copy of the License at
~~
~~ http://www.apache.org/licenses/LICENSE-2.0
~~
~~ Unless required by applicable law or agreed to in writing, software
~~ distributed under the License is distributed on an "AS IS" BASIS,
~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~~ See the License for the specific language governing permissions and
~~ limitations under the License. See accompanying LICENSE file.
---
Hadoop Map Reduce Next Generation-${project.version} - Docker Container Executor
---
---
${maven.build.timestamp}
Docker Container Executor
%{toc|section=1|fromDepth=0}
* {Overview}
Docker (https://www.docker.io/) combines an easy-to-use interface to
Linux containers with easy-to-construct image files for those
containers. In short, Docker launches very light weight virtual
machines.
The Docker Container Executor (DCE) allows the YARN NodeManager to
launch YARN containers into Docker containers. Users can specify the
Docker images they want for their YARN containers. These containers
provide a custom software environment in which the user's code runs,
isolated from the software environment of the NodeManager. These
containers can include special libraries needed by the application,
and they can have different versions of Perl, Python, and even Java
than what is installed on the NodeManager. Indeed, these containers
can run a different flavor of Linux than what is running on the
NodeManager -- although the YARN container must define all the environments
and libraries needed to run the job, nothing will be shared with the NodeManager.
Docker for YARN provides both consistency (all YARN containers will
have the same software environment) and isolation (no interference
with whatever is installed on the physical machine).
* {Cluster Configuration}
Docker Container Executor runs in non-secure mode of HDFS and
YARN. It will not run in secure mode, and will exit if it detects
secure mode.
The DockerContainerExecutor requires Docker daemon to be running on
the NodeManagers, and the Docker client installed and able to start Docker
containers. To prevent timeouts while starting jobs, the Docker
images to be used by a job should already be downloaded in the
NodeManagers. Here's an example of how this can be done:
----
sudo docker pull sequenceiq/hadoop-docker:2.4.1
----
This should be done as part of the NodeManager startup.
The following properties must be set in yarn-site.xml:
----
<property>
<name>yarn.nodemanager.docker-container-executor.exec-name</name>
<value>/usr/bin/docker</value>
<description>
Name or path to the Docker client. This is a required parameter. If this is empty,
user must pass an image name as part of the job invocation(see below).
</description>
</property>
<property>
<name>yarn.nodemanager.container-executor.class</name>
<value>org.apache.hadoop.yarn.server.nodemanager.DockerContainerExecutor</value>
<description>
This is the container executor setting that ensures that all
jobs are started with the DockerContainerExecutor.
</description>
</property>
----
Administrators should be aware that DCE doesn't currently provide
user name-space isolation. This means, in particular, that software
running as root in the YARN container will have root privileges in the
underlying NodeManager. Put differently, DCE currently provides no
better security guarantees than YARN's Default Container Executor. In
fact, DockerContainerExecutor will exit if it detects secure yarn.
* {Tips for connecting to a secure docker repository}
By default, docker images are pulled from the docker public repository. The
format of a docker image url is: <username>/<image_name>. For example,
sequenceiq/hadoop-docker:2.4.1 is an image in docker public repository that contains java and
hadoop.
If you want your own private repository, you provide the repository url instead of
your username. Therefore, the image url becomes: <private_repo_url>/<image_name>.
For example, if your repository is on localhost:8080, your images would be like:
localhost:8080/hadoop-docker
To connect to a secure docker repository, you can use the following invocation:
----
docker login [OPTIONS] [SERVER]
Register or log in to a Docker registry server, if no server is specified
"https://index.docker.io/v1/" is the default.
-e, --email="" Email
-p, --password="" Password
-u, --username="" Username
----
If you want to login to a self-hosted registry you can specify this by adding
the server name.
----
docker login <private_repo_url>
----
This needs to be run as part of the NodeManager startup, or as a cron job if
the login session expires periodically. You can login to multiple docker repositories
from the same NodeManager, but all your users will have access to all your repositories,
as at present the DockerContainerExecutor does not support per-job docker login.
* {Job Configuration}
Currently you cannot configure any of the Docker settings with the job configuration.
You can provide Mapper, Reducer, and ApplicationMaster environment overrides for the
docker images, using the following 3 JVM properties respectively(only for MR jobs):
* mapreduce.map.env: You can override the mapper's image by passing
yarn.nodemanager.docker-container-executor.image-name=<your_image_name>
to this JVM property.
* mapreduce.reduce.env: You can override the reducer's image by passing
yarn.nodemanager.docker-container-executor.image-name=<your_image_name>
to this JVM property.
* yarn.app.mapreduce.am.env: You can override the ApplicationMaster's image
by passing yarn.nodemanager.docker-container-executor.image-name=<your_image_name>
to this JVM property.
* {Docker Image requirements}
The Docker Images used for YARN containers must meet the following
requirements:
The distro and version of Linux in your Docker Image can be quite different
from that of your NodeManager. (Docker does have a few limitations in this
regard, but you're not likely to hit them.) However, if you're using the
MapReduce framework, then your image will need to be configured for running
Hadoop. Java must be installed in the container, and the following environment variables
must be defined in the image: JAVA_HOME, HADOOP_COMMON_PATH, HADOOP_HDFS_HOME,
HADOOP_MAPRED_HOME, HADOOP_YARN_HOME, and HADOOP_CONF_DIR
* {Working example of yarn launched docker containers.}
The following example shows how to run teragen using DockerContainerExecutor.
* First ensure that YARN is properly configured with DockerContainerExecutor(see above).
----
<property>
<name>yarn.nodemanager.docker-container-executor.exec-name</name>
<value>docker -H=tcp://0.0.0.0:4243</value>
<description>
Name or path to the Docker client. The tcp socket must be
where docker daemon is listening.
</description>
</property>
<property>
<name>yarn.nodemanager.container-executor.class</name>
<value>org.apache.hadoop.yarn.server.nodemanager.DockerContainerExecutor</value>
<description>
This is the container executor setting that ensures that all
jobs are started with the DockerContainerExecutor.
</description>
</property>
----
* Pick a custom Docker image if you want. In this example, we'll use sequenceiq/hadoop-docker:2.4.1 from the
docker hub repository. It has jdk, hadoop, and all the previously mentioned environment variables configured.
* Run:
----
hadoop jar $HADOOP_INSTALLATION_DIR/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar \
teragen \
-Dmapreduce.map.env="yarn.nodemanager.docker-container-executor.image-name=sequenceiq/hadoop-docker:2.4.1" \
-Dyarn.app.mapreduce.am.env="yarn.nodemanager.docker-container-executor.image-name=sequenceiq/hadoop-docker:2.4.1" \
1000 \
teragen_out_dir
----
Once it succeeds, you can check the yarn debug logs to verify that docker indeed has launched containers.