From cf8e553ab3d8321700337dba609d2b3032e2c466 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Mon, 26 Sep 2011 17:31:47 +0000 Subject: [PATCH] MAPREDUCE-3031. svn merge -c r1175960 --ignore-ancestry ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1175964 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../container/ContainerImpl.java | 59 ++++++++++++++++--- .../container/TestContainer.java | 24 +++++++- 3 files changed, 75 insertions(+), 11 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index df2b390227f..452531182d5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1397,6 +1397,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2646. Fixed AMRMProtocol to return containers based on priority. (Sharad Agarwal and Arun C Murthy via vinodkv) + MAPREDUCE-3031. Proper handling of killed containers to prevent stuck + containers/AMs on an external kill signal. (Siddharth Seth via vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 4e02c3adede..8d673fbf9c9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -158,10 +158,12 @@ public class ContainerImpl implements Container { ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) .addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, - new ExitedWithFailureTransition()) + new ExitedWithFailureTransition(true)) .addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) + // TODO race: Can lead to a CONTAINER_LAUNCHED event at state KILLING, + // and a container which will never be killed by the NM. .addTransition(ContainerState.LOCALIZED, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) @@ -169,16 +171,19 @@ public class ContainerImpl implements Container { .addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_SUCCESS, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, - new ExitedWithSuccessTransition()) + new ExitedWithSuccessTransition(true)) .addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, - new ExitedWithFailureTransition()) + new ExitedWithFailureTransition(true)) .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.RUNNING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + new KilledExternallyTransition()) // From CONTAINER_EXITED_WITH_SUCCESS State .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE, @@ -220,10 +225,10 @@ public class ContainerImpl implements Container { ContainerEventType.KILL_CONTAINER) .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, - new ExitedWithSuccessTransition()) + new ExitedWithSuccessTransition(false)) .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, - new ExitedWithFailureTransition()) + new ExitedWithFailureTransition(false)) .addTransition(ContainerState.KILLING, ContainerState.DONE, ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, @@ -551,18 +556,38 @@ public class ContainerImpl implements Container { } } + @SuppressWarnings("unchecked") // dispatcher not typed static class ExitedWithSuccessTransition extends ContainerTransition { + + boolean clCleanupRequired; + + public ExitedWithSuccessTransition(boolean clCleanupRequired) { + this.clCleanupRequired = clCleanupRequired; + } + @Override public void transition(ContainerImpl container, ContainerEvent event) { // TODO: Add containerWorkDir to the deletion service. - // Inform the localizer to decrement reference counts and cleanup - // resources. + if (clCleanupRequired) { + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER)); + } + container.cleanup(); } } + @SuppressWarnings("unchecked") // dispatcher not typed static class ExitedWithFailureTransition extends ContainerTransition { + + boolean clCleanupRequired; + + public ExitedWithFailureTransition(boolean clCleanupRequired) { + this.clCleanupRequired = clCleanupRequired; + } + @Override public void transition(ContainerImpl container, ContainerEvent event) { ContainerExitEvent exitEvent = (ContainerExitEvent) event; @@ -571,12 +596,28 @@ public class ContainerImpl implements Container { // TODO: Add containerWorkDir to the deletion service. // TODO: Add containerOuputDir to the deletion service. - // Inform the localizer to decrement reference counts and cleanup - // resources. + if (clCleanupRequired) { + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER)); + } + container.cleanup(); } } + static class KilledExternallyTransition extends ExitedWithFailureTransition { + KilledExternallyTransition() { + super(true); + } + + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + super.transition(container, event); + container.diagnostics.append("Killed by external signal\n"); + } + } + static class ResourceFailedTransition implements SingleArcTransition { @Override diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 04d400ad18d..48c745457a7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -38,8 +38,6 @@ import java.util.Map.Entry; import java.util.Random; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -135,6 +133,28 @@ public class TestContainer { } } + @Test + @SuppressWarnings("unchecked") // mocked generic + public void testExternalKill() throws Exception { + WrappedContainer wc = null; + try { + wc = new WrappedContainer(13, 314159265358979L, 4344, "yak"); + wc.initContainer(); + wc.localizeResources(); + wc.launchContainer(); + reset(wc.localizerBus); + wc.containerKilledOnRequest(); + assertEquals(ContainerState.EXITED_WITH_FAILURE, + wc.c.getContainerState()); + verifyCleanupCall(wc); + } + finally { + if (wc != null) { + wc.finished(); + } + } + } + @Test @SuppressWarnings("unchecked") // mocked generic public void testCleanupOnFailure() throws Exception {