YARN-1964. Create Docker analog of the LinuxContainerExecutor in YARN
This commit is contained in:
parent
46f6f9d60d
commit
53f64ee516
|
@ -124,6 +124,7 @@
|
||||||
<item name="YARN Commands" href="hadoop-yarn/hadoop-yarn-site/YarnCommands.html"/>
|
<item name="YARN Commands" href="hadoop-yarn/hadoop-yarn-site/YarnCommands.html"/>
|
||||||
<item name="Scheduler Load Simulator" href="hadoop-sls/SchedulerLoadSimulator.html"/>
|
<item name="Scheduler Load Simulator" href="hadoop-sls/SchedulerLoadSimulator.html"/>
|
||||||
<item name="NodeManager Restart" href="hadoop-yarn/hadoop-yarn-site/NodeManagerRestart.html"/>
|
<item name="NodeManager Restart" href="hadoop-yarn/hadoop-yarn-site/NodeManagerRestart.html"/>
|
||||||
|
<item name="DockerContainerExecutor" href="hadoop-yarn/hadoop-yarn-site/DockerContainerExecutor.html"/>
|
||||||
</menu>
|
</menu>
|
||||||
|
|
||||||
<menu name="YARN REST APIs" inherit="top">
|
<menu name="YARN REST APIs" inherit="top">
|
||||||
|
|
|
@ -89,6 +89,9 @@ Release 2.6.0 - 2014-11-15
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
|
YARN-1964. Create Docker analog of the LinuxContainerExecutor in YARN. (Abin
|
||||||
|
Shahab via raviprak)
|
||||||
|
|
||||||
YARN-2131. Add a way to format the RMStateStore. (Robert Kanter via kasha)
|
YARN-2131. Add a way to format the RMStateStore. (Robert Kanter via kasha)
|
||||||
|
|
||||||
YARN-1367. Changed NM to not kill containers on NM resync if RM work-preserving
|
YARN-1367. Changed NM to not kill containers on NM resync if RM work-preserving
|
||||||
|
|
|
@ -891,7 +891,19 @@ public class YarnConfiguration extends Configuration {
|
||||||
/** The arguments to pass to the health check script.*/
|
/** The arguments to pass to the health check script.*/
|
||||||
public static final String NM_HEALTH_CHECK_SCRIPT_OPTS =
|
public static final String NM_HEALTH_CHECK_SCRIPT_OPTS =
|
||||||
NM_PREFIX + "health-checker.script.opts";
|
NM_PREFIX + "health-checker.script.opts";
|
||||||
|
|
||||||
|
/** The Docker image name(For DockerContainerExecutor).*/
|
||||||
|
public static final String NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME =
|
||||||
|
NM_PREFIX + "docker-container-executor.image-name";
|
||||||
|
|
||||||
|
/** The name of the docker executor (For DockerContainerExecutor).*/
|
||||||
|
public static final String NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME =
|
||||||
|
NM_PREFIX + "docker-container-executor.exec-name";
|
||||||
|
|
||||||
|
/** The default docker executor (For DockerContainerExecutor).*/
|
||||||
|
public static final String NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME =
|
||||||
|
"/usr/bin/docker";
|
||||||
|
|
||||||
/** The path to the Linux container executor.*/
|
/** The path to the Linux container executor.*/
|
||||||
public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH =
|
public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH =
|
||||||
NM_PREFIX + "linux-container-executor.path";
|
NM_PREFIX + "linux-container-executor.path";
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
<!-- there. If yarn-site.xml does not already exist, create it. -->
|
<!-- there. If yarn-site.xml does not already exist, create it. -->
|
||||||
|
|
||||||
<configuration>
|
<configuration>
|
||||||
|
|
||||||
<!-- IPC Configs -->
|
<!-- IPC Configs -->
|
||||||
<property>
|
<property>
|
||||||
<description>Factory to create client IPC classes.</description>
|
<description>Factory to create client IPC classes.</description>
|
||||||
|
@ -1146,6 +1146,16 @@
|
||||||
<value>${hadoop.tmp.dir}/yarn-nm-recovery</value>
|
<value>${hadoop.tmp.dir}/yarn-nm-recovery</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<!--Docker configuration-->
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.nodemanager.docker-container-executor.exec-name</name>
|
||||||
|
<value>/usr/bin/docker</value>
|
||||||
|
<description>
|
||||||
|
Name or path to the Docker client.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<!--Map Reduce configuration-->
|
<!--Map Reduce configuration-->
|
||||||
<property>
|
<property>
|
||||||
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
|
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
|
||||||
|
|
|
@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.PrintStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
@ -211,6 +214,34 @@ public abstract class ContainerExecutor implements Configurable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command) throws IOException{
|
||||||
|
ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create();
|
||||||
|
if (environment != null) {
|
||||||
|
for (Map.Entry<String,String> env : environment.entrySet()) {
|
||||||
|
sb.env(env.getKey().toString(), env.getValue().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (resources != null) {
|
||||||
|
for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
|
||||||
|
for (String linkName : entry.getValue()) {
|
||||||
|
sb.symlink(entry.getKey(), new Path(linkName));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sb.command(command);
|
||||||
|
|
||||||
|
PrintStream pout = null;
|
||||||
|
try {
|
||||||
|
pout = new PrintStream(out);
|
||||||
|
sb.write(pout);
|
||||||
|
} finally {
|
||||||
|
if (out != null) {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public enum ExitCode {
|
public enum ExitCode {
|
||||||
FORCE_KILLED(137),
|
FORCE_KILLED(137),
|
||||||
TERMINATED(143),
|
TERMINATED(143),
|
||||||
|
|
|
@ -0,0 +1,794 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.base.Strings;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
|
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.PrintStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
|
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This executor will launch a docker container and run the task inside the container.
|
||||||
|
*/
|
||||||
|
public class DockerContainerExecutor extends ContainerExecutor {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(DockerContainerExecutor.class);
|
||||||
|
public static final String DOCKER_CONTAINER_EXECUTOR_SCRIPT = "docker_container_executor";
|
||||||
|
public static final String DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT = "docker_container_executor_session";
|
||||||
|
|
||||||
|
// This validates that the image is a proper docker image and would not crash docker.
|
||||||
|
public static final String DOCKER_IMAGE_PATTERN = "^(([\\w\\.-]+)(:\\d+)*\\/)?[\\w\\.:-]+$";
|
||||||
|
|
||||||
|
|
||||||
|
private final FileContext lfs;
|
||||||
|
private final Pattern dockerImagePattern;
|
||||||
|
|
||||||
|
public DockerContainerExecutor() {
|
||||||
|
try {
|
||||||
|
this.lfs = FileContext.getLocalFSFileContext();
|
||||||
|
this.dockerImagePattern = Pattern.compile(DOCKER_IMAGE_PATTERN);
|
||||||
|
} catch (UnsupportedFileSystemException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void copyFile(Path src, Path dst, String owner) throws IOException {
|
||||||
|
lfs.util().copy(src, dst);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() throws IOException {
|
||||||
|
String auth = getConf().get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);
|
||||||
|
if (auth != null && !auth.equals("simple")) {
|
||||||
|
throw new IllegalStateException("DockerContainerExecutor only works with simple authentication mode");
|
||||||
|
}
|
||||||
|
String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
|
||||||
|
YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME);
|
||||||
|
if (!new File(dockerExecutor).exists()) {
|
||||||
|
throw new IllegalStateException("Invalid docker exec path: " + dockerExecutor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void startLocalizer(Path nmPrivateContainerTokensPath,
|
||||||
|
InetSocketAddress nmAddr, String user, String appId, String locId,
|
||||||
|
LocalDirsHandlerService dirsHandler)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
List<String> localDirs = dirsHandler.getLocalDirs();
|
||||||
|
List<String> logDirs = dirsHandler.getLogDirs();
|
||||||
|
|
||||||
|
ContainerLocalizer localizer =
|
||||||
|
new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs),
|
||||||
|
RecordFactoryProvider.getRecordFactory(getConf()));
|
||||||
|
|
||||||
|
createUserLocalDirs(localDirs, user);
|
||||||
|
createUserCacheDirs(localDirs, user);
|
||||||
|
createAppDirs(localDirs, user, appId);
|
||||||
|
createAppLogDirs(appId, logDirs, user);
|
||||||
|
|
||||||
|
// randomly choose the local directory
|
||||||
|
Path appStorageDir = getWorkingDir(localDirs, user, appId);
|
||||||
|
|
||||||
|
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
|
||||||
|
Path tokenDst = new Path(appStorageDir, tokenFn);
|
||||||
|
copyFile(nmPrivateContainerTokensPath, tokenDst, user);
|
||||||
|
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
|
||||||
|
lfs.setWorkingDirectory(appStorageDir);
|
||||||
|
LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
|
||||||
|
// TODO: DO it over RPC for maintaining similarity?
|
||||||
|
localizer.runLocalization(nmAddr);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int launchContainer(Container container,
|
||||||
|
Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
|
||||||
|
String userName, String appId, Path containerWorkDir,
|
||||||
|
List<String> localDirs, List<String> logDirs) throws IOException {
|
||||||
|
String containerImageName = container.getLaunchContext().getEnvironment()
|
||||||
|
.get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("containerImageName from launchContext: " + containerImageName);
|
||||||
|
}
|
||||||
|
Preconditions.checkArgument(!Strings.isNullOrEmpty(containerImageName), "Container image must not be null");
|
||||||
|
containerImageName = containerImageName.replaceAll("['\"]", "");
|
||||||
|
|
||||||
|
Preconditions.checkArgument(saneDockerImage(containerImageName), "Image: " + containerImageName + " is not a proper docker image");
|
||||||
|
String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
|
||||||
|
YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME);
|
||||||
|
|
||||||
|
FsPermission dirPerm = new FsPermission(APPDIR_PERM);
|
||||||
|
ContainerId containerId = container.getContainerId();
|
||||||
|
|
||||||
|
// create container dirs on all disks
|
||||||
|
String containerIdStr = ConverterUtils.toString(containerId);
|
||||||
|
String appIdStr =
|
||||||
|
ConverterUtils.toString(
|
||||||
|
containerId.getApplicationAttemptId().
|
||||||
|
getApplicationId());
|
||||||
|
for (String sLocalDir : localDirs) {
|
||||||
|
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
|
||||||
|
Path userdir = new Path(usersdir, userName);
|
||||||
|
Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
|
||||||
|
Path appDir = new Path(appCacheDir, appIdStr);
|
||||||
|
Path containerDir = new Path(appDir, containerIdStr);
|
||||||
|
createDir(containerDir, dirPerm, true, userName);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the container log-dirs on all disks
|
||||||
|
createContainerLogDirs(appIdStr, containerIdStr, logDirs, userName);
|
||||||
|
|
||||||
|
Path tmpDir = new Path(containerWorkDir,
|
||||||
|
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
|
||||||
|
createDir(tmpDir, dirPerm, false, userName);
|
||||||
|
|
||||||
|
// copy launch script to work dir
|
||||||
|
Path launchDst =
|
||||||
|
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
|
||||||
|
lfs.util().copy(nmPrivateContainerScriptPath, launchDst);
|
||||||
|
|
||||||
|
// copy container tokens to work dir
|
||||||
|
Path tokenDst =
|
||||||
|
new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
|
||||||
|
lfs.util().copy(nmPrivateTokensPath, tokenDst);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
String localDirMount = toMount(localDirs);
|
||||||
|
String logDirMount = toMount(logDirs);
|
||||||
|
String containerWorkDirMount = toMount(Collections.singletonList(containerWorkDir.toUri().getPath()));
|
||||||
|
StringBuilder commands = new StringBuilder();
|
||||||
|
String commandStr = commands.append(dockerExecutor)
|
||||||
|
.append(" ")
|
||||||
|
.append("run")
|
||||||
|
.append(" ")
|
||||||
|
.append("--rm --net=host")
|
||||||
|
.append(" ")
|
||||||
|
.append(" --name " + containerIdStr)
|
||||||
|
.append(localDirMount)
|
||||||
|
.append(logDirMount)
|
||||||
|
.append(containerWorkDirMount)
|
||||||
|
.append(" ")
|
||||||
|
.append(containerImageName)
|
||||||
|
.toString();
|
||||||
|
String dockerPidScript = "`" + dockerExecutor + " inspect --format {{.State.Pid}} " + containerIdStr + "`";
|
||||||
|
// Create new local launch wrapper script
|
||||||
|
LocalWrapperScriptBuilder sb =
|
||||||
|
new UnixLocalWrapperScriptBuilder(containerWorkDir, commandStr, dockerPidScript);
|
||||||
|
Path pidFile = getPidFilePath(containerId);
|
||||||
|
if (pidFile != null) {
|
||||||
|
sb.writeLocalWrapperScript(launchDst, pidFile);
|
||||||
|
} else {
|
||||||
|
LOG.info("Container " + containerIdStr
|
||||||
|
+ " was marked as inactive. Returning terminated error");
|
||||||
|
return ExitCode.TERMINATED.getExitCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
ShellCommandExecutor shExec = null;
|
||||||
|
try {
|
||||||
|
lfs.setPermission(launchDst,
|
||||||
|
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
|
||||||
|
lfs.setPermission(sb.getWrapperScriptPath(),
|
||||||
|
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
|
||||||
|
|
||||||
|
// Setup command to run
|
||||||
|
String[] command = getRunCommand(sb.getWrapperScriptPath().toString(),
|
||||||
|
containerIdStr, userName, pidFile, this.getConf());
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("launchContainer: " + commandStr + " " + Joiner.on(" ").join(command));
|
||||||
|
}
|
||||||
|
shExec = new ShellCommandExecutor(
|
||||||
|
command,
|
||||||
|
new File(containerWorkDir.toUri().getPath()),
|
||||||
|
container.getLaunchContext().getEnvironment()); // sanitized env
|
||||||
|
if (isContainerActive(containerId)) {
|
||||||
|
shExec.execute();
|
||||||
|
} else {
|
||||||
|
LOG.info("Container " + containerIdStr +
|
||||||
|
" was marked as inactive. Returning terminated error");
|
||||||
|
return ExitCode.TERMINATED.getExitCode();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (null == shExec) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
int exitCode = shExec.getExitCode();
|
||||||
|
LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
|
||||||
|
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
|
||||||
|
// terminated/killed forcefully. In all other cases, log the
|
||||||
|
// container-executor's output
|
||||||
|
if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
|
||||||
|
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
|
||||||
|
LOG.warn("Exception from container-launch with container ID: "
|
||||||
|
+ containerId + " and exit code: " + exitCode, e);
|
||||||
|
logOutput(shExec.getOutput());
|
||||||
|
String diagnostics = "Exception from container-launch: \n"
|
||||||
|
+ StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
|
||||||
|
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
|
||||||
|
diagnostics));
|
||||||
|
} else {
|
||||||
|
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
|
||||||
|
"Container killed on request. Exit code is " + exitCode));
|
||||||
|
}
|
||||||
|
return exitCode;
|
||||||
|
} finally {
|
||||||
|
if (shExec != null) {
|
||||||
|
shExec.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command) throws IOException {
|
||||||
|
ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create();
|
||||||
|
|
||||||
|
Set<String> exclusionSet = new HashSet<String>();
|
||||||
|
exclusionSet.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
|
||||||
|
exclusionSet.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name());
|
||||||
|
exclusionSet.add(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name());
|
||||||
|
exclusionSet.add(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name());
|
||||||
|
exclusionSet.add(ApplicationConstants.Environment.HADOOP_CONF_DIR.name());
|
||||||
|
exclusionSet.add(ApplicationConstants.Environment.JAVA_HOME.name());
|
||||||
|
|
||||||
|
if (environment != null) {
|
||||||
|
for (Map.Entry<String,String> env : environment.entrySet()) {
|
||||||
|
if (!exclusionSet.contains(env.getKey())) {
|
||||||
|
sb.env(env.getKey().toString(), env.getValue().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (resources != null) {
|
||||||
|
for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
|
||||||
|
for (String linkName : entry.getValue()) {
|
||||||
|
sb.symlink(entry.getKey(), new Path(linkName));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sb.command(command);
|
||||||
|
|
||||||
|
PrintStream pout = null;
|
||||||
|
PrintStream ps = null;
|
||||||
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
try {
|
||||||
|
pout = new PrintStream(out);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
ps = new PrintStream(baos);
|
||||||
|
sb.write(ps);
|
||||||
|
}
|
||||||
|
sb.write(pout);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (out != null) {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
if (ps != null) {
|
||||||
|
ps.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Script: " + baos.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean saneDockerImage(String containerImageName) {
|
||||||
|
return dockerImagePattern.matcher(containerImageName).matches();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean signalContainer(String user, String pid, Signal signal)
|
||||||
|
throws IOException {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid
|
||||||
|
+ " as user " + user);
|
||||||
|
}
|
||||||
|
if (!containerIsAlive(pid)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
killContainer(pid, signal);
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (!containerIsAlive(pid)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isContainerProcessAlive(String user, String pid)
|
||||||
|
throws IOException {
|
||||||
|
return containerIsAlive(pid);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if the process with the specified pid is alive.
|
||||||
|
*
|
||||||
|
* @param pid String pid
|
||||||
|
* @return boolean true if the process is alive
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static boolean containerIsAlive(String pid) throws IOException {
|
||||||
|
try {
|
||||||
|
new ShellCommandExecutor(Shell.getCheckProcessIsAliveCommand(pid))
|
||||||
|
.execute();
|
||||||
|
// successful execution means process is alive
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
catch (Shell.ExitCodeException e) {
|
||||||
|
// failure (non-zero exit code) means process is not alive
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a specified signal to the specified pid
|
||||||
|
*
|
||||||
|
* @param pid the pid of the process [group] to signal.
|
||||||
|
* @param signal signal to send
|
||||||
|
* (for logging).
|
||||||
|
*/
|
||||||
|
protected void killContainer(String pid, Signal signal) throws IOException {
|
||||||
|
new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid))
|
||||||
|
.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteAsUser(String user, Path subDir, Path... baseDirs)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
if (baseDirs == null || baseDirs.length == 0) {
|
||||||
|
LOG.info("Deleting absolute path : " + subDir);
|
||||||
|
if (!lfs.delete(subDir, true)) {
|
||||||
|
//Maybe retry
|
||||||
|
LOG.warn("delete returned false for path: [" + subDir + "]");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (Path baseDir : baseDirs) {
|
||||||
|
Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
|
||||||
|
LOG.info("Deleting path : " + del);
|
||||||
|
if (!lfs.delete(del, true)) {
|
||||||
|
LOG.warn("delete returned false for path: [" + del + "]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a directory list to a docker mount string
|
||||||
|
* @param dirs
|
||||||
|
* @return a string of mounts for docker
|
||||||
|
*/
|
||||||
|
private String toMount(List<String> dirs) {
|
||||||
|
StringBuilder builder = new StringBuilder();
|
||||||
|
for (String dir : dirs) {
|
||||||
|
builder.append(" -v " + dir + ":" + dir);
|
||||||
|
}
|
||||||
|
return builder.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private abstract class LocalWrapperScriptBuilder {
|
||||||
|
|
||||||
|
private final Path wrapperScriptPath;
|
||||||
|
|
||||||
|
public Path getWrapperScriptPath() {
|
||||||
|
return wrapperScriptPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOException {
|
||||||
|
DataOutputStream out = null;
|
||||||
|
PrintStream pout = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
out = lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE));
|
||||||
|
pout = new PrintStream(out);
|
||||||
|
writeLocalWrapperScript(launchDst, pidFile, pout);
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, pout, out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile,
|
||||||
|
PrintStream pout);
|
||||||
|
|
||||||
|
protected LocalWrapperScriptBuilder(Path containerWorkDir) {
|
||||||
|
this.wrapperScriptPath = new Path(containerWorkDir,
|
||||||
|
Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SCRIPT));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class UnixLocalWrapperScriptBuilder
|
||||||
|
extends LocalWrapperScriptBuilder {
|
||||||
|
private final Path sessionScriptPath;
|
||||||
|
private final String dockerCommand;
|
||||||
|
private final String dockerPidScript;
|
||||||
|
|
||||||
|
public UnixLocalWrapperScriptBuilder(Path containerWorkDir, String dockerCommand, String dockerPidScript) {
|
||||||
|
super(containerWorkDir);
|
||||||
|
this.dockerCommand = dockerCommand;
|
||||||
|
this.dockerPidScript = dockerPidScript;
|
||||||
|
this.sessionScriptPath = new Path(containerWorkDir,
|
||||||
|
Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeLocalWrapperScript(Path launchDst, Path pidFile)
|
||||||
|
throws IOException {
|
||||||
|
writeSessionScript(launchDst, pidFile);
|
||||||
|
super.writeLocalWrapperScript(launchDst, pidFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeLocalWrapperScript(Path launchDst, Path pidFile,
|
||||||
|
PrintStream pout) {
|
||||||
|
|
||||||
|
String exitCodeFile = ContainerLaunch.getExitCodeFile(
|
||||||
|
pidFile.toString());
|
||||||
|
String tmpFile = exitCodeFile + ".tmp";
|
||||||
|
pout.println("#!/usr/bin/env bash");
|
||||||
|
pout.println("bash \"" + sessionScriptPath.toString() + "\"");
|
||||||
|
pout.println("rc=$?");
|
||||||
|
pout.println("echo $rc > \"" + tmpFile + "\"");
|
||||||
|
pout.println("mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\"");
|
||||||
|
pout.println("exit $rc");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeSessionScript(Path launchDst, Path pidFile)
|
||||||
|
throws IOException {
|
||||||
|
DataOutputStream out = null;
|
||||||
|
PrintStream pout = null;
|
||||||
|
try {
|
||||||
|
out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE));
|
||||||
|
pout = new PrintStream(out);
|
||||||
|
// We need to do a move as writing to a file is not atomic
|
||||||
|
// Process reading a file being written to may get garbled data
|
||||||
|
// hence write pid to tmp file first followed by a mv
|
||||||
|
pout.println("#!/usr/bin/env bash");
|
||||||
|
pout.println();
|
||||||
|
pout.println("echo "+ dockerPidScript +" > " + pidFile.toString() + ".tmp");
|
||||||
|
pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
|
||||||
|
pout.println(dockerCommand + " bash \"" +
|
||||||
|
launchDst.toUri().getPath().toString() + "\"");
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, pout, out);
|
||||||
|
}
|
||||||
|
lfs.setPermission(sessionScriptPath,
|
||||||
|
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void createDir(Path dirPath, FsPermission perms,
|
||||||
|
boolean createParent, String user) throws IOException {
|
||||||
|
lfs.mkdir(dirPath, perms, createParent);
|
||||||
|
if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
|
||||||
|
lfs.setPermission(dirPath, perms);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the local directories for a particular user.
|
||||||
|
* <ul>.mkdir
|
||||||
|
* <li>$local.dir/usercache/$user</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
void createUserLocalDirs(List<String> localDirs, String user)
|
||||||
|
throws IOException {
|
||||||
|
boolean userDirStatus = false;
|
||||||
|
FsPermission userperms = new FsPermission(USER_PERM);
|
||||||
|
for (String localDir : localDirs) {
|
||||||
|
// create $local.dir/usercache/$user and its immediate parent
|
||||||
|
try {
|
||||||
|
createDir(getUserCacheDir(new Path(localDir), user), userperms, true, user);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unable to create the user directory : " + localDir, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
userDirStatus = true;
|
||||||
|
}
|
||||||
|
if (!userDirStatus) {
|
||||||
|
throw new IOException("Not able to initialize user directories "
|
||||||
|
+ "in any of the configured local directories for user " + user);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the local cache directories for a particular user.
|
||||||
|
* <ul>
|
||||||
|
* <li>$local.dir/usercache/$user</li>
|
||||||
|
* <li>$local.dir/usercache/$user/appcache</li>
|
||||||
|
* <li>$local.dir/usercache/$user/filecache</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
void createUserCacheDirs(List<String> localDirs, String user)
|
||||||
|
throws IOException {
|
||||||
|
LOG.info("Initializing user " + user);
|
||||||
|
|
||||||
|
boolean appcacheDirStatus = false;
|
||||||
|
boolean distributedCacheDirStatus = false;
|
||||||
|
FsPermission appCachePerms = new FsPermission(APPCACHE_PERM);
|
||||||
|
FsPermission fileperms = new FsPermission(FILECACHE_PERM);
|
||||||
|
|
||||||
|
for (String localDir : localDirs) {
|
||||||
|
// create $local.dir/usercache/$user/appcache
|
||||||
|
Path localDirPath = new Path(localDir);
|
||||||
|
final Path appDir = getAppcacheDir(localDirPath, user);
|
||||||
|
try {
|
||||||
|
createDir(appDir, appCachePerms, true, user);
|
||||||
|
appcacheDirStatus = true;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unable to create app cache directory : " + appDir, e);
|
||||||
|
}
|
||||||
|
// create $local.dir/usercache/$user/filecache
|
||||||
|
final Path distDir = getFileCacheDir(localDirPath, user);
|
||||||
|
try {
|
||||||
|
createDir(distDir, fileperms, true, user);
|
||||||
|
distributedCacheDirStatus = true;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unable to create file cache directory : " + distDir, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!appcacheDirStatus) {
|
||||||
|
throw new IOException("Not able to initialize app-cache directories "
|
||||||
|
+ "in any of the configured local directories for user " + user);
|
||||||
|
}
|
||||||
|
if (!distributedCacheDirStatus) {
|
||||||
|
throw new IOException(
|
||||||
|
"Not able to initialize distributed-cache directories "
|
||||||
|
+ "in any of the configured local directories for user "
|
||||||
|
+ user);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the local directories for a particular user.
|
||||||
|
* <ul>
|
||||||
|
* <li>$local.dir/usercache/$user/appcache/$appid</li>
|
||||||
|
* </ul>
|
||||||
|
* @param localDirs
|
||||||
|
*/
|
||||||
|
void createAppDirs(List<String> localDirs, String user, String appId)
|
||||||
|
throws IOException {
|
||||||
|
boolean initAppDirStatus = false;
|
||||||
|
FsPermission appperms = new FsPermission(APPDIR_PERM);
|
||||||
|
for (String localDir : localDirs) {
|
||||||
|
Path fullAppDir = getApplicationDir(new Path(localDir), user, appId);
|
||||||
|
// create $local.dir/usercache/$user/appcache/$appId
|
||||||
|
try {
|
||||||
|
createDir(fullAppDir, appperms, true, user);
|
||||||
|
initAppDirStatus = true;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unable to create app directory " + fullAppDir.toString(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!initAppDirStatus) {
|
||||||
|
throw new IOException("Not able to initialize app directories "
|
||||||
|
+ "in any of the configured local directories for app "
|
||||||
|
+ appId.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create application log directories on all disks.
|
||||||
|
*/
|
||||||
|
void createContainerLogDirs(String appId, String containerId,
|
||||||
|
List<String> logDirs, String user) throws IOException {
|
||||||
|
|
||||||
|
boolean containerLogDirStatus = false;
|
||||||
|
FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM);
|
||||||
|
for (String rootLogDir : logDirs) {
|
||||||
|
// create $log.dir/$appid/$containerid
|
||||||
|
Path appLogDir = new Path(rootLogDir, appId);
|
||||||
|
Path containerLogDir = new Path(appLogDir, containerId);
|
||||||
|
try {
|
||||||
|
createDir(containerLogDir, containerLogDirPerms, true, user);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unable to create the container-log directory : "
|
||||||
|
+ appLogDir, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
containerLogDirStatus = true;
|
||||||
|
}
|
||||||
|
if (!containerLogDirStatus) {
|
||||||
|
throw new IOException(
|
||||||
|
"Not able to initialize container-log directories "
|
||||||
|
+ "in any of the configured local directories for container "
|
||||||
|
+ containerId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Permissions for user dir.
|
||||||
|
* $local.dir/usercache/$user
|
||||||
|
*/
|
||||||
|
static final short USER_PERM = (short) 0750;
|
||||||
|
/**
|
||||||
|
* Permissions for user appcache dir.
|
||||||
|
* $local.dir/usercache/$user/appcache
|
||||||
|
*/
|
||||||
|
static final short APPCACHE_PERM = (short) 0710;
|
||||||
|
/**
|
||||||
|
* Permissions for user filecache dir.
|
||||||
|
* $local.dir/usercache/$user/filecache
|
||||||
|
*/
|
||||||
|
static final short FILECACHE_PERM = (short) 0710;
|
||||||
|
/**
|
||||||
|
* Permissions for user app dir.
|
||||||
|
* $local.dir/usercache/$user/appcache/$appId
|
||||||
|
*/
|
||||||
|
static final short APPDIR_PERM = (short) 0710;
|
||||||
|
/**
|
||||||
|
* Permissions for user log dir.
|
||||||
|
* $logdir/$user/$appId
|
||||||
|
*/
|
||||||
|
static final short LOGDIR_PERM = (short) 0710;
|
||||||
|
|
||||||
|
private long getDiskFreeSpace(Path base) throws IOException {
|
||||||
|
return lfs.getFsStatus(base).getRemaining();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path getApplicationDir(Path base, String user, String appId) {
|
||||||
|
return new Path(getAppcacheDir(base, user), appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path getUserCacheDir(Path base, String user) {
|
||||||
|
return new Path(new Path(base, ContainerLocalizer.USERCACHE), user);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path getAppcacheDir(Path base, String user) {
|
||||||
|
return new Path(getUserCacheDir(base, user),
|
||||||
|
ContainerLocalizer.APPCACHE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path getFileCacheDir(Path base, String user) {
|
||||||
|
return new Path(getUserCacheDir(base, user),
|
||||||
|
ContainerLocalizer.FILECACHE);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Path getWorkingDir(List<String> localDirs, String user,
|
||||||
|
String appId) throws IOException {
|
||||||
|
Path appStorageDir = null;
|
||||||
|
long totalAvailable = 0L;
|
||||||
|
long[] availableOnDisk = new long[localDirs.size()];
|
||||||
|
int i = 0;
|
||||||
|
// randomly choose the app directory
|
||||||
|
// the chance of picking a directory is proportional to
|
||||||
|
// the available space on the directory.
|
||||||
|
// firstly calculate the sum of all available space on these directories
|
||||||
|
for (String localDir : localDirs) {
|
||||||
|
Path curBase = getApplicationDir(new Path(localDir),
|
||||||
|
user, appId);
|
||||||
|
long space = 0L;
|
||||||
|
try {
|
||||||
|
space = getDiskFreeSpace(curBase);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unable to get Free Space for " + curBase.toString(), e);
|
||||||
|
}
|
||||||
|
availableOnDisk[i++] = space;
|
||||||
|
totalAvailable += space;
|
||||||
|
}
|
||||||
|
|
||||||
|
// throw an IOException if totalAvailable is 0.
|
||||||
|
if (totalAvailable <= 0L) {
|
||||||
|
throw new IOException("Not able to find a working directory for "
|
||||||
|
+ user);
|
||||||
|
}
|
||||||
|
|
||||||
|
// make probability to pick a directory proportional to
|
||||||
|
// the available space on the directory.
|
||||||
|
Random r = new Random();
|
||||||
|
long randomPosition = Math.abs(r.nextLong()) % totalAvailable;
|
||||||
|
int dir = 0;
|
||||||
|
// skip zero available space directory,
|
||||||
|
// because totalAvailable is greater than 0 and randomPosition
|
||||||
|
// is less than totalAvailable, we can find a valid directory
|
||||||
|
// with nonzero available space.
|
||||||
|
while (availableOnDisk[dir] == 0L) {
|
||||||
|
dir++;
|
||||||
|
}
|
||||||
|
while (randomPosition > availableOnDisk[dir]) {
|
||||||
|
randomPosition -= availableOnDisk[dir++];
|
||||||
|
}
|
||||||
|
appStorageDir = getApplicationDir(new Path(localDirs.get(dir)),
|
||||||
|
user, appId);
|
||||||
|
|
||||||
|
return appStorageDir;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create application log directories on all disks.
|
||||||
|
*/
|
||||||
|
void createAppLogDirs(String appId, List<String> logDirs, String user)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
boolean appLogDirStatus = false;
|
||||||
|
FsPermission appLogDirPerms = new FsPermission(LOGDIR_PERM);
|
||||||
|
for (String rootLogDir : logDirs) {
|
||||||
|
// create $log.dir/$appid
|
||||||
|
Path appLogDir = new Path(rootLogDir, appId);
|
||||||
|
try {
|
||||||
|
createDir(appLogDir, appLogDirPerms, true, user);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Unable to create the app-log directory : " + appLogDir, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
appLogDirStatus = true;
|
||||||
|
}
|
||||||
|
if (!appLogDirStatus) {
|
||||||
|
throw new IOException("Not able to initialize app-log directories "
|
||||||
|
+ "in any of the configured local directories for app " + appId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the list of paths of given local directories
|
||||||
|
*/
|
||||||
|
private static List<Path> getPaths(List<String> dirs) {
|
||||||
|
List<Path> paths = new ArrayList<Path>(dirs.size());
|
||||||
|
for (int i = 0; i < dirs.size(); i++) {
|
||||||
|
paths.add(new Path(dirs.get(i)));
|
||||||
|
}
|
||||||
|
return paths;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -269,7 +269,7 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
localResources, nmPrivateClasspathJarDir);
|
localResources, nmPrivateClasspathJarDir);
|
||||||
|
|
||||||
// Write out the environment
|
// Write out the environment
|
||||||
writeLaunchEnv(containerScriptOutStream, environment, localResources,
|
exec.writeLaunchEnv(containerScriptOutStream, environment, localResources,
|
||||||
launchContext.getCommands());
|
launchContext.getCommands());
|
||||||
|
|
||||||
// /////////// End of writing out container-script
|
// /////////// End of writing out container-script
|
||||||
|
@ -502,8 +502,7 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
public static abstract class ShellScriptBuilder {
|
||||||
static abstract class ShellScriptBuilder {
|
|
||||||
public static ShellScriptBuilder create() {
|
public static ShellScriptBuilder create() {
|
||||||
return Shell.WINDOWS ? new WindowsShellScriptBuilder() :
|
return Shell.WINDOWS ? new WindowsShellScriptBuilder() :
|
||||||
new UnixShellScriptBuilder();
|
new UnixShellScriptBuilder();
|
||||||
|
@ -793,37 +792,6 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
meta.getKey(), meta.getValue(), environment);
|
meta.getKey(), meta.getValue(), environment);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void writeLaunchEnv(OutputStream out,
|
|
||||||
Map<String,String> environment, Map<Path,List<String>> resources,
|
|
||||||
List<String> command)
|
|
||||||
throws IOException {
|
|
||||||
ShellScriptBuilder sb = ShellScriptBuilder.create();
|
|
||||||
if (environment != null) {
|
|
||||||
for (Map.Entry<String,String> env : environment.entrySet()) {
|
|
||||||
sb.env(env.getKey().toString(), env.getValue().toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (resources != null) {
|
|
||||||
for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
|
|
||||||
for (String linkName : entry.getValue()) {
|
|
||||||
sb.symlink(entry.getKey(), new Path(linkName));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sb.command(command);
|
|
||||||
|
|
||||||
PrintStream pout = null;
|
|
||||||
try {
|
|
||||||
pout = new PrintStream(out);
|
|
||||||
sb.write(pout);
|
|
||||||
} finally {
|
|
||||||
if (out != null) {
|
|
||||||
out.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String getExitCodeFile(String pidFile) {
|
public static String getExitCodeFile(String pidFile) {
|
||||||
return pidFile + EXIT_CODE_FILE_SUFFIX;
|
return pidFile + EXIT_CODE_FILE_SUFFIX;
|
||||||
|
|
|
@ -0,0 +1,213 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
|
import com.google.common.base.Strings;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.LineNumberReader;
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is intended to test the DockerContainerExecutor code, but it requires docker
|
||||||
|
* to be installed.
|
||||||
|
* <br><ol>
|
||||||
|
* <li>Install docker, and Compile the code with docker-service-url set to the host and port
|
||||||
|
* where docker service is running.
|
||||||
|
* <br><pre><code>
|
||||||
|
* > mvn clean install -Ddocker-service-url=tcp://0.0.0.0:4243
|
||||||
|
* -DskipTests
|
||||||
|
* </code></pre>
|
||||||
|
*/
|
||||||
|
public class TestDockerContainerExecutor {
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(TestDockerContainerExecutor.class);
|
||||||
|
private static File workSpace = null;
|
||||||
|
private DockerContainerExecutor exec = null;
|
||||||
|
private LocalDirsHandlerService dirsHandler;
|
||||||
|
private Path workDir;
|
||||||
|
private FileContext lfs;
|
||||||
|
private String yarnImage;
|
||||||
|
|
||||||
|
private int id = 0;
|
||||||
|
private String appSubmitter;
|
||||||
|
private String dockerUrl;
|
||||||
|
private String testImage = "centos";
|
||||||
|
private String dockerExec;
|
||||||
|
private String containerIdStr;
|
||||||
|
|
||||||
|
|
||||||
|
private ContainerId getNextContainerId() {
|
||||||
|
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
|
||||||
|
String id = "CONTAINER_" + System.currentTimeMillis();
|
||||||
|
when(cId.toString()).thenReturn(id);
|
||||||
|
return cId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
try {
|
||||||
|
lfs = FileContext.getLocalFSFileContext();
|
||||||
|
workDir = new Path("/tmp/temp-" + System.currentTimeMillis());
|
||||||
|
workSpace = new File(workDir.toUri().getPath());
|
||||||
|
lfs.mkdir(workDir, FsPermission.getDirDefault(), true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
yarnImage = "yarnImage";
|
||||||
|
long time = System.currentTimeMillis();
|
||||||
|
conf.set(YarnConfiguration.NM_LOCAL_DIRS, "/tmp/nm-local-dir" + time);
|
||||||
|
conf.set(YarnConfiguration.NM_LOG_DIRS, "/tmp/userlogs" + time);
|
||||||
|
|
||||||
|
dockerUrl = System.getProperty("docker-service-url");
|
||||||
|
LOG.info("dockerUrl: " + dockerUrl);
|
||||||
|
if (Strings.isNullOrEmpty(dockerUrl)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
dockerUrl = " -H " + dockerUrl;
|
||||||
|
dockerExec = "docker " + dockerUrl;
|
||||||
|
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, yarnImage);
|
||||||
|
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, dockerExec);
|
||||||
|
exec = new DockerContainerExecutor();
|
||||||
|
dirsHandler = new LocalDirsHandlerService();
|
||||||
|
dirsHandler.init(conf);
|
||||||
|
exec.setConf(conf);
|
||||||
|
appSubmitter = System.getProperty("application.submitter");
|
||||||
|
if (appSubmitter == null || appSubmitter.isEmpty()) {
|
||||||
|
appSubmitter = "nobody";
|
||||||
|
}
|
||||||
|
shellExec(dockerExec + " pull " + testImage);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private Shell.ShellCommandExecutor shellExec(String command) {
|
||||||
|
try {
|
||||||
|
|
||||||
|
Shell.ShellCommandExecutor shExec = new Shell.ShellCommandExecutor(
|
||||||
|
command.split("\\s+"),
|
||||||
|
new File(workDir.toUri().getPath()),
|
||||||
|
System.getenv());
|
||||||
|
shExec.execute();
|
||||||
|
return shExec;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean shouldRun() {
|
||||||
|
return exec != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int runAndBlock(ContainerId cId, Map<String, String> launchCtxEnv, String... cmd) throws IOException {
|
||||||
|
String appId = "APP_" + System.currentTimeMillis();
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||||
|
|
||||||
|
when(container.getContainerId()).thenReturn(cId);
|
||||||
|
when(container.getLaunchContext()).thenReturn(context);
|
||||||
|
when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId);
|
||||||
|
when(context.getEnvironment()).thenReturn(launchCtxEnv);
|
||||||
|
|
||||||
|
String script = writeScriptFile(launchCtxEnv, cmd);
|
||||||
|
|
||||||
|
Path scriptPath = new Path(script);
|
||||||
|
Path tokensPath = new Path("/dev/null");
|
||||||
|
Path workDir = new Path(workSpace.getAbsolutePath());
|
||||||
|
Path pidFile = new Path(workDir, "pid.txt");
|
||||||
|
|
||||||
|
exec.activateContainer(cId, pidFile);
|
||||||
|
return exec.launchContainer(container, scriptPath, tokensPath,
|
||||||
|
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
|
||||||
|
dirsHandler.getLogDirs());
|
||||||
|
}
|
||||||
|
|
||||||
|
private String writeScriptFile(Map<String, String> launchCtxEnv, String... cmd) throws IOException {
|
||||||
|
File f = File.createTempFile("TestDockerContainerExecutor", ".sh");
|
||||||
|
f.deleteOnExit();
|
||||||
|
PrintWriter p = new PrintWriter(new FileOutputStream(f));
|
||||||
|
for(Map.Entry<String, String> entry: launchCtxEnv.entrySet()) {
|
||||||
|
p.println("export " + entry.getKey() + "=\"" + entry.getValue() + "\"");
|
||||||
|
}
|
||||||
|
for (String part : cmd) {
|
||||||
|
p.print(part.replace("\\", "\\\\").replace("'", "\\'"));
|
||||||
|
p.print(" ");
|
||||||
|
}
|
||||||
|
p.println();
|
||||||
|
p.close();
|
||||||
|
return f.getAbsolutePath();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
try {
|
||||||
|
lfs.delete(workDir, true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLaunchContainer() throws IOException {
|
||||||
|
if (!shouldRun()) {
|
||||||
|
LOG.warn("Docker not installed, aborting test.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, String> env = new HashMap<String, String>();
|
||||||
|
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
||||||
|
String touchFileName = "touch-file-" + System.currentTimeMillis();
|
||||||
|
File touchFile = new File(dirsHandler.getLocalDirs().get(0), touchFileName);
|
||||||
|
ContainerId cId = getNextContainerId();
|
||||||
|
int ret = runAndBlock(
|
||||||
|
cId, env, "touch", touchFile.getAbsolutePath(), "&&", "cp", touchFile.getAbsolutePath(), "/");
|
||||||
|
|
||||||
|
assertEquals(0, ret);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,259 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
|
import com.google.common.base.Strings;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.LineNumberReader;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mock tests for docker container executor
|
||||||
|
*/
|
||||||
|
public class TestDockerContainerExecutorWithMocks {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(TestDockerContainerExecutorWithMocks.class);
|
||||||
|
public static final String DOCKER_LAUNCH_COMMAND = "/bin/true";
|
||||||
|
private DockerContainerExecutor dockerContainerExecutor = null;
|
||||||
|
private LocalDirsHandlerService dirsHandler;
|
||||||
|
private Path workDir;
|
||||||
|
private FileContext lfs;
|
||||||
|
private String yarnImage;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
assumeTrue(!Path.WINDOWS);
|
||||||
|
File f = new File("./src/test/resources/mock-container-executor");
|
||||||
|
if(!FileUtil.canExecute(f)) {
|
||||||
|
FileUtil.setExecutable(f, true);
|
||||||
|
}
|
||||||
|
String executorPath = f.getAbsolutePath();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
yarnImage = "yarnImage";
|
||||||
|
long time = System.currentTimeMillis();
|
||||||
|
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
|
||||||
|
conf.set(YarnConfiguration.NM_LOCAL_DIRS, "/tmp/nm-local-dir" + time);
|
||||||
|
conf.set(YarnConfiguration.NM_LOG_DIRS, "/tmp/userlogs" + time);
|
||||||
|
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, yarnImage);
|
||||||
|
conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME , DOCKER_LAUNCH_COMMAND);
|
||||||
|
dockerContainerExecutor = new DockerContainerExecutor();
|
||||||
|
dirsHandler = new LocalDirsHandlerService();
|
||||||
|
dirsHandler.init(conf);
|
||||||
|
dockerContainerExecutor.setConf(conf);
|
||||||
|
lfs = null;
|
||||||
|
try {
|
||||||
|
lfs = FileContext.getLocalFSFileContext();
|
||||||
|
workDir = new Path("/tmp/temp-"+ System.currentTimeMillis());
|
||||||
|
lfs.mkdir(workDir, FsPermission.getDirDefault(), true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
try {
|
||||||
|
lfs.delete(workDir, true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalStateException.class)
|
||||||
|
public void testContainerInitSecure() throws IOException {
|
||||||
|
dockerContainerExecutor.getConf().set(
|
||||||
|
CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
||||||
|
dockerContainerExecutor.init();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testContainerLaunchNullImage() throws IOException {
|
||||||
|
String appSubmitter = "nobody";
|
||||||
|
String appId = "APP_ID";
|
||||||
|
String containerId = "CONTAINER_ID";
|
||||||
|
String testImage = "";
|
||||||
|
|
||||||
|
Container container = mock(Container.class, RETURNS_DEEP_STUBS);
|
||||||
|
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
|
||||||
|
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||||
|
HashMap<String, String> env = new HashMap<String,String>();
|
||||||
|
|
||||||
|
when(container.getContainerId()).thenReturn(cId);
|
||||||
|
when(container.getLaunchContext()).thenReturn(context);
|
||||||
|
when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId);
|
||||||
|
when(cId.toString()).thenReturn(containerId);
|
||||||
|
|
||||||
|
when(context.getEnvironment()).thenReturn(env);
|
||||||
|
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
||||||
|
dockerContainerExecutor.getConf()
|
||||||
|
.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
||||||
|
Path scriptPath = new Path("file:///bin/echo");
|
||||||
|
Path tokensPath = new Path("file:///dev/null");
|
||||||
|
|
||||||
|
Path pidFile = new Path(workDir, "pid.txt");
|
||||||
|
|
||||||
|
dockerContainerExecutor.activateContainer(cId, pidFile);
|
||||||
|
dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath,
|
||||||
|
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
|
||||||
|
dirsHandler.getLogDirs());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testContainerLaunchInvalidImage() throws IOException {
|
||||||
|
String appSubmitter = "nobody";
|
||||||
|
String appId = "APP_ID";
|
||||||
|
String containerId = "CONTAINER_ID";
|
||||||
|
String testImage = "testrepo.com/test-image rm -rf $HADOOP_PREFIX/*";
|
||||||
|
|
||||||
|
Container container = mock(Container.class, RETURNS_DEEP_STUBS);
|
||||||
|
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
|
||||||
|
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||||
|
HashMap<String, String> env = new HashMap<String,String>();
|
||||||
|
|
||||||
|
when(container.getContainerId()).thenReturn(cId);
|
||||||
|
when(container.getLaunchContext()).thenReturn(context);
|
||||||
|
when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId);
|
||||||
|
when(cId.toString()).thenReturn(containerId);
|
||||||
|
|
||||||
|
when(context.getEnvironment()).thenReturn(env);
|
||||||
|
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
||||||
|
dockerContainerExecutor.getConf()
|
||||||
|
.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
||||||
|
Path scriptPath = new Path("file:///bin/echo");
|
||||||
|
Path tokensPath = new Path("file:///dev/null");
|
||||||
|
|
||||||
|
Path pidFile = new Path(workDir, "pid.txt");
|
||||||
|
|
||||||
|
dockerContainerExecutor.activateContainer(cId, pidFile);
|
||||||
|
dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath,
|
||||||
|
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
|
||||||
|
dirsHandler.getLogDirs());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerLaunch() throws IOException {
|
||||||
|
String appSubmitter = "nobody";
|
||||||
|
String appId = "APP_ID";
|
||||||
|
String containerId = "CONTAINER_ID";
|
||||||
|
String testImage = "\"sequenceiq/hadoop-docker:2.4.1\"";
|
||||||
|
|
||||||
|
Container container = mock(Container.class, RETURNS_DEEP_STUBS);
|
||||||
|
ContainerId cId = mock(ContainerId.class, RETURNS_DEEP_STUBS);
|
||||||
|
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||||
|
HashMap<String, String> env = new HashMap<String,String>();
|
||||||
|
|
||||||
|
when(container.getContainerId()).thenReturn(cId);
|
||||||
|
when(container.getLaunchContext()).thenReturn(context);
|
||||||
|
when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId);
|
||||||
|
when(cId.toString()).thenReturn(containerId);
|
||||||
|
|
||||||
|
when(context.getEnvironment()).thenReturn(env);
|
||||||
|
env.put(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, testImage);
|
||||||
|
Path scriptPath = new Path("file:///bin/echo");
|
||||||
|
Path tokensPath = new Path("file:///dev/null");
|
||||||
|
|
||||||
|
Path pidFile = new Path(workDir, "pid");
|
||||||
|
|
||||||
|
dockerContainerExecutor.activateContainer(cId, pidFile);
|
||||||
|
int ret = dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath,
|
||||||
|
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
|
||||||
|
dirsHandler.getLogDirs());
|
||||||
|
assertEquals(0, ret);
|
||||||
|
//get the script
|
||||||
|
Path sessionScriptPath = new Path(workDir,
|
||||||
|
Shell.appendScriptExtension(
|
||||||
|
DockerContainerExecutor.DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT));
|
||||||
|
LineNumberReader lnr = new LineNumberReader(new FileReader(sessionScriptPath.toString()));
|
||||||
|
boolean cmdFound = false;
|
||||||
|
List<String> localDirs = dirsToMount(dirsHandler.getLocalDirs());
|
||||||
|
List<String> logDirs = dirsToMount(dirsHandler.getLogDirs());
|
||||||
|
List<String> workDirMount = dirsToMount(Collections.singletonList(workDir.toUri().getPath()));
|
||||||
|
List<String> expectedCommands = new ArrayList<String>(
|
||||||
|
Arrays.asList(DOCKER_LAUNCH_COMMAND, "run", "--rm", "--net=host", "--name", containerId));
|
||||||
|
expectedCommands.addAll(localDirs);
|
||||||
|
expectedCommands.addAll(logDirs);
|
||||||
|
expectedCommands.addAll(workDirMount);
|
||||||
|
String shellScript = workDir + "/launch_container.sh";
|
||||||
|
|
||||||
|
expectedCommands.addAll(Arrays.asList(testImage.replaceAll("['\"]", ""), "bash","\"" + shellScript + "\""));
|
||||||
|
|
||||||
|
String expectedPidString = "echo `/bin/true inspect --format {{.State.Pid}} " + containerId+"` > "+ pidFile.toString() + ".tmp";
|
||||||
|
boolean pidSetterFound = false;
|
||||||
|
while(lnr.ready()){
|
||||||
|
String line = lnr.readLine();
|
||||||
|
LOG.debug("line: " + line);
|
||||||
|
if (line.startsWith(DOCKER_LAUNCH_COMMAND)){
|
||||||
|
List<String> command = new ArrayList<String>();
|
||||||
|
for( String s :line.split("\\s+")){
|
||||||
|
command.add(s.trim());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(expectedCommands, command);
|
||||||
|
cmdFound = true;
|
||||||
|
} else if (line.startsWith("echo")) {
|
||||||
|
assertEquals(expectedPidString, line);
|
||||||
|
pidSetterFound = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
assertTrue(cmdFound);
|
||||||
|
assertTrue(pidSetterFound);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> dirsToMount(List<String> dirs) {
|
||||||
|
List<String> localDirs = new ArrayList<String>();
|
||||||
|
for(String dir: dirs){
|
||||||
|
localDirs.add("-v");
|
||||||
|
localDirs.add(dir + ":" + dir);
|
||||||
|
}
|
||||||
|
return localDirs;
|
||||||
|
}
|
||||||
|
}
|
|
@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.event.Event;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||||
|
@ -144,7 +145,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||||
commands.add("/bin/sh ./\\\"" + badSymlink + "\\\"");
|
commands.add("/bin/sh ./\\\"" + badSymlink + "\\\"");
|
||||||
}
|
}
|
||||||
|
|
||||||
ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
|
new DefaultContainerExecutor().writeLaunchEnv(fos, env, resources, commands);
|
||||||
fos.flush();
|
fos.flush();
|
||||||
fos.close();
|
fos.close();
|
||||||
FileUtil.setExecutable(tempFile, true);
|
FileUtil.setExecutable(tempFile, true);
|
||||||
|
@ -211,7 +212,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||||
} else {
|
} else {
|
||||||
commands.add("/bin/sh ./\\\"" + symLink + "\\\"");
|
commands.add("/bin/sh ./\\\"" + symLink + "\\\"");
|
||||||
}
|
}
|
||||||
ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
|
new DefaultContainerExecutor().writeLaunchEnv(fos, env, resources, commands);
|
||||||
fos.flush();
|
fos.flush();
|
||||||
fos.close();
|
fos.close();
|
||||||
FileUtil.setExecutable(tempFile, true);
|
FileUtil.setExecutable(tempFile, true);
|
||||||
|
@ -264,7 +265,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||||
"\"workflowName\":\"\n\ninsert table " +
|
"\"workflowName\":\"\n\ninsert table " +
|
||||||
"\npartition (cd_education_status)\nselect cd_demo_sk, cd_gender, " );
|
"\npartition (cd_education_status)\nselect cd_demo_sk, cd_gender, " );
|
||||||
List<String> commands = new ArrayList<String>();
|
List<String> commands = new ArrayList<String>();
|
||||||
ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
|
new DefaultContainerExecutor().writeLaunchEnv(fos, env, resources, commands);
|
||||||
fos.flush();
|
fos.flush();
|
||||||
fos.close();
|
fos.close();
|
||||||
|
|
||||||
|
@ -341,7 +342,8 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||||
Map<String, String> env = new HashMap<String, String>();
|
Map<String, String> env = new HashMap<String, String>();
|
||||||
List<String> commands = new ArrayList<String>();
|
List<String> commands = new ArrayList<String>();
|
||||||
commands.add(command);
|
commands.add(command);
|
||||||
ContainerLaunch.writeLaunchEnv(fos, env, resources, commands);
|
ContainerExecutor exec = new DefaultContainerExecutor();
|
||||||
|
exec.writeLaunchEnv(fos, env, resources, commands);
|
||||||
fos.flush();
|
fos.flush();
|
||||||
fos.close();
|
fos.close();
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,200 @@
|
||||||
|
|
||||||
|
~~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
~~ you may not use this file except in compliance with the License.
|
||||||
|
~~ You may obtain a copy of the License at
|
||||||
|
~~
|
||||||
|
~~ http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
~~
|
||||||
|
~~ Unless required by applicable law or agreed to in writing, software
|
||||||
|
~~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
~~ See the License for the specific language governing permissions and
|
||||||
|
~~ limitations under the License. See accompanying LICENSE file.
|
||||||
|
|
||||||
|
---
|
||||||
|
Hadoop Map Reduce Next Generation-${project.version} - Docker Container Executor
|
||||||
|
---
|
||||||
|
---
|
||||||
|
${maven.build.timestamp}
|
||||||
|
|
||||||
|
Docker Container Executor
|
||||||
|
|
||||||
|
%{toc|section=1|fromDepth=0}
|
||||||
|
|
||||||
|
* {Overview}
|
||||||
|
|
||||||
|
Docker (https://www.docker.io/) combines an easy-to-use interface to
|
||||||
|
Linux containers with easy-to-construct image files for those
|
||||||
|
containers. In short, Docker launches very light weight virtual
|
||||||
|
machines.
|
||||||
|
|
||||||
|
The Docker Container Executor (DCE) allows the YARN NodeManager to
|
||||||
|
launch YARN containers into Docker containers. Users can specify the
|
||||||
|
Docker images they want for their YARN containers. These containers
|
||||||
|
provide a custom software environment in which the user's code runs,
|
||||||
|
isolated from the software environment of the NodeManager. These
|
||||||
|
containers can include special libraries needed by the application,
|
||||||
|
and they can have different versions of Perl, Python, and even Java
|
||||||
|
than what is installed on the NodeManager. Indeed, these containers
|
||||||
|
can run a different flavor of Linux than what is running on the
|
||||||
|
NodeManager -- although the YARN container must define all the environments
|
||||||
|
and libraries needed to run the job, nothing will be shared with the NodeManager.
|
||||||
|
|
||||||
|
Docker for YARN provides both consistency (all YARN containers will
|
||||||
|
have the same software environment) and isolation (no interference
|
||||||
|
with whatever is installed on the physical machine).
|
||||||
|
|
||||||
|
* {Cluster Configuration}
|
||||||
|
|
||||||
|
Docker Container Executor runs in non-secure mode of HDFS and
|
||||||
|
YARN. It will not run in secure mode, and will exit if it detects
|
||||||
|
secure mode.
|
||||||
|
|
||||||
|
The DockerContainerExecutor requires Docker daemon to be running on
|
||||||
|
the NodeManagers, and the Docker client installed and able to start Docker
|
||||||
|
containers. To prevent timeouts while starting jobs, the Docker
|
||||||
|
images to be used by a job should already be downloaded in the
|
||||||
|
NodeManagers. Here's an example of how this can be done:
|
||||||
|
|
||||||
|
----
|
||||||
|
sudo docker pull sequenceiq/hadoop-docker:2.4.1
|
||||||
|
----
|
||||||
|
|
||||||
|
This should be done as part of the NodeManager startup.
|
||||||
|
|
||||||
|
The following properties must be set in yarn-site.xml:
|
||||||
|
|
||||||
|
----
|
||||||
|
<property>
|
||||||
|
<name>yarn.nodemanager.docker-container-executor.exec-name</name>
|
||||||
|
<value>/usr/bin/docker</value>
|
||||||
|
<description>
|
||||||
|
Name or path to the Docker client. This is a required parameter. If this is empty,
|
||||||
|
user must pass an image name as part of the job invocation(see below).
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.nodemanager.container-executor.class</name>
|
||||||
|
<value>org.apache.hadoop.yarn.server.nodemanager.DockerContainerExecutor</value>
|
||||||
|
<description>
|
||||||
|
This is the container executor setting that ensures that all
|
||||||
|
jobs are started with the DockerContainerExecutor.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
----
|
||||||
|
|
||||||
|
Administrators should be aware that DCE doesn't currently provide
|
||||||
|
user name-space isolation. This means, in particular, that software
|
||||||
|
running as root in the YARN container will have root privileges in the
|
||||||
|
underlying NodeManager. Put differently, DCE currently provides no
|
||||||
|
better security guarantees than YARN's Default Container Executor. In
|
||||||
|
fact, DockerContainerExecutor will exit if it detects secure yarn.
|
||||||
|
|
||||||
|
* {Tips for connecting to a secure docker repository}
|
||||||
|
|
||||||
|
By default, docker images are pulled from the docker public repository. The
|
||||||
|
format of a docker image url is: <username>/<image_name>. For example,
|
||||||
|
sequenceiq/hadoop-docker:2.4.1 is an image in docker public repository that contains java and
|
||||||
|
hadoop.
|
||||||
|
|
||||||
|
If you want your own private repository, you provide the repository url instead of
|
||||||
|
your username. Therefore, the image url becomes: <private_repo_url>/<image_name>.
|
||||||
|
For example, if your repository is on localhost:8080, your images would be like:
|
||||||
|
localhost:8080/hadoop-docker
|
||||||
|
|
||||||
|
To connect to a secure docker repository, you can use the following invocation:
|
||||||
|
|
||||||
|
----
|
||||||
|
docker login [OPTIONS] [SERVER]
|
||||||
|
|
||||||
|
Register or log in to a Docker registry server, if no server is specified
|
||||||
|
"https://index.docker.io/v1/" is the default.
|
||||||
|
|
||||||
|
-e, --email="" Email
|
||||||
|
-p, --password="" Password
|
||||||
|
-u, --username="" Username
|
||||||
|
----
|
||||||
|
|
||||||
|
If you want to login to a self-hosted registry you can specify this by adding
|
||||||
|
the server name.
|
||||||
|
|
||||||
|
----
|
||||||
|
docker login <private_repo_url>
|
||||||
|
----
|
||||||
|
|
||||||
|
This needs to be run as part of the NodeManager startup, or as a cron job if
|
||||||
|
the login session expires periodically. You can login to multiple docker repositories
|
||||||
|
from the same NodeManager, but all your users will have access to all your repositories,
|
||||||
|
as at present the DockerContainerExecutor does not support per-job docker login.
|
||||||
|
|
||||||
|
* {Job Configuration}
|
||||||
|
|
||||||
|
Currently you cannot configure any of the Docker settings with the job configuration.
|
||||||
|
You can provide Mapper, Reducer, and ApplicationMaster environment overrides for the
|
||||||
|
docker images, using the following 3 JVM properties respectively(only for MR jobs):
|
||||||
|
* mapreduce.map.env: You can override the mapper's image by passing
|
||||||
|
yarn.nodemanager.docker-container-executor.image-name=<your_image_name>
|
||||||
|
to this JVM property.
|
||||||
|
* mapreduce.reduce.env: You can override the reducer's image by passing
|
||||||
|
yarn.nodemanager.docker-container-executor.image-name=<your_image_name>
|
||||||
|
to this JVM property.
|
||||||
|
* yarn.app.mapreduce.am.env: You can override the ApplicationMaster's image
|
||||||
|
by passing yarn.nodemanager.docker-container-executor.image-name=<your_image_name>
|
||||||
|
to this JVM property.
|
||||||
|
|
||||||
|
* {Docker Image requirements}
|
||||||
|
|
||||||
|
The Docker Images used for YARN containers must meet the following
|
||||||
|
requirements:
|
||||||
|
|
||||||
|
The distro and version of Linux in your Docker Image can be quite different
|
||||||
|
from that of your NodeManager. (Docker does have a few limitations in this
|
||||||
|
regard, but you're not likely to hit them.) However, if you're using the
|
||||||
|
MapReduce framework, then your image will need to be configured for running
|
||||||
|
Hadoop. Java must be installed in the container, and the following environment variables
|
||||||
|
must be defined in the image: JAVA_HOME, HADOOP_COMMON_PATH, HADOOP_HDFS_HOME,
|
||||||
|
HADOOP_MAPRED_HOME, HADOOP_YARN_HOME, and HADOOP_CONF_DIR
|
||||||
|
|
||||||
|
|
||||||
|
* {Working example of yarn launched docker containers.}
|
||||||
|
|
||||||
|
The following example shows how to run teragen using DockerContainerExecutor.
|
||||||
|
|
||||||
|
* First ensure that YARN is properly configured with DockerContainerExecutor(see above).
|
||||||
|
|
||||||
|
----
|
||||||
|
<property>
|
||||||
|
<name>yarn.nodemanager.docker-container-executor.exec-name</name>
|
||||||
|
<value>docker -H=tcp://0.0.0.0:4243</value>
|
||||||
|
<description>
|
||||||
|
Name or path to the Docker client. The tcp socket must be
|
||||||
|
where docker daemon is listening.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.nodemanager.container-executor.class</name>
|
||||||
|
<value>org.apache.hadoop.yarn.server.nodemanager.DockerContainerExecutor</value>
|
||||||
|
<description>
|
||||||
|
This is the container executor setting that ensures that all
|
||||||
|
jobs are started with the DockerContainerExecutor.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
----
|
||||||
|
|
||||||
|
* Pick a custom Docker image if you want. In this example, we'll use sequenceiq/hadoop-docker:2.4.1 from the
|
||||||
|
docker hub repository. It has jdk, hadoop, and all the previously mentioned environment variables configured.
|
||||||
|
* Run:
|
||||||
|
|
||||||
|
----
|
||||||
|
hadoop jar $HADOOP_INSTALLATION_DIR/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar \
|
||||||
|
teragen \
|
||||||
|
-Dmapreduce.map.env="yarn.nodemanager.docker-container-executor.image-name=sequenceiq/hadoop-docker:2.4.1" \
|
||||||
|
-Dyarn.app.mapreduce.am.env="yarn.nodemanager.docker-container-executor.image-name=sequenceiq/hadoop-docker:2.4.1" \
|
||||||
|
1000 \
|
||||||
|
teragen_out_dir
|
||||||
|
----
|
||||||
|
|
||||||
|
Once it succeeds, you can check the yarn debug logs to verify that docker indeed has launched containers.
|
||||||
|
|
Loading…
Reference in New Issue