YARN-7654. Support ENTRY_POINT for docker container. Contributed by Eric Yang

(cherry picked from commit 6c8e51ca7e)
This commit is contained in:
Jason Lowe 2018-05-11 18:56:05 -05:00
parent 355ff085e6
commit dfe73334c2
12 changed files with 467 additions and 87 deletions

View File

@ -244,7 +244,14 @@ public interface ApplicationConstants {
* Comma separate list of directories that the container should use for * Comma separate list of directories that the container should use for
* logging. * logging.
*/ */
LOG_DIRS("LOG_DIRS"); LOG_DIRS("LOG_DIRS"),
/**
* $YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE
* Final, Docker run support ENTRY_POINT.
*/
YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE(
"YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE");
private final String variable; private final String variable;
private Environment(String variable) { private Environment(String variable) {

View File

@ -58,15 +58,9 @@ public abstract class AbstractProviderService implements ProviderService,
Service service) Service service)
throws IOException; throws IOException;
public void buildContainerLaunchContext(AbstractLauncher launcher, public Map<String, String> buildContainerTokens(ComponentInstance instance,
Service service, ComponentInstance instance, Container container,
SliderFileSystem fileSystem, Configuration yarnConf, Container container, ContainerLaunchService.ComponentLaunchContext compLaunchContext) {
ContainerLaunchService.ComponentLaunchContext compLaunchContext)
throws IOException, SliderException {
processArtifact(launcher, instance, fileSystem, service);
ServiceContext context =
instance.getComponent().getScheduler().getContext();
// Generate tokens (key-value pair) for config substitution. // Generate tokens (key-value pair) for config substitution.
// Get pre-defined tokens // Get pre-defined tokens
Map<String, String> globalTokens = Map<String, String> globalTokens =
@ -75,6 +69,15 @@ public abstract class AbstractProviderService implements ProviderService,
.initCompTokensForSubstitute(instance, container, .initCompTokensForSubstitute(instance, container,
compLaunchContext); compLaunchContext);
tokensForSubstitution.putAll(globalTokens); tokensForSubstitution.putAll(globalTokens);
return tokensForSubstitution;
}
public void buildContainerEnvironment(AbstractLauncher launcher,
Service service, ComponentInstance instance,
SliderFileSystem fileSystem, Configuration yarnConf, Container container,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
Map<String, String> tokensForSubstitution)
throws IOException, SliderException {
// Set the environment variables in launcher // Set the environment variables in launcher
launcher.putEnv(ServiceUtils.buildEnvMap( launcher.putEnv(ServiceUtils.buildEnvMap(
compLaunchContext.getConfiguration(), tokensForSubstitution)); compLaunchContext.getConfiguration(), tokensForSubstitution));
@ -90,17 +93,14 @@ public abstract class AbstractProviderService implements ProviderService,
for (Entry<String, String> entry : launcher.getEnv().entrySet()) { for (Entry<String, String> entry : launcher.getEnv().entrySet()) {
tokensForSubstitution.put($(entry.getKey()), entry.getValue()); tokensForSubstitution.put($(entry.getKey()), entry.getValue());
} }
//TODO add component host tokens? }
// ProviderUtils.addComponentHostTokens(tokensForSubstitution, amState);
// create config file on hdfs and add local resource
ProviderUtils.createConfigFileAndAddLocalResource(launcher, fileSystem,
compLaunchContext, tokensForSubstitution, instance, context);
// handles static files (like normal file / archive file) for localization.
ProviderUtils.handleStaticFilesForLocalization(launcher, fileSystem,
compLaunchContext);
public void buildContainerLaunchCommand(AbstractLauncher launcher,
Service service, ComponentInstance instance,
SliderFileSystem fileSystem, Configuration yarnConf, Container container,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
Map<String, String> tokensForSubstitution)
throws IOException, SliderException {
// substitute launch command // substitute launch command
String launchCommand = compLaunchContext.getLaunchCommand(); String launchCommand = compLaunchContext.getLaunchCommand();
// docker container may have empty commands // docker container may have empty commands
@ -112,10 +112,15 @@ public abstract class AbstractProviderService implements ProviderService,
operation.addOutAndErrFiles(OUT_FILE, ERR_FILE); operation.addOutAndErrFiles(OUT_FILE, ERR_FILE);
launcher.addCommand(operation.build()); launcher.addCommand(operation.build());
} }
}
public void buildContainerRetry(AbstractLauncher launcher,
Configuration yarnConf,
ContainerLaunchService.ComponentLaunchContext compLaunchContext) {
// By default retry forever every 30 seconds // By default retry forever every 30 seconds
launcher.setRetryContext( launcher.setRetryContext(
YarnServiceConf.getInt(CONTAINER_RETRY_MAX, DEFAULT_CONTAINER_RETRY_MAX, YarnServiceConf.getInt(CONTAINER_RETRY_MAX,
DEFAULT_CONTAINER_RETRY_MAX,
compLaunchContext.getConfiguration(), yarnConf), compLaunchContext.getConfiguration(), yarnConf),
YarnServiceConf.getInt(CONTAINER_RETRY_INTERVAL, YarnServiceConf.getInt(CONTAINER_RETRY_INTERVAL,
DEFAULT_CONTAINER_RETRY_INTERVAL, DEFAULT_CONTAINER_RETRY_INTERVAL,
@ -124,4 +129,38 @@ public abstract class AbstractProviderService implements ProviderService,
DEFAULT_CONTAINER_FAILURES_VALIDITY_INTERVAL, DEFAULT_CONTAINER_FAILURES_VALIDITY_INTERVAL,
compLaunchContext.getConfiguration(), yarnConf)); compLaunchContext.getConfiguration(), yarnConf));
} }
public void buildContainerLaunchContext(AbstractLauncher launcher,
Service service, ComponentInstance instance,
SliderFileSystem fileSystem, Configuration yarnConf, Container container,
ContainerLaunchService.ComponentLaunchContext compLaunchContext)
throws IOException, SliderException {
processArtifact(launcher, instance, fileSystem, service);
ServiceContext context =
instance.getComponent().getScheduler().getContext();
// Generate tokens (key-value pair) for config substitution.
Map<String, String> tokensForSubstitution =
buildContainerTokens(instance, container, compLaunchContext);
// Setup launch context environment
buildContainerEnvironment(launcher, service, instance,
fileSystem, yarnConf, container, compLaunchContext,
tokensForSubstitution);
// create config file on hdfs and add local resource
ProviderUtils.createConfigFileAndAddLocalResource(launcher, fileSystem,
compLaunchContext, tokensForSubstitution, instance, context);
// handles static files (like normal file / archive file) for localization.
ProviderUtils.handleStaticFilesForLocalization(launcher, fileSystem,
compLaunchContext);
// replace launch command with token specific information
buildContainerLaunchCommand(launcher, service, instance, fileSystem,
yarnConf, container, compLaunchContext, tokensForSubstitution);
// Setup container retry settings
buildContainerRetry(launcher, yarnConf, compLaunchContext);
}
} }

