YARN-8473. Containers being launched as app tears down can leave containers in NEW state. Contributed by Jason Lowe.
This commit is contained in:
parent
ca8b80bf59
commit
705e2c1f7c
|
@ -211,6 +211,9 @@ public class ApplicationImpl implements Application {
|
|||
private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
|
||||
new ContainerDoneTransition();
|
||||
|
||||
private static final InitContainerTransition INIT_CONTAINER_TRANSITION =
|
||||
new InitContainerTransition();
|
||||
|
||||
private static StateMachineFactory<ApplicationImpl, ApplicationState,
|
||||
ApplicationEventType, ApplicationEvent> stateMachineFactory =
|
||||
new StateMachineFactory<ApplicationImpl, ApplicationState,
|
||||
|
@ -221,12 +224,12 @@ public class ApplicationImpl implements Application {
|
|||
ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
|
||||
.addTransition(ApplicationState.NEW, ApplicationState.NEW,
|
||||
ApplicationEventType.INIT_CONTAINER,
|
||||
new InitContainerTransition())
|
||||
INIT_CONTAINER_TRANSITION)
|
||||
|
||||
// Transitions from INITING state
|
||||
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
|
||||
ApplicationEventType.INIT_CONTAINER,
|
||||
new InitContainerTransition())
|
||||
INIT_CONTAINER_TRANSITION)
|
||||
.addTransition(ApplicationState.INITING,
|
||||
EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
|
||||
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
|
||||
|
@ -249,7 +252,7 @@ public class ApplicationImpl implements Application {
|
|||
.addTransition(ApplicationState.RUNNING,
|
||||
ApplicationState.RUNNING,
|
||||
ApplicationEventType.INIT_CONTAINER,
|
||||
new InitContainerTransition())
|
||||
INIT_CONTAINER_TRANSITION)
|
||||
.addTransition(ApplicationState.RUNNING,
|
||||
ApplicationState.RUNNING,
|
||||
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
|
||||
|
@ -268,6 +271,10 @@ public class ApplicationImpl implements Application {
|
|||
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
|
||||
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
|
||||
new AppFinishTransition())
|
||||
.addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT,
|
||||
ApplicationState.FINISHING_CONTAINERS_WAIT,
|
||||
ApplicationEventType.INIT_CONTAINER,
|
||||
INIT_CONTAINER_TRANSITION)
|
||||
.addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT,
|
||||
ApplicationState.FINISHING_CONTAINERS_WAIT,
|
||||
EnumSet.of(
|
||||
|
@ -284,6 +291,10 @@ public class ApplicationImpl implements Application {
|
|||
ApplicationState.FINISHED,
|
||||
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
|
||||
new AppCompletelyDoneTransition())
|
||||
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
|
||||
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
|
||||
ApplicationEventType.INIT_CONTAINER,
|
||||
INIT_CONTAINER_TRANSITION)
|
||||
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
|
||||
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
|
||||
EnumSet.of(
|
||||
|
@ -300,9 +311,14 @@ public class ApplicationImpl implements Application {
|
|||
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED),
|
||||
new AppLogsAggregatedTransition())
|
||||
.addTransition(ApplicationState.FINISHED,
|
||||
ApplicationState.FINISHED,
|
||||
ApplicationEventType.INIT_CONTAINER,
|
||||
INIT_CONTAINER_TRANSITION)
|
||||
.addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED,
|
||||
EnumSet.of(
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
|
||||
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
|
||||
ApplicationEventType.FINISH_APPLICATION))
|
||||
// create the topology tables
|
||||
.installTopology();
|
||||
|
@ -445,8 +461,9 @@ public class ApplicationImpl implements Application {
|
|||
app.containers.put(container.getContainerId(), container);
|
||||
LOG.info("Adding " + container.getContainerId()
|
||||
+ " to application " + app.toString());
|
||||
|
||||
switch (app.getApplicationState()) {
|
||||
|
||||
ApplicationState appState = app.getApplicationState();
|
||||
switch (appState) {
|
||||
case RUNNING:
|
||||
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
|
||||
container.getContainerId()));
|
||||
|
@ -456,8 +473,13 @@ public class ApplicationImpl implements Application {
|
|||
// these get queued up and sent out in AppInitDoneTransition
|
||||
break;
|
||||
default:
|
||||
assert false : "Invalid state for InitContainerTransition: " +
|
||||
app.getApplicationState();
|
||||
LOG.warn("Killing {} because {} is in state {}",
|
||||
container.getContainerId(), app, appState);
|
||||
app.dispatcher.getEventHandler().handle(new ContainerKillEvent(
|
||||
container.getContainerId(),
|
||||
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
|
||||
"Application no longer running.\n"));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -360,35 +360,66 @@ public class TestApplication {
|
|||
}
|
||||
}
|
||||
|
||||
//TODO Re-work after Application transitions are changed.
|
||||
// @Test
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testStartContainerAfterAppFinished() {
|
||||
public void testStartContainerAfterAppRunning() {
|
||||
WrappedApplication wa = null;
|
||||
try {
|
||||
wa = new WrappedApplication(5, 314159265358979L, "yak", 3);
|
||||
wa = new WrappedApplication(5, 314159265358979L, "yak", 4);
|
||||
wa.initApplication();
|
||||
wa.initContainer(-1);
|
||||
wa.initContainer(0);
|
||||
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
|
||||
wa.applicationInited();
|
||||
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
|
||||
|
||||
reset(wa.localizerBus);
|
||||
wa.containerFinished(0);
|
||||
wa.containerFinished(1);
|
||||
wa.containerFinished(2);
|
||||
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
|
||||
assertEquals(0, wa.app.getContainers().size());
|
||||
assertEquals(1, wa.app.getContainers().size());
|
||||
|
||||
wa.appFinished();
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerKillMatcher(wa.containers.get(0)
|
||||
.getContainerId())));
|
||||
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
|
||||
wa.app.getApplicationState());
|
||||
|
||||
wa.initContainer(1);
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerKillMatcher(wa.containers.get(1)
|
||||
.getContainerId())));
|
||||
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
|
||||
wa.app.getApplicationState());
|
||||
wa.containerFinished(1);
|
||||
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
|
||||
wa.app.getApplicationState());
|
||||
|
||||
wa.containerFinished(0);
|
||||
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
|
||||
wa.app.getApplicationState());
|
||||
verify(wa.localizerBus).handle(
|
||||
refEq(new ApplicationLocalizationEvent(
|
||||
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
|
||||
LocalizationEventType.DESTROY_APPLICATION_RESOURCES,
|
||||
wa.app), "timestamp"));
|
||||
|
||||
wa.initContainer(2);
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerKillMatcher(wa.containers.get(2)
|
||||
.getContainerId())));
|
||||
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
|
||||
wa.app.getApplicationState());
|
||||
wa.containerFinished(2);
|
||||
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
|
||||
wa.app.getApplicationState());
|
||||
|
||||
wa.appResourcesCleanedup();
|
||||
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
|
||||
|
||||
wa.initContainer(3);
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerKillMatcher(wa.containers.get(3)
|
||||
.getContainerId())));
|
||||
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
|
||||
wa.containerFinished(3);
|
||||
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
|
||||
} finally {
|
||||
if (wa != null)
|
||||
wa.finished();
|
||||
|
|
Loading…
Reference in New Issue