YARN-7644. NM gets backed up deleting docker containers. Contributed by Chandni Singh

This commit is contained in:
Jason Lowe 2018-10-10 09:52:19 -05:00
parent cd280514b8
commit 5ce70e1211
5 changed files with 401 additions and 188 deletions

View File

@ -0,0 +1,229 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.EXIT_CODE_FILE_SUFFIX;
/**
* Cleanup the container.
* Cancels the launch if launch has not started yet or signals
* the executor to not execute the process if not already done so.
* Also, sends a SIGTERM followed by a SIGKILL to the process if
* the process id is available.
*/
public class ContainerCleanup implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerCleanup.class);
private final Context context;
private final Configuration conf;
private final Dispatcher dispatcher;
private final ContainerExecutor exec;
private final Container container;
private final ContainerLaunch launch;
private final long sleepDelayBeforeSigKill;
public ContainerCleanup(Context context, Configuration configuration,
Dispatcher dispatcher, ContainerExecutor exec,
Container container,
ContainerLaunch containerLaunch) {
this.context = Preconditions.checkNotNull(context, "context");
this.conf = Preconditions.checkNotNull(configuration, "config");
this.dispatcher = Preconditions.checkNotNull(dispatcher, "dispatcher");
this.exec = Preconditions.checkNotNull(exec, "exec");
this.container = Preconditions.checkNotNull(container, "container");
this.launch = Preconditions.checkNotNull(containerLaunch, "launch");
this.sleepDelayBeforeSigKill = conf.getLong(
YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
}
@Override
public void run() {
ContainerId containerId = container.getContainerId();
String containerIdStr = containerId.toString();
LOG.info("Cleaning up container " + containerIdStr);
try {
context.getNMStateStore().storeContainerKilled(containerId);
} catch (IOException e) {
LOG.error("Unable to mark container " + containerId
+ " killed in store", e);
}
// launch flag will be set to true if process already launched
boolean alreadyLaunched = !launch.markLaunched();
if (!alreadyLaunched) {
LOG.info("Container " + containerIdStr + " not launched."
+ " No cleanup needed to be done");
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Marking container " + containerIdStr + " as inactive");
}
// this should ensure that if the container process has not launched
// by this time, it will never be launched
exec.deactivateContainer(containerId);
Path pidFilePath = launch.getPidFilePath();
if (LOG.isDebugEnabled()) {
LOG.debug("Getting pid for container {} to kill"
+ " from pid file {}", containerIdStr, pidFilePath != null ?
pidFilePath : "null");
}
// however the container process may have already started
try {
// get process id from pid file if available
// else if shell is still active, get it from the shell
String processId = launch.getContainerPid();
// kill process
String user = container.getUser();
if (processId != null) {
signalProcess(processId, user, containerIdStr);
} else {
// Normally this means that the process was notified about
// deactivateContainer above and did not start.
// Since we already set the state to RUNNING or REINITIALIZING
// we have to send a killed event to continue.
if (!launch.isLaunchCompleted()) {
LOG.warn("Container clean up before pid file created "
+ containerIdStr);
dispatcher.getEventHandler().handle(
new ContainerExitEvent(container.getContainerId(),
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
Shell.WINDOWS ?
ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() :
ContainerExecutor.ExitCode.TERMINATED.getExitCode(),
"Container terminated before pid file created."));
// There is a possibility that the launch grabbed the file name before
// the deactivateContainer above but it was slow enough to avoid
// getContainerPid.
// Increasing YarnConfiguration.NM_PROCESS_KILL_WAIT_MS
// reduces the likelihood of this race condition and process leak.
}
// The Docker container may not have fully started, reap the container.
if (DockerLinuxContainerRuntime.isDockerContainerRequested(conf,
container.getLaunchContext().getEnvironment())) {
reapDockerContainerNoPid(user);
}
}
} catch (Exception e) {
String message =
"Exception when trying to cleanup container " + containerIdStr
+ ": " + StringUtils.stringifyException(e);
LOG.warn(message);
dispatcher.getEventHandler().handle(
new ContainerDiagnosticsUpdateEvent(containerId, message));
} finally {
// cleanup pid file if present
if (pidFilePath != null) {
try {
FileContext lfs = FileContext.getLocalFSFileContext();
lfs.delete(pidFilePath, false);
lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
} catch (IOException ioe) {
LOG.warn("{} exception trying to delete pid file {}. Ignoring.",
containerId, pidFilePath, ioe);
}
}
}
try {
// Reap the container
launch.reapContainer();
} catch (IOException ioe) {
LOG.warn("{} exception trying to reap container. Ignoring.", containerId,
ioe);
}
}
private void signalProcess(String processId, String user,
String containerIdStr) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending signal to pid " + processId + " as user " + user
+ " for container " + containerIdStr);
}
final ContainerExecutor.Signal signal =
sleepDelayBeforeSigKill > 0 ? ContainerExecutor.Signal.TERM :
ContainerExecutor.Signal.KILL;
boolean result = sendSignal(user, processId, signal);
if (LOG.isDebugEnabled()) {
LOG.debug("Sent signal " + signal + " to pid " + processId + " as user "
+ user + " for container " + containerIdStr + ", result="
+ (result ? "success" : "failed"));
}
if (sleepDelayBeforeSigKill > 0) {
new ContainerExecutor.DelayedProcessKiller(container, user, processId,
sleepDelayBeforeSigKill, ContainerExecutor.Signal.KILL, exec).start();
}
}
private boolean sendSignal(String user, String processId,
ContainerExecutor.Signal signal)
throws IOException {
return exec.signalContainer(
new ContainerSignalContext.Builder().setContainer(container)
.setUser(user).setPid(processId).setSignal(signal).build());
}
private void reapDockerContainerNoPid(String user) throws IOException {
String containerIdStr =
container.getContainerTokenIdentifier().getContainerID().toString();
LOG.info("Unable to obtain pid, but docker container request detected. "
+ "Attempting to reap container " + containerIdStr);
boolean result = exec.reapContainer(
new ContainerReapContext.Builder()
.setContainer(container)
.setUser(container.getUser())
.build());
if (LOG.isDebugEnabled()) {
LOG.debug("Sent signal to docker container " + containerIdStr
+ " as user " + user + ", result=" + (result ? "success" : "failed"));
}
}
}

