YARN-7973. Added ContainerRelaunch feature for Docker containers.
Contributed by Shane Kumpf
This commit is contained in:
parent
4c93cd228c
commit
e4b9981612
|
@ -98,6 +98,16 @@ public interface ApplicationConstants {
|
||||||
|
|
||||||
public static final String STDOUT = "stdout";
|
public static final String STDOUT = "stdout";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The type of launch for the container.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
enum ContainerLaunchType {
|
||||||
|
LAUNCH,
|
||||||
|
RELAUNCH
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Environment for Applications.
|
* Environment for Applications.
|
||||||
*
|
*
|
||||||
|
|
|
@ -181,6 +181,17 @@ public abstract class ContainerExecutor implements Configurable {
|
||||||
public abstract int launchContainer(ContainerStartContext ctx) throws
|
public abstract int launchContainer(ContainerStartContext ctx) throws
|
||||||
IOException, ConfigurationException;
|
IOException, ConfigurationException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Relaunch the container on the node. This is a blocking call and returns
|
||||||
|
* only when the container exits.
|
||||||
|
* @param ctx Encapsulates information necessary for relaunching containers.
|
||||||
|
* @return the return status of the relaunch
|
||||||
|
* @throws IOException if the container relaunch fails
|
||||||
|
* @throws ConfigurationException if config error was found
|
||||||
|
*/
|
||||||
|
public abstract int relaunchContainer(ContainerStartContext ctx) throws
|
||||||
|
IOException, ConfigurationException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Signal container with the specified signal.
|
* Signal container with the specified signal.
|
||||||
*
|
*
|
||||||
|
|
|
@ -339,6 +339,12 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int relaunchContainer(ContainerStartContext ctx)
|
||||||
|
throws IOException, ConfigurationException {
|
||||||
|
return launchContainer(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new {@link ShellCommandExecutor} using the parameters.
|
* Create a new {@link ShellCommandExecutor} using the parameters.
|
||||||
*
|
*
|
||||||
|
|
|
@ -476,6 +476,20 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
@Override
|
@Override
|
||||||
public int launchContainer(ContainerStartContext ctx)
|
public int launchContainer(ContainerStartContext ctx)
|
||||||
throws IOException, ConfigurationException {
|
throws IOException, ConfigurationException {
|
||||||
|
return handleLaunchForLaunchType(ctx,
|
||||||
|
ApplicationConstants.ContainerLaunchType.LAUNCH);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int relaunchContainer(ContainerStartContext ctx)
|
||||||
|
throws IOException, ConfigurationException {
|
||||||
|
return handleLaunchForLaunchType(ctx,
|
||||||
|
ApplicationConstants.ContainerLaunchType.RELAUNCH);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int handleLaunchForLaunchType(ContainerStartContext ctx,
|
||||||
|
ApplicationConstants.ContainerLaunchType type) throws IOException,
|
||||||
|
ConfigurationException {
|
||||||
Container container = ctx.getContainer();
|
Container container = ctx.getContainer();
|
||||||
String user = ctx.getUser();
|
String user = ctx.getUser();
|
||||||
|
|
||||||
|
@ -544,13 +558,37 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext(
|
ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext(
|
||||||
ctx, pidFilePath, resourcesOptions, tcCommandFile, numaArgs);
|
ctx, pidFilePath, resourcesOptions, tcCommandFile, numaArgs);
|
||||||
|
|
||||||
|
if (type.equals(ApplicationConstants.ContainerLaunchType.RELAUNCH)) {
|
||||||
|
linuxContainerRuntime.relaunchContainer(runtimeContext);
|
||||||
|
} else {
|
||||||
linuxContainerRuntime.launchContainer(runtimeContext);
|
linuxContainerRuntime.launchContainer(runtimeContext);
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Container was marked as inactive. Returning terminated error");
|
"Container was marked as inactive. Returning terminated error");
|
||||||
return ContainerExecutor.ExitCode.TERMINATED.getExitCode();
|
return ContainerExecutor.ExitCode.TERMINATED.getExitCode();
|
||||||
}
|
}
|
||||||
} catch (ContainerExecutionException e) {
|
} catch (ContainerExecutionException e) {
|
||||||
|
return handleExitCode(e, container, containerId);
|
||||||
|
} finally {
|
||||||
|
resourcesHandler.postExecute(containerId);
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (resourceHandlerChain != null) {
|
||||||
|
resourceHandlerChain.postComplete(containerId);
|
||||||
|
}
|
||||||
|
} catch (ResourceHandlerException e) {
|
||||||
|
LOG.warn("ResourceHandlerChain.postComplete failed for " +
|
||||||
|
"containerId: " + containerId + ". Exception: " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int handleExitCode(ContainerExecutionException e, Container container,
|
||||||
|
ContainerId containerId) throws ConfigurationException {
|
||||||
int exitCode = e.getExitCode();
|
int exitCode = e.getExitCode();
|
||||||
LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
|
LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
|
||||||
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
|
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
|
||||||
|
@ -600,20 +638,6 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
||||||
"Container killed on request. Exit code is " + exitCode));
|
"Container killed on request. Exit code is " + exitCode));
|
||||||
}
|
}
|
||||||
return exitCode;
|
return exitCode;
|
||||||
} finally {
|
|
||||||
resourcesHandler.postExecute(containerId);
|
|
||||||
|
|
||||||
try {
|
|
||||||
if (resourceHandlerChain != null) {
|
|
||||||
resourceHandlerChain.postComplete(containerId);
|
|
||||||
}
|
|
||||||
} catch (ResourceHandlerException e) {
|
|
||||||
LOG.warn("ResourceHandlerChain.postComplete failed for " +
|
|
||||||
"containerId: " + containerId + ". Exception: " + e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ContainerRuntimeContext buildContainerRuntimeContext(
|
private ContainerRuntimeContext buildContainerRuntimeContext(
|
||||||
|
|
|
@ -487,6 +487,24 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected int launchContainer(ContainerStartContext ctx)
|
protected int launchContainer(ContainerStartContext ctx)
|
||||||
throws IOException, ConfigurationException {
|
throws IOException, ConfigurationException {
|
||||||
|
int launchPrep = prepareForLaunch(ctx);
|
||||||
|
if (launchPrep == 0) {
|
||||||
|
return exec.launchContainer(ctx);
|
||||||
|
}
|
||||||
|
return launchPrep;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
protected int relaunchContainer(ContainerStartContext ctx)
|
||||||
|
throws IOException, ConfigurationException {
|
||||||
|
int launchPrep = prepareForLaunch(ctx);
|
||||||
|
if (launchPrep == 0) {
|
||||||
|
return exec.relaunchContainer(ctx);
|
||||||
|
}
|
||||||
|
return launchPrep;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int prepareForLaunch(ContainerStartContext ctx) throws IOException {
|
||||||
ContainerId containerId = container.getContainerId();
|
ContainerId containerId = container.getContainerId();
|
||||||
if (container.isMarkedForKilling()) {
|
if (container.isMarkedForKilling()) {
|
||||||
LOG.info("Container " + containerId + " not launched as it has already "
|
LOG.info("Container " + containerId + " not launched as it has already "
|
||||||
|
@ -508,8 +526,8 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
return ExitCode.TERMINATED.getExitCode();
|
return ExitCode.TERMINATED.getExitCode();
|
||||||
} else {
|
} else {
|
||||||
exec.activateContainer(containerId, pidFilePath);
|
exec.activateContainer(containerId, pidFilePath);
|
||||||
return exec.launchContainer(ctx);
|
|
||||||
}
|
}
|
||||||
|
return ExitCode.SUCCESS.getExitCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setContainerCompletedStatus(int exitCode) {
|
protected void setContainerCompletedStatus(int exitCode) {
|
||||||
|
|
|
@ -108,7 +108,7 @@ public class ContainerRelaunch extends ContainerLaunch {
|
||||||
+ dirsHandler.getDisksHealthReport(false));
|
+ dirsHandler.getDisksHealthReport(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = launchContainer(new ContainerStartContext.Builder()
|
ret = relaunchContainer(new ContainerStartContext.Builder()
|
||||||
.setContainer(container)
|
.setContainer(container)
|
||||||
.setLocalizedResources(localResources)
|
.setLocalizedResources(localResources)
|
||||||
.setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath)
|
.setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath)
|
||||||
|
|
|
@ -125,6 +125,12 @@ public class DefaultLinuxContainerRuntime implements LinuxContainerRuntime {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void relaunchContainer(ContainerRuntimeContext ctx)
|
||||||
|
throws ContainerExecutionException {
|
||||||
|
launchContainer(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void signalContainer(ContainerRuntimeContext ctx)
|
public void signalContainer(ContainerRuntimeContext ctx)
|
||||||
throws ContainerExecutionException {
|
throws ContainerExecutionException {
|
||||||
|
|
|
@ -141,6 +141,15 @@ public class DelegatingLinuxContainerRuntime implements LinuxContainerRuntime {
|
||||||
runtime.launchContainer(ctx);
|
runtime.launchContainer(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void relaunchContainer(ContainerRuntimeContext ctx)
|
||||||
|
throws ContainerExecutionException {
|
||||||
|
Container container = ctx.getContainer();
|
||||||
|
LinuxContainerRuntime runtime = pickContainerRuntime(container);
|
||||||
|
|
||||||
|
runtime.relaunchContainer(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void signalContainer(ContainerRuntimeContext ctx)
|
public void signalContainer(ContainerRuntimeContext ctx)
|
||||||
throws ContainerExecutionException {
|
throws ContainerExecutionException {
|
||||||
|
|
|
@ -24,9 +24,11 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerKillCommand;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerKillCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRmCommand;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRmCommand;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStartCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
|
||||||
|
@ -919,6 +921,40 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void relaunchContainer(ContainerRuntimeContext ctx)
|
||||||
|
throws ContainerExecutionException {
|
||||||
|
Container container = ctx.getContainer();
|
||||||
|
String containerIdStr = container.getContainerId().toString();
|
||||||
|
// Check to see if the container already exists for relaunch
|
||||||
|
DockerCommandExecutor.DockerContainerStatus containerStatus =
|
||||||
|
DockerCommandExecutor.getContainerStatus(containerIdStr, conf,
|
||||||
|
privilegedOperationExecutor);
|
||||||
|
if (containerStatus != null &&
|
||||||
|
DockerCommandExecutor.isStartable(containerStatus)) {
|
||||||
|
DockerStartCommand startCommand = new DockerStartCommand(containerIdStr);
|
||||||
|
String commandFile = dockerClient.writeCommandToTempFile(startCommand,
|
||||||
|
containerIdStr);
|
||||||
|
PrivilegedOperation launchOp = buildLaunchOp(ctx, commandFile,
|
||||||
|
startCommand);
|
||||||
|
|
||||||
|
try {
|
||||||
|
privilegedOperationExecutor.executePrivilegedOperation(null,
|
||||||
|
launchOp, null, null, false, false);
|
||||||
|
} catch (PrivilegedOperationException e) {
|
||||||
|
LOG.warn("Relaunch container failed. Exception: ", e);
|
||||||
|
LOG.info("Docker command used: " + startCommand);
|
||||||
|
|
||||||
|
throw new ContainerExecutionException("Launch container failed", e
|
||||||
|
.getExitCode(), e.getOutput(), e.getErrorOutput());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new ContainerExecutionException("Container is not in a startable "
|
||||||
|
+ "state, unable to relaunch: " + containerIdStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Signal the docker container.
|
* Signal the docker container.
|
||||||
*
|
*
|
||||||
|
@ -1067,7 +1103,7 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
|
||||||
|
|
||||||
|
|
||||||
private PrivilegedOperation buildLaunchOp(ContainerRuntimeContext ctx,
|
private PrivilegedOperation buildLaunchOp(ContainerRuntimeContext ctx,
|
||||||
String commandFile, DockerRunCommand runCommand) {
|
String commandFile, DockerCommand command) {
|
||||||
|
|
||||||
String runAsUser = ctx.getExecutionAttribute(RUN_AS_USER);
|
String runAsUser = ctx.getExecutionAttribute(RUN_AS_USER);
|
||||||
String containerIdStr = ctx.getContainer().getContainerId().toString();
|
String containerIdStr = ctx.getContainer().getContainerId().toString();
|
||||||
|
@ -1106,7 +1142,7 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
|
||||||
launchOp.appendArgs(tcCommandFile);
|
launchOp.appendArgs(tcCommandFile);
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Launching container with cmd: " + runCommand);
|
LOG.debug("Launching container with cmd: " + command);
|
||||||
}
|
}
|
||||||
|
|
||||||
return launchOp;
|
return launchOp;
|
||||||
|
|
|
@ -268,6 +268,16 @@ public class JavaSandboxLinuxContainerRuntime
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void relaunchContainer(ContainerRuntimeContext ctx)
|
||||||
|
throws ContainerExecutionException {
|
||||||
|
try {
|
||||||
|
super.relaunchContainer(ctx);
|
||||||
|
} finally {
|
||||||
|
deletePolicyFiles(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Determine if JVMSandboxLinuxContainerRuntime should be used. This is
|
* Determine if JVMSandboxLinuxContainerRuntime should be used. This is
|
||||||
* decided based on the value of
|
* decided based on the value of
|
||||||
|
|
|
@ -227,4 +227,18 @@ public final class DockerCommandExecutor {
|
||||||
&& !containerStatus.equals(DockerContainerStatus.REMOVING)
|
&& !containerStatus.equals(DockerContainerStatus.REMOVING)
|
||||||
&& !containerStatus.equals(DockerContainerStatus.RUNNING);
|
&& !containerStatus.equals(DockerContainerStatus.RUNNING);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is the container in a startable state?
|
||||||
|
*
|
||||||
|
* @param containerStatus the container's {@link DockerContainerStatus}.
|
||||||
|
* @return is the container in a startable state.
|
||||||
|
*/
|
||||||
|
public static boolean isStartable(DockerContainerStatus containerStatus) {
|
||||||
|
if (containerStatus.equals(DockerContainerStatus.EXITED)
|
||||||
|
|| containerStatus.equals(DockerContainerStatus.STOPPED)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encapsulates the docker start command and its command line arguments.
|
||||||
|
*/
|
||||||
|
public class DockerStartCommand extends DockerCommand {
|
||||||
|
private static final String START_COMMAND = "start";
|
||||||
|
|
||||||
|
public DockerStartCommand(String containerName) {
|
||||||
|
super(START_COMMAND);
|
||||||
|
super.addCommandArguments("name", containerName);
|
||||||
|
}
|
||||||
|
}
|
|
@ -54,6 +54,16 @@ public interface ContainerRuntime {
|
||||||
void launchContainer(ContainerRuntimeContext ctx)
|
void launchContainer(ContainerRuntimeContext ctx)
|
||||||
throws ContainerExecutionException;
|
throws ContainerExecutionException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Relaunch a container.
|
||||||
|
*
|
||||||
|
* @param ctx the {@link ContainerRuntimeContext}
|
||||||
|
* @throws ContainerExecutionException if an error occurs while relaunching
|
||||||
|
* the container
|
||||||
|
*/
|
||||||
|
void relaunchContainer(ContainerRuntimeContext ctx)
|
||||||
|
throws ContainerExecutionException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Signal a container. Signals may be a request to terminate, a status check,
|
* Signal a container. Signals may be a request to terminate, a status check,
|
||||||
* etc.
|
* etc.
|
||||||
|
|
|
@ -1586,7 +1586,7 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
|
||||||
fprintf(LOGFILE, "Launching docker container...\n");
|
fprintf(LOGFILE, "Launching docker container...\n");
|
||||||
fprintf(LOGFILE, "Docker run command: %s\n", docker_command_with_binary);
|
fprintf(LOGFILE, "Docker run command: %s\n", docker_command_with_binary);
|
||||||
FILE* start_docker = popen(docker_command_with_binary, "r");
|
FILE* start_docker = popen(docker_command_with_binary, "r");
|
||||||
if (pclose (start_docker) != 0)
|
if (WEXITSTATUS(pclose (start_docker)) != 0)
|
||||||
{
|
{
|
||||||
fprintf (ERRORFILE,
|
fprintf (ERRORFILE,
|
||||||
"Could not invoke docker %s.\n", docker_command_with_binary);
|
"Could not invoke docker %s.\n", docker_command_with_binary);
|
||||||
|
|
|
@ -364,6 +364,8 @@ int get_docker_command(const char *command_file, const struct configuration *con
|
||||||
return get_docker_stop_command(command_file, conf, out, outlen);
|
return get_docker_stop_command(command_file, conf, out, outlen);
|
||||||
} else if (strcmp(DOCKER_VOLUME_COMMAND, command) == 0) {
|
} else if (strcmp(DOCKER_VOLUME_COMMAND, command) == 0) {
|
||||||
return get_docker_volume_command(command_file, conf, out, outlen);
|
return get_docker_volume_command(command_file, conf, out, outlen);
|
||||||
|
} else if (strcmp(DOCKER_START_COMMAND, command) == 0) {
|
||||||
|
return get_docker_start_command(command_file, conf, out, outlen);
|
||||||
} else {
|
} else {
|
||||||
return UNKNOWN_DOCKER_COMMAND;
|
return UNKNOWN_DOCKER_COMMAND;
|
||||||
}
|
}
|
||||||
|
@ -820,6 +822,44 @@ int get_docker_kill_command(const char *command_file, const struct configuration
|
||||||
return BUFFER_TOO_SMALL;
|
return BUFFER_TOO_SMALL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int get_docker_start_command(const char *command_file, const struct configuration *conf, char *out, const size_t outlen) {
|
||||||
|
int ret = 0;
|
||||||
|
char *container_name = NULL;
|
||||||
|
struct configuration command_config = {0, NULL};
|
||||||
|
ret = read_and_verify_command_file(command_file, DOCKER_START_COMMAND, &command_config);
|
||||||
|
if (ret != 0) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
container_name = get_configuration_value("name", DOCKER_COMMAND_FILE_SECTION, &command_config);
|
||||||
|
if (container_name == NULL || validate_container_name(container_name) != 0) {
|
||||||
|
return INVALID_DOCKER_CONTAINER_NAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(out, 0, outlen);
|
||||||
|
|
||||||
|
ret = add_docker_config_param(&command_config, out, outlen);
|
||||||
|
if (ret != 0) {
|
||||||
|
return BUFFER_TOO_SMALL;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = add_to_buffer(out, outlen, DOCKER_START_COMMAND);
|
||||||
|
if (ret != 0) {
|
||||||
|
goto free_and_exit;
|
||||||
|
}
|
||||||
|
ret = add_to_buffer(out, outlen, " ");
|
||||||
|
if (ret != 0) {
|
||||||
|
goto free_and_exit;
|
||||||
|
}
|
||||||
|
ret = add_to_buffer(out, outlen, container_name);
|
||||||
|
if (ret != 0) {
|
||||||
|
goto free_and_exit;
|
||||||
|
}
|
||||||
|
free_and_exit:
|
||||||
|
free(container_name);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
static int detach_container(const struct configuration *command_config, char *out, const size_t outlen) {
|
static int detach_container(const struct configuration *command_config, char *out, const size_t outlen) {
|
||||||
return add_param_to_command(command_config, "detach", "-d ", 0, out, outlen);
|
return add_param_to_command(command_config, "detach", "-d ", 0, out, outlen);
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@
|
||||||
#define DOCKER_STOP_COMMAND "stop"
|
#define DOCKER_STOP_COMMAND "stop"
|
||||||
#define DOCKER_KILL_COMMAND "kill"
|
#define DOCKER_KILL_COMMAND "kill"
|
||||||
#define DOCKER_VOLUME_COMMAND "volume"
|
#define DOCKER_VOLUME_COMMAND "volume"
|
||||||
|
#define DOCKER_START_COMMAND "start"
|
||||||
|
|
||||||
|
|
||||||
enum docker_error_codes {
|
enum docker_error_codes {
|
||||||
|
@ -161,6 +162,16 @@ int get_docker_kill_command(const char* command_file, const struct configuration
|
||||||
int get_docker_volume_command(const char *command_file, const struct configuration *conf, char *out,
|
int get_docker_volume_command(const char *command_file, const struct configuration *conf, char *out,
|
||||||
const size_t outlen);
|
const size_t outlen);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the Docker start command line string. The function will verify that the params file is meant for the start command.
|
||||||
|
* @param command_file File containing the params for the Docker start command
|
||||||
|
* @param conf Configuration struct containing the container-executor.cfg details
|
||||||
|
* @param out Buffer to fill with the start command
|
||||||
|
* @param outlen Size of the output buffer
|
||||||
|
* @return Return code with 0 indicating success and non-zero codes indicating error
|
||||||
|
*/
|
||||||
|
int get_docker_start_command(const char* command_file, const struct configuration* conf, char *out, const size_t outlen);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Give an error message for the supplied error code
|
* Give an error message for the supplied error code
|
||||||
* @param error_code the error code
|
* @param error_code the error code
|
||||||
|
|
|
@ -338,6 +338,26 @@ namespace ContainerExecutor {
|
||||||
run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_kill_command);
|
run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_kill_command);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(TestDockerUtil, test_docker_start) {
|
||||||
|
std::vector<std::pair<std::string, std::string> > file_cmd_vec;
|
||||||
|
file_cmd_vec.push_back(std::make_pair<std::string, std::string>(
|
||||||
|
"[docker-command-execution]\n docker-command=start\n name=container_e1_12312_11111_02_000001",
|
||||||
|
"start container_e1_12312_11111_02_000001"));
|
||||||
|
|
||||||
|
std::vector<std::pair<std::string, int> > bad_file_cmd_vec;
|
||||||
|
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||||
|
"[docker-command-execution]\n docker-command=run\n name=container_e1_12312_11111_02_000001",
|
||||||
|
static_cast<int>(INCORRECT_COMMAND)));
|
||||||
|
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||||
|
"docker-command=start\n name=ctr-id", static_cast<int>(INCORRECT_COMMAND)));
|
||||||
|
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||||
|
"[docker-command-execution]\n docker-command=start\n name=", static_cast<int>(INVALID_DOCKER_CONTAINER_NAME)));
|
||||||
|
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||||
|
"[docker-command-execution]\n docker-command=start", static_cast<int>(INVALID_DOCKER_CONTAINER_NAME)));
|
||||||
|
|
||||||
|
run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_start_command);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(TestDockerUtil, test_detach_container) {
|
TEST_F(TestDockerUtil, test_detach_container) {
|
||||||
std::vector<std::pair<std::string, std::string> > file_cmd_vec;
|
std::vector<std::pair<std::string, std::string> > file_cmd_vec;
|
||||||
file_cmd_vec.push_back(std::make_pair<std::string, std::string>(
|
file_cmd_vec.push_back(std::make_pair<std::string, std::string>(
|
||||||
|
|
|
@ -675,6 +675,18 @@ public class TestLinuxContainerExecutor {
|
||||||
verify(lce, times(1)).reapContainer(ctx);
|
verify(lce, times(1)).reapContainer(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRelaunchContainer() throws Exception {
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
LinuxContainerExecutor lce = mock(LinuxContainerExecutor.class);
|
||||||
|
ContainerStartContext.Builder builder =
|
||||||
|
new ContainerStartContext.Builder();
|
||||||
|
builder.setContainer(container).setUser("foo");
|
||||||
|
ContainerStartContext ctx = builder.build();
|
||||||
|
lce.relaunchContainer(ctx);
|
||||||
|
verify(lce, times(1)).relaunchContainer(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
private static class TestResourceHandler implements LCEResourcesHandler {
|
private static class TestResourceHandler implements LCEResourcesHandler {
|
||||||
static Set<ContainerId> postExecContainers = new HashSet<ContainerId>();
|
static Set<ContainerId> postExecContainers = new HashSet<ContainerId>();
|
||||||
|
|
||||||
|
|
|
@ -78,7 +78,7 @@ public class TestContainerRelaunch {
|
||||||
assertEquals("relaunch failed", 0, result);
|
assertEquals("relaunch failed", 0, result);
|
||||||
ArgumentCaptor<ContainerStartContext> captor =
|
ArgumentCaptor<ContainerStartContext> captor =
|
||||||
ArgumentCaptor.forClass(ContainerStartContext.class);
|
ArgumentCaptor.forClass(ContainerStartContext.class);
|
||||||
verify(mockExecutor).launchContainer(captor.capture());
|
verify(mockExecutor).relaunchContainer(captor.capture());
|
||||||
ContainerStartContext csc = captor.getValue();
|
ContainerStartContext csc = captor.getValue();
|
||||||
assertNotNull("app ID null", csc.getAppId());
|
assertNotNull("app ID null", csc.getAppId());
|
||||||
assertNotNull("container null", csc.getContainer());
|
assertNotNull("container null", csc.getContainer());
|
||||||
|
|
|
@ -44,10 +44,12 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileg
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerClient;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerKillCommand;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerKillCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRmCommand;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRmCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStartCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStopCommand;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStopCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
|
||||||
|
@ -102,6 +104,7 @@ import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.r
|
||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RESOURCES_OPTIONS;
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RESOURCES_OPTIONS;
|
||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RUN_AS_USER;
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RUN_AS_USER;
|
||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.SIGNAL;
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.SIGNAL;
|
||||||
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.TC_COMMAND_FILE;
|
||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER;
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER;
|
||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER_FILECACHE_DIRS;
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER_FILECACHE_DIRS;
|
||||||
import static org.mockito.Matchers.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
|
@ -1942,6 +1945,32 @@ public class TestDockerContainerRuntime {
|
||||||
dockerCommands.get(counter++));
|
dockerCommands.get(counter++));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDockerContainerRelaunch()
|
||||||
|
throws ContainerExecutionException, PrivilegedOperationException,
|
||||||
|
IOException {
|
||||||
|
DockerLinuxContainerRuntime runtime = new MockRuntime(mockExecutor,
|
||||||
|
DockerCommandExecutor.DockerContainerStatus.EXITED, false);
|
||||||
|
runtime.initialize(conf, null);
|
||||||
|
runtime.relaunchContainer(builder.build());
|
||||||
|
|
||||||
|
PrivilegedOperation op = capturePrivilegedOperation();
|
||||||
|
List<String> args = op.getArguments();
|
||||||
|
String dockerCommandFile = args.get(11);
|
||||||
|
|
||||||
|
List<String> dockerCommands = Files.readAllLines(
|
||||||
|
Paths.get(dockerCommandFile), Charset.forName("UTF-8"));
|
||||||
|
|
||||||
|
int expected = 3;
|
||||||
|
int counter = 0;
|
||||||
|
Assert.assertEquals(expected, dockerCommands.size());
|
||||||
|
Assert.assertEquals("[docker-command-execution]",
|
||||||
|
dockerCommands.get(counter++));
|
||||||
|
Assert.assertEquals(" docker-command=start",
|
||||||
|
dockerCommands.get(counter++));
|
||||||
|
Assert.assertEquals(" name=container_id", dockerCommands.get(counter));
|
||||||
|
}
|
||||||
|
|
||||||
class MockRuntime extends DockerLinuxContainerRuntime {
|
class MockRuntime extends DockerLinuxContainerRuntime {
|
||||||
|
|
||||||
private PrivilegedOperationExecutor privilegedOperationExecutor;
|
private PrivilegedOperationExecutor privilegedOperationExecutor;
|
||||||
|
@ -2008,5 +2037,66 @@ public class TestDockerContainerRuntime {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void relaunchContainer(ContainerRuntimeContext ctx)
|
||||||
|
throws ContainerExecutionException {
|
||||||
|
if (DockerCommandExecutor.isRemovable(containerStatus)) {
|
||||||
|
String relaunchContainerIdStr =
|
||||||
|
ctx.getContainer().getContainerId().toString();
|
||||||
|
DockerStartCommand startCommand =
|
||||||
|
new DockerStartCommand(containerIdStr);
|
||||||
|
DockerClient dockerClient = new DockerClient(conf);
|
||||||
|
String commandFile = dockerClient.writeCommandToTempFile(startCommand,
|
||||||
|
relaunchContainerIdStr);
|
||||||
|
String relaunchRunAsUser = ctx.getExecutionAttribute(RUN_AS_USER);
|
||||||
|
Path relaunchNmPrivateContainerScriptPath = ctx.getExecutionAttribute(
|
||||||
|
NM_PRIVATE_CONTAINER_SCRIPT_PATH);
|
||||||
|
Path relaunchContainerWorkDir =
|
||||||
|
ctx.getExecutionAttribute(CONTAINER_WORK_DIR);
|
||||||
|
//we can't do better here thanks to type-erasure
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
List<String> relaunchLocalDirs = ctx.getExecutionAttribute(LOCAL_DIRS);
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
List<String> relaunchLogDirs = ctx.getExecutionAttribute(LOG_DIRS);
|
||||||
|
String resourcesOpts = ctx.getExecutionAttribute(RESOURCES_OPTIONS);
|
||||||
|
|
||||||
|
PrivilegedOperation launchOp = new PrivilegedOperation(
|
||||||
|
PrivilegedOperation.OperationType.LAUNCH_DOCKER_CONTAINER);
|
||||||
|
|
||||||
|
launchOp.appendArgs(relaunchRunAsUser, ctx.getExecutionAttribute(USER),
|
||||||
|
Integer.toString(PrivilegedOperation
|
||||||
|
.RunAsUserCommand.LAUNCH_DOCKER_CONTAINER.getValue()),
|
||||||
|
ctx.getExecutionAttribute(APPID),
|
||||||
|
relaunchContainerIdStr,
|
||||||
|
relaunchContainerWorkDir.toString(),
|
||||||
|
relaunchNmPrivateContainerScriptPath.toUri().getPath(),
|
||||||
|
ctx.getExecutionAttribute(NM_PRIVATE_TOKENS_PATH).toUri().getPath(),
|
||||||
|
ctx.getExecutionAttribute(PID_FILE_PATH).toString(),
|
||||||
|
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
|
||||||
|
relaunchLocalDirs),
|
||||||
|
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
|
||||||
|
relaunchLogDirs),
|
||||||
|
commandFile,
|
||||||
|
resourcesOpts);
|
||||||
|
|
||||||
|
String tcCommandFile = ctx.getExecutionAttribute(TC_COMMAND_FILE);
|
||||||
|
|
||||||
|
if (tcCommandFile != null) {
|
||||||
|
launchOp.appendArgs(tcCommandFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
privilegedOperationExecutor.executePrivilegedOperation(null,
|
||||||
|
launchOp, null, null, false, false);
|
||||||
|
} catch (PrivilegedOperationException e) {
|
||||||
|
LOG.warn("Relaunch container failed. Exception: ", e);
|
||||||
|
LOG.info("Docker command used: " + startCommand);
|
||||||
|
|
||||||
|
throw new ContainerExecutionException("Launch container failed", e
|
||||||
|
.getExitCode(), e.getOutput(), e.getErrorOutput());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
|
||||||
|
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the docker start command and any command line arguments.
|
||||||
|
*/
|
||||||
|
public class TestDockerStartCommand {
|
||||||
|
|
||||||
|
private DockerStartCommand dockerStartCommand;
|
||||||
|
|
||||||
|
private static final String CONTAINER_NAME = "foo";
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
dockerStartCommand = new DockerStartCommand(CONTAINER_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetCommandOption() {
|
||||||
|
assertEquals("start", dockerStartCommand.getCommandOption());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetCommandWithArguments() {
|
||||||
|
assertEquals("start", StringUtils.join(",",
|
||||||
|
dockerStartCommand.getDockerCommandWithArguments()
|
||||||
|
.get("docker-command")));
|
||||||
|
assertEquals("foo", StringUtils.join(",",
|
||||||
|
dockerStartCommand.getDockerCommandWithArguments().get("name")));
|
||||||
|
assertEquals(2, dockerStartCommand.getDockerCommandWithArguments().size());
|
||||||
|
}
|
||||||
|
}
|
|
@ -86,6 +86,11 @@ public class TestContainersMonitorResourceChange {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
|
public int relaunchContainer(ContainerStartContext ctx) throws
|
||||||
|
IOException, ConfigurationException {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
public boolean signalContainer(ContainerSignalContext ctx)
|
public boolean signalContainer(ContainerSignalContext ctx)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return true;
|
return true;
|
||||||
|
|
Loading…
Reference in New Issue