YARN-8473. Containers being launched as app tears down can leave containers in NEW state. Contributed by Jason Lowe.

(cherry picked from commit 705e2c1f7c)
This commit is contained in:
Sunil G 2018-07-10 20:11:47 +05:30
parent aab9bfc13c
commit 6cc5d49fa3
2 changed files with 71 additions and 18 deletions

View File

@ -211,6 +211,9 @@ public class ApplicationImpl implements Application {
private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
new ContainerDoneTransition();
private static final InitContainerTransition INIT_CONTAINER_TRANSITION =
new InitContainerTransition();
private static StateMachineFactory<ApplicationImpl, ApplicationState,
ApplicationEventType, ApplicationEvent> stateMachineFactory =
new StateMachineFactory<ApplicationImpl, ApplicationState,
@ -221,12 +224,12 @@ public class ApplicationImpl implements Application {
ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
.addTransition(ApplicationState.NEW, ApplicationState.NEW,
ApplicationEventType.INIT_CONTAINER,
new InitContainerTransition())
INIT_CONTAINER_TRANSITION)
// Transitions from INITING state
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
ApplicationEventType.INIT_CONTAINER,
new InitContainerTransition())
INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.INITING,
EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
@ -249,7 +252,7 @@ public class ApplicationImpl implements Application {
.addTransition(ApplicationState.RUNNING,
ApplicationState.RUNNING,
ApplicationEventType.INIT_CONTAINER,
new InitContainerTransition())
INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.RUNNING,
ApplicationState.RUNNING,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
@ -268,6 +271,10 @@ public class ApplicationImpl implements Application {
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
new AppFinishTransition())
.addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationEventType.INIT_CONTAINER,
INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.FINISHING_CONTAINERS_WAIT,
EnumSet.of(
@ -284,6 +291,10 @@ public class ApplicationImpl implements Application {
ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
new AppCompletelyDoneTransition())
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationEventType.INIT_CONTAINER,
INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
EnumSet.of(
@ -300,9 +311,14 @@ public class ApplicationImpl implements Application {
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED),
new AppLogsAggregatedTransition())
.addTransition(ApplicationState.FINISHED,
ApplicationState.FINISHED,
ApplicationEventType.INIT_CONTAINER,
INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED,
EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
ApplicationEventType.FINISH_APPLICATION))
// create the topology tables
.installTopology();
@ -445,8 +461,9 @@ public class ApplicationImpl implements Application {
app.containers.put(container.getContainerId(), container);
LOG.info("Adding " + container.getContainerId()
+ " to application " + app.toString());
switch (app.getApplicationState()) {
ApplicationState appState = app.getApplicationState();
switch (appState) {
case RUNNING:
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerId()));
@ -456,8 +473,13 @@ public class ApplicationImpl implements Application {
// these get queued up and sent out in AppInitDoneTransition
break;
default:
assert false : "Invalid state for InitContainerTransition: " +
app.getApplicationState();
LOG.warn("Killing {} because {} is in state {}",
container.getContainerId(), app, appState);
app.dispatcher.getEventHandler().handle(new ContainerKillEvent(
container.getContainerId(),
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
"Application no longer running.\n"));
break;
}
}
}

View File

@ -360,35 +360,66 @@ public class TestApplication {
}
}
//TODO Re-work after Application transitions are changed.
// @Test
@Test
@SuppressWarnings("unchecked")
public void testStartContainerAfterAppFinished() {
public void testStartContainerAfterAppRunning() {
WrappedApplication wa = null;
try {
wa = new WrappedApplication(5, 314159265358979L, "yak", 3);
wa = new WrappedApplication(5, 314159265358979L, "yak", 4);
wa.initApplication();
wa.initContainer(-1);
wa.initContainer(0);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
reset(wa.localizerBus);
wa.containerFinished(0);
wa.containerFinished(1);
wa.containerFinished(2);
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
assertEquals(0, wa.app.getContainers().size());
assertEquals(1, wa.app.getContainers().size());
wa.appFinished();
verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(0)
.getContainerId())));
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
wa.app.getApplicationState());
wa.initContainer(1);
verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(1)
.getContainerId())));
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
wa.app.getApplicationState());
wa.containerFinished(1);
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
wa.app.getApplicationState());
wa.containerFinished(0);
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
wa.app.getApplicationState());
verify(wa.localizerBus).handle(
refEq(new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
LocalizationEventType.DESTROY_APPLICATION_RESOURCES,
wa.app), "timestamp"));
wa.initContainer(2);
verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(2)
.getContainerId())));
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
wa.app.getApplicationState());
wa.containerFinished(2);
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
wa.app.getApplicationState());
wa.appResourcesCleanedup();
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
wa.initContainer(3);
verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(3)
.getContainerId())));
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
wa.containerFinished(3);
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
} finally {
if (wa != null)
wa.finished();