From d863f54f57b606a4eb7d04c2bf44f7a5daadee26 Mon Sep 17 00:00:00 2001 From: Ravi Prakash Date: Tue, 11 Nov 2014 21:28:11 -0800 Subject: [PATCH] YARN-1964. Create Docker analog of the LinuxContainerExecutor in YARN --- hadoop-project/src/site/site.xml | 1 + hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 14 +- .../src/main/resources/yarn-default.xml | 12 +- .../server/nodemanager/ContainerExecutor.java | 31 + .../nodemanager/DockerContainerExecutor.java | 794 ++++++++++++++++++ .../launcher/ContainerLaunch.java | 36 +- .../TestDockerContainerExecutor.java | 213 +++++ .../TestDockerContainerExecutorWithMocks.java | 259 ++++++ .../launcher/TestContainerLaunch.java | 10 +- .../site/apt/DockerContainerExecutor.apt.vm | 200 +++++ 11 files changed, 1533 insertions(+), 40 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/DockerContainerExecutor.apt.vm diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml index 4a2c221b753..1b649fcbd21 100644 --- a/hadoop-project/src/site/site.xml +++ b/hadoop-project/src/site/site.xml @@ -125,6 +125,7 @@ + diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2ca9db2452b..51413ca2fa4 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -62,6 +62,9 @@ Release 2.6.0 - 2014-11-15 NEW FEATURES + YARN-1964. Create Docker analog of the LinuxContainerExecutor in YARN. (Abin + Shahab via raviprak) + YARN-2131. Add a way to format the RMStateStore. (Robert Kanter via kasha) YARN-1367. Changed NM to not kill containers on NM resync if RM work-preserving diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 301631b6394..7168068d0da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -891,7 +891,19 @@ private static void addDeprecatedKeys() { /** The arguments to pass to the health check script.*/ public static final String NM_HEALTH_CHECK_SCRIPT_OPTS = NM_PREFIX + "health-checker.script.opts"; - + + /** The Docker image name(For DockerContainerExecutor).*/ + public static final String NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME = + NM_PREFIX + "docker-container-executor.image-name"; + + /** The name of the docker executor (For DockerContainerExecutor).*/ + public static final String NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME = + NM_PREFIX + "docker-container-executor.exec-name"; + + /** The default docker executor (For DockerContainerExecutor).*/ + public static final String NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME = + "/usr/bin/docker"; + /** The path to the Linux container executor.*/ public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH = NM_PREFIX + "linux-container-executor.path"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 651d6b0944d..bae77ed8d18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -23,7 +23,7 @@ - + Factory to create client IPC classes. @@ -1145,6 +1145,16 @@ ${hadoop.tmp.dir}/yarn-nm-recovery + + + + yarn.nodemanager.docker-container-executor.exec-name + /usr/bin/docker + + Name or path to the Docker client. + + + yarn.nodemanager.aux-services.mapreduce_shuffle.class diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 4ce1a750764..8133413f252 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -20,10 +20,13 @@ import java.io.File; import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -211,6 +214,34 @@ public int reacquireContainer(String user, ContainerId containerId) } } + public void writeLaunchEnv(OutputStream out, Map environment, Map> resources, List command) throws IOException{ + ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create(); + if (environment != null) { + for (Map.Entry env : environment.entrySet()) { + sb.env(env.getKey().toString(), env.getValue().toString()); + } + } + if (resources != null) { + for (Map.Entry> entry : resources.entrySet()) { + for (String linkName : entry.getValue()) { + sb.symlink(entry.getKey(), new Path(linkName)); + } + } + } + + sb.command(command); + + PrintStream pout = null; + try { + pout = new PrintStream(out); + sb.write(pout); + } finally { + if (out != null) { + out.close(); + } + } + } + public enum ExitCode { FORCE_KILLED(137), TERMINATED(143), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java new file mode 100644 index 00000000000..d8dd8907117 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java @@ -0,0 +1,794 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.regex.Pattern; +import java.net.InetSocketAddress; + +import static org.apache.hadoop.fs.CreateFlag.CREATE; +import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; + +/** + * This executor will launch a docker container and run the task inside the container. + */ +public class DockerContainerExecutor extends ContainerExecutor { + + private static final Log LOG = LogFactory + .getLog(DockerContainerExecutor.class); + public static final String DOCKER_CONTAINER_EXECUTOR_SCRIPT = "docker_container_executor"; + public static final String DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT = "docker_container_executor_session"; + + // This validates that the image is a proper docker image and would not crash docker. + public static final String DOCKER_IMAGE_PATTERN = "^(([\\w\\.-]+)(:\\d+)*\\/)?[\\w\\.:-]+$"; + + + private final FileContext lfs; + private final Pattern dockerImagePattern; + + public DockerContainerExecutor() { + try { + this.lfs = FileContext.getLocalFSFileContext(); + this.dockerImagePattern = Pattern.compile(DOCKER_IMAGE_PATTERN); + } catch (UnsupportedFileSystemException e) { + throw new RuntimeException(e); + } + } + + protected void copyFile(Path src, Path dst, String owner) throws IOException { + lfs.util().copy(src, dst); + } + + @Override + public void init() throws IOException { + String auth = getConf().get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION); + if (auth != null && !auth.equals("simple")) { + throw new IllegalStateException("DockerContainerExecutor only works with simple authentication mode"); + } + String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, + YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME); + if (!new File(dockerExecutor).exists()) { + throw new IllegalStateException("Invalid docker exec path: " + dockerExecutor); + } + } + + @Override + public synchronized void startLocalizer(Path nmPrivateContainerTokensPath, + InetSocketAddress nmAddr, String user, String appId, String locId, + LocalDirsHandlerService dirsHandler) + throws IOException, InterruptedException { + + List localDirs = dirsHandler.getLocalDirs(); + List logDirs = dirsHandler.getLogDirs(); + + ContainerLocalizer localizer = + new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs), + RecordFactoryProvider.getRecordFactory(getConf())); + + createUserLocalDirs(localDirs, user); + createUserCacheDirs(localDirs, user); + createAppDirs(localDirs, user, appId); + createAppLogDirs(appId, logDirs, user); + + // randomly choose the local directory + Path appStorageDir = getWorkingDir(localDirs, user, appId); + + String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); + Path tokenDst = new Path(appStorageDir, tokenFn); + copyFile(nmPrivateContainerTokensPath, tokenDst, user); + LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst); + lfs.setWorkingDirectory(appStorageDir); + LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory()); + // TODO: DO it over RPC for maintaining similarity? + localizer.runLocalization(nmAddr); + } + + + @Override + public int launchContainer(Container container, + Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, + String userName, String appId, Path containerWorkDir, + List localDirs, List logDirs) throws IOException { + String containerImageName = container.getLaunchContext().getEnvironment() + .get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME); + if (LOG.isDebugEnabled()) { + LOG.debug("containerImageName from launchContext: " + containerImageName); + } + Preconditions.checkArgument(!Strings.isNullOrEmpty(containerImageName), "Container image must not be null"); + containerImageName = containerImageName.replaceAll("['\"]", ""); + + Preconditions.checkArgument(saneDockerImage(containerImageName), "Image: " + containerImageName + " is not a proper docker image"); + String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, + YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME); + + FsPermission dirPerm = new FsPermission(APPDIR_PERM); + ContainerId containerId = container.getContainerId(); + + // create container dirs on all disks + String containerIdStr = ConverterUtils.toString(containerId); + String appIdStr = + ConverterUtils.toString( + containerId.getApplicationAttemptId(). + getApplicationId()); + for (String sLocalDir : localDirs) { + Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE); + Path userdir = new Path(usersdir, userName); + Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE); + Path appDir = new Path(appCacheDir, appIdStr); + Path containerDir = new Path(appDir, containerIdStr); + createDir(containerDir, dirPerm, true, userName); + } + + // Create the container log-dirs on all disks + createContainerLogDirs(appIdStr, containerIdStr, logDirs, userName); + + Path tmpDir = new Path(containerWorkDir, + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR); + createDir(tmpDir, dirPerm, false, userName); + + // copy launch script to work dir + Path launchDst = + new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT); + lfs.util().copy(nmPrivateContainerScriptPath, launchDst); + + // copy container tokens to work dir + Path tokenDst = + new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE); + lfs.util().copy(nmPrivateTokensPath, tokenDst); + + + + String localDirMount = toMount(localDirs); + String logDirMount = toMount(logDirs); + String containerWorkDirMount = toMount(Collections.singletonList(containerWorkDir.toUri().getPath())); + StringBuilder commands = new StringBuilder(); + String commandStr = commands.append(dockerExecutor) + .append(" ") + .append("run") + .append(" ") + .append("--rm --net=host") + .append(" ") + .append(" --name " + containerIdStr) + .append(localDirMount) + .append(logDirMount) + .append(containerWorkDirMount) + .append(" ") + .append(containerImageName) + .toString(); + String dockerPidScript = "`" + dockerExecutor + " inspect --format {{.State.Pid}} " + containerIdStr + "`"; + // Create new local launch wrapper script + LocalWrapperScriptBuilder sb = + new UnixLocalWrapperScriptBuilder(containerWorkDir, commandStr, dockerPidScript); + Path pidFile = getPidFilePath(containerId); + if (pidFile != null) { + sb.writeLocalWrapperScript(launchDst, pidFile); + } else { + LOG.info("Container " + containerIdStr + + " was marked as inactive. Returning terminated error"); + return ExitCode.TERMINATED.getExitCode(); + } + + ShellCommandExecutor shExec = null; + try { + lfs.setPermission(launchDst, + ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); + lfs.setPermission(sb.getWrapperScriptPath(), + ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); + + // Setup command to run + String[] command = getRunCommand(sb.getWrapperScriptPath().toString(), + containerIdStr, userName, pidFile, this.getConf()); + if (LOG.isDebugEnabled()) { + LOG.debug("launchContainer: " + commandStr + " " + Joiner.on(" ").join(command)); + } + shExec = new ShellCommandExecutor( + command, + new File(containerWorkDir.toUri().getPath()), + container.getLaunchContext().getEnvironment()); // sanitized env + if (isContainerActive(containerId)) { + shExec.execute(); + } else { + LOG.info("Container " + containerIdStr + + " was marked as inactive. Returning terminated error"); + return ExitCode.TERMINATED.getExitCode(); + } + } catch (IOException e) { + if (null == shExec) { + return -1; + } + int exitCode = shExec.getExitCode(); + LOG.warn("Exit code from container " + containerId + " is : " + exitCode); + // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was + // terminated/killed forcefully. In all other cases, log the + // container-executor's output + if (exitCode != ExitCode.FORCE_KILLED.getExitCode() + && exitCode != ExitCode.TERMINATED.getExitCode()) { + LOG.warn("Exception from container-launch with container ID: " + + containerId + " and exit code: " + exitCode, e); + logOutput(shExec.getOutput()); + String diagnostics = "Exception from container-launch: \n" + + StringUtils.stringifyException(e) + "\n" + shExec.getOutput(); + container.handle(new ContainerDiagnosticsUpdateEvent(containerId, + diagnostics)); + } else { + container.handle(new ContainerDiagnosticsUpdateEvent(containerId, + "Container killed on request. Exit code is " + exitCode)); + } + return exitCode; + } finally { + if (shExec != null) { + shExec.close(); + } + } + return 0; + } + + @Override + public void writeLaunchEnv(OutputStream out, Map environment, Map> resources, List command) throws IOException { + ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create(); + + Set exclusionSet = new HashSet(); + exclusionSet.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME); + exclusionSet.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name()); + exclusionSet.add(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name()); + exclusionSet.add(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name()); + exclusionSet.add(ApplicationConstants.Environment.HADOOP_CONF_DIR.name()); + exclusionSet.add(ApplicationConstants.Environment.JAVA_HOME.name()); + + if (environment != null) { + for (Map.Entry env : environment.entrySet()) { + if (!exclusionSet.contains(env.getKey())) { + sb.env(env.getKey().toString(), env.getValue().toString()); + } + } + } + if (resources != null) { + for (Map.Entry> entry : resources.entrySet()) { + for (String linkName : entry.getValue()) { + sb.symlink(entry.getKey(), new Path(linkName)); + } + } + } + + sb.command(command); + + PrintStream pout = null; + PrintStream ps = null; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + pout = new PrintStream(out); + if (LOG.isDebugEnabled()) { + ps = new PrintStream(baos); + sb.write(ps); + } + sb.write(pout); + + } finally { + if (out != null) { + out.close(); + } + if (ps != null) { + ps.close(); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Script: " + baos.toString()); + } + } + + private boolean saneDockerImage(String containerImageName) { + return dockerImagePattern.matcher(containerImageName).matches(); + } + + @Override + public boolean signalContainer(String user, String pid, Signal signal) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid + + " as user " + user); + } + if (!containerIsAlive(pid)) { + return false; + } + try { + killContainer(pid, signal); + } catch (IOException e) { + if (!containerIsAlive(pid)) { + return false; + } + throw e; + } + return true; + } + + @Override + public boolean isContainerProcessAlive(String user, String pid) + throws IOException { + return containerIsAlive(pid); + } + + /** + * Returns true if the process with the specified pid is alive. + * + * @param pid String pid + * @return boolean true if the process is alive + */ + @VisibleForTesting + public static boolean containerIsAlive(String pid) throws IOException { + try { + new ShellCommandExecutor(Shell.getCheckProcessIsAliveCommand(pid)) + .execute(); + // successful execution means process is alive + return true; + } + catch (Shell.ExitCodeException e) { + // failure (non-zero exit code) means process is not alive + return false; + } + } + + /** + * Send a specified signal to the specified pid + * + * @param pid the pid of the process [group] to signal. + * @param signal signal to send + * (for logging). + */ + protected void killContainer(String pid, Signal signal) throws IOException { + new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid)) + .execute(); + } + + @Override + public void deleteAsUser(String user, Path subDir, Path... baseDirs) + throws IOException, InterruptedException { + if (baseDirs == null || baseDirs.length == 0) { + LOG.info("Deleting absolute path : " + subDir); + if (!lfs.delete(subDir, true)) { + //Maybe retry + LOG.warn("delete returned false for path: [" + subDir + "]"); + } + return; + } + for (Path baseDir : baseDirs) { + Path del = subDir == null ? baseDir : new Path(baseDir, subDir); + LOG.info("Deleting path : " + del); + if (!lfs.delete(del, true)) { + LOG.warn("delete returned false for path: [" + del + "]"); + } + } + } + + /** + * Converts a directory list to a docker mount string + * @param dirs + * @return a string of mounts for docker + */ + private String toMount(List dirs) { + StringBuilder builder = new StringBuilder(); + for (String dir : dirs) { + builder.append(" -v " + dir + ":" + dir); + } + return builder.toString(); + } + + private abstract class LocalWrapperScriptBuilder { + + private final Path wrapperScriptPath; + + public Path getWrapperScriptPath() { + return wrapperScriptPath; + } + + public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOException { + DataOutputStream out = null; + PrintStream pout = null; + + try { + out = lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE)); + pout = new PrintStream(out); + writeLocalWrapperScript(launchDst, pidFile, pout); + } finally { + IOUtils.cleanup(LOG, pout, out); + } + } + + protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile, + PrintStream pout); + + protected LocalWrapperScriptBuilder(Path containerWorkDir) { + this.wrapperScriptPath = new Path(containerWorkDir, + Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SCRIPT)); + } + } + + private final class UnixLocalWrapperScriptBuilder + extends LocalWrapperScriptBuilder { + private final Path sessionScriptPath; + private final String dockerCommand; + private final String dockerPidScript; + + public UnixLocalWrapperScriptBuilder(Path containerWorkDir, String dockerCommand, String dockerPidScript) { + super(containerWorkDir); + this.dockerCommand = dockerCommand; + this.dockerPidScript = dockerPidScript; + this.sessionScriptPath = new Path(containerWorkDir, + Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT)); + } + + @Override + public void writeLocalWrapperScript(Path launchDst, Path pidFile) + throws IOException { + writeSessionScript(launchDst, pidFile); + super.writeLocalWrapperScript(launchDst, pidFile); + } + + @Override + public void writeLocalWrapperScript(Path launchDst, Path pidFile, + PrintStream pout) { + + String exitCodeFile = ContainerLaunch.getExitCodeFile( + pidFile.toString()); + String tmpFile = exitCodeFile + ".tmp"; + pout.println("#!/usr/bin/env bash"); + pout.println("bash \"" + sessionScriptPath.toString() + "\""); + pout.println("rc=$?"); + pout.println("echo $rc > \"" + tmpFile + "\""); + pout.println("mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\""); + pout.println("exit $rc"); + } + + private void writeSessionScript(Path launchDst, Path pidFile) + throws IOException { + DataOutputStream out = null; + PrintStream pout = null; + try { + out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE)); + pout = new PrintStream(out); + // We need to do a move as writing to a file is not atomic + // Process reading a file being written to may get garbled data + // hence write pid to tmp file first followed by a mv + pout.println("#!/usr/bin/env bash"); + pout.println(); + pout.println("echo "+ dockerPidScript +" > " + pidFile.toString() + ".tmp"); + pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); + pout.println(dockerCommand + " bash \"" + + launchDst.toUri().getPath().toString() + "\""); + } finally { + IOUtils.cleanup(LOG, pout, out); + } + lfs.setPermission(sessionScriptPath, + ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); + } + } + + protected void createDir(Path dirPath, FsPermission perms, + boolean createParent, String user) throws IOException { + lfs.mkdir(dirPath, perms, createParent); + if (!perms.equals(perms.applyUMask(lfs.getUMask()))) { + lfs.setPermission(dirPath, perms); + } + } + + /** + * Initialize the local directories for a particular user. + *
    .mkdir + *
  • $local.dir/usercache/$user
  • + *
+ */ + void createUserLocalDirs(List localDirs, String user) + throws IOException { + boolean userDirStatus = false; + FsPermission userperms = new FsPermission(USER_PERM); + for (String localDir : localDirs) { + // create $local.dir/usercache/$user and its immediate parent + try { + createDir(getUserCacheDir(new Path(localDir), user), userperms, true, user); + } catch (IOException e) { + LOG.warn("Unable to create the user directory : " + localDir, e); + continue; + } + userDirStatus = true; + } + if (!userDirStatus) { + throw new IOException("Not able to initialize user directories " + + "in any of the configured local directories for user " + user); + } + } + + + /** + * Initialize the local cache directories for a particular user. + *
    + *
  • $local.dir/usercache/$user
  • + *
  • $local.dir/usercache/$user/appcache
  • + *
  • $local.dir/usercache/$user/filecache
  • + *
+ */ + void createUserCacheDirs(List localDirs, String user) + throws IOException { + LOG.info("Initializing user " + user); + + boolean appcacheDirStatus = false; + boolean distributedCacheDirStatus = false; + FsPermission appCachePerms = new FsPermission(APPCACHE_PERM); + FsPermission fileperms = new FsPermission(FILECACHE_PERM); + + for (String localDir : localDirs) { + // create $local.dir/usercache/$user/appcache + Path localDirPath = new Path(localDir); + final Path appDir = getAppcacheDir(localDirPath, user); + try { + createDir(appDir, appCachePerms, true, user); + appcacheDirStatus = true; + } catch (IOException e) { + LOG.warn("Unable to create app cache directory : " + appDir, e); + } + // create $local.dir/usercache/$user/filecache + final Path distDir = getFileCacheDir(localDirPath, user); + try { + createDir(distDir, fileperms, true, user); + distributedCacheDirStatus = true; + } catch (IOException e) { + LOG.warn("Unable to create file cache directory : " + distDir, e); + } + } + if (!appcacheDirStatus) { + throw new IOException("Not able to initialize app-cache directories " + + "in any of the configured local directories for user " + user); + } + if (!distributedCacheDirStatus) { + throw new IOException( + "Not able to initialize distributed-cache directories " + + "in any of the configured local directories for user " + + user); + } + } + + /** + * Initialize the local directories for a particular user. + *
    + *
  • $local.dir/usercache/$user/appcache/$appid
  • + *
+ * @param localDirs + */ + void createAppDirs(List localDirs, String user, String appId) + throws IOException { + boolean initAppDirStatus = false; + FsPermission appperms = new FsPermission(APPDIR_PERM); + for (String localDir : localDirs) { + Path fullAppDir = getApplicationDir(new Path(localDir), user, appId); + // create $local.dir/usercache/$user/appcache/$appId + try { + createDir(fullAppDir, appperms, true, user); + initAppDirStatus = true; + } catch (IOException e) { + LOG.warn("Unable to create app directory " + fullAppDir.toString(), e); + } + } + if (!initAppDirStatus) { + throw new IOException("Not able to initialize app directories " + + "in any of the configured local directories for app " + + appId.toString()); + } + } + + + /** + * Create application log directories on all disks. + */ + void createContainerLogDirs(String appId, String containerId, + List logDirs, String user) throws IOException { + + boolean containerLogDirStatus = false; + FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM); + for (String rootLogDir : logDirs) { + // create $log.dir/$appid/$containerid + Path appLogDir = new Path(rootLogDir, appId); + Path containerLogDir = new Path(appLogDir, containerId); + try { + createDir(containerLogDir, containerLogDirPerms, true, user); + } catch (IOException e) { + LOG.warn("Unable to create the container-log directory : " + + appLogDir, e); + continue; + } + containerLogDirStatus = true; + } + if (!containerLogDirStatus) { + throw new IOException( + "Not able to initialize container-log directories " + + "in any of the configured local directories for container " + + containerId); + } + } + + /** + * Permissions for user dir. + * $local.dir/usercache/$user + */ + static final short USER_PERM = (short) 0750; + /** + * Permissions for user appcache dir. + * $local.dir/usercache/$user/appcache + */ + static final short APPCACHE_PERM = (short) 0710; + /** + * Permissions for user filecache dir. + * $local.dir/usercache/$user/filecache + */ + static final short FILECACHE_PERM = (short) 0710; + /** + * Permissions for user app dir. + * $local.dir/usercache/$user/appcache/$appId + */ + static final short APPDIR_PERM = (short) 0710; + /** + * Permissions for user log dir. + * $logdir/$user/$appId + */ + static final short LOGDIR_PERM = (short) 0710; + + private long getDiskFreeSpace(Path base) throws IOException { + return lfs.getFsStatus(base).getRemaining(); + } + + private Path getApplicationDir(Path base, String user, String appId) { + return new Path(getAppcacheDir(base, user), appId); + } + + private Path getUserCacheDir(Path base, String user) { + return new Path(new Path(base, ContainerLocalizer.USERCACHE), user); + } + + private Path getAppcacheDir(Path base, String user) { + return new Path(getUserCacheDir(base, user), + ContainerLocalizer.APPCACHE); + } + + private Path getFileCacheDir(Path base, String user) { + return new Path(getUserCacheDir(base, user), + ContainerLocalizer.FILECACHE); + } + + protected Path getWorkingDir(List localDirs, String user, + String appId) throws IOException { + Path appStorageDir = null; + long totalAvailable = 0L; + long[] availableOnDisk = new long[localDirs.size()]; + int i = 0; + // randomly choose the app directory + // the chance of picking a directory is proportional to + // the available space on the directory. + // firstly calculate the sum of all available space on these directories + for (String localDir : localDirs) { + Path curBase = getApplicationDir(new Path(localDir), + user, appId); + long space = 0L; + try { + space = getDiskFreeSpace(curBase); + } catch (IOException e) { + LOG.warn("Unable to get Free Space for " + curBase.toString(), e); + } + availableOnDisk[i++] = space; + totalAvailable += space; + } + + // throw an IOException if totalAvailable is 0. + if (totalAvailable <= 0L) { + throw new IOException("Not able to find a working directory for " + + user); + } + + // make probability to pick a directory proportional to + // the available space on the directory. + Random r = new Random(); + long randomPosition = Math.abs(r.nextLong()) % totalAvailable; + int dir = 0; + // skip zero available space directory, + // because totalAvailable is greater than 0 and randomPosition + // is less than totalAvailable, we can find a valid directory + // with nonzero available space. + while (availableOnDisk[dir] == 0L) { + dir++; + } + while (randomPosition > availableOnDisk[dir]) { + randomPosition -= availableOnDisk[dir++]; + } + appStorageDir = getApplicationDir(new Path(localDirs.get(dir)), + user, appId); + + return appStorageDir; + } + + /** + * Create application log directories on all disks. + */ + void createAppLogDirs(String appId, List logDirs, String user) + throws IOException { + + boolean appLogDirStatus = false; + FsPermission appLogDirPerms = new FsPermission(LOGDIR_PERM); + for (String rootLogDir : logDirs) { + // create $log.dir/$appid + Path appLogDir = new Path(rootLogDir, appId); + try { + createDir(appLogDir, appLogDirPerms, true, user); + } catch (IOException e) { + LOG.warn("Unable to create the app-log directory : " + appLogDir, e); + continue; + } + appLogDirStatus = true; + } + if (!appLogDirStatus) { + throw new IOException("Not able to initialize app-log directories " + + "in any of the configured local directories for app " + appId); + } + } + + /** + * @return the list of paths of given local directories + */ + private static List getPaths(List dirs) { + List paths = new ArrayList(dirs.size()); + for (int i = 0; i < dirs.size(); i++) { + paths.add(new Path(dirs.get(i))); + } + return paths; + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 8fc5ea3dd83..a87238d7932 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -269,7 +269,7 @@ public Integer call() { localResources, nmPrivateClasspathJarDir); // Write out the environment - writeLaunchEnv(containerScriptOutStream, environment, localResources, + exec.writeLaunchEnv(containerScriptOutStream, environment, localResources, launchContext.getCommands()); // /////////// End of writing out container-script @@ -502,8 +502,7 @@ Context getContext() { return context; } - @VisibleForTesting - static abstract class ShellScriptBuilder { + public static abstract class ShellScriptBuilder { public static ShellScriptBuilder create() { return Shell.WINDOWS ? new WindowsShellScriptBuilder() : new UnixShellScriptBuilder(); @@ -793,37 +792,6 @@ public void sanitizeEnv(Map environment, Path pwd, meta.getKey(), meta.getValue(), environment); } } - - static void writeLaunchEnv(OutputStream out, - Map environment, Map> resources, - List command) - throws IOException { - ShellScriptBuilder sb = ShellScriptBuilder.create(); - if (environment != null) { - for (Map.Entry env : environment.entrySet()) { - sb.env(env.getKey().toString(), env.getValue().toString()); - } - } - if (resources != null) { - for (Map.Entry> entry : resources.entrySet()) { - for (String linkName : entry.getValue()) { - sb.symlink(entry.getKey(), new Path(linkName)); - } - } - } - - sb.command(command); - - PrintStream pout = null; - try { - pout = new PrintStream(out); - sb.write(pout); - } finally { - if (out != null) { - out.close(); - } - } - } public static String getExitCodeFile(String pidFile) { return pidFile + EXIT_CODE_FILE_SUFFIX; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java new file mode 100644 index 00000000000..e43ac2eb672 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import com.google.common.base.Strings; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.LineNumberReader; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * This is intended to test the DockerContainerExecutor code, but it requires docker + * to be installed. + *
    + *
  1. Install docker, and Compile the code with docker-service-url set to the host and port + * where docker service is running. + *
    
    + * > mvn clean install -Ddocker-service-url=tcp://0.0.0.0:4243
    + *                          -DskipTests
    + * 
    + */ +public class TestDockerContainerExecutor { + private static final Log LOG = LogFactory + .getLog(TestDockerContainerExecutor.class); + private static File workSpace = null; + private DockerContainerExecutor exec = null; + private LocalDirsHandlerService dirsHandler; + private Path workDir; + private FileContext lfs; + private String yarnImage; + + private int id = 0; + private String appSubmitter; + private String dockerUrl; + private String testImage = "centos"; + private String dockerExec; + private String containerIdStr; + + + private ContainerId getNextContainerId() { + ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS); + String id = "CONTAINER_" + System.currentTimeMillis(); + when(cId.toString()).thenReturn(id); + return cId; + } + + @Before + public void setup() { + try { + lfs = FileContext.getLocalFSFileContext(); + workDir = new Path("/tmp/temp-" + System.currentTimeMillis()); + workSpace = new File(workDir.toUri().getPath()); + lfs.mkdir(workDir, FsPermission.getDirDefault(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + Configuration conf = new Configuration(); + yarnImage = "yarnImage"; + long time = System.currentTimeMillis(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, "/tmp/nm-local-dir" + time); + conf.set(YarnConfiguration.NM_LOG_DIRS, "/tmp/userlogs" + time); + + dockerUrl = System.getProperty("docker-service-url"); + LOG.info("dockerUrl: " + dockerUrl); + if (Strings.isNullOrEmpty(dockerUrl)) { + return; + } + dockerUrl = " -H " + dockerUrl; + dockerExec = "docker " + dockerUrl; + conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, yarnImage); + conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, dockerExec); + exec = new DockerContainerExecutor(); + dirsHandler = new LocalDirsHandlerService(); + dirsHandler.init(conf); + exec.setConf(conf); + appSubmitter = System.getProperty("application.submitter"); + if (appSubmitter == null || appSubmitter.isEmpty()) { + appSubmitter = "nobody"; + } + shellExec(dockerExec + " pull " + testImage); + + } + + private Shell.ShellCommandExecutor shellExec(String command) { + try { + + Shell.ShellCommandExecutor shExec = new Shell.ShellCommandExecutor( + command.split("\\s+"), + new File(workDir.toUri().getPath()), + System.getenv()); + shExec.execute(); + return shExec; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private boolean shouldRun() { + return exec != null; + } + + private int runAndBlock(ContainerId cId, Map launchCtxEnv, String... cmd) throws IOException { + String appId = "APP_" + System.currentTimeMillis(); + Container container = mock(Container.class); + ContainerLaunchContext context = mock(ContainerLaunchContext.class); + + when(container.getContainerId()).thenReturn(cId); + when(container.getLaunchContext()).thenReturn(context); + when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId); + when(context.getEnvironment()).thenReturn(launchCtxEnv); + + String script = writeScriptFile(launchCtxEnv, cmd); + + Path scriptPath = new Path(script); + Path tokensPath = new Path("/dev/null"); + Path workDir = new Path(workSpace.getAbsolutePath()); + Path pidFile = new Path(workDir, "pid.txt"); + + exec.activateContainer(cId, pidFile); + return exec.launchContainer(container, scriptPath, tokensPath, + appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), + dirsHandler.getLogDirs()); + } + + private String writeScriptFile(Map launchCtxEnv, String... cmd) throws IOException { + File f = File.createTempFile("TestDockerContainerExecutor", ".sh"); + f.deleteOnExit(); + PrintWriter p = new PrintWriter(new FileOutputStream(f)); + for(Map.Entry entry: launchCtxEnv.entrySet()) { + p.println("export " + entry.getKey() + "=\"" + entry.getValue() + "\""); + } + for (String part : cmd) { + p.print(part.replace("\\", "\\\\").replace("'", "\\'")); + p.print(" "); + } + p.println(); + p.close(); + return f.getAbsolutePath(); + } + + @After + public void tearDown() { + try { + lfs.delete(workDir, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testLaunchContainer() throws IOException { + if (!shouldRun()) { + LOG.warn("Docker not installed, aborting test."); + return; + } + + Map env = new HashMap(); + env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage); + String touchFileName = "touch-file-" + System.currentTimeMillis(); + File touchFile = new File(dirsHandler.getLocalDirs().get(0), touchFileName); + ContainerId cId = getNextContainerId(); + int ret = runAndBlock( + cId, env, "touch", touchFile.getAbsolutePath(), "&&", "cp", touchFile.getAbsolutePath(), "/"); + + assertEquals(0, ret); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java new file mode 100644 index 00000000000..fa8bfaf8dc6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import com.google.common.base.Strings; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.LineNumberReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; + +/** + * Mock tests for docker container executor + */ +public class TestDockerContainerExecutorWithMocks { + + private static final Log LOG = LogFactory + .getLog(TestDockerContainerExecutorWithMocks.class); + public static final String DOCKER_LAUNCH_COMMAND = "/bin/true"; + private DockerContainerExecutor dockerContainerExecutor = null; + private LocalDirsHandlerService dirsHandler; + private Path workDir; + private FileContext lfs; + private String yarnImage; + + @Before + public void setup() { + assumeTrue(!Path.WINDOWS); + File f = new File("./src/test/resources/mock-container-executor"); + if(!FileUtil.canExecute(f)) { + FileUtil.setExecutable(f, true); + } + String executorPath = f.getAbsolutePath(); + Configuration conf = new Configuration(); + yarnImage = "yarnImage"; + long time = System.currentTimeMillis(); + conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, "/tmp/nm-local-dir" + time); + conf.set(YarnConfiguration.NM_LOG_DIRS, "/tmp/userlogs" + time); + conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, yarnImage); + conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME , DOCKER_LAUNCH_COMMAND); + dockerContainerExecutor = new DockerContainerExecutor(); + dirsHandler = new LocalDirsHandlerService(); + dirsHandler.init(conf); + dockerContainerExecutor.setConf(conf); + lfs = null; + try { + lfs = FileContext.getLocalFSFileContext(); + workDir = new Path("/tmp/temp-"+ System.currentTimeMillis()); + lfs.mkdir(workDir, FsPermission.getDirDefault(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + + @After + public void tearDown() { + try { + lfs.delete(workDir, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test(expected = IllegalStateException.class) + public void testContainerInitSecure() throws IOException { + dockerContainerExecutor.getConf().set( + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + dockerContainerExecutor.init(); + } + + @Test(expected = IllegalArgumentException.class) + public void testContainerLaunchNullImage() throws IOException { + String appSubmitter = "nobody"; + String appId = "APP_ID"; + String containerId = "CONTAINER_ID"; + String testImage = ""; + + Container container = mock(Container.class, RETURNS_DEEP_STUBS); + ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS); + ContainerLaunchContext context = mock(ContainerLaunchContext.class); + HashMap env = new HashMap(); + + when(container.getContainerId()).thenReturn(cId); + when(container.getLaunchContext()).thenReturn(context); + when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId); + when(cId.toString()).thenReturn(containerId); + + when(context.getEnvironment()).thenReturn(env); + env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage); + dockerContainerExecutor.getConf() + .set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage); + Path scriptPath = new Path("file:///bin/echo"); + Path tokensPath = new Path("file:///dev/null"); + + Path pidFile = new Path(workDir, "pid.txt"); + + dockerContainerExecutor.activateContainer(cId, pidFile); + dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath, + appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), + dirsHandler.getLogDirs()); + } + + @Test(expected = IllegalArgumentException.class) + public void testContainerLaunchInvalidImage() throws IOException { + String appSubmitter = "nobody"; + String appId = "APP_ID"; + String containerId = "CONTAINER_ID"; + String testImage = "testrepo.com/test-image rm -rf $HADOOP_PREFIX/*"; + + Container container = mock(Container.class, RETURNS_DEEP_STUBS); + ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS); + ContainerLaunchContext context = mock(ContainerLaunchContext.class); + HashMap env = new HashMap(); + + when(container.getContainerId()).thenReturn(cId); + when(container.getLaunchContext()).thenReturn(context); + when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId); + when(cId.toString()).thenReturn(containerId); + + when(context.getEnvironment()).thenReturn(env); + env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage); + dockerContainerExecutor.getConf() + .set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage); + Path scriptPath = new Path("file:///bin/echo"); + Path tokensPath = new Path("file:///dev/null"); + + Path pidFile = new Path(workDir, "pid.txt"); + + dockerContainerExecutor.activateContainer(cId, pidFile); + dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath, + appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), + dirsHandler.getLogDirs()); + } + + @Test + public void testContainerLaunch() throws IOException { + String appSubmitter = "nobody"; + String appId = "APP_ID"; + String containerId = "CONTAINER_ID"; + String testImage = "\"sequenceiq/hadoop-docker:2.4.1\""; + + Container container = mock(Container.class, RETURNS_DEEP_STUBS); + ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS); + ContainerLaunchContext context = mock(ContainerLaunchContext.class); + HashMap env = new HashMap(); + + when(container.getContainerId()).thenReturn(cId); + when(container.getLaunchContext()).thenReturn(context); + when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId); + when(cId.toString()).thenReturn(containerId); + + when(context.getEnvironment()).thenReturn(env); + env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage); + Path scriptPath = new Path("file:///bin/echo"); + Path tokensPath = new Path("file:///dev/null"); + + Path pidFile = new Path(workDir, "pid"); + + dockerContainerExecutor.activateContainer(cId, pidFile); + int ret = dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath, + appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), + dirsHandler.getLogDirs()); + assertEquals(0, ret); + //get the script + Path sessionScriptPath = new Path(workDir, + Shell.appendScriptExtension( + DockerContainerExecutor.DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT)); + LineNumberReader lnr = new LineNumberReader(new FileReader(sessionScriptPath.toString())); + boolean cmdFound = false; + List localDirs = dirsToMount(dirsHandler.getLocalDirs()); + List logDirs = dirsToMount(dirsHandler.getLogDirs()); + List workDirMount = dirsToMount(Collections.singletonList(workDir.toUri().getPath())); + List expectedCommands = new ArrayList( + Arrays.asList(DOCKER_LAUNCH_COMMAND, "run", "--rm", "--net=host", "--name", containerId)); + expectedCommands.addAll(localDirs); + expectedCommands.addAll(logDirs); + expectedCommands.addAll(workDirMount); + String shellScript = workDir + "/launch_container.sh"; + + expectedCommands.addAll(Arrays.asList(testImage.replaceAll("['\"]", ""), "bash","\"" + shellScript + "\"")); + + String expectedPidString = "echo `/bin/true inspect --format {{.State.Pid}} " + containerId+"` > "+ pidFile.toString() + ".tmp"; + boolean pidSetterFound = false; + while(lnr.ready()){ + String line = lnr.readLine(); + LOG.debug("line: " + line); + if (line.startsWith(DOCKER_LAUNCH_COMMAND)){ + List command = new ArrayList(); + for( String s :line.split("\\s+")){ + command.add(s.trim()); + } + + assertEquals(expectedCommands, command); + cmdFound = true; + } else if (line.startsWith("echo")) { + assertEquals(expectedPidString, line); + pidSetterFound = true; + } + + } + assertTrue(cmdFound); + assertTrue(pidSetterFound); + } + + private List dirsToMount(List dirs) { + List localDirs = new ArrayList(); + for(String dir: dirs){ + localDirs.add("-v"); + localDirs.add(dir + ":" + dir); + } + return localDirs; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java index cbc41c411ed..4088c2ad11a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; @@ -144,7 +145,7 @@ public void testSpecialCharSymlinks() throws IOException { commands.add("/bin/sh ./\\\"" + badSymlink + "\\\""); } - ContainerLaunch.writeLaunchEnv(fos, env, resources, commands); + new DefaultContainerExecutor().writeLaunchEnv(fos, env, resources, commands); fos.flush(); fos.close(); FileUtil.setExecutable(tempFile, true); @@ -211,7 +212,7 @@ public void testInvalidSymlinkDiagnostics() throws IOException { } else { commands.add("/bin/sh ./\\\"" + symLink + "\\\""); } - ContainerLaunch.writeLaunchEnv(fos, env, resources, commands); + new DefaultContainerExecutor().writeLaunchEnv(fos, env, resources, commands); fos.flush(); fos.close(); FileUtil.setExecutable(tempFile, true); @@ -264,7 +265,7 @@ public void testInvalidEnvSyntaxDiagnostics() throws IOException { "\"workflowName\":\"\n\ninsert table " + "\npartition (cd_education_status)\nselect cd_demo_sk, cd_gender, " ); List commands = new ArrayList(); - ContainerLaunch.writeLaunchEnv(fos, env, resources, commands); + new DefaultContainerExecutor().writeLaunchEnv(fos, env, resources, commands); fos.flush(); fos.close(); @@ -341,7 +342,8 @@ public void testContainerLaunchStdoutAndStderrDiagnostics() throws IOException { Map env = new HashMap(); List commands = new ArrayList(); commands.add(command); - ContainerLaunch.writeLaunchEnv(fos, env, resources, commands); + ContainerExecutor exec = new DefaultContainerExecutor(); + exec.writeLaunchEnv(fos, env, resources, commands); fos.flush(); fos.close(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/DockerContainerExecutor.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/DockerContainerExecutor.apt.vm new file mode 100644 index 00000000000..45731735cae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/DockerContainerExecutor.apt.vm @@ -0,0 +1,200 @@ + +~~ Licensed under the Apache License, Version 2.0 (the "License"); +~~ you may not use this file except in compliance with the License. +~~ You may obtain a copy of the License at +~~ +~~ http://www.apache.org/licenses/LICENSE-2.0 +~~ +~~ Unless required by applicable law or agreed to in writing, software +~~ distributed under the License is distributed on an "AS IS" BASIS, +~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~~ See the License for the specific language governing permissions and +~~ limitations under the License. See accompanying LICENSE file. + + --- + Hadoop Map Reduce Next Generation-${project.version} - Docker Container Executor + --- + --- + ${maven.build.timestamp} + +Docker Container Executor + +%{toc|section=1|fromDepth=0} + +* {Overview} + + Docker (https://www.docker.io/) combines an easy-to-use interface to +Linux containers with easy-to-construct image files for those +containers. In short, Docker launches very light weight virtual +machines. + + The Docker Container Executor (DCE) allows the YARN NodeManager to +launch YARN containers into Docker containers. Users can specify the +Docker images they want for their YARN containers. These containers +provide a custom software environment in which the user's code runs, +isolated from the software environment of the NodeManager. These +containers can include special libraries needed by the application, +and they can have different versions of Perl, Python, and even Java +than what is installed on the NodeManager. Indeed, these containers +can run a different flavor of Linux than what is running on the +NodeManager -- although the YARN container must define all the environments + and libraries needed to run the job, nothing will be shared with the NodeManager. + + Docker for YARN provides both consistency (all YARN containers will +have the same software environment) and isolation (no interference +with whatever is installed on the physical machine). + +* {Cluster Configuration} + + Docker Container Executor runs in non-secure mode of HDFS and +YARN. It will not run in secure mode, and will exit if it detects +secure mode. + + The DockerContainerExecutor requires Docker daemon to be running on +the NodeManagers, and the Docker client installed and able to start Docker +containers. To prevent timeouts while starting jobs, the Docker +images to be used by a job should already be downloaded in the +NodeManagers. Here's an example of how this can be done: + +---- +sudo docker pull sequenceiq/hadoop-docker:2.4.1 +---- + + This should be done as part of the NodeManager startup. + + The following properties must be set in yarn-site.xml: + +---- + + yarn.nodemanager.docker-container-executor.exec-name + /usr/bin/docker + + Name or path to the Docker client. This is a required parameter. If this is empty, + user must pass an image name as part of the job invocation(see below). + + + + + yarn.nodemanager.container-executor.class + org.apache.hadoop.yarn.server.nodemanager.DockerContainerExecutor + + This is the container executor setting that ensures that all +jobs are started with the DockerContainerExecutor. + + +---- + + Administrators should be aware that DCE doesn't currently provide +user name-space isolation. This means, in particular, that software +running as root in the YARN container will have root privileges in the +underlying NodeManager. Put differently, DCE currently provides no +better security guarantees than YARN's Default Container Executor. In +fact, DockerContainerExecutor will exit if it detects secure yarn. + +* {Tips for connecting to a secure docker repository} + + By default, docker images are pulled from the docker public repository. The +format of a docker image url is: /. For example, +sequenceiq/hadoop-docker:2.4.1 is an image in docker public repository that contains java and +hadoop. + + If you want your own private repository, you provide the repository url instead of +your username. Therefore, the image url becomes: /. +For example, if your repository is on localhost:8080, your images would be like: + localhost:8080/hadoop-docker + + To connect to a secure docker repository, you can use the following invocation: + +---- +docker login [OPTIONS] [SERVER] + +Register or log in to a Docker registry server, if no server is specified +"https://index.docker.io/v1/" is the default. + +-e, --email="" Email +-p, --password="" Password +-u, --username="" Username +---- + + If you want to login to a self-hosted registry you can specify this by adding +the server name. + +---- +docker login +---- + + This needs to be run as part of the NodeManager startup, or as a cron job if +the login session expires periodically. You can login to multiple docker repositories +from the same NodeManager, but all your users will have access to all your repositories, +as at present the DockerContainerExecutor does not support per-job docker login. + +* {Job Configuration} + + Currently you cannot configure any of the Docker settings with the job configuration. +You can provide Mapper, Reducer, and ApplicationMaster environment overrides for the +docker images, using the following 3 JVM properties respectively(only for MR jobs): + * mapreduce.map.env: You can override the mapper's image by passing +yarn.nodemanager.docker-container-executor.image-name= +to this JVM property. + * mapreduce.reduce.env: You can override the reducer's image by passing +yarn.nodemanager.docker-container-executor.image-name= +to this JVM property. + * yarn.app.mapreduce.am.env: You can override the ApplicationMaster's image +by passing yarn.nodemanager.docker-container-executor.image-name= +to this JVM property. + +* {Docker Image requirements} + + The Docker Images used for YARN containers must meet the following +requirements: + + The distro and version of Linux in your Docker Image can be quite different +from that of your NodeManager. (Docker does have a few limitations in this +regard, but you're not likely to hit them.) However, if you're using the +MapReduce framework, then your image will need to be configured for running +Hadoop. Java must be installed in the container, and the following environment variables +must be defined in the image: JAVA_HOME, HADOOP_COMMON_PATH, HADOOP_HDFS_HOME, +HADOOP_MAPRED_HOME, HADOOP_YARN_HOME, and HADOOP_CONF_DIR + + +* {Working example of yarn launched docker containers.} + + The following example shows how to run teragen using DockerContainerExecutor. + + * First ensure that YARN is properly configured with DockerContainerExecutor(see above). + +---- + + yarn.nodemanager.docker-container-executor.exec-name + docker -H=tcp://0.0.0.0:4243 + + Name or path to the Docker client. The tcp socket must be + where docker daemon is listening. + + + + + yarn.nodemanager.container-executor.class + org.apache.hadoop.yarn.server.nodemanager.DockerContainerExecutor + + This is the container executor setting that ensures that all +jobs are started with the DockerContainerExecutor. + + +---- + + * Pick a custom Docker image if you want. In this example, we'll use sequenceiq/hadoop-docker:2.4.1 from the +docker hub repository. It has jdk, hadoop, and all the previously mentioned environment variables configured. + * Run: + +---- +hadoop jar $HADOOP_INSTALLATION_DIR/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar \ +teragen \ +-Dmapreduce.map.env="yarn.nodemanager.docker-container-executor.image-name=sequenceiq/hadoop-docker:2.4.1" \ +-Dyarn.app.mapreduce.am.env="yarn.nodemanager.docker-container-executor.image-name=sequenceiq/hadoop-docker:2.4.1" \ +1000 \ +teragen_out_dir +---- + + Once it succeeds, you can check the yarn debug logs to verify that docker indeed has launched containers. +