YARN-4759. Fix signal handling for docker containers. Contributed by Shane Kumpf.

This commit is contained in:
Varun Vasudev 2016-07-14 19:27:16 +05:30
parent 58e1850801
commit e5e558b0a3
5 changed files with 250 additions and 25 deletions

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resource
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerClient;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStopCommand;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeContext;
@ -445,26 +447,39 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
public void signalContainer(ContainerRuntimeContext ctx)
throws ContainerExecutionException {
Container container = ctx.getContainer();
PrivilegedOperation signalOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.SIGNAL_CONTAINER);
ContainerExecutor.Signal signal = ctx.getExecutionAttribute(SIGNAL);
signalOp.appendArgs(ctx.getExecutionAttribute(RUN_AS_USER),
ctx.getExecutionAttribute(USER),
Integer.toString(PrivilegedOperation
.RunAsUserCommand.SIGNAL_CONTAINER.getValue()),
ctx.getExecutionAttribute(PID),
Integer.toString(ctx.getExecutionAttribute(SIGNAL).getValue()));
PrivilegedOperation privOp = null;
// Handle liveliness checks, send null signal to pid
if(ContainerExecutor.Signal.NULL.equals(signal)) {
privOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.SIGNAL_CONTAINER);
privOp.appendArgs(ctx.getExecutionAttribute(RUN_AS_USER),
ctx.getExecutionAttribute(USER),
Integer.toString(PrivilegedOperation.RunAsUserCommand
.SIGNAL_CONTAINER.getValue()),
ctx.getExecutionAttribute(PID),
Integer.toString(ctx.getExecutionAttribute(SIGNAL).getValue()));
// All other signals handled as docker stop
} else {
String containerId = ctx.getContainer().getContainerId().toString();
DockerStopCommand stopCommand = new DockerStopCommand(containerId);
String commandFile = dockerClient.writeCommandToTempFile(stopCommand,
containerId);
privOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.RUN_DOCKER_CMD);
privOp.appendArgs(commandFile);
}
//Some failures here are acceptable. Let the calling executor decide.
privOp.disableFailureLogging();
try {
PrivilegedOperationExecutor executor = PrivilegedOperationExecutor
.getInstance(conf);
executor.executePrivilegedOperation(null,
signalOp, null, container.getLaunchContext().getEnvironment(),
false, true);
privilegedOperationExecutor.executePrivilegedOperation(null,
privOp, null, container.getLaunchContext().getEnvironment(),
false, false);
} catch (PrivilegedOperationException e) {
LOG.warn("Signal container failed. Exception: ", e);
throw new ContainerExecutionException("Signal container failed", e
.getExitCode(), e.getOutput(), e.getErrorOutput());
}

View File

@ -0,0 +1,39 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
/**
* Encapsulates the docker stop command and its command
* line arguments.
*/
public class DockerStopCommand extends DockerCommand {
private static final String STOP_COMMAND = "stop";
public DockerStopCommand(String containerName) {
super(STOP_COMMAND);
super.addCommandArguments(containerName);
}
public DockerStopCommand setGracePeriod(int value) {
super.addCommandArguments("--time=" + Integer.toString(value));
return this;
}
}

View File