View File

@ -70,7 +70,6 @@ import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -85,7 +84,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext;
@ -115,7 +113,7 @@ public class ContainerLaunch implements Callable<Integer> {
public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens"; public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
private static final String PID_FILE_NAME_FMT = "%s.pid"; private static final String PID_FILE_NAME_FMT = "%s.pid";
private static final String EXIT_CODE_FILE_SUFFIX = ".exitcode"; static final String EXIT_CODE_FILE_SUFFIX = ".exitcode";
protected final Dispatcher dispatcher; protected final Dispatcher dispatcher;
protected final ContainerExecutor exec; protected final ContainerExecutor exec;
@ -131,7 +129,6 @@ public class ContainerLaunch implements Callable<Integer> {
protected AtomicBoolean completed = new AtomicBoolean(false); protected AtomicBoolean completed = new AtomicBoolean(false);
private volatile boolean killedBeforeStart = false; private volatile boolean killedBeforeStart = false;
private long sleepDelayBeforeSigKill = 250;
private long maxKillWaitTime = 2000; private long maxKillWaitTime = 2000;
protected Path pidFilePath = null; protected Path pidFilePath = null;
@ -152,9 +149,6 @@ public class ContainerLaunch implements Callable<Integer> {
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
this.dirsHandler = dirsHandler; this.dirsHandler = dirsHandler;
this.containerManager = containerManager; this.containerManager = containerManager;
this.sleepDelayBeforeSigKill =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
this.maxKillWaitTime = this.maxKillWaitTime =
conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS, conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS); YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS);
@ -515,6 +509,25 @@ public class ContainerLaunch implements Callable<Integer> {
return launchPrep; return launchPrep;
} }
void reapContainer() throws IOException {
containerExecLock.lock();
try {
// Reap the container
boolean result = exec.reapContainer(
new ContainerReapContext.Builder()
.setContainer(container)
.setUser(container.getUser())
.build());
if (!result) {
throw new IOException("Reap container failed for container " +
container.getContainerId());
}
cleanupContainerFiles(getContainerWorkDir());
} finally {
containerExecLock.unlock();
}
}
protected int prepareForLaunch(ContainerStartContext ctx) throws IOException { protected int prepareForLaunch(ContainerStartContext ctx) throws IOException {
ContainerId containerId = container.getContainerId(); ContainerId containerId = container.getContainerId();
if (container.isMarkedForKilling()) { if (container.isMarkedForKilling()) {
@ -721,121 +734,6 @@ public class ContainerLaunch implements Callable<Integer> {
return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr); + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr);
} }
/**
* Cleanup the container.
* Cancels the launch if launch has not started yet or signals
* the executor to not execute the process if not already done so.
* Also, sends a SIGTERM followed by a SIGKILL to the process if
* the process id is available.
* @throws IOException
*/
public void cleanupContainer() throws IOException {
ContainerId containerId = container.getContainerId();
String containerIdStr = containerId.toString();
LOG.info("Cleaning up container " + containerIdStr);
try {
context.getNMStateStore().storeContainerKilled(containerId);
} catch (IOException e) {
LOG.error("Unable to mark container " + containerId
+ " killed in store", e);
}
// launch flag will be set to true if process already launched
boolean alreadyLaunched =
!containerAlreadyLaunched.compareAndSet(false, true);
if (!alreadyLaunched) {
LOG.info("Container " + containerIdStr + " not launched."
+ " No cleanup needed to be done");
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Marking container " + containerIdStr + " as inactive");
}
// this should ensure that if the container process has not launched
// by this time, it will never be launched
exec.deactivateContainer(containerId);
if (LOG.isDebugEnabled()) {
LOG.debug("Getting pid for container " + containerIdStr + " to kill"
+ " from pid file "
+ (pidFilePath != null ? pidFilePath.toString() : "null"));
}
// however the container process may have already started
try {
// get process id from pid file if available
// else if shell is still active, get it from the shell
String processId = null;
if (pidFilePath != null) {
processId = getContainerPid(pidFilePath);
}
// kill process
String user = container.getUser();
if (processId != null) {
signalProcess(processId, user, containerIdStr);
} else {
// Normally this means that the process was notified about
// deactivateContainer above and did not start.
// Since we already set the state to RUNNING or REINITIALIZING
// we have to send a killed event to continue.
if (!completed.get()) {
LOG.warn("Container clean up before pid file created "
+ containerIdStr);
dispatcher.getEventHandler().handle(
new ContainerExitEvent(container.getContainerId(),
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
ExitCode.TERMINATED.getExitCode(),
"Container terminated before pid file created."));
// There is a possibility that the launch grabbed the file name before
// the deactivateContainer above but it was slow enough to avoid
// getContainerPid.
// Increasing YarnConfiguration.NM_PROCESS_KILL_WAIT_MS
// reduces the likelihood of this race condition and process leak.
}
// The Docker container may not have fully started, reap the container.
if (DockerLinuxContainerRuntime.isDockerContainerRequested(
conf,
container.getLaunchContext().getEnvironment())) {
reapDockerContainerNoPid(user);
}
}
} catch (Exception e) {
String message =
"Exception when trying to cleanup container " + containerIdStr
+ ": " + StringUtils.stringifyException(e);
LOG.warn(message);
dispatcher.getEventHandler().handle(
new ContainerDiagnosticsUpdateEvent(containerId, message));
} finally {
// cleanup pid file if present
if (pidFilePath != null) {
FileContext lfs = FileContext.getLocalFSFileContext();
lfs.delete(pidFilePath, false);
lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
}
}
containerExecLock.lock();
try {
// Reap the container
boolean result = exec.reapContainer(
new ContainerReapContext.Builder()
.setContainer(container)
.setUser(container.getUser())
.build());
if (!result) {
throw new IOException("Reap container failed for container "
+ containerIdStr);
}
cleanupContainerFiles(getContainerWorkDir());
} finally {
containerExecLock.unlock();
}
}
/** /**
* Send a signal to the container. * Send a signal to the container.
@ -874,11 +772,7 @@ public class ContainerLaunch implements Callable<Integer> {
try { try {
// get process id from pid file if available // get process id from pid file if available
// else if shell is still active, get it from the shell // else if shell is still active, get it from the shell
String processId = null; String processId = getContainerPid();
if (pidFilePath != null) {
processId = getContainerPid(pidFilePath);
}
if (processId != null) { if (processId != null) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Sending signal to pid " + processId LOG.debug("Sending signal to pid " + processId
@ -912,50 +806,6 @@ public class ContainerLaunch implements Callable<Integer> {
} }
} }
private boolean sendSignal(String user, String processId, Signal signal)
throws IOException {
return exec.signalContainer(
new ContainerSignalContext.Builder().setContainer(container)
.setUser(user).setPid(processId).setSignal(signal).build());
}
private void signalProcess(String processId, String user,
String containerIdStr) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending signal to pid " + processId + " as user " + user
+ " for container " + containerIdStr);
}
final Signal signal =
sleepDelayBeforeSigKill > 0 ? Signal.TERM : Signal.KILL;
boolean result = sendSignal(user, processId, signal);
if (LOG.isDebugEnabled()) {
LOG.debug("Sent signal " + signal + " to pid " + processId + " as user "
+ user + " for container " + containerIdStr + ", result="
+ (result ? "success" : "failed"));
}
if (sleepDelayBeforeSigKill > 0) {
new DelayedProcessKiller(container, user, processId,
sleepDelayBeforeSigKill, Signal.KILL, exec).start();
}
}
private void reapDockerContainerNoPid(String user) throws IOException {
String containerIdStr =
container.getContainerTokenIdentifier().getContainerID().toString();
LOG.info("Unable to obtain pid, but docker container request detected. "
+ "Attempting to reap container " + containerIdStr);
boolean result = exec.reapContainer(
new ContainerReapContext.Builder()
.setContainer(container)
.setUser(container.getUser())
.build());
if (LOG.isDebugEnabled()) {
LOG.debug("Sent signal to docker container " + containerIdStr
+ " as user " + user + ", result=" + (result ? "success" : "failed"));
}
}
@VisibleForTesting @VisibleForTesting
public static Signal translateCommandToSignal( public static Signal translateCommandToSignal(
SignalContainerCommand command) { SignalContainerCommand command) {
@ -1076,14 +926,16 @@ public class ContainerLaunch implements Callable<Integer> {
/** /**
* Loop through for a time-bounded interval waiting to * Loop through for a time-bounded interval waiting to
* read the process id from a file generated by a running process. * read the process id from a file generated by a running process.
* @param pidFilePath File from which to read the process id * @return Process ID; null when pidFilePath is null
* @return Process ID
* @throws Exception * @throws Exception
*/ */
private String getContainerPid(Path pidFilePath) throws Exception { String getContainerPid() throws Exception {
if (pidFilePath == null) {
return null;
}
String containerIdStr = String containerIdStr =
container.getContainerId().toString(); container.getContainerId().toString();
String processId = null; String processId;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Accessing pid for container " + containerIdStr LOG.debug("Accessing pid for container " + containerIdStr
+ " from pid file " + pidFilePath); + " from pid file " + pidFilePath);
@ -1889,4 +1741,28 @@ public class ContainerLaunch implements Callable<Integer> {
LOG.warn("Failed to delete " + path, e); LOG.warn("Failed to delete " + path, e);
} }
} }
/**
* Returns the PID File Path.
*/
Path getPidFilePath() {
return pidFilePath;
}
/**
* Marks the container to be launched only if it was not launched.
*
* @return true if successful; false otherwise.
*/
boolean markLaunched() {
return containerAlreadyLaunched.compareAndSet(false, true);
}
/**
* Returns if the launch is completed or not.
*/
boolean isLaunchCompleted() {
return completed.get();
}
} }

