YARN-7644. NM gets backed up deleting docker containers. Contributed by Chandni Singh
(cherry picked from commit 5ce70e1211
)
This commit is contained in:
parent
5f97c0cd76
commit
145c7aa663
|
@ -0,0 +1,229 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.EXIT_CODE_FILE_SUFFIX;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup the container.
|
||||||
|
* Cancels the launch if launch has not started yet or signals
|
||||||
|
* the executor to not execute the process if not already done so.
|
||||||
|
* Also, sends a SIGTERM followed by a SIGKILL to the process if
|
||||||
|
* the process id is available.
|
||||||
|
*/
|
||||||
|
public class ContainerCleanup implements Runnable {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ContainerCleanup.class);
|
||||||
|
|
||||||
|
private final Context context;
|
||||||
|
private final Configuration conf;
|
||||||
|
private final Dispatcher dispatcher;
|
||||||
|
private final ContainerExecutor exec;
|
||||||
|
private final Container container;
|
||||||
|
private final ContainerLaunch launch;
|
||||||
|
private final long sleepDelayBeforeSigKill;
|
||||||
|
|
||||||
|
|
||||||
|
public ContainerCleanup(Context context, Configuration configuration,
|
||||||
|
Dispatcher dispatcher, ContainerExecutor exec,
|
||||||
|
Container container,
|
||||||
|
ContainerLaunch containerLaunch) {
|
||||||
|
|
||||||
|
this.context = Preconditions.checkNotNull(context, "context");
|
||||||
|
this.conf = Preconditions.checkNotNull(configuration, "config");
|
||||||
|
this.dispatcher = Preconditions.checkNotNull(dispatcher, "dispatcher");
|
||||||
|
this.exec = Preconditions.checkNotNull(exec, "exec");
|
||||||
|
this.container = Preconditions.checkNotNull(container, "container");
|
||||||
|
this.launch = Preconditions.checkNotNull(containerLaunch, "launch");
|
||||||
|
this.sleepDelayBeforeSigKill = conf.getLong(
|
||||||
|
YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
ContainerId containerId = container.getContainerId();
|
||||||
|
String containerIdStr = containerId.toString();
|
||||||
|
LOG.info("Cleaning up container " + containerIdStr);
|
||||||
|
|
||||||
|
try {
|
||||||
|
context.getNMStateStore().storeContainerKilled(containerId);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to mark container " + containerId
|
||||||
|
+ " killed in store", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// launch flag will be set to true if process already launched
|
||||||
|
boolean alreadyLaunched = !launch.markLaunched();
|
||||||
|
if (!alreadyLaunched) {
|
||||||
|
LOG.info("Container " + containerIdStr + " not launched."
|
||||||
|
+ " No cleanup needed to be done");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Marking container " + containerIdStr + " as inactive");
|
||||||
|
}
|
||||||
|
// this should ensure that if the container process has not launched
|
||||||
|
// by this time, it will never be launched
|
||||||
|
exec.deactivateContainer(containerId);
|
||||||
|
Path pidFilePath = launch.getPidFilePath();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Getting pid for container {} to kill"
|
||||||
|
+ " from pid file {}", containerIdStr, pidFilePath != null ?
|
||||||
|
pidFilePath : "null");
|
||||||
|
}
|
||||||
|
|
||||||
|
// however the container process may have already started
|
||||||
|
try {
|
||||||
|
|
||||||
|
// get process id from pid file if available
|
||||||
|
// else if shell is still active, get it from the shell
|
||||||
|
String processId = launch.getContainerPid();
|
||||||
|
|
||||||
|
// kill process
|
||||||
|
String user = container.getUser();
|
||||||
|
if (processId != null) {
|
||||||
|
signalProcess(processId, user, containerIdStr);
|
||||||
|
} else {
|
||||||
|
// Normally this means that the process was notified about
|
||||||
|
// deactivateContainer above and did not start.
|
||||||
|
// Since we already set the state to RUNNING or REINITIALIZING
|
||||||
|
// we have to send a killed event to continue.
|
||||||
|
if (!launch.isLaunchCompleted()) {
|
||||||
|
LOG.warn("Container clean up before pid file created "
|
||||||
|
+ containerIdStr);
|
||||||
|
dispatcher.getEventHandler().handle(
|
||||||
|
new ContainerExitEvent(container.getContainerId(),
|
||||||
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||||
|
Shell.WINDOWS ?
|
||||||
|
ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() :
|
||||||
|
ContainerExecutor.ExitCode.TERMINATED.getExitCode(),
|
||||||
|
"Container terminated before pid file created."));
|
||||||
|
// There is a possibility that the launch grabbed the file name before
|
||||||
|
// the deactivateContainer above but it was slow enough to avoid
|
||||||
|
// getContainerPid.
|
||||||
|
// Increasing YarnConfiguration.NM_PROCESS_KILL_WAIT_MS
|
||||||
|
// reduces the likelihood of this race condition and process leak.
|
||||||
|
}
|
||||||
|
// The Docker container may not have fully started, reap the container.
|
||||||
|
if (DockerLinuxContainerRuntime.isDockerContainerRequested(conf,
|
||||||
|
container.getLaunchContext().getEnvironment())) {
|
||||||
|
reapDockerContainerNoPid(user);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
String message =
|
||||||
|
"Exception when trying to cleanup container " + containerIdStr
|
||||||
|
+ ": " + StringUtils.stringifyException(e);
|
||||||
|
LOG.warn(message);
|
||||||
|
dispatcher.getEventHandler().handle(
|
||||||
|
new ContainerDiagnosticsUpdateEvent(containerId, message));
|
||||||
|
} finally {
|
||||||
|
// cleanup pid file if present
|
||||||
|
if (pidFilePath != null) {
|
||||||
|
try {
|
||||||
|
FileContext lfs = FileContext.getLocalFSFileContext();
|
||||||
|
lfs.delete(pidFilePath, false);
|
||||||
|
lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("{} exception trying to delete pid file {}. Ignoring.",
|
||||||
|
containerId, pidFilePath, ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Reap the container
|
||||||
|
launch.reapContainer();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("{} exception trying to reap container. Ignoring.", containerId,
|
||||||
|
ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void signalProcess(String processId, String user,
|
||||||
|
String containerIdStr) throws IOException {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Sending signal to pid " + processId + " as user " + user
|
||||||
|
+ " for container " + containerIdStr);
|
||||||
|
}
|
||||||
|
final ContainerExecutor.Signal signal =
|
||||||
|
sleepDelayBeforeSigKill > 0 ? ContainerExecutor.Signal.TERM :
|
||||||
|
ContainerExecutor.Signal.KILL;
|
||||||
|
|
||||||
|
boolean result = sendSignal(user, processId, signal);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Sent signal " + signal + " to pid " + processId + " as user "
|
||||||
|
+ user + " for container " + containerIdStr + ", result="
|
||||||
|
+ (result ? "success" : "failed"));
|
||||||
|
}
|
||||||
|
if (sleepDelayBeforeSigKill > 0) {
|
||||||
|
new ContainerExecutor.DelayedProcessKiller(container, user, processId,
|
||||||
|
sleepDelayBeforeSigKill, ContainerExecutor.Signal.KILL, exec).start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean sendSignal(String user, String processId,
|
||||||
|
ContainerExecutor.Signal signal)
|
||||||
|
throws IOException {
|
||||||
|
return exec.signalContainer(
|
||||||
|
new ContainerSignalContext.Builder().setContainer(container)
|
||||||
|
.setUser(user).setPid(processId).setSignal(signal).build());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reapDockerContainerNoPid(String user) throws IOException {
|
||||||
|
String containerIdStr =
|
||||||
|
container.getContainerTokenIdentifier().getContainerID().toString();
|
||||||
|
LOG.info("Unable to obtain pid, but docker container request detected. "
|
||||||
|
+ "Attempting to reap container " + containerIdStr);
|
||||||
|
boolean result = exec.reapContainer(
|
||||||
|
new ContainerReapContext.Builder()
|
||||||
|
.setContainer(container)
|
||||||
|
.setUser(container.getUser())
|
||||||
|
.build());
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Sent signal to docker container " + containerIdStr
|
||||||
|
+ " as user " + user + ", result=" + (result ? "success" : "failed"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -70,7 +70,6 @@ import org.apache.hadoop.yarn.exceptions.ConfigurationException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
|
@ -85,7 +84,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext;
|
||||||
|
@ -115,7 +113,7 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
|
public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
|
||||||
|
|
||||||
private static final String PID_FILE_NAME_FMT = "%s.pid";
|
private static final String PID_FILE_NAME_FMT = "%s.pid";
|
||||||
private static final String EXIT_CODE_FILE_SUFFIX = ".exitcode";
|
static final String EXIT_CODE_FILE_SUFFIX = ".exitcode";
|
||||||
|
|
||||||
protected final Dispatcher dispatcher;
|
protected final Dispatcher dispatcher;
|
||||||
protected final ContainerExecutor exec;
|
protected final ContainerExecutor exec;
|
||||||
|
@ -131,7 +129,6 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
protected AtomicBoolean completed = new AtomicBoolean(false);
|
protected AtomicBoolean completed = new AtomicBoolean(false);
|
||||||
|
|
||||||
private volatile boolean killedBeforeStart = false;
|
private volatile boolean killedBeforeStart = false;
|
||||||
private long sleepDelayBeforeSigKill = 250;
|
|
||||||
private long maxKillWaitTime = 2000;
|
private long maxKillWaitTime = 2000;
|
||||||
|
|
||||||
protected Path pidFilePath = null;
|
protected Path pidFilePath = null;
|
||||||
|
@ -152,9 +149,6 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
this.dispatcher = dispatcher;
|
this.dispatcher = dispatcher;
|
||||||
this.dirsHandler = dirsHandler;
|
this.dirsHandler = dirsHandler;
|
||||||
this.containerManager = containerManager;
|
this.containerManager = containerManager;
|
||||||
this.sleepDelayBeforeSigKill =
|
|
||||||
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
|
|
||||||
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
|
|
||||||
this.maxKillWaitTime =
|
this.maxKillWaitTime =
|
||||||
conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
|
conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
|
||||||
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS);
|
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS);
|
||||||
|
@ -515,6 +509,25 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
return launchPrep;
|
return launchPrep;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void reapContainer() throws IOException {
|
||||||
|
containerExecLock.lock();
|
||||||
|
try {
|
||||||
|
// Reap the container
|
||||||
|
boolean result = exec.reapContainer(
|
||||||
|
new ContainerReapContext.Builder()
|
||||||
|
.setContainer(container)
|
||||||
|
.setUser(container.getUser())
|
||||||
|
.build());
|
||||||
|
if (!result) {
|
||||||
|
throw new IOException("Reap container failed for container " +
|
||||||
|
container.getContainerId());
|
||||||
|
}
|
||||||
|
cleanupContainerFiles(getContainerWorkDir());
|
||||||
|
} finally {
|
||||||
|
containerExecLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected int prepareForLaunch(ContainerStartContext ctx) throws IOException {
|
protected int prepareForLaunch(ContainerStartContext ctx) throws IOException {
|
||||||
ContainerId containerId = container.getContainerId();
|
ContainerId containerId = container.getContainerId();
|
||||||
if (container.isMarkedForKilling()) {
|
if (container.isMarkedForKilling()) {
|
||||||
|
@ -722,121 +735,6 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
+ String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr);
|
+ String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Cleanup the container.
|
|
||||||
* Cancels the launch if launch has not started yet or signals
|
|
||||||
* the executor to not execute the process if not already done so.
|
|
||||||
* Also, sends a SIGTERM followed by a SIGKILL to the process if
|
|
||||||
* the process id is available.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public void cleanupContainer() throws IOException {
|
|
||||||
ContainerId containerId = container.getContainerId();
|
|
||||||
String containerIdStr = containerId.toString();
|
|
||||||
LOG.info("Cleaning up container " + containerIdStr);
|
|
||||||
|
|
||||||
try {
|
|
||||||
context.getNMStateStore().storeContainerKilled(containerId);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Unable to mark container " + containerId
|
|
||||||
+ " killed in store", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
// launch flag will be set to true if process already launched
|
|
||||||
boolean alreadyLaunched =
|
|
||||||
!containerAlreadyLaunched.compareAndSet(false, true);
|
|
||||||
if (!alreadyLaunched) {
|
|
||||||
LOG.info("Container " + containerIdStr + " not launched."
|
|
||||||
+ " No cleanup needed to be done");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Marking container " + containerIdStr + " as inactive");
|
|
||||||
}
|
|
||||||
// this should ensure that if the container process has not launched
|
|
||||||
// by this time, it will never be launched
|
|
||||||
exec.deactivateContainer(containerId);
|
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Getting pid for container " + containerIdStr + " to kill"
|
|
||||||
+ " from pid file "
|
|
||||||
+ (pidFilePath != null ? pidFilePath.toString() : "null"));
|
|
||||||
}
|
|
||||||
|
|
||||||
// however the container process may have already started
|
|
||||||
try {
|
|
||||||
|
|
||||||
// get process id from pid file if available
|
|
||||||
// else if shell is still active, get it from the shell
|
|
||||||
String processId = null;
|
|
||||||
if (pidFilePath != null) {
|
|
||||||
processId = getContainerPid(pidFilePath);
|
|
||||||
}
|
|
||||||
|
|
||||||
// kill process
|
|
||||||
String user = container.getUser();
|
|
||||||
if (processId != null) {
|
|
||||||
signalProcess(processId, user, containerIdStr);
|
|
||||||
} else {
|
|
||||||
// Normally this means that the process was notified about
|
|
||||||
// deactivateContainer above and did not start.
|
|
||||||
// Since we already set the state to RUNNING or REINITIALIZING
|
|
||||||
// we have to send a killed event to continue.
|
|
||||||
if (!completed.get()) {
|
|
||||||
LOG.warn("Container clean up before pid file created "
|
|
||||||
+ containerIdStr);
|
|
||||||
dispatcher.getEventHandler().handle(
|
|
||||||
new ContainerExitEvent(container.getContainerId(),
|
|
||||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
|
||||||
Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
|
|
||||||
ExitCode.TERMINATED.getExitCode(),
|
|
||||||
"Container terminated before pid file created."));
|
|
||||||
// There is a possibility that the launch grabbed the file name before
|
|
||||||
// the deactivateContainer above but it was slow enough to avoid
|
|
||||||
// getContainerPid.
|
|
||||||
// Increasing YarnConfiguration.NM_PROCESS_KILL_WAIT_MS
|
|
||||||
// reduces the likelihood of this race condition and process leak.
|
|
||||||
}
|
|
||||||
// The Docker container may not have fully started, reap the container.
|
|
||||||
if (DockerLinuxContainerRuntime.isDockerContainerRequested(
|
|
||||||
conf,
|
|
||||||
container.getLaunchContext().getEnvironment())) {
|
|
||||||
reapDockerContainerNoPid(user);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
String message =
|
|
||||||
"Exception when trying to cleanup container " + containerIdStr
|
|
||||||
+ ": " + StringUtils.stringifyException(e);
|
|
||||||
LOG.warn(message);
|
|
||||||
dispatcher.getEventHandler().handle(
|
|
||||||
new ContainerDiagnosticsUpdateEvent(containerId, message));
|
|
||||||
} finally {
|
|
||||||
// cleanup pid file if present
|
|
||||||
if (pidFilePath != null) {
|
|
||||||
FileContext lfs = FileContext.getLocalFSFileContext();
|
|
||||||
lfs.delete(pidFilePath, false);
|
|
||||||
lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
containerExecLock.lock();
|
|
||||||
try {
|
|
||||||
// Reap the container
|
|
||||||
boolean result = exec.reapContainer(
|
|
||||||
new ContainerReapContext.Builder()
|
|
||||||
.setContainer(container)
|
|
||||||
.setUser(container.getUser())
|
|
||||||
.build());
|
|
||||||
if (!result) {
|
|
||||||
throw new IOException("Reap container failed for container "
|
|
||||||
+ containerIdStr);
|
|
||||||
}
|
|
||||||
cleanupContainerFiles(getContainerWorkDir());
|
|
||||||
} finally {
|
|
||||||
containerExecLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a signal to the container.
|
* Send a signal to the container.
|
||||||
*
|
*
|
||||||
|
@ -874,11 +772,7 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
try {
|
try {
|
||||||
// get process id from pid file if available
|
// get process id from pid file if available
|
||||||
// else if shell is still active, get it from the shell
|
// else if shell is still active, get it from the shell
|
||||||
String processId = null;
|
String processId = getContainerPid();
|
||||||
if (pidFilePath != null) {
|
|
||||||
processId = getContainerPid(pidFilePath);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (processId != null) {
|
if (processId != null) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Sending signal to pid " + processId
|
LOG.debug("Sending signal to pid " + processId
|
||||||
|
@ -912,50 +806,6 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean sendSignal(String user, String processId, Signal signal)
|
|
||||||
throws IOException {
|
|
||||||
return exec.signalContainer(
|
|
||||||
new ContainerSignalContext.Builder().setContainer(container)
|
|
||||||
.setUser(user).setPid(processId).setSignal(signal).build());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void signalProcess(String processId, String user,
|
|
||||||
String containerIdStr) throws IOException {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Sending signal to pid " + processId + " as user " + user
|
|
||||||
+ " for container " + containerIdStr);
|
|
||||||
}
|
|
||||||
final Signal signal =
|
|
||||||
sleepDelayBeforeSigKill > 0 ? Signal.TERM : Signal.KILL;
|
|
||||||
|
|
||||||
boolean result = sendSignal(user, processId, signal);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Sent signal " + signal + " to pid " + processId + " as user "
|
|
||||||
+ user + " for container " + containerIdStr + ", result="
|
|
||||||
+ (result ? "success" : "failed"));
|
|
||||||
}
|
|
||||||
if (sleepDelayBeforeSigKill > 0) {
|
|
||||||
new DelayedProcessKiller(container, user, processId,
|
|
||||||
sleepDelayBeforeSigKill, Signal.KILL, exec).start();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void reapDockerContainerNoPid(String user) throws IOException {
|
|
||||||
String containerIdStr =
|
|
||||||
container.getContainerTokenIdentifier().getContainerID().toString();
|
|
||||||
LOG.info("Unable to obtain pid, but docker container request detected. "
|
|
||||||
+ "Attempting to reap container " + containerIdStr);
|
|
||||||
boolean result = exec.reapContainer(
|
|
||||||
new ContainerReapContext.Builder()
|
|
||||||
.setContainer(container)
|
|
||||||
.setUser(container.getUser())
|
|
||||||
.build());
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Sent signal to docker container " + containerIdStr
|
|
||||||
+ " as user " + user + ", result=" + (result ? "success" : "failed"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static Signal translateCommandToSignal(
|
public static Signal translateCommandToSignal(
|
||||||
SignalContainerCommand command) {
|
SignalContainerCommand command) {
|
||||||
|
@ -1076,14 +926,16 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
/**
|
/**
|
||||||
* Loop through for a time-bounded interval waiting to
|
* Loop through for a time-bounded interval waiting to
|
||||||
* read the process id from a file generated by a running process.
|
* read the process id from a file generated by a running process.
|
||||||
* @param pidFilePath File from which to read the process id
|
* @return Process ID; null when pidFilePath is null
|
||||||
* @return Process ID
|
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
private String getContainerPid(Path pidFilePath) throws Exception {
|
String getContainerPid() throws Exception {
|
||||||
|
if (pidFilePath == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
String containerIdStr =
|
String containerIdStr =
|
||||||
container.getContainerId().toString();
|
container.getContainerId().toString();
|
||||||
String processId = null;
|
String processId;
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Accessing pid for container " + containerIdStr
|
LOG.debug("Accessing pid for container " + containerIdStr
|
||||||
+ " from pid file " + pidFilePath);
|
+ " from pid file " + pidFilePath);
|
||||||
|
@ -1889,4 +1741,28 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
LOG.warn("Failed to delete " + path, e);
|
LOG.warn("Failed to delete " + path, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the PID File Path.
|
||||||
|
*/
|
||||||
|
Path getPidFilePath() {
|
||||||
|
return pidFilePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marks the container to be launched only if it was not launched.
|
||||||
|
*
|
||||||
|
* @return true if successful; false otherwise.
|
||||||
|
*/
|
||||||
|
boolean markLaunched() {
|
||||||
|
return containerAlreadyLaunched.compareAndSet(false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns if the launch is completed or not.
|
||||||
|
*/
|
||||||
|
boolean isLaunchCompleted() {
|
||||||
|
return completed.get();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,6 @@ import java.util.Map;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -154,8 +153,8 @@ public class ContainersLauncher extends AbstractService
|
||||||
break;
|
break;
|
||||||
case CLEANUP_CONTAINER:
|
case CLEANUP_CONTAINER:
|
||||||
case CLEANUP_CONTAINER_FOR_REINIT:
|
case CLEANUP_CONTAINER_FOR_REINIT:
|
||||||
ContainerLaunch launcher = running.remove(containerId);
|
ContainerLaunch existingLaunch = running.remove(containerId);
|
||||||
if (launcher == null) {
|
if (existingLaunch == null) {
|
||||||
// Container not launched.
|
// Container not launched.
|
||||||
// triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition.
|
// triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition.
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
|
@ -169,12 +168,9 @@ public class ContainersLauncher extends AbstractService
|
||||||
|
|
||||||
// Cleanup a container whether it is running/killed/completed, so that
|
// Cleanup a container whether it is running/killed/completed, so that
|
||||||
// no sub-processes are alive.
|
// no sub-processes are alive.
|
||||||
try {
|
ContainerCleanup cleanup = new ContainerCleanup(context, getConfig(),
|
||||||
launcher.cleanupContainer();
|
dispatcher, exec, event.getContainer(), existingLaunch);
|
||||||
} catch (IOException e) {
|
containerLauncher.submit(cleanup);
|
||||||
LOG.warn("Got exception while cleaning container " + containerId
|
|
||||||
+ ". Ignoring.");
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case SIGNAL_CONTAINER:
|
case SIGNAL_CONTAINER:
|
||||||
SignalContainersLauncherEvent signalEvent =
|
SignalContainersLauncherEvent signalEvent =
|
||||||
|
|
|
@ -0,0 +1,108 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for {@link ContainerCleanup}.
|
||||||
|
*/
|
||||||
|
public class TestContainerCleanup {
|
||||||
|
|
||||||
|
private YarnConfiguration conf;
|
||||||
|
private ContainerId containerId;
|
||||||
|
private ContainerExecutor executor;
|
||||||
|
private ContainerLaunch launch;
|
||||||
|
private ContainerCleanup cleanup;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
conf = new YarnConfiguration();
|
||||||
|
conf.setLong(NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 60000);
|
||||||
|
Context context = mock(Context.class);
|
||||||
|
NMStateStoreService storeService = mock(NMStateStoreService.class);
|
||||||
|
when(context.getNMStateStore()).thenReturn(storeService);
|
||||||
|
|
||||||
|
Dispatcher dispatcher = new InlineDispatcher();
|
||||||
|
executor = mock(ContainerExecutor.class);
|
||||||
|
when(executor.signalContainer(Mockito.any(
|
||||||
|
ContainerSignalContext.class))).thenReturn(true);
|
||||||
|
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
|
||||||
|
1);
|
||||||
|
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
containerId = ContainerId.newContainerId(attemptId, 1);
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
|
||||||
|
when(container.getContainerId()).thenReturn(containerId);
|
||||||
|
|
||||||
|
launch = mock(ContainerLaunch.class);
|
||||||
|
launch.containerAlreadyLaunched = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
launch.pidFilePath = new Path("target/" + containerId.toString() + ".pid");
|
||||||
|
when(launch.getContainerPid()).thenReturn(containerId.toString());
|
||||||
|
|
||||||
|
cleanup = new ContainerCleanup(context, conf, dispatcher, executor,
|
||||||
|
container, launch);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoCleanupWhenContainerNotLaunched() throws IOException {
|
||||||
|
cleanup.run();
|
||||||
|
verify(launch, Mockito.times(0)).signalContainer(
|
||||||
|
Mockito.any(SignalContainerCommand.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCleanup() throws Exception {
|
||||||
|
launch.containerAlreadyLaunched.set(true);
|
||||||
|
cleanup.run();
|
||||||
|
ArgumentCaptor<ContainerSignalContext> captor =
|
||||||
|
ArgumentCaptor.forClass(ContainerSignalContext.class);
|
||||||
|
|
||||||
|
verify(executor, Mockito.times(1)).signalContainer(captor.capture());
|
||||||
|
Assert.assertEquals("signal", ContainerExecutor.Signal.TERM,
|
||||||
|
captor.getValue().getSignal());
|
||||||
|
}
|
||||||
|
}
|
|
@ -192,12 +192,14 @@ public class TestContainersLauncher {
|
||||||
.synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
|
.synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
|
||||||
dummyMap.put(containerId, containerLaunch);
|
dummyMap.put(containerId, containerLaunch);
|
||||||
Whitebox.setInternalState(spy, "running", dummyMap);
|
Whitebox.setInternalState(spy, "running", dummyMap);
|
||||||
|
|
||||||
when(event.getType())
|
when(event.getType())
|
||||||
.thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER);
|
.thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER);
|
||||||
doNothing().when(containerLaunch).cleanupContainer();
|
assertEquals(1, dummyMap.size());
|
||||||
spy.handle(event);
|
spy.handle(event);
|
||||||
assertEquals(0, dummyMap.size());
|
assertEquals(0, dummyMap.size());
|
||||||
Mockito.verify(containerLaunch, Mockito.times(1)).cleanupContainer();
|
Mockito.verify(containerLauncher, Mockito.times(1))
|
||||||
|
.submit(Mockito.any(ContainerCleanup.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -207,12 +209,14 @@ public class TestContainersLauncher {
|
||||||
.synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
|
.synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
|
||||||
dummyMap.put(containerId, containerLaunch);
|
dummyMap.put(containerId, containerLaunch);
|
||||||
Whitebox.setInternalState(spy, "running", dummyMap);
|
Whitebox.setInternalState(spy, "running", dummyMap);
|
||||||
|
|
||||||
when(event.getType())
|
when(event.getType())
|
||||||
.thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT);
|
.thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT);
|
||||||
doNothing().when(containerLaunch).cleanupContainer();
|
assertEquals(1, dummyMap.size());
|
||||||
spy.handle(event);
|
spy.handle(event);
|
||||||
assertEquals(0, dummyMap.size());
|
assertEquals(0, dummyMap.size());
|
||||||
Mockito.verify(containerLaunch, Mockito.times(1)).cleanupContainer();
|
Mockito.verify(containerLauncher, Mockito.times(1))
|
||||||
|
.submit(Mockito.any(ContainerCleanup.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue