YARN-4759. Fix signal handling for docker containers. Contributed by Shane Kumpf.
(cherry picked from commit e5e558b0a3
)
This commit is contained in:
parent
abfaf0e0e7
commit
b4124c6a5a
|
@ -31,6 +31,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
|
||||
|
@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resource
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerClient;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStopCommand;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeContext;
|
||||
|
@ -445,26 +447,39 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
|
|||
public void signalContainer(ContainerRuntimeContext ctx)
|
||||
throws ContainerExecutionException {
|
||||
Container container = ctx.getContainer();
|
||||
PrivilegedOperation signalOp = new PrivilegedOperation(
|
||||
PrivilegedOperation.OperationType.SIGNAL_CONTAINER);
|
||||
ContainerExecutor.Signal signal = ctx.getExecutionAttribute(SIGNAL);
|
||||
|
||||
signalOp.appendArgs(ctx.getExecutionAttribute(RUN_AS_USER),
|
||||
PrivilegedOperation privOp = null;
|
||||
// Handle liveliness checks, send null signal to pid
|
||||
if(ContainerExecutor.Signal.NULL.equals(signal)) {
|
||||
privOp = new PrivilegedOperation(
|
||||
PrivilegedOperation.OperationType.SIGNAL_CONTAINER);
|
||||
privOp.appendArgs(ctx.getExecutionAttribute(RUN_AS_USER),
|
||||
ctx.getExecutionAttribute(USER),
|
||||
Integer.toString(PrivilegedOperation
|
||||
.RunAsUserCommand.SIGNAL_CONTAINER.getValue()),
|
||||
Integer.toString(PrivilegedOperation.RunAsUserCommand
|
||||
.SIGNAL_CONTAINER.getValue()),
|
||||
ctx.getExecutionAttribute(PID),
|
||||
Integer.toString(ctx.getExecutionAttribute(SIGNAL).getValue()));
|
||||
|
||||
// All other signals handled as docker stop
|
||||
} else {
|
||||
String containerId = ctx.getContainer().getContainerId().toString();
|
||||
DockerStopCommand stopCommand = new DockerStopCommand(containerId);
|
||||
String commandFile = dockerClient.writeCommandToTempFile(stopCommand,
|
||||
containerId);
|
||||
privOp = new PrivilegedOperation(
|
||||
PrivilegedOperation.OperationType.RUN_DOCKER_CMD);
|
||||
privOp.appendArgs(commandFile);
|
||||
}
|
||||
|
||||
//Some failures here are acceptable. Let the calling executor decide.
|
||||
privOp.disableFailureLogging();
|
||||
|
||||
try {
|
||||
PrivilegedOperationExecutor executor = PrivilegedOperationExecutor
|
||||
.getInstance(conf);
|
||||
|
||||
executor.executePrivilegedOperation(null,
|
||||
signalOp, null, container.getLaunchContext().getEnvironment(),
|
||||
false, true);
|
||||
privilegedOperationExecutor.executePrivilegedOperation(null,
|
||||
privOp, null, container.getLaunchContext().getEnvironment(),
|
||||
false, false);
|
||||
} catch (PrivilegedOperationException e) {
|
||||
LOG.warn("Signal container failed. Exception: ", e);
|
||||
|
||||
throw new ContainerExecutionException("Signal container failed", e
|
||||
.getExitCode(), e.getOutput(), e.getErrorOutput());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* *
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* /
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
|
||||
|
||||
/**
|
||||
* Encapsulates the docker stop command and its command
|
||||
* line arguments.
|
||||
*/
|
||||
public class DockerStopCommand extends DockerCommand {
|
||||
private static final String STOP_COMMAND = "stop";
|
||||
|
||||
public DockerStopCommand(String containerName) {
|
||||
super(STOP_COMMAND);
|
||||
super.addCommandArguments(containerName);
|
||||
}
|
||||
|
||||
public DockerStopCommand setGracePeriod(int value) {
|
||||
super.addCommandArguments("--time=" + Integer.toString(value));
|
||||
return this;
|
||||
}
|
||||
}
|
|
@ -283,7 +283,14 @@ static int write_pid_to_file_as_nm(const char* pid_file, pid_t pid) {
|
|||
* Write the exit code of the container into the exit code file
|
||||
* exit_code_file: Path to exit code file where exit code needs to be written
|
||||
*/
|
||||
static int write_exit_code_file(const char* exit_code_file, int exit_code) {
|
||||
static int write_exit_code_file_as_nm(const char* exit_code_file, int exit_code) {
|
||||
uid_t user = geteuid();
|
||||
gid_t group = getegid();
|
||||
if (change_effective_user(nm_uid, nm_gid) != 0) {
|
||||
fprintf(ERRORFILE, "Could not change to effective users %d, %d\n", nm_uid, nm_gid);
|
||||
fflush(ERRORFILE);
|
||||
return -1;
|
||||
}
|
||||
char *tmp_ecode_file = concatenate("%s.tmp", "exit_code_path", 1,
|
||||
exit_code_file);
|
||||
if (tmp_ecode_file == NULL) {
|
||||
|
@ -320,6 +327,13 @@ static int write_exit_code_file(const char* exit_code_file, int exit_code) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// always change back
|
||||
if (change_effective_user(user, group) != 0) {
|
||||
fprintf(ERRORFILE,
|
||||
"Could not change to effective users %d, %d\n", user, group);
|
||||
fflush(ERRORFILE);
|
||||
return -1;
|
||||
}
|
||||
free(tmp_ecode_file);
|
||||
return 0;
|
||||
}
|
||||
|
@ -357,11 +371,8 @@ static int wait_and_get_exit_code(pid_t pid) {
|
|||
static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
|
||||
int exit_code = -1;
|
||||
|
||||
if (change_effective_user(nm_uid, nm_gid) != 0) {
|
||||
return -1;
|
||||
}
|
||||
exit_code = wait_and_get_exit_code(pid);
|
||||
if (write_exit_code_file(exit_code_file, exit_code) < 0) {
|
||||
if (write_exit_code_file_as_nm(exit_code_file, exit_code) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1240,6 +1251,9 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
|
|||
int BUFFER_SIZE = 4096;
|
||||
char buffer[BUFFER_SIZE];
|
||||
|
||||
gid_t user_gid = getegid();
|
||||
uid_t prev_uid = geteuid();
|
||||
|
||||
char *docker_command = parse_docker_command_file(command_file);
|
||||
char *docker_binary = get_value(DOCKER_BINARY_KEY, &executor_cfg);
|
||||
if (docker_binary == NULL) {
|
||||
|
@ -1255,7 +1269,6 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
|
|||
fflush(ERRORFILE);
|
||||
goto cleanup;
|
||||
}
|
||||
gid_t user_gid = getegid();
|
||||
|
||||
fprintf(LOGFILE, "Creating local dirs...\n");
|
||||
exit_code = create_local_dirs(user, app_id, container_id,
|
||||
|
@ -1400,11 +1413,19 @@ cleanup:
|
|||
//clean up docker command file
|
||||
unlink(command_file);
|
||||
|
||||
if (exit_code_file != NULL && write_exit_code_file(exit_code_file, exit_code) < 0) {
|
||||
if (exit_code_file != NULL && write_exit_code_file_as_nm(exit_code_file, exit_code) < 0) {
|
||||
fprintf (ERRORFILE,
|
||||
"Could not write exit code to file %s.\n", exit_code_file);
|
||||
fflush(ERRORFILE);
|
||||
}
|
||||
|
||||
// Drop root privileges
|
||||
if (change_effective_user(prev_uid, user_gid) != 0) {
|
||||
fprintf(ERRORFILE,
|
||||
"Could not change to effective users %d, %d\n", prev_uid, user_gid);
|
||||
fflush(ERRORFILE);
|
||||
}
|
||||
|
||||
#if HAVE_FCLOSEALL
|
||||
fcloseall();
|
||||
#else
|
||||
|
|
|
@ -23,10 +23,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
|
||||
|
@ -92,6 +94,7 @@ public class TestDockerContainerRuntime {
|
|||
private final String submittingUser = "anakin";
|
||||
private final String whitelistedUser = "yoda";
|
||||
private String[] testCapabilities;
|
||||
private final String signalPid = "1234";
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
|
@ -187,7 +190,7 @@ public class TestDockerContainerRuntime {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private PrivilegedOperation capturePrivilegedOperationAndVerifyArgs()
|
||||
private PrivilegedOperation capturePrivilegedOperation()
|
||||
throws PrivilegedOperationException {
|
||||
ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass(
|
||||
PrivilegedOperation.class);
|
||||
|
@ -203,7 +206,14 @@ public class TestDockerContainerRuntime {
|
|||
// hence, reset mock here
|
||||
Mockito.reset(mockExecutor);
|
||||
|
||||
PrivilegedOperation op = opCaptor.getValue();
|
||||
return opCaptor.getValue();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private PrivilegedOperation capturePrivilegedOperationAndVerifyArgs()
|
||||
throws PrivilegedOperationException {
|
||||
|
||||
PrivilegedOperation op = capturePrivilegedOperation();
|
||||
|
||||
Assert.assertEquals(PrivilegedOperation.OperationType
|
||||
.LAUNCH_DOCKER_CONTAINER, op.getOperationType());
|
||||
|
@ -784,4 +794,89 @@ public class TestDockerContainerRuntime {
|
|||
":ro "));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerLivelinessCheck()
|
||||
throws ContainerExecutionException, PrivilegedOperationException {
|
||||
|
||||
DockerLinuxContainerRuntime runtime = new DockerLinuxContainerRuntime(
|
||||
mockExecutor, mockCGroupsHandler);
|
||||
builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
|
||||
.setExecutionAttribute(USER, user)
|
||||
.setExecutionAttribute(PID, signalPid)
|
||||
.setExecutionAttribute(SIGNAL, ContainerExecutor.Signal.NULL);
|
||||
runtime.initialize(getConfigurationWithMockContainerExecutor());
|
||||
runtime.signalContainer(builder.build());
|
||||
|
||||
PrivilegedOperation op = capturePrivilegedOperation();
|
||||
Assert.assertEquals(op.getOperationType(),
|
||||
PrivilegedOperation.OperationType.SIGNAL_CONTAINER);
|
||||
Assert.assertEquals("run_as_user", op.getArguments().get(0));
|
||||
Assert.assertEquals("user", op.getArguments().get(1));
|
||||
Assert.assertEquals("2", op.getArguments().get(2));
|
||||
Assert.assertEquals("1234", op.getArguments().get(3));
|
||||
Assert.assertEquals("0", op.getArguments().get(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDockerStopOnTermSignal()
|
||||
throws ContainerExecutionException, PrivilegedOperationException,
|
||||
IOException {
|
||||
List<String> dockerCommands = getDockerCommandsForSignal(
|
||||
ContainerExecutor.Signal.TERM);
|
||||
Assert.assertEquals(1, dockerCommands.size());
|
||||
Assert.assertEquals("stop container_id", dockerCommands.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDockerStopOnKillSignal()
|
||||
throws ContainerExecutionException, PrivilegedOperationException,
|
||||
IOException {
|
||||
List<String> dockerCommands = getDockerCommandsForSignal(
|
||||
ContainerExecutor.Signal.KILL);
|
||||
Assert.assertEquals(1, dockerCommands.size());
|
||||
Assert.assertEquals("stop container_id", dockerCommands.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDockerStopOnQuitSignal()
|
||||
throws ContainerExecutionException, PrivilegedOperationException,
|
||||
IOException {
|
||||
List<String> dockerCommands = getDockerCommandsForSignal(
|
||||
ContainerExecutor.Signal.QUIT);
|
||||
Assert.assertEquals(1, dockerCommands.size());
|
||||
Assert.assertEquals("stop container_id", dockerCommands.get(0));
|
||||
}
|
||||
|
||||
private List<String> getDockerCommandsForSignal(
|
||||
ContainerExecutor.Signal signal)
|
||||
throws ContainerExecutionException, PrivilegedOperationException,
|
||||
IOException {
|
||||
|
||||
DockerLinuxContainerRuntime runtime = new DockerLinuxContainerRuntime(
|
||||
mockExecutor, mockCGroupsHandler);
|
||||
builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
|
||||
.setExecutionAttribute(USER, user)
|
||||
.setExecutionAttribute(PID, signalPid)
|
||||
.setExecutionAttribute(SIGNAL, signal);
|
||||
runtime.initialize(getConfigurationWithMockContainerExecutor());
|
||||
runtime.signalContainer(builder.build());
|
||||
|
||||
PrivilegedOperation op = capturePrivilegedOperation();
|
||||
Assert.assertEquals(op.getOperationType(),
|
||||
PrivilegedOperation.OperationType.RUN_DOCKER_CMD);
|
||||
String dockerCommandFile = op.getArguments().get(0);
|
||||
return Files.readAllLines(Paths.get(dockerCommandFile),
|
||||
Charset.forName("UTF-8"));
|
||||
}
|
||||
|
||||
private Configuration getConfigurationWithMockContainerExecutor() {
|
||||
File f = new File("./src/test/resources/mock-container-executor");
|
||||
if(!FileUtil.canExecute(f)) {
|
||||
FileUtil.setExecutable(f, true);
|
||||
}
|
||||
String executorPath = f.getAbsolutePath();
|
||||
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
|
||||
return conf;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* *
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* /
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests the docker stop command and its command
|
||||
* line arguments.
|
||||
*/
|
||||
public class DockerStopCommandTest {
|
||||
|
||||
private DockerStopCommand dockerStopCommand;
|
||||
|
||||
private static final int GRACE_PERIOD = 10;
|
||||
private static final String CONTAINER_NAME = "foo";
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
dockerStopCommand = new DockerStopCommand(CONTAINER_NAME);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCommandOption() {
|
||||
assertEquals("stop", dockerStopCommand.getCommandOption());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetGracePeriod() throws Exception {
|
||||
dockerStopCommand.setGracePeriod(GRACE_PERIOD);
|
||||
assertEquals("stop foo --time=10",
|
||||
dockerStopCommand.getCommandWithArguments());
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue