YARN-8331. Race condition in NM container launched after done. Contributed by Pradeep Ambati
(cherry picked from commit cd04e954d2
)
This commit is contained in:
parent
c6013b060d
commit
3dd299a770
|
@ -384,7 +384,7 @@ public class ContainerImpl implements Container {
|
||||||
UPDATE_DIAGNOSTICS_TRANSITION)
|
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||||
.addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
|
.addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
|
||||||
ContainerEventType.KILL_CONTAINER,
|
ContainerEventType.KILL_CONTAINER,
|
||||||
new KillBeforeRunningTransition())
|
new KillTransition())
|
||||||
.addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
|
.addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
|
||||||
ContainerEventType.UPDATE_CONTAINER_TOKEN,
|
ContainerEventType.UPDATE_CONTAINER_TOKEN,
|
||||||
new NotifyContainerSchedulerOfUpdateTransition())
|
new NotifyContainerSchedulerOfUpdateTransition())
|
||||||
|
@ -618,6 +618,9 @@ public class ContainerImpl implements Container {
|
||||||
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
|
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
|
||||||
ContainerState.EXITED_WITH_SUCCESS,
|
ContainerState.EXITED_WITH_SUCCESS,
|
||||||
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
||||||
|
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
|
||||||
|
ContainerState.EXITED_WITH_SUCCESS,
|
||||||
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
|
||||||
|
|
||||||
// From EXITED_WITH_FAILURE State
|
// From EXITED_WITH_FAILURE State
|
||||||
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
|
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
|
||||||
|
@ -635,6 +638,9 @@ public class ContainerImpl implements Container {
|
||||||
.addTransition(ContainerState.EXITED_WITH_FAILURE,
|
.addTransition(ContainerState.EXITED_WITH_FAILURE,
|
||||||
ContainerState.EXITED_WITH_FAILURE,
|
ContainerState.EXITED_WITH_FAILURE,
|
||||||
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
||||||
|
.addTransition(ContainerState.EXITED_WITH_FAILURE,
|
||||||
|
ContainerState.EXITED_WITH_FAILURE,
|
||||||
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
|
||||||
|
|
||||||
// From KILLING State.
|
// From KILLING State.
|
||||||
.addTransition(ContainerState.KILLING,
|
.addTransition(ContainerState.KILLING,
|
||||||
|
@ -694,6 +700,9 @@ public class ContainerImpl implements Container {
|
||||||
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
||||||
|
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
|
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
|
||||||
|
|
||||||
// From DONE
|
// From DONE
|
||||||
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||||
|
@ -714,6 +723,8 @@ public class ContainerImpl implements Container {
|
||||||
// No transition - assuming container is on its way to completion
|
// No transition - assuming container is on its way to completion
|
||||||
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||||
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
||||||
|
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||||
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
|
||||||
|
|
||||||
// create the topology tables
|
// create the topology tables
|
||||||
.installTopology();
|
.installTopology();
|
||||||
|
|
|
@ -563,14 +563,10 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
|| exitCode == ExitCode.TERMINATED.getExitCode()) {
|
|| exitCode == ExitCode.TERMINATED.getExitCode()) {
|
||||||
// If the process was killed, Send container_cleanedup_after_kill and
|
// If the process was killed, Send container_cleanedup_after_kill and
|
||||||
// just break out of this method.
|
// just break out of this method.
|
||||||
|
dispatcher.getEventHandler().handle(
|
||||||
// If Container was killed before starting... NO need to do this.
|
new ContainerExitEvent(containerId,
|
||||||
if (!killedBeforeStart) {
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
|
||||||
dispatcher.getEventHandler().handle(
|
diagnosticInfo.toString()));
|
||||||
new ContainerExitEvent(containerId,
|
|
||||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
|
|
||||||
diagnosticInfo.toString()));
|
|
||||||
}
|
|
||||||
} else if (exitCode != 0) {
|
} else if (exitCode != 0) {
|
||||||
handleContainerExitWithFailure(containerId, exitCode, containerLogDir,
|
handleContainerExitWithFailure(containerId, exitCode, containerLogDir,
|
||||||
diagnosticInfo);
|
diagnosticInfo);
|
||||||
|
|
|
@ -23,6 +23,11 @@ import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -151,7 +156,14 @@ public class ContainersLauncher extends AbstractService
|
||||||
case CLEANUP_CONTAINER_FOR_REINIT:
|
case CLEANUP_CONTAINER_FOR_REINIT:
|
||||||
ContainerLaunch launcher = running.remove(containerId);
|
ContainerLaunch launcher = running.remove(containerId);
|
||||||
if (launcher == null) {
|
if (launcher == null) {
|
||||||
// Container not launched. So nothing needs to be done.
|
// Container not launched.
|
||||||
|
// triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition.
|
||||||
|
dispatcher.getEventHandler().handle(
|
||||||
|
new ContainerExitEvent(containerId,
|
||||||
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||||
|
Shell.WINDOWS ? ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() :
|
||||||
|
ContainerExecutor.ExitCode.TERMINATED.getExitCode(),
|
||||||
|
"Container terminated before launch."));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
|
import static org.mockito.Matchers.refEq;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.reset;
|
import static org.mockito.Mockito.reset;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
|
@ -665,6 +666,17 @@ public class TestContainer {
|
||||||
ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId());
|
ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId());
|
||||||
wc.killContainer();
|
wc.killContainer();
|
||||||
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
|
||||||
|
|
||||||
|
// check that container cleanup hasn't started at this point.
|
||||||
|
LocalizationCleanupMatcher cleanupResources =
|
||||||
|
new LocalizationCleanupMatcher(wc.c);
|
||||||
|
verify(wc.localizerBus, times(0)).handle(argThat(cleanupResources));
|
||||||
|
|
||||||
|
// check if containerlauncher cleans up the container launch.
|
||||||
|
verify(wc.launcherBus)
|
||||||
|
.handle(refEq(new ContainersLauncherEvent(wc.c,
|
||||||
|
ContainersLauncherEventType.CLEANUP_CONTAINER), "timestamp"));
|
||||||
|
|
||||||
launcher.call();
|
launcher.call();
|
||||||
wc.drainDispatcherEvents();
|
wc.drainDispatcherEvents();
|
||||||
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
|
@ -677,6 +689,7 @@ public class TestContainer {
|
||||||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||||
assertEquals(killed + 1, metrics.getKilledContainers());
|
assertEquals(killed + 1, metrics.getKilledContainers());
|
||||||
assertEquals(0, metrics.getRunningContainers());
|
assertEquals(0, metrics.getRunningContainers());
|
||||||
|
assertEquals(0, wc.launcher.running.size());
|
||||||
} finally {
|
} finally {
|
||||||
if (wc != null) {
|
if (wc != null) {
|
||||||
wc.finished();
|
wc.finished();
|
||||||
|
@ -1146,7 +1159,7 @@ public class TestContainer {
|
||||||
ResourcesReleasedMatcher matchesReq =
|
ResourcesReleasedMatcher matchesReq =
|
||||||
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
|
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
|
||||||
LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
|
LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
|
||||||
LocalResourceVisibility.APPLICATION));
|
LocalResourceVisibility.APPLICATION), wc.c);
|
||||||
verify(wc.localizerBus, atLeastOnce()).handle(argThat(matchesReq));
|
verify(wc.localizerBus, atLeastOnce()).handle(argThat(matchesReq));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1162,13 +1175,35 @@ public class TestContainer {
|
||||||
wc.c.getContainerId().toString())));
|
wc.c.getContainerId().toString())));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ResourcesReleasedMatcher extends
|
// Argument matcher for matching container localization cleanup event.
|
||||||
|
private static class LocalizationCleanupMatcher extends
|
||||||
ArgumentMatcher<LocalizationEvent> {
|
ArgumentMatcher<LocalizationEvent> {
|
||||||
|
Container c;
|
||||||
|
|
||||||
|
LocalizationCleanupMatcher(Container c){
|
||||||
|
this.c = c;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean matches(Object o) {
|
||||||
|
if (!(o instanceof ContainerLocalizationCleanupEvent)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ContainerLocalizationCleanupEvent evt =
|
||||||
|
(ContainerLocalizationCleanupEvent) o;
|
||||||
|
|
||||||
|
return (evt.getContainer() == c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ResourcesReleasedMatcher extends
|
||||||
|
LocalizationCleanupMatcher {
|
||||||
final HashSet<LocalResourceRequest> resources =
|
final HashSet<LocalResourceRequest> resources =
|
||||||
new HashSet<LocalResourceRequest>();
|
new HashSet<LocalResourceRequest>();
|
||||||
|
|
||||||
ResourcesReleasedMatcher(Map<String, LocalResource> allResources,
|
ResourcesReleasedMatcher(Map<String, LocalResource> allResources,
|
||||||
EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
|
EnumSet<LocalResourceVisibility> vis, Container c) throws URISyntaxException {
|
||||||
|
super(c);
|
||||||
for (Entry<String, LocalResource> e : allResources.entrySet()) {
|
for (Entry<String, LocalResource> e : allResources.entrySet()) {
|
||||||
if (vis.contains(e.getValue().getVisibility())) {
|
if (vis.contains(e.getValue().getVisibility())) {
|
||||||
resources.add(new LocalResourceRequest(e.getValue()));
|
resources.add(new LocalResourceRequest(e.getValue()));
|
||||||
|
@ -1178,9 +1213,12 @@ public class TestContainer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean matches(Object o) {
|
public boolean matches(Object o) {
|
||||||
if (!(o instanceof ContainerLocalizationCleanupEvent)) {
|
// match event type and container.
|
||||||
|
if(!super.matches(o)){
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// match resources.
|
||||||
ContainerLocalizationCleanupEvent evt =
|
ContainerLocalizationCleanupEvent evt =
|
||||||
(ContainerLocalizationCleanupEvent) o;
|
(ContainerLocalizationCleanupEvent) o;
|
||||||
final HashSet<LocalResourceRequest> expected =
|
final HashSet<LocalResourceRequest> expected =
|
||||||
|
|
Loading…
Reference in New Issue