View File

@ -25,7 +25,6 @@ import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -154,8 +153,8 @@ public class ContainersLauncher extends AbstractService
break; break;
case CLEANUP_CONTAINER: case CLEANUP_CONTAINER:
case CLEANUP_CONTAINER_FOR_REINIT: case CLEANUP_CONTAINER_FOR_REINIT:
ContainerLaunch launcher = running.remove(containerId); ContainerLaunch existingLaunch = running.remove(containerId);
if (launcher == null) { if (existingLaunch == null) {
// Container not launched. // Container not launched.
// triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition. // triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition.
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
@ -169,12 +168,9 @@ public class ContainersLauncher extends AbstractService
// Cleanup a container whether it is running/killed/completed, so that // Cleanup a container whether it is running/killed/completed, so that
// no sub-processes are alive. // no sub-processes are alive.
try { ContainerCleanup cleanup = new ContainerCleanup(context, getConfig(),
launcher.cleanupContainer(); dispatcher, exec, event.getContainer(), existingLaunch);
} catch (IOException e) { containerLauncher.submit(cleanup);
LOG.warn("Got exception while cleaning container " + containerId
+ ". Ignoring.");
}
break; break;
case SIGNAL_CONTAINER: case SIGNAL_CONTAINER:
SignalContainersLauncherEvent signalEvent = SignalContainersLauncherEvent signalEvent =

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests for {@link ContainerCleanup}.
*/
public class TestContainerCleanup {
private YarnConfiguration conf;
private ContainerId containerId;
private ContainerExecutor executor;
private ContainerLaunch launch;
private ContainerCleanup cleanup;
@Before
public void setup() throws Exception {
conf = new YarnConfiguration();
conf.setLong(NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 60000);
Context context = mock(Context.class);
NMStateStoreService storeService = mock(NMStateStoreService.class);
when(context.getNMStateStore()).thenReturn(storeService);
Dispatcher dispatcher = new InlineDispatcher();
executor = mock(ContainerExecutor.class);
when(executor.signalContainer(Mockito.any(
ContainerSignalContext.class))).thenReturn(true);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
1);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
containerId = ContainerId.newContainerId(attemptId, 1);
Container container = mock(Container.class);
when(container.getContainerId()).thenReturn(containerId);
launch = mock(ContainerLaunch.class);
launch.containerAlreadyLaunched = new AtomicBoolean(false);
launch.pidFilePath = new Path("target/" + containerId.toString() + ".pid");
when(launch.getContainerPid()).thenReturn(containerId.toString());
cleanup = new ContainerCleanup(context, conf, dispatcher, executor,
container, launch);
}
@Test
public void testNoCleanupWhenContainerNotLaunched() throws IOException {
cleanup.run();
verify(launch, Mockito.times(0)).signalContainer(
Mockito.any(SignalContainerCommand.class));
}
@Test
public void testCleanup() throws Exception {
launch.containerAlreadyLaunched.set(true);
cleanup.run();
ArgumentCaptor<ContainerSignalContext> captor =
ArgumentCaptor.forClass(ContainerSignalContext.class);
verify(executor, Mockito.times(1)).signalContainer(captor.capture());
Assert.assertEquals("signal", ContainerExecutor.Signal.TERM,
captor.getValue().getSignal());
}
}

View File

@ -192,12 +192,14 @@ public class TestContainersLauncher {
.synchronizedMap(new HashMap<ContainerId, ContainerLaunch>()); .synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
dummyMap.put(containerId, containerLaunch); dummyMap.put(containerId, containerLaunch);
Whitebox.setInternalState(spy, "running", dummyMap); Whitebox.setInternalState(spy, "running", dummyMap);
when(event.getType()) when(event.getType())
.thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER); .thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER);
doNothing().when(containerLaunch).cleanupContainer(); assertEquals(1, dummyMap.size());
spy.handle(event); spy.handle(event);
assertEquals(0, dummyMap.size()); assertEquals(0, dummyMap.size());
Mockito.verify(containerLaunch, Mockito.times(1)).cleanupContainer(); Mockito.verify(containerLauncher, Mockito.times(1))
.submit(Mockito.any(ContainerCleanup.class));
} }
@Test @Test
@ -207,12 +209,14 @@ public class TestContainersLauncher {
.synchronizedMap(new HashMap<ContainerId, ContainerLaunch>()); .synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
dummyMap.put(containerId, containerLaunch); dummyMap.put(containerId, containerLaunch);
Whitebox.setInternalState(spy, "running", dummyMap); Whitebox.setInternalState(spy, "running", dummyMap);
when(event.getType()) when(event.getType())
.thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT); .thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT);
doNothing().when(containerLaunch).cleanupContainer(); assertEquals(1, dummyMap.size());
spy.handle(event); spy.handle(event);
assertEquals(0, dummyMap.size()); assertEquals(0, dummyMap.size());
Mockito.verify(containerLaunch, Mockito.times(1)).cleanupContainer(); Mockito.verify(containerLauncher, Mockito.times(1))
.submit(Mockito.any(ContainerCleanup.class));
} }
@Test @Test