Merge -c 1361813 from trunk to branch-2 to fix MAPREDUCE-4427. Added an 'unmanaged' mode for AMs so as to ease development of new applications. Contributed by Bikas Saha.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1361815 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-07-15 21:47:53 +00:00
parent 5685eb30bb
commit 4902c6bdc9
18 changed files with 363 additions and 233 deletions

View File

@ -24,6 +24,9 @@ Release 2.0.1-alpha - UNRELEASED
MAPREDUCE-4355. Add RunningJob.getJobStatus() (kkambatl via tucu)
MAPREDUCE-4427. Added an 'unmanaged' mode for AMs so as to ease
development of new applications. (Bikas Saha via acmurthy)
OPTIMIZATIONS
BUG FIXES

View File

@ -383,6 +383,7 @@ public static State fromYarn(YarnApplicationState yarnApplicationState,
switch (yarnApplicationState) {
case NEW:
case SUBMITTED:
case ACCEPTED:
return State.PREP;
case RUNNING:
return State.RUNNING;

View File

@ -232,8 +232,9 @@ public MRClientProtocol run() throws IOException {
if (user == null) {
throw RPCUtil.getRemoteException("User is not set in the application report");
}
if (application.getYarnApplicationState() == YarnApplicationState.NEW ||
application.getYarnApplicationState() == YarnApplicationState.SUBMITTED) {
if (application.getYarnApplicationState() == YarnApplicationState.NEW
|| application.getYarnApplicationState() == YarnApplicationState.SUBMITTED
|| application.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
realProxy = null;
return getNotRunningJob(application, JobState.NEW);
}

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -75,13 +76,17 @@ public class NotRunningJob implements MRClientProtocol {
private ApplicationReport getUnknownApplicationReport() {
ApplicationId unknownAppId = recordFactory.newRecordInstance(ApplicationId.class);
ApplicationId unknownAppId = recordFactory
.newRecordInstance(ApplicationId.class);
ApplicationAttemptId unknownAttemptId = recordFactory
.newRecordInstance(ApplicationAttemptId.class);
// Setting AppState to NEW and finalStatus to UNDEFINED as they are never used
// Setting AppState to NEW and finalStatus to UNDEFINED as they are never
// used
// for a non running job
return BuilderUtils.newApplicationReport(unknownAppId, "N/A", "N/A", "N/A", "N/A", 0, "",
YarnApplicationState.NEW, "N/A", "N/A", 0, 0,
FinalApplicationStatus.UNDEFINED, null, "N/A");
return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId,
"N/A", "N/A", "N/A", "N/A", 0, "", YarnApplicationState.NEW, "N/A",
"N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A");
}
NotRunningJob(ApplicationReport applicationReport, JobState jobState) {

View File

@ -50,6 +50,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -404,17 +405,23 @@ private GetCountersRequest getCountersRequest() {
}
private ApplicationReport getFinishedApplicationReport() {
return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
1234, 5), "user", "queue", "appname", "host", 124, null,
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
FinalApplicationStatus.SUCCEEDED, null, "N/A");
ApplicationId appId = BuilderUtils.newApplicationId(1234, 5);
ApplicationAttemptId attemptId = BuilderUtils.newApplicationAttemptId(
appId, 0);
return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue",
"appname", "host", 124, null, YarnApplicationState.FINISHED,
"diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
"N/A");
}
private ApplicationReport getRunningApplicationReport(String host, int port) {
return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
1234, 5), "user", "queue", "appname", host, port, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
FinalApplicationStatus.UNDEFINED, null, "N/A");
ApplicationId appId = BuilderUtils.newApplicationId(1234, 5);
ApplicationAttemptId attemptId = BuilderUtils.newApplicationAttemptId(
appId, 0);
return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue",
"appname", host, port, null, YarnApplicationState.RUNNING,
"diagnostics", "url", 0, 0, FinalApplicationStatus.UNDEFINED, null,
"N/A");
}
private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {

View File

@ -41,6 +41,12 @@ public interface ApplicationConstants {
* only
*/
public static final String AM_CONTAINER_ID_ENV = "AM_CONTAINER_ID";
/**
* The environment variable for APPLICATION_ATTEMPT_ID. Set in AppMaster
* environment only
*/
public static final String AM_APP_ATTEMPT_ID_ENV = "AM_APP_ATTEMPT_ID";
/**
* The environment variable for the NM_HOST. Set in the AppMaster environment

View File

@ -60,6 +60,19 @@ public interface ApplicationReport {
@Private
@Unstable
void setApplicationId(ApplicationId applicationId);
/**
* Get the <code>ApplicationAttemptId</code> of the current
* attempt of the application
* @return <code>ApplicationAttemptId</code> of the attempt
*/
@Private
@Unstable
ApplicationAttemptId getCurrentApplicationAttemptId();
@Private
@Unstable
void setCurrentApplicationAttemptId(ApplicationAttemptId applicationAttemptId);
/**
* Get the <em>user</em> who submitted the application.

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -151,6 +150,28 @@ public interface ApplicationSubmissionContext {
@Public
@Stable
public void setAMContainerSpec(ContainerLaunchContext amContainer);
/**
* Get if the RM should manage the execution of the AM.
* If true, then the RM
* will not allocate a container for the AM and start it. It will expect the
* AM to be launched and connect to the RM within the AM liveliness period and
* fail the app otherwise. The client should launch the AM only after the RM
* has ACCEPTED the application and changed the <code>YarnApplicationState</code>.
* Such apps will not be retried by the RM on app attempt failure.
* The default value is false.
* @return true if the AM is not managed by the RM
*/
@Public
@Unstable
public boolean getUnmanagedAM();
/**
* @param value true if RM should not manage the AM
*/
@Public
@Unstable
public void setUnmanagedAM(boolean value);
/**
* @return true if tokens should be canceled when the app completes.

View File

@ -32,7 +32,7 @@ public enum YarnApplicationState {
/** Application which has been submitted. */
SUBMITTED,
/** Application which is currently running. */
RUNNING,
@ -43,5 +43,8 @@ public enum YarnApplicationState {
FAILED,
/** Application which was terminated by a user or admin. */
KILLED
KILLED,
/** Application has been accepted by the scheduler */
ACCEPTED
}

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProtoOrBuilder;
@ -39,6 +41,7 @@ public class ApplicationReportPBImpl extends ProtoBase<ApplicationReportProto>
boolean viaProto = false;
ApplicationId applicationId;
ApplicationAttemptId currentApplicationAttemptId;
public ApplicationReportPBImpl() {
builder = ApplicationReportProto.newBuilder();
@ -71,6 +74,20 @@ public void setApplicationResourceUsageReport(ApplicationResourceUsageReport app
}
builder.setAppResourceUsage(convertToProtoFormat(appInfo));
}
@Override
public ApplicationAttemptId getCurrentApplicationAttemptId() {
if (this.currentApplicationAttemptId != null) {
return this.currentApplicationAttemptId;
}
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasCurrentApplicationAttemptId()) {
return null;
}
this.currentApplicationAttemptId = convertFromProtoFormat(p.getCurrentApplicationAttemptId());
return this.currentApplicationAttemptId;
}
@Override
public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
@ -198,6 +215,14 @@ public void setApplicationId(ApplicationId applicationId) {
this.applicationId = applicationId;
}
@Override
public void setCurrentApplicationAttemptId(ApplicationAttemptId applicationAttemptId) {
maybeInitBuilder();
if (applicationId == null)
builder.clearStatus();
this.currentApplicationAttemptId = applicationAttemptId;
}
@Override
public void setTrackingUrl(String url) {
maybeInitBuilder();
@ -330,6 +355,11 @@ private void mergeLocalToBuilder() {
builder.getApplicationId())) {
builder.setApplicationId(convertToProtoFormat(this.applicationId));
}
if (this.currentApplicationAttemptId != null
&& !((ApplicationAttemptIdPBImpl) this.currentApplicationAttemptId).getProto().equals(
builder.getCurrentApplicationAttemptId())) {
builder.setCurrentApplicationAttemptId(convertToProtoFormat(this.currentApplicationAttemptId));
}
}
private void mergeLocalToProto() {
@ -350,6 +380,10 @@ private void maybeInitBuilder() {
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl) t).getProto();
}
private ApplicationAttemptIdProto convertToProtoFormat(ApplicationAttemptId t) {
return ((ApplicationAttemptIdPBImpl) t).getProto();
}
private ApplicationResourceUsageReport convertFromProtoFormat(ApplicationResourceUsageReportProto s) {
return ProtoUtils.convertFromProtoFormat(s);
@ -363,6 +397,11 @@ private ApplicationIdPBImpl convertFromProtoFormat(
ApplicationIdProto applicationId) {
return new ApplicationIdPBImpl(applicationId);
}
private ApplicationAttemptIdPBImpl convertFromProtoFormat(
ApplicationAttemptIdProto applicationAttemptId) {
return new ApplicationAttemptIdPBImpl(applicationAttemptId);
}
private YarnApplicationState convertFromProtoFormat(YarnApplicationStateProto s) {
return ProtoUtils.convertFromProtoFormat(s);

View File

@ -207,13 +207,26 @@ public void setAMContainerSpec(ContainerLaunchContext amContainer) {
this.amContainer = amContainer;
}
@Override
public boolean getUnmanagedAM() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
//There is a default so cancelTokens should never be null
return p.getUnmanagedAm();
}
@Override
public void setUnmanagedAM(boolean value) {
maybeInitBuilder();
builder.setUnmanagedAm(value);
}
@Override
public boolean getCancelTokensWhenComplete() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
//There is a default so cancelTokens should never be null
return p.getCancelTokensWhenComplete();
}
@Override
public void setCancelTokensWhenComplete(boolean cancel) {
maybeInitBuilder();

View File

@ -90,6 +90,7 @@ enum YarnApplicationStateProto {
FINISHED = 4;
FAILED = 5;
KILLED = 6;
ACCEPTED = 7;
}
enum FinalApplicationStatusProto {
@ -170,6 +171,7 @@ message ApplicationReportProto {
optional FinalApplicationStatusProto final_application_status = 15;
optional ApplicationResourceUsageReportProto app_resource_Usage = 16;
optional string originalTrackingUrl = 17;
optional ApplicationAttemptIdProto currentApplicationAttemptId = 18;
}
enum NodeStateProto {
@ -235,6 +237,7 @@ message ApplicationSubmissionContextProto {
optional PriorityProto priority = 5;
optional ContainerLaunchContextProto am_container_spec = 6;
optional bool cancel_tokens_when_complete = 7 [default = true];
optional bool unmanaged_am = 8 [default = false];
}
enum ApplicationAccessTypeProto {

View File

@ -331,14 +331,16 @@ public static ResourceRequest newResourceRequest(ResourceRequest r) {
}
public static ApplicationReport newApplicationReport(
ApplicationId applicationId, String user, String queue, String name,
String host, int rpcPort, String clientToken, YarnApplicationState state,
String diagnostics, String url, long startTime, long finishTime,
FinalApplicationStatus finalStatus, ApplicationResourceUsageReport appResources,
String origTrackingUrl) {
ApplicationId applicationId, ApplicationAttemptId applicationAttemptId,
String user, String queue, String name, String host, int rpcPort,
String clientToken, YarnApplicationState state, String diagnostics,
String url, long startTime, long finishTime,
FinalApplicationStatus finalStatus,
ApplicationResourceUsageReport appResources, String origTrackingUrl) {
ApplicationReport report = recordFactory
.newRecordInstance(ApplicationReport.class);
report.setApplicationId(applicationId);
report.setCurrentApplicationAttemptId(applicationAttemptId);
report.setUser(user);
report.setQueue(queue);
report.setName(name);

View File

@ -19,18 +19,13 @@
package org.apache.hadoop.yarn;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
/**
* Utilities to generate fake test apps
@ -66,137 +61,6 @@ public static String newQueue() {
}
}
public static List<ApplicationReport> genApps(int n) {
List<ApplicationReport> list = Lists.newArrayList();
for (int i = 0; i < n; ++i) {
list.add(newApp(i));
}
return list;
}
public static ApplicationReport newApp(int i) {
final ApplicationId id = newAppID(i);
final YarnApplicationState state = newAppState();
final String user = newUserName();
final String name = newAppName();
final String queue = newQueue();
final FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
return new ApplicationReport() {
private ApplicationResourceUsageReport appUsageReport;
@Override public ApplicationId getApplicationId() { return id; }
@Override public String getUser() { return user; }
@Override public String getName() { return name; }
@Override public YarnApplicationState getYarnApplicationState() { return state; }
@Override public String getQueue() { return queue; }
@Override public String getTrackingUrl() { return ""; }
@Override public String getOriginalTrackingUrl() { return ""; }
@Override public FinalApplicationStatus getFinalApplicationStatus() { return finishState; }
@Override
public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
return this.appUsageReport;
}
public void setApplicationId(ApplicationId applicationId) {
// TODO Auto-generated method stub
}
@Override
public void setTrackingUrl(String url) {
// TODO Auto-generated method stub
}
@Override public void setOriginalTrackingUrl(String url) { }
@Override
public void setApplicationResourceUsageReport(ApplicationResourceUsageReport appResources) {
this.appUsageReport = appResources;
}
@Override
public void setName(String name) {
// TODO Auto-generated method stub
}
@Override
public void setQueue(String queue) {
// TODO Auto-generated method stub
}
@Override
public void setYarnApplicationState(YarnApplicationState state) {
// TODO Auto-generated method stub
}
@Override
public void setUser(String user) {
// TODO Auto-generated method stub
}
@Override
public String getDiagnostics() {
// TODO Auto-generated method stub
return null;
}
@Override
public void setDiagnostics(String diagnostics) {
// TODO Auto-generated method stub
}
@Override
public String getHost() {
// TODO Auto-generated method stub
return null;
}
@Override
public void setHost(String host) {
// TODO Auto-generated method stub
}
@Override
public int getRpcPort() {
// TODO Auto-generated method stub
return 0;
}
@Override
public void setRpcPort(int rpcPort) {
// TODO Auto-generated method stub
}
@Override
public String getClientToken() {
// TODO Auto-generated method stub
return null;
}
@Override
public void setClientToken(String clientToken) {
// TODO Auto-generated method stub
}
@Override
public long getStartTime() {
// TODO Auto-generated method stub
return 0;
}
@Override
public void setStartTime(long startTime) {
// TODO Auto-generated method stub
}
@Override
public long getFinishTime() {
// TODO Auto-generated method stub
return 0;
}
@Override
public void setFinishTime(long finishTime) {
// TODO Auto-generated method stub
}
@Override
public void setFinalApplicationStatus(FinalApplicationStatus finishState) {
// TODO Auto-generated method stub
}
};
}
public static ApplicationId newAppID(int i) {
ApplicationId id = Records.newRecord(ApplicationId.class);
id.setClusterTimestamp(TS);

View File

@ -333,8 +333,9 @@ private YarnApplicationState createApplicationState(RMAppState rmAppState) {
case NEW:
return YarnApplicationState.NEW;
case SUBMITTED:
case ACCEPTED:
return YarnApplicationState.SUBMITTED;
case ACCEPTED:
return YarnApplicationState.ACCEPTED;
case RUNNING:
return YarnApplicationState.RUNNING;
case FINISHED:
@ -403,12 +404,12 @@ public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
} else {
appUsageReport = DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
}
return BuilderUtils.newApplicationReport(this.applicationId, this.user,
this.queue, this.name, host, rpcPort, clientToken,
createApplicationState(this.stateMachine.getCurrentState()),
diags, trackingUrl,
this.startTime, this.finishTime, finishState, appUsageReport,
origTrackingUrl);
return BuilderUtils.newApplicationReport(this.applicationId,
this.currentAttempt.getAppAttemptId(), this.user, this.queue,
this.name, host, rpcPort, clientToken,
createApplicationState(this.stateMachine.getCurrentState()), diags,
trackingUrl, this.startTime, this.finishTime, finishState,
appUsageReport, origTrackingUrl);
} finally {
this.readLock.unlock();
}
@ -599,21 +600,32 @@ public AttemptFailedTransition(RMAppState initialState) {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent)event);
if (app.attempts.size() == app.maxRetries) {
String msg = "Application " + app.getApplicationId()
+ " failed " + app.maxRetries
+ " times due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent) event);
boolean retryApp = true;
String msg = null;
if (app.submissionContext.getUnmanagedAM()) {
// RM does not manage the AM. Do not retry
retryApp = false;
msg = "Unmanaged application " + app.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
} else if (app.attempts.size() == app.maxRetries) {
retryApp = false;
msg = "Application " + app.getApplicationId() + " failed "
+ app.maxRetries + " times due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
}
if (retryApp) {
app.createNewAttempt();
return initialState;
} else {
LOG.info(msg);
app.diagnostics.append(msg);
// Inform the node for app-finish
FINAL_TRANSITION.transition(app, event);
return RMAppState.FAILED;
}
app.createNewAttempt();
return initialState;
}
}

View File

@ -143,16 +143,24 @@ RMAppAttemptEventType.START, new AttemptStartedTransition())
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.KILLED,
RMAppAttemptEventType.KILL,
new BaseFinalTransition(RMAppAttemptState.KILLED))
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED,
RMAppAttemptEventType.REGISTERED,
new UnexpectedAMRegisteredTransition())
// Transitions from SUBMITTED state
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.SCHEDULED,
RMAppAttemptEventType.APP_ACCEPTED, new ScheduleTransition())
.addTransition(RMAppAttemptState.SUBMITTED,
EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.APP_ACCEPTED,
new ScheduleTransition())
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED,
RMAppAttemptEventType.KILL,
new BaseFinalTransition(RMAppAttemptState.KILLED))
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
RMAppAttemptEventType.REGISTERED,
new UnexpectedAMRegisteredTransition())
// Transitions from SCHEDULED State
.addTransition(RMAppAttemptState.SCHEDULED,
RMAppAttemptState.ALLOCATED,
@ -173,7 +181,7 @@ RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
RMAppAttemptEventType.LAUNCH_FAILED, new LaunchFailedTransition())
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.KILLED,
RMAppAttemptEventType.KILL, new KillAllocatedAMTransition())
// Transitions from LAUNCHED State
.addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
@ -583,9 +591,11 @@ public void transition(RMAppAttemptImpl appAttempt,
private static final List<ResourceRequest> EMPTY_CONTAINER_REQUEST_LIST =
new ArrayList<ResourceRequest>();
private static final class ScheduleTransition extends BaseTransition {
private static final class ScheduleTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override
public void transition(RMAppAttemptImpl appAttempt,
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
// Send the acceptance to the app
@ -593,17 +603,27 @@ public void transition(RMAppAttemptImpl appAttempt,
.getApplicationAttemptId().getApplicationId(),
RMAppEventType.APP_ACCEPTED));
// Request a container for the AM.
ResourceRequest request = BuilderUtils.newResourceRequest(
AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
.getAMContainerSpec().getResource(), 1);
if (!appAttempt.submissionContext.getUnmanagedAM()) {
// Request a container for the AM.
ResourceRequest request = BuilderUtils.newResourceRequest(
AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
.getAMContainerSpec().getResource(), 1);
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST);
if (amContainerAllocation != null
&& amContainerAllocation.getContainers() != null) {
assert(amContainerAllocation.getContainers().size() == 0);
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId,
Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST);
if (amContainerAllocation != null
&& amContainerAllocation.getContainers() != null) {
assert (amContainerAllocation.getContainers().size() == 0);
}
return RMAppAttemptState.SCHEDULED;
} else {
// RM not allocating container. AM is self launched.
// Directly go to LAUNCHED state
// Register with AMLivelinessMonitor
appAttempt.rmContext.getAMLivelinessMonitor().register(
appAttempt.applicationAttemptId);
return RMAppAttemptState.LAUNCHED;
}
}
}
@ -811,11 +831,30 @@ public void transition(RMAppAttemptImpl appAttempt,
appAttempt.rmContext.getAMLivelinessMonitor().unregister(
appAttempt.getAppAttemptId());
// Tell the launcher to cleanup.
appAttempt.eventHandler.handle(new AMLauncherEvent(
AMLauncherEventType.CLEANUP, appAttempt));
if(!appAttempt.submissionContext.getUnmanagedAM()) {
// Tell the launcher to cleanup.
appAttempt.eventHandler.handle(new AMLauncherEvent(
AMLauncherEventType.CLEANUP, appAttempt));
}
}
}
private static class UnexpectedAMRegisteredTransition extends
BaseFinalTransition {
public UnexpectedAMRegisteredTransition() {
super(RMAppAttemptState.FAILED);
}
@Override
public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
assert appAttempt.submissionContext.getUnmanagedAM();
appAttempt
.setDiagnostics("Unmanaged AM must register after AM attempt reaches LAUNCHED state.");
super.transition(appAttempt, event);
}
}
private static final class StatusUpdateTransition extends
BaseTransition {
@ -884,8 +923,11 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
// Is this container the AmContainer? If the finished container is same as
// the AMContainer, AppAttempt fails
if (appAttempt.masterContainer.getId().equals(
containerStatus.getContainerId())) {
if (appAttempt.masterContainer != null
&& appAttempt.masterContainer.getId().equals(
containerStatus.getContainerId())) {
// container associated with AM. must not be unmanaged
assert appAttempt.submissionContext.getUnmanagedAM() == false;
// Setup diagnostic message
appAttempt.diagnostics.append("AM Container for " +
appAttempt.getAppAttemptId() + " exited with " +

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -131,7 +132,7 @@ public void setUp() throws Exception {
rmDispatcher.start();
}
protected RMApp createNewTestApp() {
protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) {
ApplicationId applicationId = MockApps.newAppID(appId++);
String user = MockApps.newUserName();
String name = MockApps.newAppName();
@ -139,12 +140,15 @@ protected RMApp createNewTestApp() {
Configuration conf = new YarnConfiguration();
// ensure max retries set to known value
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, maxRetries);
ApplicationSubmissionContext submissionContext = null;
String clientTokenStr = "bogusstring";
ApplicationStore appStore = mock(ApplicationStore.class);
YarnScheduler scheduler = mock(YarnScheduler.class);
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
if(submissionContext == null) {
submissionContext = new ApplicationSubmissionContextPBImpl();
}
RMApp application = new RMAppImpl(applicationId, rmContext,
conf, name, user,
@ -235,8 +239,9 @@ private static void assertFailed(RMApp application, String regex) {
diag.toString().matches(regex));
}
protected RMApp testCreateAppSubmitted() throws IOException {
RMApp application = createNewTestApp();
protected RMApp testCreateAppSubmitted(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = createNewTestApp(submissionContext);
// NEW => SUBMITTED event RMAppEventType.START
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.START);
@ -246,9 +251,10 @@ protected RMApp testCreateAppSubmitted() throws IOException {
return application;
}
protected RMApp testCreateAppAccepted() throws IOException {
RMApp application = testCreateAppSubmitted();
// SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED
protected RMApp testCreateAppAccepted(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = testCreateAppSubmitted(submissionContext);
// SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED
RMAppEvent event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.APP_ACCEPTED);
@ -258,8 +264,9 @@ protected RMApp testCreateAppAccepted() throws IOException {
return application;
}
protected RMApp testCreateAppRunning() throws IOException {
RMApp application = testCreateAppAccepted();
protected RMApp testCreateAppRunning(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = testCreateAppAccepted(submissionContext);
// ACCEPTED => RUNNING event RMAppEventType.ATTEMPT_REGISTERED
RMAppEvent event =
new RMAppEvent(application.getApplicationId(),
@ -271,8 +278,9 @@ protected RMApp testCreateAppRunning() throws IOException {
return application;
}
protected RMApp testCreateAppFinished() throws IOException {
RMApp application = testCreateAppRunning();
protected RMApp testCreateAppFinished(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = testCreateAppRunning(submissionContext);
// RUNNING => FINISHED event RMAppEventType.ATTEMPT_FINISHED
RMAppEvent event =
new RMAppEvent(application.getApplicationId(),
@ -285,17 +293,38 @@ protected RMApp testCreateAppFinished() throws IOException {
return application;
}
@Test
public void testUnmanagedApp() throws IOException {
ApplicationSubmissionContext subContext = new ApplicationSubmissionContextPBImpl();
subContext.setUnmanagedAM(true);
// test success path
LOG.info("--- START: testUnmanagedAppSuccessPath ---");
testCreateAppFinished(subContext);
// test app fails after 1 app attempt failure
LOG.info("--- START: testUnmanagedAppFailPath ---");
RMApp application = testCreateAppRunning(subContext);
RMAppEvent event = new RMAppFailedAttemptEvent(
application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
RMAppAttempt appAttempt = application.getCurrentAppAttempt();
Assert.assertEquals(1, appAttempt.getAppAttemptId().getAttemptId());
assertFailed(application,
".*Unmanaged application.*Failing the application.*");
}
@Test
public void testAppSuccessPath() throws IOException {
LOG.info("--- START: testAppSuccessPath ---");
testCreateAppFinished();
testCreateAppFinished(null);
}
@Test
public void testAppNewKill() throws IOException {
LOG.info("--- START: testAppNewKill ---");
RMApp application = createNewTestApp();
RMApp application = createNewTestApp(null);
// NEW => KILLED event RMAppEventType.KILL
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
@ -307,7 +336,7 @@ public void testAppNewKill() throws IOException {
public void testAppNewReject() throws IOException {
LOG.info("--- START: testAppNewReject ---");
RMApp application = createNewTestApp();
RMApp application = createNewTestApp(null);
// NEW => FAILED event RMAppEventType.APP_REJECTED
String rejectedText = "Test Application Rejected";
RMAppEvent event =
@ -320,7 +349,7 @@ public void testAppNewReject() throws IOException {
public void testAppSubmittedRejected() throws IOException {
LOG.info("--- START: testAppSubmittedRejected ---");
RMApp application = testCreateAppSubmitted();
RMApp application = testCreateAppSubmitted(null);
// SUBMITTED => FAILED event RMAppEventType.APP_REJECTED
String rejectedText = "app rejected";
RMAppEvent event =
@ -333,7 +362,7 @@ public void testAppSubmittedRejected() throws IOException {
public void testAppSubmittedKill() throws IOException {
LOG.info("--- START: testAppSubmittedKill---");
RMApp application = testCreateAppAccepted();
RMApp application = testCreateAppAccepted(null);
// SUBMITTED => KILLED event RMAppEventType.KILL
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), application);
@ -345,7 +374,7 @@ public void testAppSubmittedKill() throws IOException {
public void testAppAcceptedFailed() throws IOException {
LOG.info("--- START: testAppAcceptedFailed ---");
RMApp application = testCreateAppAccepted();
RMApp application = testCreateAppAccepted(null);
// ACCEPTED => ACCEPTED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED
for (int i=1; i<maxRetries; i++) {
RMAppEvent event =
@ -374,7 +403,7 @@ public void testAppAcceptedFailed() throws IOException {
public void testAppAcceptedKill() throws IOException {
LOG.info("--- START: testAppAcceptedKill ---");
RMApp application = testCreateAppAccepted();
RMApp application = testCreateAppAccepted(null);
// ACCEPTED => KILLED event RMAppEventType.KILL
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
@ -386,7 +415,7 @@ public void testAppAcceptedKill() throws IOException {
public void testAppRunningKill() throws IOException {
LOG.info("--- START: testAppRunningKill ---");
RMApp application = testCreateAppRunning();
RMApp application = testCreateAppRunning(null);
// RUNNING => KILLED event RMAppEventType.KILL
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
@ -398,7 +427,7 @@ public void testAppRunningKill() throws IOException {
public void testAppRunningFailed() throws IOException {
LOG.info("--- START: testAppRunningFailed ---");
RMApp application = testCreateAppRunning();
RMApp application = testCreateAppRunning(null);
RMAppAttempt appAttempt = application.getCurrentAppAttempt();
int expectedAttemptId = 1;
Assert.assertEquals(expectedAttemptId,
@ -444,7 +473,7 @@ public void testAppRunningFailed() throws IOException {
public void testAppFinishedFinished() throws IOException {
LOG.info("--- START: testAppFinishedFinished ---");
RMApp application = testCreateAppFinished();
RMApp application = testCreateAppFinished(null);
// FINISHED => FINISHED event RMAppEventType.KILL
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
@ -460,7 +489,7 @@ public void testAppFinishedFinished() throws IOException {
public void testAppKilledKilled() throws IOException {
LOG.info("--- START: testAppKilledKilled ---");
RMApp application = testCreateAppRunning();
RMApp application = testCreateAppRunning(null);
// RUNNING => KILLED event RMAppEventType.KILL
RMAppEvent event =

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@ -56,7 +57,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
@ -83,6 +86,7 @@ public class TestRMAppAttemptTransitions {
private YarnScheduler scheduler;
private ApplicationMasterService masterService;
private ApplicationMasterLauncher applicationMasterLauncher;
private AMLivelinessMonitor amLivelinessMonitor;
private RMApp application;
private RMAppAttempt applicationAttempt;
@ -135,6 +139,9 @@ public void handle(AMLauncherEvent event) {
}
private static int appId = 1;
private ApplicationSubmissionContext submissionContext = null;
private boolean unmanagedAM;
@Before
public void setUp() throws Exception {
@ -142,7 +149,7 @@ public void setUp() throws Exception {
ContainerAllocationExpirer containerAllocationExpirer =
mock(ContainerAllocationExpirer.class);
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
amLivelinessMonitor = mock(AMLivelinessMonitor.class);
rmContext =
new RMContextImpl(new MemStore(), rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, null,
@ -174,8 +181,7 @@ public void setUp() throws Exception {
final String user = MockApps.newUserName();
final String queue = MockApps.newQueue();
ApplicationSubmissionContext submissionContext =
mock(ApplicationSubmissionContext.class);
submissionContext = mock(ApplicationSubmissionContext.class);
when(submissionContext.getUser()).thenReturn(user);
when(submissionContext.getQueue()).thenReturn(queue);
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
@ -183,6 +189,8 @@ public void setUp() throws Exception {
when(amContainerSpec.getResource()).thenReturn(resource);
when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
unmanagedAM = false;
application = mock(RMApp.class);
applicationAttempt =
new RMAppAttemptImpl(applicationAttemptId, null, rmContext, scheduler,
@ -247,7 +255,8 @@ private void testAppAttemptSubmittedToFailedState(String diagnostics) {
assertEquals(0, applicationAttempt.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus());
// Check events
// this works for unmanaged and managed AM's because this is actually doing
// verify(application).handle(anyObject());
verify(application).handle(any(RMAppRejectedEvent.class));
}
@ -269,9 +278,24 @@ private void testAppAttemptKilledState(Container amContainer,
/**
* {@link RMAppAttemptState#SCHEDULED}
*/
@SuppressWarnings("unchecked")
private void testAppAttemptScheduledState() {
assertEquals(RMAppAttemptState.SCHEDULED,
RMAppAttemptState expectedState;
int expectedAllocateCount;
if(unmanagedAM) {
expectedState = RMAppAttemptState.LAUNCHED;
expectedAllocateCount = 0;
} else {
expectedState = RMAppAttemptState.SCHEDULED;
expectedAllocateCount = 1;
}
assertEquals(expectedState,
applicationAttempt.getAppAttemptState());
verify(scheduler, times(expectedAllocateCount)).
allocate(any(ApplicationAttemptId.class),
any(List.class), any(List.class));
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertNull(applicationAttempt.getMasterContainer());
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
@ -280,9 +304,6 @@ private void testAppAttemptScheduledState() {
// Check events
verify(application).handle(any(RMAppEvent.class));
verify(scheduler).
allocate(any(ApplicationAttemptId.class),
any(List.class), any(List.class));
}
/**
@ -351,14 +372,16 @@ private void testAppAttemptRunningState(Container container,
private void testAppAttemptFinishedState(Container container,
FinalApplicationStatus finalStatus,
String trackingUrl,
String diagnostics) {
String diagnostics,
int finishedContainerCount) {
assertEquals(RMAppAttemptState.FINISHED,
applicationAttempt.getAppAttemptState());
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals("null/proxy/"+applicationAttempt.getAppAttemptId().
getApplicationId()+"/", applicationAttempt.getTrackingUrl());
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertEquals(finishedContainerCount, applicationAttempt
.getJustFinishedContainers().size());
assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
}
@ -424,7 +447,50 @@ private void runApplicationAttempt(Container container,
testAppAttemptRunningState(container, host, rpcPort, trackingUrl);
}
@Test
public void testUnmanagedAMSuccess() {
unmanagedAM = true;
when(submissionContext.getUnmanagedAM()).thenReturn(true);
// submit AM and check it goes to LAUNCHED state
scheduleApplicationAttempt();
testAppAttemptLaunchedState(null);
verify(amLivelinessMonitor, times(1)).register(
applicationAttempt.getAppAttemptId());
// launch AM
runApplicationAttempt(null, "host", 8042, "oldtrackingurl");
// complete a container
applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent(
applicationAttempt.getAppAttemptId(), mock(Container.class)));
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class)));
// complete AM
String trackingUrl = "mytrackingurl";
String diagnostics = "Successful";
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
applicationAttempt.handle(new RMAppAttemptUnregistrationEvent(
applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus,
diagnostics));
testAppAttemptFinishedState(null, finalStatus, trackingUrl, diagnostics, 1);
}
@Test
public void testUnmanagedAMUnexpectedRegistration() {
unmanagedAM = true;
when(submissionContext.getUnmanagedAM()).thenReturn(true);
// submit AM and check it goes to SUBMITTED state
submitApplicationAttempt();
assertEquals(RMAppAttemptState.SUBMITTED,
applicationAttempt.getAppAttemptState());
// launch AM and verify attempt failed
applicationAttempt.handle(new RMAppAttemptRegistrationEvent(
applicationAttempt.getAppAttemptId(), "host", 8042, "oldtrackingurl"));
testAppAttemptSubmittedToFailedState("Unmanaged AM must register after AM attempt reaches LAUNCHED state.");
}
@Test
public void testNewToKilled() {
@ -499,7 +565,7 @@ public void testUnregisterToKilledFinish() {
applicationAttempt.getAppAttemptId(),
trackingUrl, finalStatus, diagnostics));
testAppAttemptFinishedState(amContainer, finalStatus,
trackingUrl, diagnostics);
trackingUrl, diagnostics, 0);
}
@ -516,7 +582,7 @@ public void testUnregisterToSuccessfulFinish() {
applicationAttempt.getAppAttemptId(),
trackingUrl, finalStatus, diagnostics));
testAppAttemptFinishedState(amContainer, finalStatus,
trackingUrl, diagnostics);
trackingUrl, diagnostics, 0);
}
}