View File

@ -17,13 +17,23 @@
*/ */
package org.apache.hadoop.yarn.service.provider.docker; package org.apache.hadoop.yarn.service.provider.docker;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.provider.AbstractProviderService; import org.apache.hadoop.yarn.service.provider.AbstractProviderService;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher; import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
import org.apache.hadoop.yarn.service.containerlaunch.CommandLineBuilder;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
public class DockerProviderService extends AbstractProviderService public class DockerProviderService extends AbstractProviderService
implements DockerKeys { implements DockerKeys {
@ -39,4 +49,36 @@ public class DockerProviderService extends AbstractProviderService
launcher.setRunPrivilegedContainer( launcher.setRunPrivilegedContainer(
compInstance.getCompSpec().getRunPrivilegedContainer()); compInstance.getCompSpec().getRunPrivilegedContainer());
} }
@Override
public void buildContainerLaunchCommand(AbstractLauncher launcher,
Service service, ComponentInstance instance,
SliderFileSystem fileSystem, Configuration yarnConf, Container container,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
Map<String, String> tokensForSubstitution)
throws IOException, SliderException {
Component component = instance.getComponent().getComponentSpec();
boolean useEntryPoint = Boolean.parseBoolean(component
.getConfiguration().getEnv(Environment
.YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE.name()));
if (useEntryPoint) {
String launchCommand = component.getLaunchCommand();
if (!StringUtils.isEmpty(launchCommand)) {
launcher.addCommand(launchCommand);
}
} else {
// substitute launch command
String launchCommand = compLaunchContext.getLaunchCommand();
// docker container may have empty commands
if (!StringUtils.isEmpty(launchCommand)) {
launchCommand = ProviderUtils
.substituteStrWithTokens(launchCommand, tokensForSubstitution);
CommandLineBuilder operation = new CommandLineBuilder();
operation.add(launchCommand);
operation.addOutAndErrFiles(OUT_FILE, ERR_FILE);
launcher.addCommand(operation.build());
}
}
}
} }

View File