@ -283,7 +283,14 @@ static int write_pid_to_file_as_nm(const char* pid_file, pid_t pid) {
* Write the exit code of the container into the exit code file
* exit_code_file: Path to exit code file where exit code needs to be written
*/
static int write_exit_code_file(const char* exit_code_file, int exit_code) {
static int write_exit_code_file_as_nm(const char* exit_code_file, int exit_code) {
uid_t user = geteuid();
gid_t group = getegid();
if (change_effective_user(nm_uid, nm_gid) != 0) {
fprintf(ERRORFILE, "Could not change to effective users %d, %d\n", nm_uid, nm_gid);
fflush(ERRORFILE);
return -1;
}
char *tmp_ecode_file = concatenate("%s.tmp", "exit_code_path", 1,
exit_code_file);
if (tmp_ecode_file == NULL) {
@ -320,6 +327,13 @@ static int write_exit_code_file(const char* exit_code_file, int exit_code) {
return -1;
}
// always change back
if (change_effective_user(user, group) != 0) {
fprintf(ERRORFILE,
"Could not change to effective users %d, %d\n", user, group);
fflush(ERRORFILE);
return -1;
}
free(tmp_ecode_file);
return 0;
}
@ -357,11 +371,8 @@ static int wait_and_get_exit_code(pid_t pid) {
static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
int exit_code = -1;
if (change_effective_user(nm_uid, nm_gid) != 0) {
return -1;
}
exit_code = wait_and_get_exit_code(pid);
if (write_exit_code_file(exit_code_file, exit_code) < 0) {
if (write_exit_code_file_as_nm(exit_code_file, exit_code) < 0) {
return -1;
}
@ -1240,6 +1251,9 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
int BUFFER_SIZE = 4096;
char buffer[BUFFER_SIZE];
gid_t user_gid = getegid();
uid_t prev_uid = geteuid();
char *docker_command = parse_docker_command_file(command_file);
char *docker_binary = get_value(DOCKER_BINARY_KEY, &executor_cfg);
if (docker_binary == NULL) {
@ -1255,7 +1269,6 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
fflush(ERRORFILE);
goto cleanup;
}
gid_t user_gid = getegid();
fprintf(LOGFILE, "Creating local dirs...\n");
exit_code = create_local_dirs(user, app_id, container_id,
@ -1400,11 +1413,19 @@ cleanup:
//clean up docker command file
unlink(command_file);
if (exit_code_file != NULL && write_exit_code_file(exit_code_file, exit_code) < 0) {
if (exit_code_file != NULL && write_exit_code_file_as_nm(exit_code_file, exit_code) < 0) {
fprintf (ERRORFILE,
"Could not write exit code to file %s.\n", exit_code_file);
fflush(ERRORFILE);
}
// Drop root privileges
if (change_effective_user(prev_uid, user_gid) != 0) {
fprintf(ERRORFILE,
"Could not change to effective users %d, %d\n", prev_uid, user_gid);
fflush(ERRORFILE);
}
#if HAVE_FCLOSEALL
fcloseall();
#else

View File

@ -23,10 +23,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
@ -92,6 +94,7 @@ public class TestDockerContainerRuntime {
private final String submittingUser = "anakin";
private final String whitelistedUser = "yoda";
private String[] testCapabilities;
private final String signalPid = "1234";
@Before
public void setup() {
@ -187,7 +190,7 @@ public class TestDockerContainerRuntime {
}
@SuppressWarnings("unchecked")
private PrivilegedOperation capturePrivilegedOperationAndVerifyArgs()
private PrivilegedOperation capturePrivilegedOperation()
throws PrivilegedOperationException {
ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass(
PrivilegedOperation.class);
@ -203,7 +206,14 @@ public class TestDockerContainerRuntime {
// hence, reset mock here
Mockito.reset(mockExecutor);
PrivilegedOperation op = opCaptor.getValue();
return opCaptor.getValue();
}
@SuppressWarnings("unchecked")
private PrivilegedOperation capturePrivilegedOperationAndVerifyArgs()
throws PrivilegedOperationException {
PrivilegedOperation op = capturePrivilegedOperation();
Assert.assertEquals(PrivilegedOperation.OperationType
.LAUNCH_DOCKER_CONTAINER, op.getOperationType());
@ -784,4 +794,89 @@ public class TestDockerContainerRuntime {
":ro "));
}
@Test
public void testContainerLivelinessCheck()
throws ContainerExecutionException, PrivilegedOperationException {
DockerLinuxContainerRuntime runtime = new DockerLinuxContainerRuntime(
mockExecutor, mockCGroupsHandler);
builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
.setExecutionAttribute(USER, user)
.setExecutionAttribute(PID, signalPid)
.setExecutionAttribute(SIGNAL, ContainerExecutor.Signal.NULL);
runtime.initialize(getConfigurationWithMockContainerExecutor());
runtime.signalContainer(builder.build());
PrivilegedOperation op = capturePrivilegedOperation();
Assert.assertEquals(op.getOperationType(),
PrivilegedOperation.OperationType.SIGNAL_CONTAINER);
Assert.assertEquals("run_as_user", op.getArguments().get(0));
Assert.assertEquals("user", op.getArguments().get(1));
Assert.assertEquals("2", op.getArguments().get(2));
Assert.assertEquals("1234", op.getArguments().get(3));
Assert.assertEquals("0", op.getArguments().get(4));
}
@Test
public void testDockerStopOnTermSignal()
throws ContainerExecutionException, PrivilegedOperationException,
IOException {
List<String> dockerCommands = getDockerCommandsForSignal(
ContainerExecutor.Signal.TERM);
Assert.assertEquals(1, dockerCommands.size());
Assert.assertEquals("stop container_id", dockerCommands.get(0));
}
@Test
public void testDockerStopOnKillSignal()
throws ContainerExecutionException, PrivilegedOperationException,
IOException {
List<String> dockerCommands = getDockerCommandsForSignal(
ContainerExecutor.Signal.KILL);
Assert.assertEquals(1, dockerCommands.size());
Assert.assertEquals("stop container_id", dockerCommands.get(0));
}
@Test
public void testDockerStopOnQuitSignal()
throws ContainerExecutionException, PrivilegedOperationException,
IOException {
List<String> dockerCommands = getDockerCommandsForSignal(
ContainerExecutor.Signal.QUIT);
Assert.assertEquals(1, dockerCommands.size());
Assert.assertEquals("stop container_id", dockerCommands.get(0));
}
private List<String> getDockerCommandsForSignal(
ContainerExecutor.Signal signal)
throws ContainerExecutionException, PrivilegedOperationException,
IOException {
DockerLinuxContainerRuntime runtime = new DockerLinuxContainerRuntime(
mockExecutor, mockCGroupsHandler);
builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
.setExecutionAttribute(USER, user)
.setExecutionAttribute(PID, signalPid)
.setExecutionAttribute(SIGNAL, signal);
runtime.initialize(getConfigurationWithMockContainerExecutor());
runtime.signalContainer(builder.build());
PrivilegedOperation op = capturePrivilegedOperation();
Assert.assertEquals(op.getOperationType(),
PrivilegedOperation.OperationType.RUN_DOCKER_CMD);
String dockerCommandFile = op.getArguments().get(0);
return Files.readAllLines(Paths.get(dockerCommandFile),
Charset.forName("UTF-8"));
}
private Configuration getConfigurationWithMockContainerExecutor() {
File f = new File("./src/test/resources/mock-container-executor");
if(!FileUtil.canExecute(f)) {
FileUtil.setExecutable(f, true);
}
String executorPath = f.getAbsolutePath();
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
return conf;
}
}

View File

@ -0,0 +1,55 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
/**
* Tests the docker stop command and its command
* line arguments.
*/
public class DockerStopCommandTest {
private DockerStopCommand dockerStopCommand;
private static final int GRACE_PERIOD = 10;
private static final String CONTAINER_NAME = "foo";
@Before
public void setup() {
dockerStopCommand = new DockerStopCommand(CONTAINER_NAME);
}
@Test
public void testGetCommandOption() {
assertEquals("stop", dockerStopCommand.getCommandOption());
}
@Test
public void testSetGracePeriod() throws Exception {
dockerStopCommand.setGracePeriod(GRACE_PERIOD);
assertEquals("stop foo --time=10",
dockerStopCommand.getCommandWithArguments());
}
}