YARN-8473. Containers being launched as app tears down can leave containers in NEW state. Contributed by Jason Lowe.

(cherry picked from commit 705e2c1f7c)
This commit is contained in:
Sunil G 2018-07-10 20:11:47 +05:30
parent d54241e9c9
commit 6f10491e64
2 changed files with 71 additions and 18 deletions

View File

@ -211,6 +211,9 @@ public class ApplicationImpl implements Application {
private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION = private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
new ContainerDoneTransition(); new ContainerDoneTransition();
private static final InitContainerTransition INIT_CONTAINER_TRANSITION =
new InitContainerTransition();
private static StateMachineFactory<ApplicationImpl, ApplicationState, private static StateMachineFactory<ApplicationImpl, ApplicationState,
ApplicationEventType, ApplicationEvent> stateMachineFactory = ApplicationEventType, ApplicationEvent> stateMachineFactory =
new StateMachineFactory<ApplicationImpl, ApplicationState, new StateMachineFactory<ApplicationImpl, ApplicationState,
@ -221,12 +224,12 @@ public class ApplicationImpl implements Application {
ApplicationEventType.INIT_APPLICATION, new AppInitTransition()) ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
.addTransition(ApplicationState.NEW, ApplicationState.NEW, .addTransition(ApplicationState.NEW, ApplicationState.NEW,
ApplicationEventType.INIT_CONTAINER, ApplicationEventType.INIT_CONTAINER,
new InitContainerTransition()) INIT_CONTAINER_TRANSITION)
// Transitions from INITING state // Transitions from INITING state
.addTransition(ApplicationState.INITING, ApplicationState.INITING, .addTransition(ApplicationState.INITING, ApplicationState.INITING,
ApplicationEventType.INIT_CONTAINER, ApplicationEventType.INIT_CONTAINER,
new InitContainerTransition()) INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.INITING, .addTransition(ApplicationState.INITING,
EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT, EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP), ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
@ -249,7 +252,7 @@ public class ApplicationImpl implements Application {
.addTransition(ApplicationState.RUNNING, .addTransition(ApplicationState.RUNNING,
ApplicationState.RUNNING, ApplicationState.RUNNING,
ApplicationEventType.INIT_CONTAINER, ApplicationEventType.INIT_CONTAINER,
new InitContainerTransition()) INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.RUNNING, .addTransition(ApplicationState.RUNNING,
ApplicationState.RUNNING, ApplicationState.RUNNING,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED, ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
@ -268,6 +271,10 @@ public class ApplicationImpl implements Application {
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP), ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
ApplicationEventType.APPLICATION_CONTAINER_FINISHED, ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
new AppFinishTransition()) new AppFinishTransition())
.addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationEventType.INIT_CONTAINER,
INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT, .addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.FINISHING_CONTAINERS_WAIT, ApplicationState.FINISHING_CONTAINERS_WAIT,
EnumSet.of( EnumSet.of(
@ -284,6 +291,10 @@ public class ApplicationImpl implements Application {
ApplicationState.FINISHED, ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP, ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
new AppCompletelyDoneTransition()) new AppCompletelyDoneTransition())
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationEventType.INIT_CONTAINER,
INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, .addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
EnumSet.of( EnumSet.of(
@ -300,9 +311,14 @@ public class ApplicationImpl implements Application {
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED), ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED),
new AppLogsAggregatedTransition()) new AppLogsAggregatedTransition())
.addTransition(ApplicationState.FINISHED,
ApplicationState.FINISHED,
ApplicationEventType.INIT_CONTAINER,
INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED, .addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED,
EnumSet.of( EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
ApplicationEventType.FINISH_APPLICATION)) ApplicationEventType.FINISH_APPLICATION))
// create the topology tables // create the topology tables
.installTopology(); .installTopology();
@ -445,8 +461,9 @@ public class ApplicationImpl implements Application {
app.containers.put(container.getContainerId(), container); app.containers.put(container.getContainerId(), container);
LOG.info("Adding " + container.getContainerId() LOG.info("Adding " + container.getContainerId()
+ " to application " + app.toString()); + " to application " + app.toString());
switch (app.getApplicationState()) { ApplicationState appState = app.getApplicationState();
switch (appState) {
case RUNNING: case RUNNING:
app.dispatcher.getEventHandler().handle(new ContainerInitEvent( app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerId())); container.getContainerId()));
@ -456,8 +473,13 @@ public class ApplicationImpl implements Application {
// these get queued up and sent out in AppInitDoneTransition // these get queued up and sent out in AppInitDoneTransition
break; break;
default: default:
assert false : "Invalid state for InitContainerTransition: " + LOG.warn("Killing {} because {} is in state {}",
app.getApplicationState(); container.getContainerId(), app, appState);
app.dispatcher.getEventHandler().handle(new ContainerKillEvent(
container.getContainerId(),
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
"Application no longer running.\n"));
break;
} }
} }
} }

View File

@ -360,35 +360,66 @@ public class TestApplication {
} }
} }
//TODO Re-work after Application transitions are changed. @Test
// @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testStartContainerAfterAppFinished() { public void testStartContainerAfterAppRunning() {
WrappedApplication wa = null; WrappedApplication wa = null;
try { try {
wa = new WrappedApplication(5, 314159265358979L, "yak", 3); wa = new WrappedApplication(5, 314159265358979L, "yak", 4);
wa.initApplication(); wa.initApplication();
wa.initContainer(-1); wa.initContainer(0);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.applicationInited(); wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
reset(wa.localizerBus);
wa.containerFinished(0);
wa.containerFinished(1);
wa.containerFinished(2);
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
assertEquals(0, wa.app.getContainers().size()); assertEquals(1, wa.app.getContainers().size());
wa.appFinished(); wa.appFinished();
verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(0)
.getContainerId())));
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
wa.app.getApplicationState());
wa.initContainer(1);
verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(1)
.getContainerId())));
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
wa.app.getApplicationState());
wa.containerFinished(1);
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
wa.app.getApplicationState());
wa.containerFinished(0);
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
wa.app.getApplicationState()); wa.app.getApplicationState());
verify(wa.localizerBus).handle( verify(wa.localizerBus).handle(
refEq(new ApplicationLocalizationEvent( refEq(new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app))); LocalizationEventType.DESTROY_APPLICATION_RESOURCES,
wa.app), "timestamp"));
wa.initContainer(2);
verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(2)
.getContainerId())));
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
wa.app.getApplicationState());
wa.containerFinished(2);
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
wa.app.getApplicationState());
wa.appResourcesCleanedup(); wa.appResourcesCleanedup();
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
wa.initContainer(3);
verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(3)
.getContainerId())));
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
wa.containerFinished(3);
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
} finally { } finally {
if (wa != null) if (wa != null)
wa.finished(); wa.finished();