MAPREDUCE-3031. Proper handling of killed containers to prevent stuck containers/AMs on an external kill signal. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1175960 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-09-26 17:27:15 +00:00
parent 1e6dfa7472
commit eff931a1b1
3 changed files with 75 additions and 11 deletions

View File

@ -1429,6 +1429,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2646. Fixed AMRMProtocol to return containers based on MAPREDUCE-2646. Fixed AMRMProtocol to return containers based on
priority. (Sharad Agarwal and Arun C Murthy via vinodkv) priority. (Sharad Agarwal and Arun C Murthy via vinodkv)
MAPREDUCE-3031. Proper handling of killed containers to prevent stuck
containers/AMs on an external kill signal. (Siddharth Seth via vinodkv)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -158,10 +158,12 @@ public class ContainerImpl implements Container {
ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
.addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE, .addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition()) new ExitedWithFailureTransition(true))
.addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED, .addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG, ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION) UPDATE_DIAGNOSTICS_TRANSITION)
// TODO race: Can lead to a CONTAINER_LAUNCHED event at state KILLING,
// and a container which will never be killed by the NM.
.addTransition(ContainerState.LOCALIZED, ContainerState.KILLING, .addTransition(ContainerState.LOCALIZED, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition()) ContainerEventType.KILL_CONTAINER, new KillTransition())
@ -169,16 +171,19 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.RUNNING, .addTransition(ContainerState.RUNNING,
ContainerState.EXITED_WITH_SUCCESS, ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition()) new ExitedWithSuccessTransition(true))
.addTransition(ContainerState.RUNNING, .addTransition(ContainerState.RUNNING,
ContainerState.EXITED_WITH_FAILURE, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition()) new ExitedWithFailureTransition(true))
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING, .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG, ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION) UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.RUNNING, ContainerState.KILLING, .addTransition(ContainerState.RUNNING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition()) ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
// From CONTAINER_EXITED_WITH_SUCCESS State // From CONTAINER_EXITED_WITH_SUCCESS State
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE, .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
@ -220,10 +225,10 @@ public class ContainerImpl implements Container {
ContainerEventType.KILL_CONTAINER) ContainerEventType.KILL_CONTAINER)
.addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS, .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition()) new ExitedWithSuccessTransition(false))
.addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_FAILURE, .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition()) new ExitedWithFailureTransition(false))
.addTransition(ContainerState.KILLING, .addTransition(ContainerState.KILLING,
ContainerState.DONE, ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
@ -551,18 +556,38 @@ public class ContainerImpl implements Container {
} }
} }
@SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithSuccessTransition extends ContainerTransition { static class ExitedWithSuccessTransition extends ContainerTransition {
boolean clCleanupRequired;
public ExitedWithSuccessTransition(boolean clCleanupRequired) {
this.clCleanupRequired = clCleanupRequired;
}
@Override @Override
public void transition(ContainerImpl container, ContainerEvent event) { public void transition(ContainerImpl container, ContainerEvent event) {
// TODO: Add containerWorkDir to the deletion service. // TODO: Add containerWorkDir to the deletion service.
// Inform the localizer to decrement reference counts and cleanup if (clCleanupRequired) {
// resources. container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER));
}
container.cleanup(); container.cleanup();
} }
} }
@SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithFailureTransition extends ContainerTransition { static class ExitedWithFailureTransition extends ContainerTransition {
boolean clCleanupRequired;
public ExitedWithFailureTransition(boolean clCleanupRequired) {
this.clCleanupRequired = clCleanupRequired;
}
@Override @Override
public void transition(ContainerImpl container, ContainerEvent event) { public void transition(ContainerImpl container, ContainerEvent event) {
ContainerExitEvent exitEvent = (ContainerExitEvent) event; ContainerExitEvent exitEvent = (ContainerExitEvent) event;
@ -571,12 +596,28 @@ public class ContainerImpl implements Container {
// TODO: Add containerWorkDir to the deletion service. // TODO: Add containerWorkDir to the deletion service.
// TODO: Add containerOuputDir to the deletion service. // TODO: Add containerOuputDir to the deletion service.
// Inform the localizer to decrement reference counts and cleanup if (clCleanupRequired) {
// resources. container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER));
}
container.cleanup(); container.cleanup();
} }
} }
static class KilledExternallyTransition extends ExitedWithFailureTransition {
KilledExternallyTransition() {
super(true);
}
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
super.transition(container, event);
container.diagnostics.append("Killed by external signal\n");
}
}
static class ResourceFailedTransition implements static class ResourceFailedTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> { SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override @Override

View File

@ -38,8 +38,6 @@ import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
@ -135,6 +133,28 @@ public class TestContainer {
} }
} }
@Test
@SuppressWarnings("unchecked") // mocked generic
public void testExternalKill() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(13, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
wc.launchContainer();
reset(wc.localizerBus);
wc.containerKilledOnRequest();
assertEquals(ContainerState.EXITED_WITH_FAILURE,
wc.c.getContainerState());
verifyCleanupCall(wc);
}
finally {
if (wc != null) {
wc.finished();
}
}
}
@Test @Test
@SuppressWarnings("unchecked") // mocked generic @SuppressWarnings("unchecked") // mocked generic
public void testCleanupOnFailure() throws Exception { public void testCleanupOnFailure() throws Exception {