YARN-7654. Support ENTRY_POINT for docker container. Contributed by Eric Yang

This commit is contained in:
Jason Lowe 2018-05-11 18:56:05 -05:00
parent 4b4f24ad5f
commit 6c8e51ca7e
12 changed files with 467 additions and 87 deletions

View File

@ -244,7 +244,14 @@ public enum Environment {
* Comma separate list of directories that the container should use for * Comma separate list of directories that the container should use for
* logging. * logging.
*/ */
LOG_DIRS("LOG_DIRS"); LOG_DIRS("LOG_DIRS"),
/**
* $YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE
* Final, Docker run support ENTRY_POINT.
*/
YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE(
"YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE");
private final String variable; private final String variable;
private Environment(String variable) { private Environment(String variable) {

View File

@ -58,23 +58,26 @@ public abstract void processArtifact(AbstractLauncher launcher,
Service service) Service service)
throws IOException; throws IOException;
public void buildContainerLaunchContext(AbstractLauncher launcher, public Map<String, String> buildContainerTokens(ComponentInstance instance,
Container container,
ContainerLaunchService.ComponentLaunchContext compLaunchContext) {
// Generate tokens (key-value pair) for config substitution.
// Get pre-defined tokens
Map<String, String> globalTokens =
instance.getComponent().getScheduler().globalTokens;
Map<String, String> tokensForSubstitution = ProviderUtils
.initCompTokensForSubstitute(instance, container,
compLaunchContext);
tokensForSubstitution.putAll(globalTokens);
return tokensForSubstitution;
}
public void buildContainerEnvironment(AbstractLauncher launcher,
Service service, ComponentInstance instance, Service service, ComponentInstance instance,
SliderFileSystem fileSystem, Configuration yarnConf, Container container, SliderFileSystem fileSystem, Configuration yarnConf, Container container,
ContainerLaunchService.ComponentLaunchContext compLaunchContext) ContainerLaunchService.ComponentLaunchContext compLaunchContext,
throws IOException, SliderException { Map<String, String> tokensForSubstitution)
processArtifact(launcher, instance, fileSystem, service); throws IOException, SliderException {
ServiceContext context =
instance.getComponent().getScheduler().getContext();
// Generate tokens (key-value pair) for config substitution.
// Get pre-defined tokens
Map<String, String> globalTokens =
instance.getComponent().getScheduler().globalTokens;
Map<String, String> tokensForSubstitution = ProviderUtils
.initCompTokensForSubstitute(instance, container,
compLaunchContext);
tokensForSubstitution.putAll(globalTokens);
// Set the environment variables in launcher // Set the environment variables in launcher
launcher.putEnv(ServiceUtils.buildEnvMap( launcher.putEnv(ServiceUtils.buildEnvMap(
compLaunchContext.getConfiguration(), tokensForSubstitution)); compLaunchContext.getConfiguration(), tokensForSubstitution));
@ -90,17 +93,14 @@ public void buildContainerLaunchContext(AbstractLauncher launcher,
for (Entry<String, String> entry : launcher.getEnv().entrySet()) { for (Entry<String, String> entry : launcher.getEnv().entrySet()) {
tokensForSubstitution.put($(entry.getKey()), entry.getValue()); tokensForSubstitution.put($(entry.getKey()), entry.getValue());
} }
//TODO add component host tokens? }
// ProviderUtils.addComponentHostTokens(tokensForSubstitution, amState);
// create config file on hdfs and add local resource
ProviderUtils.createConfigFileAndAddLocalResource(launcher, fileSystem,
compLaunchContext, tokensForSubstitution, instance, context);
// handles static files (like normal file / archive file) for localization.
ProviderUtils.handleStaticFilesForLocalization(launcher, fileSystem,
compLaunchContext);
public void buildContainerLaunchCommand(AbstractLauncher launcher,
Service service, ComponentInstance instance,
SliderFileSystem fileSystem, Configuration yarnConf, Container container,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
Map<String, String> tokensForSubstitution)
throws IOException, SliderException {
// substitute launch command // substitute launch command
String launchCommand = compLaunchContext.getLaunchCommand(); String launchCommand = compLaunchContext.getLaunchCommand();
// docker container may have empty commands // docker container may have empty commands
@ -112,10 +112,15 @@ public void buildContainerLaunchContext(AbstractLauncher launcher,
operation.addOutAndErrFiles(OUT_FILE, ERR_FILE); operation.addOutAndErrFiles(OUT_FILE, ERR_FILE);
launcher.addCommand(operation.build()); launcher.addCommand(operation.build());
} }
}
public void buildContainerRetry(AbstractLauncher launcher,
Configuration yarnConf,
ContainerLaunchService.ComponentLaunchContext compLaunchContext) {
// By default retry forever every 30 seconds // By default retry forever every 30 seconds
launcher.setRetryContext( launcher.setRetryContext(
YarnServiceConf.getInt(CONTAINER_RETRY_MAX, DEFAULT_CONTAINER_RETRY_MAX, YarnServiceConf.getInt(CONTAINER_RETRY_MAX,
DEFAULT_CONTAINER_RETRY_MAX,
compLaunchContext.getConfiguration(), yarnConf), compLaunchContext.getConfiguration(), yarnConf),
YarnServiceConf.getInt(CONTAINER_RETRY_INTERVAL, YarnServiceConf.getInt(CONTAINER_RETRY_INTERVAL,
DEFAULT_CONTAINER_RETRY_INTERVAL, DEFAULT_CONTAINER_RETRY_INTERVAL,
@ -124,4 +129,38 @@ public void buildContainerLaunchContext(AbstractLauncher launcher,
DEFAULT_CONTAINER_FAILURES_VALIDITY_INTERVAL, DEFAULT_CONTAINER_FAILURES_VALIDITY_INTERVAL,
compLaunchContext.getConfiguration(), yarnConf)); compLaunchContext.getConfiguration(), yarnConf));
} }
public void buildContainerLaunchContext(AbstractLauncher launcher,
Service service, ComponentInstance instance,
SliderFileSystem fileSystem, Configuration yarnConf, Container container,
ContainerLaunchService.ComponentLaunchContext compLaunchContext)
throws IOException, SliderException {
processArtifact(launcher, instance, fileSystem, service);
ServiceContext context =
instance.getComponent().getScheduler().getContext();
// Generate tokens (key-value pair) for config substitution.
Map<String, String> tokensForSubstitution =
buildContainerTokens(instance, container, compLaunchContext);
// Setup launch context environment
buildContainerEnvironment(launcher, service, instance,
fileSystem, yarnConf, container, compLaunchContext,
tokensForSubstitution);
// create config file on hdfs and add local resource
ProviderUtils.createConfigFileAndAddLocalResource(launcher, fileSystem,
compLaunchContext, tokensForSubstitution, instance, context);
// handles static files (like normal file / archive file) for localization.
ProviderUtils.handleStaticFilesForLocalization(launcher, fileSystem,
compLaunchContext);
// replace launch command with token specific information
buildContainerLaunchCommand(launcher, service, instance, fileSystem,
yarnConf, container, compLaunchContext, tokensForSubstitution);
// Setup container retry settings
buildContainerRetry(launcher, yarnConf, compLaunchContext);
}
} }

View File

@ -17,13 +17,23 @@
*/ */
package org.apache.hadoop.yarn.service.provider.docker; package org.apache.hadoop.yarn.service.provider.docker;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.provider.AbstractProviderService; import org.apache.hadoop.yarn.service.provider.AbstractProviderService;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher; import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
import org.apache.hadoop.yarn.service.containerlaunch.CommandLineBuilder;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
public class DockerProviderService extends AbstractProviderService public class DockerProviderService extends AbstractProviderService
implements DockerKeys { implements DockerKeys {
@ -39,4 +49,36 @@ public void processArtifact(AbstractLauncher launcher,
launcher.setRunPrivilegedContainer( launcher.setRunPrivilegedContainer(
compInstance.getCompSpec().getRunPrivilegedContainer()); compInstance.getCompSpec().getRunPrivilegedContainer());
} }
@Override
public void buildContainerLaunchCommand(AbstractLauncher launcher,
Service service, ComponentInstance instance,
SliderFileSystem fileSystem, Configuration yarnConf, Container container,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
Map<String, String> tokensForSubstitution)
throws IOException, SliderException {
Component component = instance.getComponent().getComponentSpec();
boolean useEntryPoint = Boolean.parseBoolean(component
.getConfiguration().getEnv(Environment
.YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE.name()));
if (useEntryPoint) {
String launchCommand = component.getLaunchCommand();
if (!StringUtils.isEmpty(launchCommand)) {
launcher.addCommand(launchCommand);
}
} else {
// substitute launch command
String launchCommand = compLaunchContext.getLaunchCommand();
// docker container may have empty commands
if (!StringUtils.isEmpty(launchCommand)) {
launchCommand = ProviderUtils
.substituteStrWithTokens(launchCommand, tokensForSubstitution);
CommandLineBuilder operation = new CommandLineBuilder();
operation.add(launchCommand);
operation.addOutAndErrFiles(OUT_FILE, ERR_FILE);
launcher.addCommand(operation.build());
}
}
}
} }

View File

@ -1677,6 +1677,20 @@ public void sanitizeEnv(Map<String, String> environment, Path pwd,
containerLogDirs, Map<Path, List<String>> resources, containerLogDirs, Map<Path, List<String>> resources,
Path nmPrivateClasspathJarDir, Path nmPrivateClasspathJarDir,
Set<String> nmVars) throws IOException { Set<String> nmVars) throws IOException {
// Based on discussion in YARN-7654, for ENTRY_POINT enabled
// docker container, we forward user defined environment variables
// without node manager environment variables. This is the reason
// that we skip sanitizeEnv method.
boolean overrideDisable = Boolean.parseBoolean(
environment.get(
Environment.
YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE.
name()));
if (overrideDisable) {
environment.remove("WORK_DIR");
return;
}
/** /**
* Non-modifiable environment variables * Non-modifiable environment variables
*/ */

View File

@ -235,7 +235,6 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
@InterfaceAudience.Private @InterfaceAudience.Private
public static final String ENV_DOCKER_CONTAINER_DELAYED_REMOVAL = public static final String ENV_DOCKER_CONTAINER_DELAYED_REMOVAL =
"YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL"; "YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL";
private Configuration conf; private Configuration conf;
private Context nmContext; private Context nmContext;
private DockerClient dockerClient; private DockerClient dockerClient;
@ -741,6 +740,8 @@ public void launchContainer(ContainerRuntimeContext ctx)
String imageName = environment.get(ENV_DOCKER_CONTAINER_IMAGE); String imageName = environment.get(ENV_DOCKER_CONTAINER_IMAGE);
String network = environment.get(ENV_DOCKER_CONTAINER_NETWORK); String network = environment.get(ENV_DOCKER_CONTAINER_NETWORK);
String hostname = environment.get(ENV_DOCKER_CONTAINER_HOSTNAME); String hostname = environment.get(ENV_DOCKER_CONTAINER_HOSTNAME);
boolean useEntryPoint = Boolean.parseBoolean(environment
.get(ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE));
if(network == null || network.isEmpty()) { if(network == null || network.isEmpty()) {
network = defaultNetwork; network = defaultNetwork;
@ -802,8 +803,6 @@ public void launchContainer(ContainerRuntimeContext ctx)
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
DockerRunCommand runCommand = new DockerRunCommand(containerIdStr, DockerRunCommand runCommand = new DockerRunCommand(containerIdStr,
dockerRunAsUser, imageName) dockerRunAsUser, imageName)
.detachOnRun()
.setContainerWorkDir(containerWorkDir.toString())
.setNetworkType(network); .setNetworkType(network);
// Only add hostname if network is not host or if Registry DNS is enabled. // Only add hostname if network is not host or if Registry DNS is enabled.
if (!network.equalsIgnoreCase("host") || if (!network.equalsIgnoreCase("host") ||
@ -875,19 +874,22 @@ public void launchContainer(ContainerRuntimeContext ctx)
addCGroupParentIfRequired(resourcesOpts, containerIdStr, runCommand); addCGroupParentIfRequired(resourcesOpts, containerIdStr, runCommand);
String disableOverride = environment.get( if (useEntryPoint) {
ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE); runCommand.setOverrideDisabled(true);
runCommand.addEnv(environment);
if (disableOverride != null && disableOverride.equals("true")) { runCommand.setOverrideCommandWithArgs(container.getLaunchContext()
LOG.info("command override disabled"); .getCommands());
runCommand.disableDetach();
runCommand.setLogDir(container.getLogDir());
} else { } else {
List<String> overrideCommands = new ArrayList<>(); List<String> overrideCommands = new ArrayList<>();
Path launchDst = Path launchDst =
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT); new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
overrideCommands.add("bash"); overrideCommands.add("bash");
overrideCommands.add(launchDst.toUri().getPath()); overrideCommands.add(launchDst.toUri().getPath());
runCommand.setContainerWorkDir(containerWorkDir.toString());
runCommand.setOverrideCommandWithArgs(overrideCommands); runCommand.setOverrideCommandWithArgs(overrideCommands);
runCommand.detachOnRun();
} }
if(enableUserReMapping) { if(enableUserReMapping) {

View File

@ -49,6 +49,7 @@ public final class DockerClient {
LoggerFactory.getLogger(DockerClient.class); LoggerFactory.getLogger(DockerClient.class);
private static final String TMP_FILE_PREFIX = "docker."; private static final String TMP_FILE_PREFIX = "docker.";
private static final String TMP_FILE_SUFFIX = ".cmd"; private static final String TMP_FILE_SUFFIX = ".cmd";
private static final String TMP_ENV_FILE_SUFFIX = ".env";
private final String tmpDirPath; private final String tmpDirPath;
public DockerClient(Configuration conf) throws ContainerExecutionException { public DockerClient(Configuration conf) throws ContainerExecutionException {
@ -69,40 +70,56 @@ public DockerClient(Configuration conf) throws ContainerExecutionException {
public String writeCommandToTempFile(DockerCommand cmd, String filePrefix) public String writeCommandToTempFile(DockerCommand cmd, String filePrefix)
throws ContainerExecutionException { throws ContainerExecutionException {
File dockerCommandFile = null;
try { try {
dockerCommandFile = File.createTempFile(TMP_FILE_PREFIX + filePrefix, File dockerCommandFile = File.createTempFile(TMP_FILE_PREFIX + filePrefix,
TMP_FILE_SUFFIX, new TMP_FILE_SUFFIX, new
File(tmpDirPath)); File(tmpDirPath));
try (
Writer writer = new OutputStreamWriter( Writer writer = new OutputStreamWriter(
new FileOutputStream(dockerCommandFile), "UTF-8"); new FileOutputStream(dockerCommandFile), "UTF-8");
PrintWriter printWriter = new PrintWriter(writer); PrintWriter printWriter = new PrintWriter(writer);
printWriter.println("[docker-command-execution]"); ) {
for (Map.Entry<String, List<String>> entry : printWriter.println("[docker-command-execution]");
cmd.getDockerCommandWithArguments().entrySet()) { for (Map.Entry<String, List<String>> entry :
if (entry.getKey().contains("=")) { cmd.getDockerCommandWithArguments().entrySet()) {
throw new ContainerExecutionException( if (entry.getKey().contains("=")) {
"'=' found in entry for docker command file, key = " + entry throw new ContainerExecutionException(
.getKey() + "; value = " + entry.getValue()); "'=' found in entry for docker command file, key = " + entry
.getKey() + "; value = " + entry.getValue());
}
if (entry.getValue().contains("\n")) {
throw new ContainerExecutionException(
"'\\n' found in entry for docker command file, key = " + entry
.getKey() + "; value = " + entry.getValue());
}
printWriter.println(" " + entry.getKey() + "=" + StringUtils
.join(",", entry.getValue()));
} }
if (entry.getValue().contains("\n")) { return dockerCommandFile.getAbsolutePath();
throw new ContainerExecutionException(
"'\\n' found in entry for docker command file, key = " + entry
.getKey() + "; value = " + entry.getValue());
}
printWriter.println(" " + entry.getKey() + "=" + StringUtils
.join(",", entry.getValue()));
} }
printWriter.close();
return dockerCommandFile.getAbsolutePath();
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unable to write docker command to temporary file!"); LOG.warn("Unable to write docker command to temporary file!");
throw new ContainerExecutionException(e); throw new ContainerExecutionException(e);
} }
} }
private String writeEnvFile(DockerRunCommand cmd, String filePrefix,
File cmdDir) throws IOException {
File dockerEnvFile = File.createTempFile(TMP_FILE_PREFIX + filePrefix,
TMP_ENV_FILE_SUFFIX, cmdDir);
try (
Writer envWriter = new OutputStreamWriter(
new FileOutputStream(dockerEnvFile), "UTF-8");
PrintWriter envPrintWriter = new PrintWriter(envWriter);
) {
for (Map.Entry<String, String> entry : cmd.getEnv()
.entrySet()) {
envPrintWriter.println(entry.getKey() + "=" + entry.getValue());
}
return dockerEnvFile.getAbsolutePath();
}
}
public String writeCommandToTempFile(DockerCommand cmd, public String writeCommandToTempFile(DockerCommand cmd,
ContainerId containerId, Context nmContext) ContainerId containerId, Context nmContext)
throws ContainerExecutionException { throws ContainerExecutionException {
@ -126,32 +143,38 @@ public String writeCommandToTempFile(DockerCommand cmd,
throw new IOException("Cannot create container private directory " throw new IOException("Cannot create container private directory "
+ cmdDir); + cmdDir);
} }
dockerCommandFile = File.createTempFile(TMP_FILE_PREFIX + filePrefix, dockerCommandFile = File.createTempFile(TMP_FILE_PREFIX + filePrefix,
TMP_FILE_SUFFIX, cmdDir); TMP_FILE_SUFFIX, cmdDir);
try (
Writer writer = new OutputStreamWriter( Writer writer = new OutputStreamWriter(
new FileOutputStream(dockerCommandFile.toString()), "UTF-8"); new FileOutputStream(dockerCommandFile.toString()), "UTF-8");
PrintWriter printWriter = new PrintWriter(writer); PrintWriter printWriter = new PrintWriter(writer);
printWriter.println("[docker-command-execution]"); ) {
for (Map.Entry<String, List<String>> entry : printWriter.println("[docker-command-execution]");
cmd.getDockerCommandWithArguments().entrySet()) { for (Map.Entry<String, List<String>> entry :
if (entry.getKey().contains("=")) { cmd.getDockerCommandWithArguments().entrySet()) {
throw new ContainerExecutionException( if (entry.getKey().contains("=")) {
"'=' found in entry for docker command file, key = " + entry throw new ContainerExecutionException(
.getKey() + "; value = " + entry.getValue()); "'=' found in entry for docker command file, key = " + entry
.getKey() + "; value = " + entry.getValue());
}
if (entry.getValue().contains("\n")) {
throw new ContainerExecutionException(
"'\\n' found in entry for docker command file, key = " + entry
.getKey() + "; value = " + entry.getValue());
}
printWriter.println(" " + entry.getKey() + "=" + StringUtils
.join(",", entry.getValue()));
} }
if (entry.getValue().contains("\n")) { if (cmd instanceof DockerRunCommand) {
throw new ContainerExecutionException( DockerRunCommand runCommand = (DockerRunCommand) cmd;
"'\\n' found in entry for docker command file, key = " + entry if (runCommand.containsEnv()) {
.getKey() + "; value = " + entry.getValue()); String path = writeEnvFile(runCommand, filePrefix, cmdDir);
printWriter.println(" environ=" + path);
}
} }
printWriter.println(" " + entry.getKey() + "=" + StringUtils return dockerCommandFile.toString();
.join(",", entry.getValue()));
} }
printWriter.close();
return dockerCommandFile.toString();
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unable to write docker command to " + cmdDir); LOG.warn("Unable to write docker command to " + cmdDir);
throw new ContainerExecutionException(e); throw new ContainerExecutionException(e);

View File

@ -21,12 +21,14 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker; package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
import java.io.File; import java.io.File;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
public class DockerRunCommand extends DockerCommand { public class DockerRunCommand extends DockerCommand {
private static final String RUN_COMMAND = "run"; private static final String RUN_COMMAND = "run";
private final Map<String, String> userEnv;
/** The following are mandatory: */ /** The following are mandatory: */
public DockerRunCommand(String containerId, String user, String image) { public DockerRunCommand(String containerId, String user, String image) {
@ -34,6 +36,7 @@ public DockerRunCommand(String containerId, String user, String image) {
super.addCommandArguments("name", containerId); super.addCommandArguments("name", containerId);
super.addCommandArguments("user", user); super.addCommandArguments("user", user);
super.addCommandArguments("image", image); super.addCommandArguments("image", image);
this.userEnv = new LinkedHashMap<String, String>();
} }
public DockerRunCommand removeContainerOnExit() { public DockerRunCommand removeContainerOnExit() {
@ -174,4 +177,45 @@ public DockerRunCommand setOverrideCommandWithArgs(
public Map<String, List<String>> getDockerCommandWithArguments() { public Map<String, List<String>> getDockerCommandWithArguments() {
return super.getDockerCommandWithArguments(); return super.getDockerCommandWithArguments();
} }
public DockerRunCommand setOverrideDisabled(boolean toggle) {
String value = Boolean.toString(toggle);
super.addCommandArguments("use-entry-point", value);
return this;
}
public DockerRunCommand setLogDir(String logDir) {
super.addCommandArguments("log-dir", logDir);
return this;
}
/**
* Check if user defined environment variables are empty.
*
* @return true if user defined environment variables are not empty.
*/
public boolean containsEnv() {
if (userEnv.size() > 0) {
return true;
}
return false;
}
/**
* Get user defined environment variables.
*
* @return a map of user defined environment variables
*/
public Map<String, String> getEnv() {
return userEnv;
}
/**
* Add user defined environment variables.
*
* @param environment A map of user defined environment variables
*/
public final void addEnv(Map<String, String> environment) {
userEnv.putAll(environment);
}
} }

View File

@ -94,6 +94,8 @@ static gid_t nm_gid = -1;
struct configuration CFG = {.size=0, .sections=NULL}; struct configuration CFG = {.size=0, .sections=NULL};
struct section executor_cfg = {.size=0, .kv_pairs=NULL}; struct section executor_cfg = {.size=0, .kv_pairs=NULL};
static char *chosen_container_log_dir = NULL;
char *concatenate(char *concat_pattern, char *return_path_name, char *concatenate(char *concat_pattern, char *return_path_name,
int numArgs, ...); int numArgs, ...);
@ -755,8 +757,9 @@ static int create_container_directories(const char* user, const char *app_id,
} else if (mkdirs(container_log_dir, perms) != 0) { } else if (mkdirs(container_log_dir, perms) != 0) {
free(container_log_dir); free(container_log_dir);
} else { } else {
free(container_log_dir);
result = 0; result = 0;
chosen_container_log_dir = strdup(container_log_dir);
free(container_log_dir);
} }
} }
free(combined_name); free(combined_name);
@ -1129,6 +1132,34 @@ char* get_container_log_directory(const char *log_root, const char* app_id,
container_id); container_id);
} }
char *init_log_path(const char *container_log_dir, const char *logfile) {
char *tmp_buffer = NULL;
tmp_buffer = make_string("%s/%s", container_log_dir, logfile);
mode_t permissions = S_IRUSR | S_IWUSR | S_IRGRP;
int fd = open(tmp_buffer, O_CREAT | O_WRONLY, permissions);
if (fd >= 0) {
close(fd);
if (change_owner(tmp_buffer, user_detail->pw_uid, user_detail->pw_gid) != 0) {
fprintf(ERRORFILE, "Failed to chown %s to %d:%d: %s\n", tmp_buffer, user_detail->pw_uid, user_detail->pw_gid,
strerror(errno));
free(tmp_buffer);
tmp_buffer = NULL;
} else if (chmod(tmp_buffer, permissions) != 0) {
fprintf(ERRORFILE, "Can't chmod %s - %s\n",
tmp_buffer, strerror(errno));
free(tmp_buffer);
tmp_buffer = NULL;
}
} else {
fprintf(ERRORFILE, "Failed to create file %s - %s\n", tmp_buffer,
strerror(errno));
free(tmp_buffer);
tmp_buffer = NULL;
}
return tmp_buffer;
}
int create_container_log_dirs(const char *container_id, const char *app_id, int create_container_log_dirs(const char *container_id, const char *app_id,
char * const * log_dirs) { char * const * log_dirs) {
char* const* log_root; char* const* log_root;
@ -1506,6 +1537,7 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
char *docker_inspect_exitcode_command = NULL; char *docker_inspect_exitcode_command = NULL;
int container_file_source =-1; int container_file_source =-1;
int cred_file_source = -1; int cred_file_source = -1;
int use_entry_point = 0;
gid_t user_gid = getegid(); gid_t user_gid = getegid();
uid_t prev_uid = geteuid(); uid_t prev_uid = geteuid();
@ -1560,6 +1592,18 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
goto cleanup; goto cleanup;
} }
use_entry_point = get_use_entry_point_flag();
char *so = init_log_path(chosen_container_log_dir, "stdout.txt");
if (so == NULL) {
exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
goto cleanup;
}
char *se = init_log_path(chosen_container_log_dir, "stderr.txt");
if (se == NULL) {
exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
goto cleanup;
}
docker_command_with_binary = flatten(docker_command); docker_command_with_binary = flatten(docker_command);
// Launch container // Launch container
@ -1573,14 +1617,76 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
} }
if (child_pid == 0) { if (child_pid == 0) {
FILE* so_fd = fopen(so, "a+");
if (so_fd == NULL) {
fprintf(ERRORFILE, "Could not append to %s\n", so);
exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
goto cleanup;
}
FILE* se_fd = fopen(se, "a+");
if (se_fd == NULL) {
fprintf(ERRORFILE, "Could not append to %s\n", se);
exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
fclose(so_fd);
goto cleanup;
}
// if entry point is enabled, clone docker command output
// to stdout.txt and stderr.txt for yarn.
if (use_entry_point) {
fprintf(so_fd, "Launching docker container...\n");
fprintf(so_fd, "Docker run command: %s\n", docker_command_with_binary);
if (dup2(fileno(so_fd), fileno(stdout)) == -1) {
fprintf(ERRORFILE, "Could not append to stdout.txt\n");
fclose(so_fd);
return UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
}
if (dup2(fileno(se_fd), fileno(stderr)) == -1) {
fprintf(ERRORFILE, "Could not append to stderr.txt\n");
fclose(se_fd);
return UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
}
}
fclose(so_fd);
fclose(se_fd);
execvp(docker_binary, docker_command); execvp(docker_binary, docker_command);
fprintf(ERRORFILE, "failed to execute docker command! error: %s\n", strerror(errno)); fprintf(ERRORFILE, "failed to execute docker command! error: %s\n", strerror(errno));
return UNABLE_TO_EXECUTE_CONTAINER_SCRIPT; return UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
} else { } else {
exit_code = wait_and_get_exit_code(child_pid); if (use_entry_point) {
if (exit_code != 0) { int pid = 0;
exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT; int res = 0;
goto cleanup; int count = 0;
int max_retries = get_max_retries(&CFG);
docker_inspect_command = make_string(
"%s inspect --format {{.State.Pid}} %s",
docker_binary, container_id);
// check for docker container pid
while (count < max_retries) {
fprintf(LOGFILE, "Inspecting docker container...\n");
fprintf(LOGFILE, "Docker inspect command: %s\n", docker_inspect_command);
fflush(LOGFILE);
FILE* inspect_docker = popen(docker_inspect_command, "r");
res = fscanf (inspect_docker, "%d", &pid);
fprintf(LOGFILE, "pid from docker inspect: %d\n", pid);
if (pclose (inspect_docker) != 0 || res <= 0) {
fprintf (ERRORFILE,
"Could not inspect docker to get pid %s.\n", docker_inspect_command);
fflush(ERRORFILE);
exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
} else {
if (pid != 0) {
break;
}
}
sleep(3);
count++;
}
} else {
exit_code = wait_and_get_exit_code(child_pid);
if (exit_code != 0) {
exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
goto cleanup;
}
} }
} }

View File

@ -32,6 +32,8 @@
#include <pwd.h> #include <pwd.h>
#include <errno.h> #include <errno.h>
int entry_point = 0;
static int read_and_verify_command_file(const char *command_file, const char *docker_command, static int read_and_verify_command_file(const char *command_file, const char *docker_command,
struct configuration *command_config) { struct configuration *command_config) {
int ret = 0; int ret = 0;
@ -336,6 +338,17 @@ const char *get_docker_error_message(const int error_code) {
} }
} }
int get_max_retries(const struct configuration *conf) {
int retries = 10;
char *max_retries = get_configuration_value(DOCKER_INSPECT_MAX_RETRIES_KEY,
CONTAINER_EXECUTOR_CFG_DOCKER_SECTION, conf);
if (max_retries != NULL) {
retries = atoi(max_retries);
free(max_retries);
}
return retries;
}
char *get_docker_binary(const struct configuration *conf) { char *get_docker_binary(const struct configuration *conf) {
char *docker_binary = NULL; char *docker_binary = NULL;
docker_binary = get_configuration_value(DOCKER_BINARY_KEY, CONTAINER_EXECUTOR_CFG_DOCKER_SECTION, conf); docker_binary = get_configuration_value(DOCKER_BINARY_KEY, CONTAINER_EXECUTOR_CFG_DOCKER_SECTION, conf);
@ -348,6 +361,10 @@ char *get_docker_binary(const struct configuration *conf) {
return docker_binary; return docker_binary;
} }
int get_use_entry_point_flag() {
return entry_point;
}
int docker_module_enabled(const struct configuration *conf) { int docker_module_enabled(const struct configuration *conf) {
struct section *section = get_configuration_section(CONTAINER_EXECUTOR_CFG_DOCKER_SECTION, conf); struct section *section = get_configuration_section(CONTAINER_EXECUTOR_CFG_DOCKER_SECTION, conf);
if (section != NULL) { if (section != NULL) {
@ -365,6 +382,12 @@ int get_docker_command(const char *command_file, const struct configuration *con
return INVALID_COMMAND_FILE; return INVALID_COMMAND_FILE;
} }
char *value = get_configuration_value("use-entry-point", DOCKER_COMMAND_FILE_SECTION, &command_config);
if (value != NULL && strcasecmp(value, "true") == 0) {
entry_point = 1;
}
free(value);
char *command = get_configuration_value("docker-command", DOCKER_COMMAND_FILE_SECTION, &command_config); char *command = get_configuration_value("docker-command", DOCKER_COMMAND_FILE_SECTION, &command_config);
if (strcmp(DOCKER_INSPECT_COMMAND, command) == 0) { if (strcmp(DOCKER_INSPECT_COMMAND, command) == 0) {
return get_docker_inspect_command(command_file, conf, args); return get_docker_inspect_command(command_file, conf, args);
@ -1009,6 +1032,24 @@ static int set_devices(const struct configuration *command_config, const struct
return ret; return ret;
} }
static int set_env(const struct configuration *command_config, struct args *args) {
int ret = 0;
// Use envfile method.
char *envfile = get_configuration_value("environ", DOCKER_COMMAND_FILE_SECTION, command_config);
if (envfile != NULL) {
ret = add_to_args(args, "--env-file");
if (ret != 0) {
ret = BUFFER_TOO_SMALL;
}
ret = add_to_args(args, envfile);
if (ret != 0) {
ret = BUFFER_TOO_SMALL;
}
free(envfile);
}
return ret;
}
/** /**
* Helper function to help normalize mounts for checking if mounts are * Helper function to help normalize mounts for checking if mounts are
* permitted. The function does the following - * permitted. The function does the following -
@ -1520,6 +1561,11 @@ int get_docker_run_command(const char *command_file, const struct configuration
return ret; return ret;
} }
ret = set_env(&command_config, args);
if (ret != 0) {
return BUFFER_TOO_SMALL;
}
ret = add_to_args(args, image); ret = add_to_args(args, image);
if (ret != 0) { if (ret != 0) {
reset_args(args); reset_args(args);

View File

@ -23,6 +23,7 @@
#define CONTAINER_EXECUTOR_CFG_DOCKER_SECTION "docker" #define CONTAINER_EXECUTOR_CFG_DOCKER_SECTION "docker"
#define DOCKER_BINARY_KEY "docker.binary" #define DOCKER_BINARY_KEY "docker.binary"
#define DOCKER_INSPECT_MAX_RETRIES_KEY "docker.inspect.max.retries"
#define DOCKER_COMMAND_FILE_SECTION "docker-command-execution" #define DOCKER_COMMAND_FILE_SECTION "docker-command-execution"
#define DOCKER_INSPECT_COMMAND "inspect" #define DOCKER_INSPECT_COMMAND "inspect"
#define DOCKER_LOAD_COMMAND "load" #define DOCKER_LOAD_COMMAND "load"
@ -85,6 +86,12 @@ char *get_docker_binary(const struct configuration *conf);
*/ */
int get_docker_command(const char* command_file, const struct configuration* conf, args *args); int get_docker_command(const char* command_file, const struct configuration* conf, args *args);
/**
* Check if use-entry-point flag is set.
* @return 0 when use-entry-point flag is set.
*/
int get_use_entry_point_flag();
/** /**
* Get the Docker inspect command line string. The function will verify that the params file is meant for the * Get the Docker inspect command line string. The function will verify that the params file is meant for the
* inspect command. * inspect command.
@ -202,4 +209,11 @@ void reset_args(args *args);
* @param args Pointer reference to args data structure * @param args Pointer reference to args data structure
*/ */
char** extract_execv_args(args *args); char** extract_execv_args(args *args);
/**
* Get max retries for docker inspect.
* @param conf Configuration structure
* @return value of max retries
*/
int get_max_retries(const struct configuration *conf);
#endif #endif

View File

@ -1312,6 +1312,48 @@ namespace ContainerExecutor {
run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_run_command); run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_run_command);
} }
TEST_F(TestDockerUtil, test_docker_run_entry_point) {
std::string container_executor_contents = "[docker]\n"
" docker.allowed.ro-mounts=/var,/etc,/usr/bin/cut\n"
" docker.allowed.rw-mounts=/tmp\n docker.allowed.networks=bridge\n "
" docker.privileged-containers.enabled=1\n docker.allowed.capabilities=CHOWN,SETUID\n"
" docker.allowed.devices=/dev/test\n docker.privileged-containers.registries=hadoop\n";
write_file(container_executor_cfg_file, container_executor_contents);
int ret = read_config(container_executor_cfg_file.c_str(), &container_executor_cfg);
if (ret != 0) {
FAIL();
}
ret = create_ce_file();
if (ret != 0) {
std::cerr << "Could not create ce file, skipping test" << std::endl;
return;
}
std::vector<std::pair<std::string, std::string> > file_cmd_vec;
file_cmd_vec.push_back(std::make_pair<std::string, std::string>(
"[docker-command-execution]\n"
" docker-command=run\n"
" name=container_e1_12312_11111_02_000001\n"
" image=hadoop/docker-image\n"
" user=nobody\n"
" use-entry-point=true\n"
" environ=/tmp/test.env\n",
"/usr/bin/docker run --name=container_e1_12312_11111_02_000001 --user=nobody --cap-drop=ALL "
"--env-file /tmp/test.env hadoop/docker-image"));
std::vector<std::pair<std::string, int> > bad_file_cmd_vec;
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
"[docker-command-execution]\n"
" docker-command=run\n"
" image=hadoop/docker-image\n"
" user=nobody",
static_cast<int>(INVALID_DOCKER_CONTAINER_NAME)));
run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_run_command);
}
TEST_F(TestDockerUtil, test_docker_run_no_privileged) { TEST_F(TestDockerUtil, test_docker_run_no_privileged) {
std::string container_executor_contents[] = {"[docker]\n docker.allowed.ro-mounts=/var,/etc,/usr/bin/cut\n" std::string container_executor_contents[] = {"[docker]\n docker.allowed.ro-mounts=/var,/etc,/usr/bin/cut\n"

View File

@ -207,6 +207,7 @@ are allowed. It contains the following properties:
| `docker.host-pid-namespace.enabled` | Set to "true" or "false" to enable or disable using the host's PID namespace. Default value is "false". | | `docker.host-pid-namespace.enabled` | Set to "true" or "false" to enable or disable using the host's PID namespace. Default value is "false". |
| `docker.privileged-containers.enabled` | Set to "true" or "false" to enable or disable launching privileged containers. Default value is "false". | | `docker.privileged-containers.enabled` | Set to "true" or "false" to enable or disable launching privileged containers. Default value is "false". |
| `docker.privileged-containers.registries` | Comma separated list of trusted docker registries for running trusted privileged docker containers. By default, no registries are defined. | | `docker.privileged-containers.registries` | Comma separated list of trusted docker registries for running trusted privileged docker containers. By default, no registries are defined. |
| `docker.inspect.max.retries` | Integer value to check docker container readiness. Each inspection is set with 3 seconds delay. Default value of 10 will wait 30 seconds for docker container to become ready before marked as container failed. |
Please note that if you wish to run Docker containers that require access to the YARN local directories, you must add them to the docker.allowed.rw-mounts list. Please note that if you wish to run Docker containers that require access to the YARN local directories, you must add them to the docker.allowed.rw-mounts list.