@ -1677,6 +1677,20 @@ public class ContainerLaunch implements Callable<Integer> {
containerLogDirs, Map<Path, List<String>> resources, containerLogDirs, Map<Path, List<String>> resources,
Path nmPrivateClasspathJarDir, Path nmPrivateClasspathJarDir,
Set<String> nmVars) throws IOException { Set<String> nmVars) throws IOException {
// Based on discussion in YARN-7654, for ENTRY_POINT enabled
// docker container, we forward user defined environment variables
// without node manager environment variables. This is the reason
// that we skip sanitizeEnv method.
boolean overrideDisable = Boolean.parseBoolean(
environment.get(
Environment.
YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE.
name()));
if (overrideDisable) {
environment.remove("WORK_DIR");
return;
}
/** /**
* Non-modifiable environment variables * Non-modifiable environment variables
*/ */

View File

@ -235,7 +235,6 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
@InterfaceAudience.Private @InterfaceAudience.Private
public static final String ENV_DOCKER_CONTAINER_DELAYED_REMOVAL = public static final String ENV_DOCKER_CONTAINER_DELAYED_REMOVAL =
"YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL"; "YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL";
private Configuration conf; private Configuration conf;
private Context nmContext; private Context nmContext;
private DockerClient dockerClient; private DockerClient dockerClient;
@ -736,6 +735,8 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
String imageName = environment.get(ENV_DOCKER_CONTAINER_IMAGE); String imageName = environment.get(ENV_DOCKER_CONTAINER_IMAGE);
String network = environment.get(ENV_DOCKER_CONTAINER_NETWORK); String network = environment.get(ENV_DOCKER_CONTAINER_NETWORK);
String hostname = environment.get(ENV_DOCKER_CONTAINER_HOSTNAME); String hostname = environment.get(ENV_DOCKER_CONTAINER_HOSTNAME);
boolean useEntryPoint = Boolean.parseBoolean(environment
.get(ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE));
if(network == null || network.isEmpty()) { if(network == null || network.isEmpty()) {
network = defaultNetwork; network = defaultNetwork;
@ -797,8 +798,6 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
DockerRunCommand runCommand = new DockerRunCommand(containerIdStr, DockerRunCommand runCommand = new DockerRunCommand(containerIdStr,
dockerRunAsUser, imageName) dockerRunAsUser, imageName)
.detachOnRun()
.setContainerWorkDir(containerWorkDir.toString())
.setNetworkType(network); .setNetworkType(network);
// Only add hostname if network is not host or if Registry DNS is enabled. // Only add hostname if network is not host or if Registry DNS is enabled.
if (!network.equalsIgnoreCase("host") || if (!network.equalsIgnoreCase("host") ||
@ -870,19 +869,22 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
addCGroupParentIfRequired(resourcesOpts, containerIdStr, runCommand); addCGroupParentIfRequired(resourcesOpts, containerIdStr, runCommand);
String disableOverride = environment.get( if (useEntryPoint) {
ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE); runCommand.setOverrideDisabled(true);
runCommand.addEnv(environment);
if (disableOverride != null && disableOverride.equals("true")) { runCommand.setOverrideCommandWithArgs(container.getLaunchContext()
LOG.info("command override disabled"); .getCommands());
runCommand.disableDetach();
runCommand.setLogDir(container.getLogDir());
} else { } else {
List<String> overrideCommands = new ArrayList<>(); List<String> overrideCommands = new ArrayList<>();
Path launchDst = Path launchDst =
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT); new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
overrideCommands.add("bash"); overrideCommands.add("bash");
overrideCommands.add(launchDst.toUri().getPath()); overrideCommands.add(launchDst.toUri().getPath());
runCommand.setContainerWorkDir(containerWorkDir.toString());
runCommand.setOverrideCommandWithArgs(overrideCommands); runCommand.setOverrideCommandWithArgs(overrideCommands);
runCommand.detachOnRun();
} }
if(enableUserReMapping) { if(enableUserReMapping) {

View File

@ -49,6 +49,7 @@ public final class DockerClient {
LoggerFactory.getLogger(DockerClient.class); LoggerFactory.getLogger(DockerClient.class);
private static final String TMP_FILE_PREFIX = "docker."; private static final String TMP_FILE_PREFIX = "docker.";
private static final String TMP_FILE_SUFFIX = ".cmd"; private static final String TMP_FILE_SUFFIX = ".cmd";
private static final String TMP_ENV_FILE_SUFFIX = ".env";
private final String tmpDirPath; private final String tmpDirPath;
public DockerClient(Configuration conf) throws ContainerExecutionException { public DockerClient(Configuration conf) throws ContainerExecutionException {
@ -69,15 +70,15 @@ public final class DockerClient {
public String writeCommandToTempFile(DockerCommand cmd, String filePrefix) public String writeCommandToTempFile(DockerCommand cmd, String filePrefix)
throws ContainerExecutionException { throws ContainerExecutionException {
File dockerCommandFile = null;
try { try {
dockerCommandFile = File.createTempFile(TMP_FILE_PREFIX + filePrefix, File dockerCommandFile = File.createTempFile(TMP_FILE_PREFIX + filePrefix,
TMP_FILE_SUFFIX, new TMP_FILE_SUFFIX, new
File(tmpDirPath)); File(tmpDirPath));
try (
Writer writer = new OutputStreamWriter( Writer writer = new OutputStreamWriter(
new FileOutputStream(dockerCommandFile), "UTF-8"); new FileOutputStream(dockerCommandFile), "UTF-8");
PrintWriter printWriter = new PrintWriter(writer); PrintWriter printWriter = new PrintWriter(writer);
) {
printWriter.println("[docker-command-execution]"); printWriter.println("[docker-command-execution]");
for (Map.Entry<String, List<String>> entry : for (Map.Entry<String, List<String>> entry :
cmd.getDockerCommandWithArguments().entrySet()) { cmd.getDockerCommandWithArguments().entrySet()) {
@ -94,15 +95,31 @@ public final class DockerClient {
printWriter.println(" " + entry.getKey() + "=" + StringUtils printWriter.println(" " + entry.getKey() + "=" + StringUtils
.join(",", entry.getValue())); .join(",", entry.getValue()));
} }
printWriter.close();
return dockerCommandFile.getAbsolutePath(); return dockerCommandFile.getAbsolutePath();
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unable to write docker command to temporary file!"); LOG.warn("Unable to write docker command to temporary file!");
throw new ContainerExecutionException(e); throw new ContainerExecutionException(e);
} }
} }
private String writeEnvFile(DockerRunCommand cmd, String filePrefix,
File cmdDir) throws IOException {
File dockerEnvFile = File.createTempFile(TMP_FILE_PREFIX + filePrefix,
TMP_ENV_FILE_SUFFIX, cmdDir);
try (
Writer envWriter = new OutputStreamWriter(
new FileOutputStream(dockerEnvFile), "UTF-8");
PrintWriter envPrintWriter = new PrintWriter(envWriter);
) {
for (Map.Entry<String, String> entry : cmd.getEnv()
.entrySet()) {
envPrintWriter.println(entry.getKey() + "=" + entry.getValue());
}
return dockerEnvFile.getAbsolutePath();
}
}
public String writeCommandToTempFile(DockerCommand cmd, public String writeCommandToTempFile(DockerCommand cmd,
ContainerId containerId, Context nmContext) ContainerId containerId, Context nmContext)
throws ContainerExecutionException { throws ContainerExecutionException {
@ -126,13 +143,13 @@ public final class DockerClient {
throw new IOException("Cannot create container private directory " throw new IOException("Cannot create container private directory "
+ cmdDir); + cmdDir);
} }
dockerCommandFile = File.createTempFile(TMP_FILE_PREFIX + filePrefix, dockerCommandFile = File.createTempFile(TMP_FILE_PREFIX + filePrefix,
TMP_FILE_SUFFIX, cmdDir); TMP_FILE_SUFFIX, cmdDir);
try (
Writer writer = new OutputStreamWriter( Writer writer = new OutputStreamWriter(
new FileOutputStream(dockerCommandFile.toString()), "UTF-8"); new FileOutputStream(dockerCommandFile.toString()), "UTF-8");
PrintWriter printWriter = new PrintWriter(writer); PrintWriter printWriter = new PrintWriter(writer);
) {
printWriter.println("[docker-command-execution]"); printWriter.println("[docker-command-execution]");
for (Map.Entry<String, List<String>> entry : for (Map.Entry<String, List<String>> entry :
cmd.getDockerCommandWithArguments().entrySet()) { cmd.getDockerCommandWithArguments().entrySet()) {
@ -149,9 +166,15 @@ public final class DockerClient {
printWriter.println(" " + entry.getKey() + "=" + StringUtils printWriter.println(" " + entry.getKey() + "=" + StringUtils
.join(",", entry.getValue())); .join(",", entry.getValue()));
} }
printWriter.close(); if (cmd instanceof DockerRunCommand) {
DockerRunCommand runCommand = (DockerRunCommand) cmd;
if (runCommand.containsEnv()) {
String path = writeEnvFile(runCommand, filePrefix, cmdDir);
printWriter.println(" environ=" + path);
}
}
return dockerCommandFile.toString(); return dockerCommandFile.toString();
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unable to write docker command to " + cmdDir); LOG.warn("Unable to write docker command to " + cmdDir);
throw new ContainerExecutionException(e); throw new ContainerExecutionException(e);

View File

@ -21,12 +21,14 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker; package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
import java.io.File; import java.io.File;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
public class DockerRunCommand extends DockerCommand { public class DockerRunCommand extends DockerCommand {
private static final String RUN_COMMAND = "run"; private static final String RUN_COMMAND = "run";
private final Map<String, String> userEnv;
/** The following are mandatory: */ /** The following are mandatory: */
public DockerRunCommand(String containerId, String user, String image) { public DockerRunCommand(String containerId, String user, String image) {
@ -34,6 +36,7 @@ public class DockerRunCommand extends DockerCommand {
super.addCommandArguments("name", containerId); super.addCommandArguments("name", containerId);
super.addCommandArguments("user", user); super.addCommandArguments("user", user);
super.addCommandArguments("image", image); super.addCommandArguments("image", image);
this.userEnv = new LinkedHashMap<String, String>();
} }
public DockerRunCommand removeContainerOnExit() { public DockerRunCommand removeContainerOnExit() {
@ -174,4 +177,45 @@ public class DockerRunCommand extends DockerCommand {
public Map<String, List<String>> getDockerCommandWithArguments() { public Map<String, List<String>> getDockerCommandWithArguments() {
return super.getDockerCommandWithArguments(); return super.getDockerCommandWithArguments();
} }
public DockerRunCommand setOverrideDisabled(boolean toggle) {
String value = Boolean.toString(toggle);
super.addCommandArguments("use-entry-point", value);
return this;
}
public DockerRunCommand setLogDir(String logDir) {
super.addCommandArguments("log-dir", logDir);
return this;
}
/**
* Check if user defined environment variables are empty.
*
* @return true if user defined environment variables are not empty.
*/
public boolean containsEnv() {
if (userEnv.size() > 0) {
return true;
}
return false;
}
/**
* Get user defined environment variables.
*
* @return a map of user defined environment variables
*/
public Map<String, String> getEnv() {
return userEnv;
}
/**
* Add user defined environment variables.
*
* @param environment A map of user defined environment variables
*/
public final void addEnv(Map<String, String> environment) {
userEnv.putAll(environment);
}
} }

View File

@ -94,6 +94,8 @@ static gid_t nm_gid = -1;
struct configuration CFG = {.size=0, .sections=NULL}; struct configuration CFG = {.size=0, .sections=NULL};
struct section executor_cfg = {.size=0, .kv_pairs=NULL}; struct section executor_cfg = {.size=0, .kv_pairs=NULL};
static char *chosen_container_log_dir = NULL;
char *concatenate(char *concat_pattern, char *return_path_name, char *concatenate(char *concat_pattern, char *return_path_name,
int numArgs, ...); int numArgs, ...);
@ -755,8 +757,9 @@ static int create_container_directories(const char* user, const char *app_id,
} else if (mkdirs(container_log_dir, perms) != 0) { } else if (mkdirs(container_log_dir, perms) != 0) {
free(container_log_dir); free(container_log_dir);
} else { } else {
free(container_log_dir);
result = 0; result = 0;
chosen_container_log_dir = strdup(container_log_dir);
free(container_log_dir);
} }
} }
free(combined_name); free(combined_name);
@ -1129,6 +1132,34 @@ char* get_container_log_directory(const char *log_root, const char* app_id,
container_id); container_id);
} }
char *init_log_path(const char *container_log_dir, const char *logfile) {
char *tmp_buffer = NULL;
tmp_buffer = make_string("%s/%s", container_log_dir, logfile);
mode_t permissions = S_IRUSR | S_IWUSR | S_IRGRP;
int fd = open(tmp_buffer, O_CREAT | O_WRONLY, permissions);
if (fd >= 0) {
close(fd);
if (change_owner(tmp_buffer, user_detail->pw_uid, user_detail->pw_gid) != 0) {
fprintf(ERRORFILE, "Failed to chown %s to %d:%d: %s\n", tmp_buffer, user_detail->pw_uid, user_detail->pw_gid,
strerror(errno));
free(tmp_buffer);
tmp_buffer = NULL;
} else if (chmod(tmp_buffer, permissions) != 0) {
fprintf(ERRORFILE, "Can't chmod %s - %s\n",
tmp_buffer, strerror(errno));
free(tmp_buffer);
tmp_buffer = NULL;
}
} else {
fprintf(ERRORFILE, "Failed to create file %s - %s\n", tmp_buffer,
strerror(errno));
free(tmp_buffer);
tmp_buffer = NULL;
}
return tmp_buffer;
}
int create_container_log_dirs(const char *container_id, const char *app_id, int create_container_log_dirs(const char *container_id, const char *app_id,
char * const * log_dirs) { char * const * log_dirs) {
char* const* log_root; char* const* log_root;
@ -1506,6 +1537,7 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
char *docker_inspect_exitcode_command = NULL; char *docker_inspect_exitcode_command = NULL;
int container_file_source =-1; int container_file_source =-1;
int cred_file_source = -1; int cred_file_source = -1;
int use_entry_point = 0;
gid_t user_gid = getegid(); gid_t user_gid = getegid();
uid_t prev_uid = geteuid(); uid_t prev_uid = geteuid();
@ -1560,6 +1592,18 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
goto cleanup; goto cleanup;
} }
use_entry_point = get_use_entry_point_flag();
char *so = init_log_path(chosen_container_log_dir, "stdout.txt");
if (so == NULL) {
exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
goto cleanup;
}
char *se = init_log_path(chosen_container_log_dir, "stderr.txt");
if (se == NULL) {
exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
goto cleanup;
}
docker_command_with_binary = flatten(docker_command); docker_command_with_binary = flatten(docker_command);
// Launch container // Launch container
@ -1573,9 +1617,70 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
} }
if (child_pid == 0) { if (child_pid == 0) {
FILE* so_fd = fopen(so, "a+");
if (so_fd == NULL) {
fprintf(ERRORFILE, "Could not append to %s\n", so);
exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
goto cleanup;
}
FILE* se_fd = fopen(se, "a+");
if (se_fd == NULL) {
fprintf(ERRORFILE, "Could not append to %s\n", se);
exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
fclose(so_fd);
goto cleanup;
}
// if entry point is enabled, clone docker command output
// to stdout.txt and stderr.txt for yarn.
if (use_entry_point) {
fprintf(so_fd, "Launching docker container...\n");
fprintf(so_fd, "Docker run command: %s\n", docker_command_with_binary);
if (dup2(fileno(so_fd), fileno(stdout)) == -1) {
fprintf(ERRORFILE, "Could not append to stdout.txt\n");
fclose(so_fd);
return UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
}
if (dup2(fileno(se_fd), fileno(stderr)) == -1) {
fprintf(ERRORFILE, "Could not append to stderr.txt\n");
fclose(se_fd);
return UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
}
}
fclose(so_fd);
fclose(se_fd);
execvp(docker_binary, docker_command); execvp(docker_binary, docker_command);
fprintf(ERRORFILE, "failed to execute docker command! error: %s\n", strerror(errno)); fprintf(ERRORFILE, "failed to execute docker command! error: %s\n", strerror(errno));
return UNABLE_TO_EXECUTE_CONTAINER_SCRIPT; return UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
} else {
if (use_entry_point) {
int pid = 0;
int res = 0;
int count = 0;
int max_retries = get_max_retries(&CFG);
docker_inspect_command = make_string(
"%s inspect --format {{.State.Pid}} %s",
docker_binary, container_id);
// check for docker container pid
while (count < max_retries) {
fprintf(LOGFILE, "Inspecting docker container...\n");
fprintf(LOGFILE, "Docker inspect command: %s\n", docker_inspect_command);
fflush(LOGFILE);
FILE* inspect_docker = popen(docker_inspect_command, "r");
res = fscanf (inspect_docker, "%d", &pid);
fprintf(LOGFILE, "pid from docker inspect: %d\n", pid);
if (pclose (inspect_docker) != 0 || res <= 0) {
fprintf (ERRORFILE,
"Could not inspect docker to get pid %s.\n", docker_inspect_command);
fflush(ERRORFILE);
exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
} else {
if (pid != 0) {
break;
}
}
sleep(3);
count++;
}
} else { } else {
exit_code = wait_and_get_exit_code(child_pid); exit_code = wait_and_get_exit_code(child_pid);
if (exit_code != 0) { if (exit_code != 0) {
@ -1583,6 +1688,7 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
goto cleanup; goto cleanup;
} }
} }
}
docker_inspect_command = make_string( docker_inspect_command = make_string(
"%s inspect --format {{.State.Pid}} %s", "%s inspect --format {{.State.Pid}} %s",

View File

@ -32,6 +32,8 @@
#include <pwd.h> #include <pwd.h>
#include <errno.h> #include <errno.h>
int entry_point = 0;
static int read_and_verify_command_file(const char *command_file, const char *docker_command, static int read_and_verify_command_file(const char *command_file, const char *docker_command,
struct configuration *command_config) { struct configuration *command_config) {
int ret = 0; int ret = 0;
@ -336,6 +338,17 @@ const char *get_docker_error_message(const int error_code) {
} }
} }
int get_max_retries(const struct configuration *conf) {
int retries = 10;
char *max_retries = get_configuration_value(DOCKER_INSPECT_MAX_RETRIES_KEY,
CONTAINER_EXECUTOR_CFG_DOCKER_SECTION, conf);
if (max_retries != NULL) {
retries = atoi(max_retries);
free(max_retries);
}
return retries;
}
char *get_docker_binary(const struct configuration *conf) { char *get_docker_binary(const struct configuration *conf) {
char *docker_binary = NULL; char *docker_binary = NULL;
docker_binary = get_configuration_value(DOCKER_BINARY_KEY, CONTAINER_EXECUTOR_CFG_DOCKER_SECTION, conf); docker_binary = get_configuration_value(DOCKER_BINARY_KEY, CONTAINER_EXECUTOR_CFG_DOCKER_SECTION, conf);
@ -348,6 +361,10 @@ char *get_docker_binary(const struct configuration *conf) {
return docker_binary; return docker_binary;
} }
int get_use_entry_point_flag() {
return entry_point;
}
int docker_module_enabled(const struct configuration *conf) { int docker_module_enabled(const struct configuration *conf) {
struct section *section = get_configuration_section(CONTAINER_EXECUTOR_CFG_DOCKER_SECTION, conf); struct section *section = get_configuration_section(CONTAINER_EXECUTOR_CFG_DOCKER_SECTION, conf);
if (section != NULL) { if (section != NULL) {
@ -365,6 +382,12 @@ int get_docker_command(const char *command_file, const struct configuration *con
return INVALID_COMMAND_FILE; return INVALID_COMMAND_FILE;
} }
char *value = get_configuration_value("use-entry-point", DOCKER_COMMAND_FILE_SECTION, &command_config);
if (value != NULL && strcasecmp(value, "true") == 0) {
entry_point = 1;
}
free(value);
char *command = get_configuration_value("docker-command", DOCKER_COMMAND_FILE_SECTION, &command_config); char *command = get_configuration_value("docker-command", DOCKER_COMMAND_FILE_SECTION, &command_config);
if (strcmp(DOCKER_INSPECT_COMMAND, command) == 0) { if (strcmp(DOCKER_INSPECT_COMMAND, command) == 0) {
return get_docker_inspect_command(command_file, conf, args); return get_docker_inspect_command(command_file, conf, args);
@ -1009,6 +1032,24 @@ static int set_devices(const struct configuration *command_config, const struct
return ret; return ret;
} }
static int set_env(const struct configuration *command_config, struct args *args) {
int ret = 0;
// Use envfile method.
char *envfile = get_configuration_value("environ", DOCKER_COMMAND_FILE_SECTION, command_config);
if (envfile != NULL) {
ret = add_to_args(args, "--env-file");
if (ret != 0) {
ret = BUFFER_TOO_SMALL;
}
ret = add_to_args(args, envfile);
if (ret != 0) {
ret = BUFFER_TOO_SMALL;
}
free(envfile);
}
return ret;
}
/** /**
* Helper function to help normalize mounts for checking if mounts are * Helper function to help normalize mounts for checking if mounts are
* permitted. The function does the following - * permitted. The function does the following -
@ -1520,6 +1561,11 @@ int get_docker_run_command(const char *command_file, const struct configuration
return ret; return ret;
} }
ret = set_env(&command_config, args);
if (ret != 0) {
return BUFFER_TOO_SMALL;
}
ret = add_to_args(args, image); ret = add_to_args(args, image);
if (ret != 0) { if (ret != 0) {
reset_args(args); reset_args(args);

View File

@ -23,6 +23,7 @@
#define CONTAINER_EXECUTOR_CFG_DOCKER_SECTION "docker" #define CONTAINER_EXECUTOR_CFG_DOCKER_SECTION "docker"
#define DOCKER_BINARY_KEY "docker.binary" #define DOCKER_BINARY_KEY "docker.binary"
#define DOCKER_INSPECT_MAX_RETRIES_KEY "docker.inspect.max.retries"
#define DOCKER_COMMAND_FILE_SECTION "docker-command-execution" #define DOCKER_COMMAND_FILE_SECTION "docker-command-execution"
#define DOCKER_INSPECT_COMMAND "inspect" #define DOCKER_INSPECT_COMMAND "inspect"
#define DOCKER_LOAD_COMMAND "load" #define DOCKER_LOAD_COMMAND "load"
@ -85,6 +86,12 @@ char *get_docker_binary(const struct configuration *conf);
*/ */
int get_docker_command(const char* command_file, const struct configuration* conf, args *args); int get_docker_command(const char* command_file, const struct configuration* conf, args *args);
/**
* Check if use-entry-point flag is set.
* @return 0 when use-entry-point flag is set.
*/
int get_use_entry_point_flag();
/** /**
* Get the Docker inspect command line string. The function will verify that the params file is meant for the * Get the Docker inspect command line string. The function will verify that the params file is meant for the
* inspect command. * inspect command.
@ -202,4 +209,11 @@ void reset_args(args *args);
* @param args Pointer reference to args data structure * @param args Pointer reference to args data structure
*/ */
char** extract_execv_args(args *args); char** extract_execv_args(args *args);
/**
* Get max retries for docker inspect.
* @param conf Configuration structure
* @return value of max retries
*/
int get_max_retries(const struct configuration *conf);
#endif #endif

View File

@ -1312,6 +1312,48 @@ namespace ContainerExecutor {
run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_run_command); run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_run_command);
} }
TEST_F(TestDockerUtil, test_docker_run_entry_point) {
std::string container_executor_contents = "[docker]\n"
" docker.allowed.ro-mounts=/var,/etc,/usr/bin/cut\n"
" docker.allowed.rw-mounts=/tmp\n docker.allowed.networks=bridge\n "
" docker.privileged-containers.enabled=1\n docker.allowed.capabilities=CHOWN,SETUID\n"
" docker.allowed.devices=/dev/test\n docker.privileged-containers.registries=hadoop\n";
write_file(container_executor_cfg_file, container_executor_contents);
int ret = read_config(container_executor_cfg_file.c_str(), &container_executor_cfg);
if (ret != 0) {
FAIL();
}
ret = create_ce_file();
if (ret != 0) {
std::cerr << "Could not create ce file, skipping test" << std::endl;
return;
}
std::vector<std::pair<std::string, std::string> > file_cmd_vec;
file_cmd_vec.push_back(std::make_pair<std::string, std::string>(
"[docker-command-execution]\n"
" docker-command=run\n"
" name=container_e1_12312_11111_02_000001\n"
" image=hadoop/docker-image\n"
" user=nobody\n"
" use-entry-point=true\n"
" environ=/tmp/test.env\n",
"/usr/bin/docker run --name=container_e1_12312_11111_02_000001 --user=nobody --cap-drop=ALL "
"--env-file /tmp/test.env hadoop/docker-image"));
std::vector<std::pair<std::string, int> > bad_file_cmd_vec;
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
"[docker-command-execution]\n"
" docker-command=run\n"
" image=hadoop/docker-image\n"
" user=nobody",
static_cast<int>(INVALID_DOCKER_CONTAINER_NAME)));
run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_run_command);
}
TEST_F(TestDockerUtil, test_docker_run_no_privileged) { TEST_F(TestDockerUtil, test_docker_run_no_privileged) {
std::string container_executor_contents[] = {"[docker]\n docker.allowed.ro-mounts=/var,/etc,/usr/bin/cut\n" std::string container_executor_contents[] = {"[docker]\n docker.allowed.ro-mounts=/var,/etc,/usr/bin/cut\n"

View File

@ -207,6 +207,7 @@ are allowed. It contains the following properties:
| `docker.host-pid-namespace.enabled` | Set to "true" or "false" to enable or disable using the host's PID namespace. Default value is "false". | | `docker.host-pid-namespace.enabled` | Set to "true" or "false" to enable or disable using the host's PID namespace. Default value is "false". |
| `docker.privileged-containers.enabled` | Set to "true" or "false" to enable or disable launching privileged containers. Default value is "false". | | `docker.privileged-containers.enabled` | Set to "true" or "false" to enable or disable launching privileged containers. Default value is "false". |
| `docker.privileged-containers.registries` | Comma separated list of trusted docker registries for running trusted privileged docker containers. By default, no registries are defined. | | `docker.privileged-containers.registries` | Comma separated list of trusted docker registries for running trusted privileged docker containers. By default, no registries are defined. |
| `docker.inspect.max.retries` | Integer value to check docker container readiness. Each inspection is set with 3 seconds delay. Default value of 10 will wait 30 seconds for docker container to become ready before marked as container failed. |
Please note that if you wish to run Docker containers that require access to the YARN local directories, you must add them to the docker.allowed.rw-mounts list. Please note that if you wish to run Docker containers that require access to the YARN local directories, you must add them to the docker.allowed.rw-mounts list.