MAPREDUCE-3162. Separated application-init and container-init event types in NodeManager's Application state machine. Contributed by Todd Lipcon.

svn merge -c r1185988 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1185990 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-19 06:42:38 +00:00
parent e3b9d11da8
commit db1c89fd51
7 changed files with 116 additions and 52 deletions

View File

@ -1629,6 +1629,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3203. Fix some javac warnings in MRAppMaster. (mahadev) MAPREDUCE-3203. Fix some javac warnings in MRAppMaster. (mahadev)
MAPREDUCE-3162. Separated application-init and container-init event types
in NodeManager's Application state machine. (Todd Lipcon via vinodkv)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
@ -225,6 +226,7 @@ public class ContainerManagerImpl extends CompositeService implements
/** /**
* Start a container on this NodeManager. * Start a container on this NodeManager.
*/ */
@SuppressWarnings("unchecked")
@Override @Override
public StartContainerResponse startContainer(StartContainerRequest request) public StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException { throws YarnRemoteException {
@ -274,10 +276,13 @@ public class ContainerManagerImpl extends CompositeService implements
context.getApplications().putIfAbsent(applicationID, application)) { context.getApplications().putIfAbsent(applicationID, application)) {
LOG.info("Creating a new application reference for app " LOG.info("Creating a new application reference for app "
+ applicationID); + applicationID);
dispatcher.getEventHandler().handle(
new ApplicationInitEvent(applicationID));
} }
// TODO: Validate the request // TODO: Validate the request
dispatcher.getEventHandler().handle(new ApplicationInitEvent(container)); dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
NMAuditLogger.logSuccess(launchContext.getUser(), NMAuditLogger.logSuccess(launchContext.getUser(),
AuditConstants.START_CONTAINER, "ContainerManageImpl", AuditConstants.START_CONTAINER, "ContainerManageImpl",

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
/**
* Event sent from {@link ContainerManagerImpl} to {@link ApplicationImpl} to
* request the initialization of a container. This is funneled through
* the Application so that the application life-cycle can be checked, and container
* launches can be delayed until the application is fully initialized.
*
* Once the application is initialized,
* {@link ApplicationImpl.InitContainerTransition} simply passes this event on as a
* {@link ContainerInitEvent}.
*
*/
public class ApplicationContainerInitEvent extends ApplicationEvent {
final Container container;
public ApplicationContainerInitEvent(Container container) {
super(container.getContainerID().getApplicationAttemptId().getApplicationId(),
ApplicationEventType.INIT_CONTAINER);
this.container = container;
}
Container getContainer() {
return container;
}
}

View File

@ -22,6 +22,7 @@ public enum ApplicationEventType {
// Source: ContainerManager // Source: ContainerManager
INIT_APPLICATION, INIT_APPLICATION,
INIT_CONTAINER,
FINISH_APPLICATION, FINISH_APPLICATION,
// Source: ResourceLocalizationService // Source: ResourceLocalizationService

View File

@ -104,11 +104,14 @@ public class ApplicationImpl implements Application {
// Transitions from NEW state // Transitions from NEW state
.addTransition(ApplicationState.NEW, ApplicationState.INITING, .addTransition(ApplicationState.NEW, ApplicationState.INITING,
ApplicationEventType.INIT_APPLICATION, new AppInitTransition()) ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
.addTransition(ApplicationState.NEW, ApplicationState.NEW,
ApplicationEventType.INIT_CONTAINER,
new InitContainerTransition())
// Transitions from INITING state // Transitions from INITING state
.addTransition(ApplicationState.INITING, ApplicationState.INITING, .addTransition(ApplicationState.INITING, ApplicationState.INITING,
ApplicationEventType.INIT_APPLICATION, ApplicationEventType.INIT_CONTAINER,
new AppIsInitingTransition()) new InitContainerTransition())
.addTransition(ApplicationState.INITING, .addTransition(ApplicationState.INITING,
EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT, EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP), ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
@ -121,8 +124,8 @@ public class ApplicationImpl implements Application {
// Transitions from RUNNING state // Transitions from RUNNING state
.addTransition(ApplicationState.RUNNING, .addTransition(ApplicationState.RUNNING,
ApplicationState.RUNNING, ApplicationState.RUNNING,
ApplicationEventType.INIT_APPLICATION, ApplicationEventType.INIT_CONTAINER,
new DuplicateAppInitTransition()) new InitContainerTransition())
.addTransition(ApplicationState.RUNNING, .addTransition(ApplicationState.RUNNING,
ApplicationState.RUNNING, ApplicationState.RUNNING,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED, ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
@ -167,9 +170,6 @@ public class ApplicationImpl implements Application {
SingleArcTransition<ApplicationImpl, ApplicationEvent> { SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override @Override
public void transition(ApplicationImpl app, ApplicationEvent event) { public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
Container container = initEvent.getContainer();
app.containers.put(container.getContainerID(), container);
app.dispatcher.getEventHandler().handle( app.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent( new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
@ -177,17 +177,36 @@ public class ApplicationImpl implements Application {
} }
/** /**
* Absorb initialization events while the application initializes. * Handles INIT_CONTAINER events which request that we launch a new
* container. When we're still in the INITTING state, we simply
* queue these up. When we're in the RUNNING state, we pass along
* an ContainerInitEvent to the appropriate ContainerImpl.
*/ */
static class AppIsInitingTransition implements @SuppressWarnings("unchecked")
static class InitContainerTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> { SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override @Override
public void transition(ApplicationImpl app, ApplicationEvent event) { public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationInitEvent initEvent = (ApplicationInitEvent) event; ApplicationContainerInitEvent initEvent =
(ApplicationContainerInitEvent) event;
Container container = initEvent.getContainer(); Container container = initEvent.getContainer();
app.containers.put(container.getContainerID(), container); app.containers.put(container.getContainerID(), container);
LOG.info("Adding " + container.getContainerID() LOG.info("Adding " + container.getContainerID()
+ " to application " + app.toString()); + " to application " + app.toString());
switch (app.getApplicationState()) {
case RUNNING:
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerID()));
break;
case INITING:
case NEW:
// these get queued up and sent out in AppInitDoneTransition
break;
default:
assert false : "Invalid state for InitContainerTransition: " +
app.getApplicationState();
}
} }
} }
@ -211,20 +230,6 @@ public class ApplicationImpl implements Application {
} }
} }
@SuppressWarnings("unchecked")
static class DuplicateAppInitTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
Container container = initEvent.getContainer();
app.containers.put(container.getContainerID(), container);
LOG.info("Adding " + container.getContainerID()
+ " to application " + app.toString());
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerID()));
}
}
static final class ContainerDoneTransition implements static final class ContainerDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> { SingleArcTransition<ApplicationImpl, ApplicationEvent> {

View File

@ -18,20 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.api.records.ApplicationId;
public class ApplicationInitEvent extends ApplicationEvent { public class ApplicationInitEvent extends ApplicationEvent {
private final Container container; public ApplicationInitEvent(ApplicationId appId) {
super(appId, ApplicationEventType.INIT_APPLICATION);
public ApplicationInitEvent(Container container) {
super(container.getContainerID().getApplicationAttemptId().getApplicationId(),
ApplicationEventType.INIT_APPLICATION);
this.container = container;
} }
public Container getContainer() {
return this.container;
}
} }

View File

@ -41,11 +41,12 @@ public class TestApplication {
WrappedApplication wa = null; WrappedApplication wa = null;
try { try {
wa = new WrappedApplication(1, 314159265358979L, "yak", 3); wa = new WrappedApplication(1, 314159265358979L, "yak", 3);
wa.initApplication(1); wa.initApplication();
wa.initContainer(1);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
assertEquals(1, wa.app.getContainers().size()); assertEquals(1, wa.app.getContainers().size());
wa.initApplication(0); wa.initContainer(0);
wa.initApplication(2); wa.initContainer(2);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
assertEquals(3, wa.app.getContainers().size()); assertEquals(3, wa.app.getContainers().size());
wa.applicationInited(); wa.applicationInited();
@ -70,7 +71,8 @@ public class TestApplication {
WrappedApplication wa = null; WrappedApplication wa = null;
try { try {
wa = new WrappedApplication(2, 314159265358979L, "yak", 3); wa = new WrappedApplication(2, 314159265358979L, "yak", 3);
wa.initApplication(0); wa.initApplication();
wa.initContainer(0);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
assertEquals(1, wa.app.getContainers().size()); assertEquals(1, wa.app.getContainers().size());
@ -80,8 +82,8 @@ public class TestApplication {
argThat(new ContainerInitMatcher(wa.containers.get(0) argThat(new ContainerInitMatcher(wa.containers.get(0)
.getContainerID()))); .getContainerID())));
wa.initApplication(1); wa.initContainer(1);
wa.initApplication(2); wa.initContainer(2);
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
assertEquals(3, wa.app.getContainers().size()); assertEquals(3, wa.app.getContainers().size());
@ -105,7 +107,8 @@ public class TestApplication {
WrappedApplication wa = null; WrappedApplication wa = null;
try { try {
wa = new WrappedApplication(3, 314159265358979L, "yak", 3); wa = new WrappedApplication(3, 314159265358979L, "yak", 3);
wa.initApplication(-1); wa.initApplication();
wa.initContainer(-1);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.applicationInited(); wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
@ -130,7 +133,8 @@ public class TestApplication {
WrappedApplication wa = null; WrappedApplication wa = null;
try { try {
wa = new WrappedApplication(4, 314159265358979L, "yak", 3); wa = new WrappedApplication(4, 314159265358979L, "yak", 3);
wa.initApplication(-1); wa.initApplication();
wa.initContainer(-1);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.applicationInited(); wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
@ -185,7 +189,8 @@ public class TestApplication {
WrappedApplication wa = null; WrappedApplication wa = null;
try { try {
wa = new WrappedApplication(5, 314159265358979L, "yak", 3); wa = new WrappedApplication(5, 314159265358979L, "yak", 3);
wa.initApplication(-1); wa.initApplication();
wa.initContainer(-1);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.applicationInited(); wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
@ -220,7 +225,8 @@ public class TestApplication {
WrappedApplication wa = null; WrappedApplication wa = null;
try { try {
wa = new WrappedApplication(5, 314159265358979L, "yak", 3); wa = new WrappedApplication(5, 314159265358979L, "yak", 3);
wa.initApplication(-1); wa.initApplication();
wa.initContainer(-1);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.applicationInited(); wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
@ -256,7 +262,8 @@ public class TestApplication {
WrappedApplication wa = null; WrappedApplication wa = null;
try { try {
wa = new WrappedApplication(1, 314159265358979L, "yak", 3); wa = new WrappedApplication(1, 314159265358979L, "yak", 3);
wa.initApplication(0); wa.initApplication();
wa.initContainer(0);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
assertEquals(1, wa.app.getContainers().size()); assertEquals(1, wa.app.getContainers().size());
@ -276,7 +283,7 @@ public class TestApplication {
refEq(new ApplicationLocalizationEvent( refEq(new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app))); LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
wa.initApplication(1); wa.initContainer(1);
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
wa.app.getApplicationState()); wa.app.getApplicationState());
assertEquals(0, wa.app.getContainers().size()); assertEquals(0, wa.app.getContainers().size());
@ -376,13 +383,18 @@ public class TestApplication {
dispatcher.stop(); dispatcher.stop();
} }
public void initApplication(int containerNum) { public void initApplication() {
app.handle(new ApplicationInitEvent(appId));
}
public void initContainer(int containerNum) {
if (containerNum == -1) { if (containerNum == -1) {
for (int i = 0; i < containers.size(); i++) { for (int i = 0; i < containers.size(); i++) {
app.handle(new ApplicationInitEvent(containers.get(i))); app.handle(new ApplicationContainerInitEvent(containers.get(i)));
} }
} else { } else {
app.handle(new ApplicationInitEvent(containers.get(containerNum))); app.handle(new ApplicationContainerInitEvent(containers.get(containerNum)));
} }
drainDispatcherEvents(); drainDispatcherEvents();
} }