From 145c7aa663bcc55e7f354fe4ae12110650eb4c42 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 10 Oct 2018 09:52:19 -0500 Subject: [PATCH] YARN-7644. NM gets backed up deleting docker containers. Contributed by Chandni Singh (cherry picked from commit 5ce70e1211e624d58e8bb1181aec00729ebdc085) --- .../launcher/ContainerCleanup.java | 229 ++++++++++++++++++ .../launcher/ContainerLaunch.java | 226 ++++------------- .../launcher/ContainersLauncher.java | 14 +- .../launcher/TestContainerCleanup.java | 108 +++++++++ .../launcher/TestContainersLauncher.java | 12 +- 5 files changed, 401 insertions(+), 188 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerCleanup.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerCleanup.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerCleanup.java new file mode 100644 index 00000000000..963d28b54f0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerCleanup.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.EXIT_CODE_FILE_SUFFIX; + +/** + * Cleanup the container. + * Cancels the launch if launch has not started yet or signals + * the executor to not execute the process if not already done so. + * Also, sends a SIGTERM followed by a SIGKILL to the process if + * the process id is available. + */ +public class ContainerCleanup implements Runnable { + + private static final Logger LOG = + LoggerFactory.getLogger(ContainerCleanup.class); + + private final Context context; + private final Configuration conf; + private final Dispatcher dispatcher; + private final ContainerExecutor exec; + private final Container container; + private final ContainerLaunch launch; + private final long sleepDelayBeforeSigKill; + + + public ContainerCleanup(Context context, Configuration configuration, + Dispatcher dispatcher, ContainerExecutor exec, + Container container, + ContainerLaunch containerLaunch) { + + this.context = Preconditions.checkNotNull(context, "context"); + this.conf = Preconditions.checkNotNull(configuration, "config"); + this.dispatcher = Preconditions.checkNotNull(dispatcher, "dispatcher"); + this.exec = Preconditions.checkNotNull(exec, "exec"); + this.container = Preconditions.checkNotNull(container, "container"); + this.launch = Preconditions.checkNotNull(containerLaunch, "launch"); + this.sleepDelayBeforeSigKill = conf.getLong( + YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, + YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS); + } + + @Override + public void run() { + ContainerId containerId = container.getContainerId(); + String containerIdStr = containerId.toString(); + LOG.info("Cleaning up container " + containerIdStr); + + try { + context.getNMStateStore().storeContainerKilled(containerId); + } catch (IOException e) { + LOG.error("Unable to mark container " + containerId + + " killed in store", e); + } + + // launch flag will be set to true if process already launched + boolean alreadyLaunched = !launch.markLaunched(); + if (!alreadyLaunched) { + LOG.info("Container " + containerIdStr + " not launched." + + " No cleanup needed to be done"); + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Marking container " + containerIdStr + " as inactive"); + } + // this should ensure that if the container process has not launched + // by this time, it will never be launched + exec.deactivateContainer(containerId); + Path pidFilePath = launch.getPidFilePath(); + if (LOG.isDebugEnabled()) { + LOG.debug("Getting pid for container {} to kill" + + " from pid file {}", containerIdStr, pidFilePath != null ? + pidFilePath : "null"); + } + + // however the container process may have already started + try { + + // get process id from pid file if available + // else if shell is still active, get it from the shell + String processId = launch.getContainerPid(); + + // kill process + String user = container.getUser(); + if (processId != null) { + signalProcess(processId, user, containerIdStr); + } else { + // Normally this means that the process was notified about + // deactivateContainer above and did not start. + // Since we already set the state to RUNNING or REINITIALIZING + // we have to send a killed event to continue. + if (!launch.isLaunchCompleted()) { + LOG.warn("Container clean up before pid file created " + + containerIdStr); + dispatcher.getEventHandler().handle( + new ContainerExitEvent(container.getContainerId(), + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + Shell.WINDOWS ? + ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() : + ContainerExecutor.ExitCode.TERMINATED.getExitCode(), + "Container terminated before pid file created.")); + // There is a possibility that the launch grabbed the file name before + // the deactivateContainer above but it was slow enough to avoid + // getContainerPid. + // Increasing YarnConfiguration.NM_PROCESS_KILL_WAIT_MS + // reduces the likelihood of this race condition and process leak. + } + // The Docker container may not have fully started, reap the container. + if (DockerLinuxContainerRuntime.isDockerContainerRequested(conf, + container.getLaunchContext().getEnvironment())) { + reapDockerContainerNoPid(user); + } + } + } catch (Exception e) { + String message = + "Exception when trying to cleanup container " + containerIdStr + + ": " + StringUtils.stringifyException(e); + LOG.warn(message); + dispatcher.getEventHandler().handle( + new ContainerDiagnosticsUpdateEvent(containerId, message)); + } finally { + // cleanup pid file if present + if (pidFilePath != null) { + try { + FileContext lfs = FileContext.getLocalFSFileContext(); + lfs.delete(pidFilePath, false); + lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false); + } catch (IOException ioe) { + LOG.warn("{} exception trying to delete pid file {}. Ignoring.", + containerId, pidFilePath, ioe); + } + } + } + + try { + // Reap the container + launch.reapContainer(); + } catch (IOException ioe) { + LOG.warn("{} exception trying to reap container. Ignoring.", containerId, + ioe); + } + } + + private void signalProcess(String processId, String user, + String containerIdStr) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending signal to pid " + processId + " as user " + user + + " for container " + containerIdStr); + } + final ContainerExecutor.Signal signal = + sleepDelayBeforeSigKill > 0 ? ContainerExecutor.Signal.TERM : + ContainerExecutor.Signal.KILL; + + boolean result = sendSignal(user, processId, signal); + if (LOG.isDebugEnabled()) { + LOG.debug("Sent signal " + signal + " to pid " + processId + " as user " + + user + " for container " + containerIdStr + ", result=" + + (result ? "success" : "failed")); + } + if (sleepDelayBeforeSigKill > 0) { + new ContainerExecutor.DelayedProcessKiller(container, user, processId, + sleepDelayBeforeSigKill, ContainerExecutor.Signal.KILL, exec).start(); + } + } + + private boolean sendSignal(String user, String processId, + ContainerExecutor.Signal signal) + throws IOException { + return exec.signalContainer( + new ContainerSignalContext.Builder().setContainer(container) + .setUser(user).setPid(processId).setSignal(signal).build()); + } + + private void reapDockerContainerNoPid(String user) throws IOException { + String containerIdStr = + container.getContainerTokenIdentifier().getContainerID().toString(); + LOG.info("Unable to obtain pid, but docker container request detected. " + + "Attempting to reap container " + containerIdStr); + boolean result = exec.reapContainer( + new ContainerReapContext.Builder() + .setContainer(container) + .setUser(container.getUser()) + .build()); + if (LOG.isDebugEnabled()) { + LOG.debug("Sent signal to docker container " + containerIdStr + + " as user " + user + ", result=" + (result ? "success" : "failed")); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 3fa73ec5ab0..6776836d757 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -70,7 +70,6 @@ import org.apache.hadoop.yarn.exceptions.ConfigurationException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -85,7 +84,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext; @@ -115,7 +113,7 @@ public class ContainerLaunch implements Callable { public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens"; private static final String PID_FILE_NAME_FMT = "%s.pid"; - private static final String EXIT_CODE_FILE_SUFFIX = ".exitcode"; + static final String EXIT_CODE_FILE_SUFFIX = ".exitcode"; protected final Dispatcher dispatcher; protected final ContainerExecutor exec; @@ -131,7 +129,6 @@ public class ContainerLaunch implements Callable { protected AtomicBoolean completed = new AtomicBoolean(false); private volatile boolean killedBeforeStart = false; - private long sleepDelayBeforeSigKill = 250; private long maxKillWaitTime = 2000; protected Path pidFilePath = null; @@ -152,9 +149,6 @@ public class ContainerLaunch implements Callable { this.dispatcher = dispatcher; this.dirsHandler = dirsHandler; this.containerManager = containerManager; - this.sleepDelayBeforeSigKill = - conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, - YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS); this.maxKillWaitTime = conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS, YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS); @@ -515,6 +509,25 @@ public class ContainerLaunch implements Callable { return launchPrep; } + void reapContainer() throws IOException { + containerExecLock.lock(); + try { + // Reap the container + boolean result = exec.reapContainer( + new ContainerReapContext.Builder() + .setContainer(container) + .setUser(container.getUser()) + .build()); + if (!result) { + throw new IOException("Reap container failed for container " + + container.getContainerId()); + } + cleanupContainerFiles(getContainerWorkDir()); + } finally { + containerExecLock.unlock(); + } + } + protected int prepareForLaunch(ContainerStartContext ctx) throws IOException { ContainerId containerId = container.getContainerId(); if (container.isMarkedForKilling()) { @@ -721,121 +734,6 @@ public class ContainerLaunch implements Callable { return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr); } - - /** - * Cleanup the container. - * Cancels the launch if launch has not started yet or signals - * the executor to not execute the process if not already done so. - * Also, sends a SIGTERM followed by a SIGKILL to the process if - * the process id is available. - * @throws IOException - */ - public void cleanupContainer() throws IOException { - ContainerId containerId = container.getContainerId(); - String containerIdStr = containerId.toString(); - LOG.info("Cleaning up container " + containerIdStr); - - try { - context.getNMStateStore().storeContainerKilled(containerId); - } catch (IOException e) { - LOG.error("Unable to mark container " + containerId - + " killed in store", e); - } - - // launch flag will be set to true if process already launched - boolean alreadyLaunched = - !containerAlreadyLaunched.compareAndSet(false, true); - if (!alreadyLaunched) { - LOG.info("Container " + containerIdStr + " not launched." - + " No cleanup needed to be done"); - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Marking container " + containerIdStr + " as inactive"); - } - // this should ensure that if the container process has not launched - // by this time, it will never be launched - exec.deactivateContainer(containerId); - - if (LOG.isDebugEnabled()) { - LOG.debug("Getting pid for container " + containerIdStr + " to kill" - + " from pid file " - + (pidFilePath != null ? pidFilePath.toString() : "null")); - } - - // however the container process may have already started - try { - - // get process id from pid file if available - // else if shell is still active, get it from the shell - String processId = null; - if (pidFilePath != null) { - processId = getContainerPid(pidFilePath); - } - - // kill process - String user = container.getUser(); - if (processId != null) { - signalProcess(processId, user, containerIdStr); - } else { - // Normally this means that the process was notified about - // deactivateContainer above and did not start. - // Since we already set the state to RUNNING or REINITIALIZING - // we have to send a killed event to continue. - if (!completed.get()) { - LOG.warn("Container clean up before pid file created " - + containerIdStr); - dispatcher.getEventHandler().handle( - new ContainerExitEvent(container.getContainerId(), - ContainerEventType.CONTAINER_KILLED_ON_REQUEST, - Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() : - ExitCode.TERMINATED.getExitCode(), - "Container terminated before pid file created.")); - // There is a possibility that the launch grabbed the file name before - // the deactivateContainer above but it was slow enough to avoid - // getContainerPid. - // Increasing YarnConfiguration.NM_PROCESS_KILL_WAIT_MS - // reduces the likelihood of this race condition and process leak. - } - // The Docker container may not have fully started, reap the container. - if (DockerLinuxContainerRuntime.isDockerContainerRequested( - conf, - container.getLaunchContext().getEnvironment())) { - reapDockerContainerNoPid(user); - } - } - } catch (Exception e) { - String message = - "Exception when trying to cleanup container " + containerIdStr - + ": " + StringUtils.stringifyException(e); - LOG.warn(message); - dispatcher.getEventHandler().handle( - new ContainerDiagnosticsUpdateEvent(containerId, message)); - } finally { - // cleanup pid file if present - if (pidFilePath != null) { - FileContext lfs = FileContext.getLocalFSFileContext(); - lfs.delete(pidFilePath, false); - lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false); - } - } - containerExecLock.lock(); - try { - // Reap the container - boolean result = exec.reapContainer( - new ContainerReapContext.Builder() - .setContainer(container) - .setUser(container.getUser()) - .build()); - if (!result) { - throw new IOException("Reap container failed for container " - + containerIdStr); - } - cleanupContainerFiles(getContainerWorkDir()); - } finally { - containerExecLock.unlock(); - } - } /** * Send a signal to the container. @@ -874,11 +772,7 @@ public class ContainerLaunch implements Callable { try { // get process id from pid file if available // else if shell is still active, get it from the shell - String processId = null; - if (pidFilePath != null) { - processId = getContainerPid(pidFilePath); - } - + String processId = getContainerPid(); if (processId != null) { if (LOG.isDebugEnabled()) { LOG.debug("Sending signal to pid " + processId @@ -912,50 +806,6 @@ public class ContainerLaunch implements Callable { } } - private boolean sendSignal(String user, String processId, Signal signal) - throws IOException { - return exec.signalContainer( - new ContainerSignalContext.Builder().setContainer(container) - .setUser(user).setPid(processId).setSignal(signal).build()); - } - - private void signalProcess(String processId, String user, - String containerIdStr) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Sending signal to pid " + processId + " as user " + user - + " for container " + containerIdStr); - } - final Signal signal = - sleepDelayBeforeSigKill > 0 ? Signal.TERM : Signal.KILL; - - boolean result = sendSignal(user, processId, signal); - if (LOG.isDebugEnabled()) { - LOG.debug("Sent signal " + signal + " to pid " + processId + " as user " - + user + " for container " + containerIdStr + ", result=" - + (result ? "success" : "failed")); - } - if (sleepDelayBeforeSigKill > 0) { - new DelayedProcessKiller(container, user, processId, - sleepDelayBeforeSigKill, Signal.KILL, exec).start(); - } - } - - private void reapDockerContainerNoPid(String user) throws IOException { - String containerIdStr = - container.getContainerTokenIdentifier().getContainerID().toString(); - LOG.info("Unable to obtain pid, but docker container request detected. " - + "Attempting to reap container " + containerIdStr); - boolean result = exec.reapContainer( - new ContainerReapContext.Builder() - .setContainer(container) - .setUser(container.getUser()) - .build()); - if (LOG.isDebugEnabled()) { - LOG.debug("Sent signal to docker container " + containerIdStr - + " as user " + user + ", result=" + (result ? "success" : "failed")); - } - } - @VisibleForTesting public static Signal translateCommandToSignal( SignalContainerCommand command) { @@ -1076,14 +926,16 @@ public class ContainerLaunch implements Callable { /** * Loop through for a time-bounded interval waiting to * read the process id from a file generated by a running process. - * @param pidFilePath File from which to read the process id - * @return Process ID + * @return Process ID; null when pidFilePath is null * @throws Exception */ - private String getContainerPid(Path pidFilePath) throws Exception { + String getContainerPid() throws Exception { + if (pidFilePath == null) { + return null; + } String containerIdStr = container.getContainerId().toString(); - String processId = null; + String processId; if (LOG.isDebugEnabled()) { LOG.debug("Accessing pid for container " + containerIdStr + " from pid file " + pidFilePath); @@ -1889,4 +1741,28 @@ public class ContainerLaunch implements Callable { LOG.warn("Failed to delete " + path, e); } } + + /** + * Returns the PID File Path. + */ + Path getPidFilePath() { + return pidFilePath; + } + + /** + * Marks the container to be launched only if it was not launched. + * + * @return true if successful; false otherwise. + */ + boolean markLaunched() { + return containerAlreadyLaunched.compareAndSet(false, true); + } + + /** + * Returns if the launch is completed or not. + */ + boolean isLaunchCompleted() { + return completed.get(); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index 7870f86471f..fdfe5b17a9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.hadoop.util.Shell; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; import org.slf4j.Logger; @@ -154,8 +153,8 @@ public class ContainersLauncher extends AbstractService break; case CLEANUP_CONTAINER: case CLEANUP_CONTAINER_FOR_REINIT: - ContainerLaunch launcher = running.remove(containerId); - if (launcher == null) { + ContainerLaunch existingLaunch = running.remove(containerId); + if (existingLaunch == null) { // Container not launched. // triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition. dispatcher.getEventHandler().handle( @@ -169,12 +168,9 @@ public class ContainersLauncher extends AbstractService // Cleanup a container whether it is running/killed/completed, so that // no sub-processes are alive. - try { - launcher.cleanupContainer(); - } catch (IOException e) { - LOG.warn("Got exception while cleaning container " + containerId - + ". Ignoring."); - } + ContainerCleanup cleanup = new ContainerCleanup(context, getConfig(), + dispatcher, exec, event.getContainer(), existingLaunch); + containerLauncher.submit(cleanup); break; case SIGNAL_CONTAINER: SignalContainersLauncherEvent signalEvent = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerCleanup.java new file mode 100644 index 00000000000..6c99379655b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerCleanup.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link ContainerCleanup}. + */ +public class TestContainerCleanup { + + private YarnConfiguration conf; + private ContainerId containerId; + private ContainerExecutor executor; + private ContainerLaunch launch; + private ContainerCleanup cleanup; + + @Before + public void setup() throws Exception { + conf = new YarnConfiguration(); + conf.setLong(NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 60000); + Context context = mock(Context.class); + NMStateStoreService storeService = mock(NMStateStoreService.class); + when(context.getNMStateStore()).thenReturn(storeService); + + Dispatcher dispatcher = new InlineDispatcher(); + executor = mock(ContainerExecutor.class); + when(executor.signalContainer(Mockito.any( + ContainerSignalContext.class))).thenReturn(true); + + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), + 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + containerId = ContainerId.newContainerId(attemptId, 1); + Container container = mock(Container.class); + + when(container.getContainerId()).thenReturn(containerId); + + launch = mock(ContainerLaunch.class); + launch.containerAlreadyLaunched = new AtomicBoolean(false); + + launch.pidFilePath = new Path("target/" + containerId.toString() + ".pid"); + when(launch.getContainerPid()).thenReturn(containerId.toString()); + + cleanup = new ContainerCleanup(context, conf, dispatcher, executor, + container, launch); + } + + @Test + public void testNoCleanupWhenContainerNotLaunched() throws IOException { + cleanup.run(); + verify(launch, Mockito.times(0)).signalContainer( + Mockito.any(SignalContainerCommand.class)); + } + + @Test + public void testCleanup() throws Exception { + launch.containerAlreadyLaunched.set(true); + cleanup.run(); + ArgumentCaptor captor = + ArgumentCaptor.forClass(ContainerSignalContext.class); + + verify(executor, Mockito.times(1)).signalContainer(captor.capture()); + Assert.assertEquals("signal", ContainerExecutor.Signal.TERM, + captor.getValue().getSignal()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainersLauncher.java index 0234ac28581..f2fafd2170f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainersLauncher.java @@ -192,12 +192,14 @@ public class TestContainersLauncher { .synchronizedMap(new HashMap()); dummyMap.put(containerId, containerLaunch); Whitebox.setInternalState(spy, "running", dummyMap); + when(event.getType()) .thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER); - doNothing().when(containerLaunch).cleanupContainer(); + assertEquals(1, dummyMap.size()); spy.handle(event); assertEquals(0, dummyMap.size()); - Mockito.verify(containerLaunch, Mockito.times(1)).cleanupContainer(); + Mockito.verify(containerLauncher, Mockito.times(1)) + .submit(Mockito.any(ContainerCleanup.class)); } @Test @@ -207,12 +209,14 @@ public class TestContainersLauncher { .synchronizedMap(new HashMap()); dummyMap.put(containerId, containerLaunch); Whitebox.setInternalState(spy, "running", dummyMap); + when(event.getType()) .thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT); - doNothing().when(containerLaunch).cleanupContainer(); + assertEquals(1, dummyMap.size()); spy.handle(event); assertEquals(0, dummyMap.size()); - Mockito.verify(containerLaunch, Mockito.times(1)).cleanupContainer(); + Mockito.verify(containerLauncher, Mockito.times(1)) + .submit(Mockito.any(ContainerCleanup.class)); } @Test