YARN-8331. Race condition in NM container launched after done. Contributed by Pradeep Ambati
(cherry picked from commit cd04e954d2
)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
This commit is contained in:
parent
dc41462af4
commit
b0a364171d
|
@ -370,7 +370,7 @@ public class ContainerImpl implements Container {
|
|||
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||
.addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
|
||||
ContainerEventType.KILL_CONTAINER,
|
||||
new KillBeforeRunningTransition())
|
||||
new KillTransition())
|
||||
.addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
|
||||
ContainerEventType.UPDATE_CONTAINER_TOKEN,
|
||||
new NotifyContainerSchedulerOfUpdateTransition())
|
||||
|
@ -604,6 +604,9 @@ public class ContainerImpl implements Container {
|
|||
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
|
||||
ContainerState.EXITED_WITH_SUCCESS,
|
||||
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
||||
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
|
||||
ContainerState.EXITED_WITH_SUCCESS,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
|
||||
|
||||
// From EXITED_WITH_FAILURE State
|
||||
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
|
||||
|
@ -621,6 +624,9 @@ public class ContainerImpl implements Container {
|
|||
.addTransition(ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
||||
.addTransition(ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
|
||||
|
||||
// From KILLING State.
|
||||
.addTransition(ContainerState.KILLING,
|
||||
|
@ -680,6 +686,9 @@ public class ContainerImpl implements Container {
|
|||
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
||||
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
|
||||
|
||||
// From DONE
|
||||
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||
|
@ -700,6 +709,8 @@ public class ContainerImpl implements Container {
|
|||
// No transition - assuming container is on its way to completion
|
||||
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
||||
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
|
||||
|
||||
// create the topology tables
|
||||
.installTopology();
|
||||
|
|
|
@ -495,14 +495,10 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
|| exitCode == ExitCode.TERMINATED.getExitCode()) {
|
||||
// If the process was killed, Send container_cleanedup_after_kill and
|
||||
// just break out of this method.
|
||||
|
||||
// If Container was killed before starting... NO need to do this.
|
||||
if (!killedBeforeStart) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerExitEvent(containerId,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
|
||||
diagnosticInfo.toString()));
|
||||
}
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerExitEvent(containerId,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
|
||||
diagnosticInfo.toString()));
|
||||
} else if (exitCode != 0) {
|
||||
handleContainerExitWithFailure(containerId, exitCode, containerLogDir,
|
||||
diagnosticInfo);
|
||||
|
|
|
@ -23,6 +23,11 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -151,7 +156,14 @@ public class ContainersLauncher extends AbstractService
|
|||
case CLEANUP_CONTAINER_FOR_REINIT:
|
||||
ContainerLaunch launcher = running.remove(containerId);
|
||||
if (launcher == null) {
|
||||
// Container not launched. So nothing needs to be done.
|
||||
// Container not launched.
|
||||
// triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition.
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerExitEvent(containerId,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||
Shell.WINDOWS ? ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() :
|
||||
ContainerExecutor.ExitCode.TERMINATED.getExitCode(),
|
||||
"Container terminated before launch."));
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -23,8 +23,10 @@ import static org.junit.Assert.assertNull;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.refEq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
|
@ -518,6 +520,17 @@ public class TestContainer {
|
|||
ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId());
|
||||
wc.killContainer();
|
||||
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
||||
|
||||
// check that container cleanup hasn't started at this point.
|
||||
LocalizationCleanupMatcher cleanupResources =
|
||||
new LocalizationCleanupMatcher(wc.c);
|
||||
verify(wc.localizerBus, times(0)).handle(argThat(cleanupResources));
|
||||
|
||||
// check if containerlauncher cleans up the container launch.
|
||||
verify(wc.launcherBus)
|
||||
.handle(refEq(new ContainersLauncherEvent(wc.c,
|
||||
ContainersLauncherEventType.CLEANUP_CONTAINER), "timestamp"));
|
||||
|
||||
launcher.call();
|
||||
wc.drainDispatcherEvents();
|
||||
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||
|
@ -530,6 +543,7 @@ public class TestContainer {
|
|||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||
assertEquals(killed + 1, metrics.getKilledContainers());
|
||||
assertEquals(0, metrics.getRunningContainers());
|
||||
assertEquals(0, wc.launcher.running.size());
|
||||
} finally {
|
||||
if (wc != null) {
|
||||
wc.finished();
|
||||
|
@ -856,7 +870,7 @@ public class TestContainer {
|
|||
ResourcesReleasedMatcher matchesReq =
|
||||
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
|
||||
LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
|
||||
LocalResourceVisibility.APPLICATION));
|
||||
LocalResourceVisibility.APPLICATION), wc.c);
|
||||
verify(wc.localizerBus, atLeastOnce()).handle(argThat(matchesReq));
|
||||
}
|
||||
|
||||
|
@ -864,13 +878,35 @@ public class TestContainer {
|
|||
verify(wc.context.getNodeStatusUpdater()).sendOutofBandHeartBeat();
|
||||
}
|
||||
|
||||
private static class ResourcesReleasedMatcher extends
|
||||
// Argument matcher for matching container localization cleanup event.
|
||||
private static class LocalizationCleanupMatcher extends
|
||||
ArgumentMatcher<LocalizationEvent> {
|
||||
Container c;
|
||||
|
||||
LocalizationCleanupMatcher(Container c){
|
||||
this.c = c;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(Object o) {
|
||||
if (!(o instanceof ContainerLocalizationCleanupEvent)) {
|
||||
return false;
|
||||
}
|
||||
ContainerLocalizationCleanupEvent evt =
|
||||
(ContainerLocalizationCleanupEvent) o;
|
||||
|
||||
return (evt.getContainer() == c);
|
||||
}
|
||||
}
|
||||
|
||||
private static class ResourcesReleasedMatcher extends
|
||||
LocalizationCleanupMatcher {
|
||||
final HashSet<LocalResourceRequest> resources =
|
||||
new HashSet<LocalResourceRequest>();
|
||||
|
||||
ResourcesReleasedMatcher(Map<String, LocalResource> allResources,
|
||||
EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
|
||||
EnumSet<LocalResourceVisibility> vis, Container c) throws URISyntaxException {
|
||||
super(c);
|
||||
for (Entry<String, LocalResource> e : allResources.entrySet()) {
|
||||
if (vis.contains(e.getValue().getVisibility())) {
|
||||
resources.add(new LocalResourceRequest(e.getValue()));
|
||||
|
@ -880,9 +916,12 @@ public class TestContainer {
|
|||
|
||||
@Override
|
||||
public boolean matches(Object o) {
|
||||
if (!(o instanceof ContainerLocalizationCleanupEvent)) {
|
||||
// match event type and container.
|
||||
if(!super.matches(o)){
|
||||
return false;
|
||||
}
|
||||
|
||||
// match resources.
|
||||
ContainerLocalizationCleanupEvent evt =
|
||||
(ContainerLocalizationCleanupEvent) o;
|
||||
final HashSet<LocalResourceRequest> expected =
|
||||
|
|
Loading…
Reference in New Issue