diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 6fe36e7b668..b56c5a529e7 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -24,6 +24,9 @@ Release 2.0.1-alpha - UNRELEASED MAPREDUCE-4355. Add RunningJob.getJobStatus() (kkambatl via tucu) + MAPREDUCE-4427. Added an 'unmanaged' mode for AMs so as to ease + development of new applications. (Bikas Saha via acmurthy) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java index 12dce4b9011..173807a6180 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java @@ -383,6 +383,7 @@ public class TypeConverter { switch (yarnApplicationState) { case NEW: case SUBMITTED: + case ACCEPTED: return State.PREP; case RUNNING: return State.RUNNING; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java index 0143cb73913..19d83a8190f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java @@ -232,8 +232,9 @@ public class ClientServiceDelegate { if (user == null) { throw RPCUtil.getRemoteException("User is not set in the application report"); } - if (application.getYarnApplicationState() == YarnApplicationState.NEW || - application.getYarnApplicationState() == YarnApplicationState.SUBMITTED) { + if (application.getYarnApplicationState() == YarnApplicationState.NEW + || application.getYarnApplicationState() == YarnApplicationState.SUBMITTED + || application.getYarnApplicationState() == YarnApplicationState.ACCEPTED) { realProxy = null; return getNotRunningJob(application, JobState.NEW); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java index 3d00e8af8c9..f9b25b02131 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java @@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -75,13 +76,17 @@ public class NotRunningJob implements MRClientProtocol { private ApplicationReport getUnknownApplicationReport() { - ApplicationId unknownAppId = recordFactory.newRecordInstance(ApplicationId.class); + ApplicationId unknownAppId = recordFactory + .newRecordInstance(ApplicationId.class); + ApplicationAttemptId unknownAttemptId = recordFactory + .newRecordInstance(ApplicationAttemptId.class); - // Setting AppState to NEW and finalStatus to UNDEFINED as they are never used + // Setting AppState to NEW and finalStatus to UNDEFINED as they are never + // used // for a non running job - return BuilderUtils.newApplicationReport(unknownAppId, "N/A", "N/A", "N/A", "N/A", 0, "", - YarnApplicationState.NEW, "N/A", "N/A", 0, 0, - FinalApplicationStatus.UNDEFINED, null, "N/A"); + return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId, + "N/A", "N/A", "N/A", "N/A", 0, "", YarnApplicationState.NEW, "N/A", + "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A"); } NotRunningJob(ApplicationReport applicationReport, JobState jobState) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java index a3940054c59..ecb51960223 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java @@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -404,17 +405,23 @@ public class TestClientServiceDelegate { } private ApplicationReport getFinishedApplicationReport() { - return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId( - 1234, 5), "user", "queue", "appname", "host", 124, null, - YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0, - FinalApplicationStatus.SUCCEEDED, null, "N/A"); + ApplicationId appId = BuilderUtils.newApplicationId(1234, 5); + ApplicationAttemptId attemptId = BuilderUtils.newApplicationAttemptId( + appId, 0); + return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue", + "appname", "host", 124, null, YarnApplicationState.FINISHED, + "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null, + "N/A"); } private ApplicationReport getRunningApplicationReport(String host, int port) { - return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId( - 1234, 5), "user", "queue", "appname", host, port, null, - YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, - FinalApplicationStatus.UNDEFINED, null, "N/A"); + ApplicationId appId = BuilderUtils.newApplicationId(1234, 5); + ApplicationAttemptId attemptId = BuilderUtils.newApplicationAttemptId( + appId, 0); + return BuilderUtils.newApplicationReport(appId, attemptId, "user", "queue", + "appname", host, port, null, YarnApplicationState.RUNNING, + "diagnostics", "url", 0, 0, FinalApplicationStatus.UNDEFINED, null, + "N/A"); } private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java index 53020472013..ec2798315c5 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java @@ -41,6 +41,12 @@ public interface ApplicationConstants { * only */ public static final String AM_CONTAINER_ID_ENV = "AM_CONTAINER_ID"; + + /** + * The environment variable for APPLICATION_ATTEMPT_ID. Set in AppMaster + * environment only + */ + public static final String AM_APP_ATTEMPT_ID_ENV = "AM_APP_ATTEMPT_ID"; /** * The environment variable for the NM_HOST. Set in the AppMaster environment diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java index 11740e849ad..99cbcca0496 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java @@ -60,6 +60,19 @@ public interface ApplicationReport { @Private @Unstable void setApplicationId(ApplicationId applicationId); + + /** + * Get the ApplicationAttemptId of the current + * attempt of the application + * @return ApplicationAttemptId of the attempt + */ + @Private + @Unstable + ApplicationAttemptId getCurrentApplicationAttemptId(); + + @Private + @Unstable + void setCurrentApplicationAttemptId(ApplicationAttemptId applicationAttemptId); /** * Get the user who submitted the application. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 67e76019711..a9e3d5477a4 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.api.records; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -151,6 +150,28 @@ public interface ApplicationSubmissionContext { @Public @Stable public void setAMContainerSpec(ContainerLaunchContext amContainer); + + /** + * Get if the RM should manage the execution of the AM. + * If true, then the RM + * will not allocate a container for the AM and start it. It will expect the + * AM to be launched and connect to the RM within the AM liveliness period and + * fail the app otherwise. The client should launch the AM only after the RM + * has ACCEPTED the application and changed the YarnApplicationState. + * Such apps will not be retried by the RM on app attempt failure. + * The default value is false. + * @return true if the AM is not managed by the RM + */ + @Public + @Unstable + public boolean getUnmanagedAM(); + + /** + * @param value true if RM should not manage the AM + */ + @Public + @Unstable + public void setUnmanagedAM(boolean value); /** * @return true if tokens should be canceled when the app completes. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java index c45b62d54ac..7b809da17ba 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java @@ -32,7 +32,7 @@ public enum YarnApplicationState { /** Application which has been submitted. */ SUBMITTED, - + /** Application which is currently running. */ RUNNING, @@ -43,5 +43,8 @@ public enum YarnApplicationState { FAILED, /** Application which was terminated by a user or admin. */ - KILLED + KILLED, + + /** Application has been accepted by the scheduler */ + ACCEPTED } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java index ebaa3ae31f9..8def3956785 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java @@ -18,12 +18,14 @@ package org.apache.hadoop.yarn.api.records.impl.pb; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProtoOrBuilder; @@ -39,6 +41,7 @@ implements ApplicationReport { boolean viaProto = false; ApplicationId applicationId; + ApplicationAttemptId currentApplicationAttemptId; public ApplicationReportPBImpl() { builder = ApplicationReportProto.newBuilder(); @@ -71,6 +74,20 @@ implements ApplicationReport { } builder.setAppResourceUsage(convertToProtoFormat(appInfo)); } + + @Override + public ApplicationAttemptId getCurrentApplicationAttemptId() { + if (this.currentApplicationAttemptId != null) { + return this.currentApplicationAttemptId; + } + + ApplicationReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasCurrentApplicationAttemptId()) { + return null; + } + this.currentApplicationAttemptId = convertFromProtoFormat(p.getCurrentApplicationAttemptId()); + return this.currentApplicationAttemptId; + } @Override public ApplicationResourceUsageReport getApplicationResourceUsageReport() { @@ -198,6 +215,14 @@ implements ApplicationReport { this.applicationId = applicationId; } + @Override + public void setCurrentApplicationAttemptId(ApplicationAttemptId applicationAttemptId) { + maybeInitBuilder(); + if (applicationId == null) + builder.clearStatus(); + this.currentApplicationAttemptId = applicationAttemptId; + } + @Override public void setTrackingUrl(String url) { maybeInitBuilder(); @@ -330,6 +355,11 @@ implements ApplicationReport { builder.getApplicationId())) { builder.setApplicationId(convertToProtoFormat(this.applicationId)); } + if (this.currentApplicationAttemptId != null + && !((ApplicationAttemptIdPBImpl) this.currentApplicationAttemptId).getProto().equals( + builder.getCurrentApplicationAttemptId())) { + builder.setCurrentApplicationAttemptId(convertToProtoFormat(this.currentApplicationAttemptId)); + } } private void mergeLocalToProto() { @@ -350,6 +380,10 @@ implements ApplicationReport { private ApplicationIdProto convertToProtoFormat(ApplicationId t) { return ((ApplicationIdPBImpl) t).getProto(); } + + private ApplicationAttemptIdProto convertToProtoFormat(ApplicationAttemptId t) { + return ((ApplicationAttemptIdPBImpl) t).getProto(); + } private ApplicationResourceUsageReport convertFromProtoFormat(ApplicationResourceUsageReportProto s) { return ProtoUtils.convertFromProtoFormat(s); @@ -363,6 +397,11 @@ implements ApplicationReport { ApplicationIdProto applicationId) { return new ApplicationIdPBImpl(applicationId); } + + private ApplicationAttemptIdPBImpl convertFromProtoFormat( + ApplicationAttemptIdProto applicationAttemptId) { + return new ApplicationAttemptIdPBImpl(applicationAttemptId); + } private YarnApplicationState convertFromProtoFormat(YarnApplicationStateProto s) { return ProtoUtils.convertFromProtoFormat(s); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 9d66a167b45..fa3763a7968 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -207,13 +207,26 @@ implements ApplicationSubmissionContext { this.amContainer = amContainer; } + @Override + public boolean getUnmanagedAM() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + //There is a default so cancelTokens should never be null + return p.getUnmanagedAm(); + } + + @Override + public void setUnmanagedAM(boolean value) { + maybeInitBuilder(); + builder.setUnmanagedAm(value); + } + @Override public boolean getCancelTokensWhenComplete() { ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; //There is a default so cancelTokens should never be null return p.getCancelTokensWhenComplete(); } - + @Override public void setCancelTokensWhenComplete(boolean cancel) { maybeInitBuilder(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 2603dd2d306..8daeddd345b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -90,6 +90,7 @@ enum YarnApplicationStateProto { FINISHED = 4; FAILED = 5; KILLED = 6; + ACCEPTED = 7; } enum FinalApplicationStatusProto { @@ -170,6 +171,7 @@ message ApplicationReportProto { optional FinalApplicationStatusProto final_application_status = 15; optional ApplicationResourceUsageReportProto app_resource_Usage = 16; optional string originalTrackingUrl = 17; + optional ApplicationAttemptIdProto currentApplicationAttemptId = 18; } enum NodeStateProto { @@ -235,6 +237,7 @@ message ApplicationSubmissionContextProto { optional PriorityProto priority = 5; optional ContainerLaunchContextProto am_container_spec = 6; optional bool cancel_tokens_when_complete = 7 [default = true]; + optional bool unmanaged_am = 8 [default = false]; } enum ApplicationAccessTypeProto { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java index 9fadd09fc16..f79ee6377a7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java @@ -331,14 +331,16 @@ public class BuilderUtils { } public static ApplicationReport newApplicationReport( - ApplicationId applicationId, String user, String queue, String name, - String host, int rpcPort, String clientToken, YarnApplicationState state, - String diagnostics, String url, long startTime, long finishTime, - FinalApplicationStatus finalStatus, ApplicationResourceUsageReport appResources, - String origTrackingUrl) { + ApplicationId applicationId, ApplicationAttemptId applicationAttemptId, + String user, String queue, String name, String host, int rpcPort, + String clientToken, YarnApplicationState state, String diagnostics, + String url, long startTime, long finishTime, + FinalApplicationStatus finalStatus, + ApplicationResourceUsageReport appResources, String origTrackingUrl) { ApplicationReport report = recordFactory .newRecordInstance(ApplicationReport.class); report.setApplicationId(applicationId); + report.setCurrentApplicationAttemptId(applicationAttemptId); report.setUser(user); report.setQueue(queue); report.setName(name); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java index a27ae7552e2..cc67ff77780 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java @@ -19,18 +19,13 @@ package org.apache.hadoop.yarn; import java.util.Iterator; -import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.util.Records; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; /** * Utilities to generate fake test apps @@ -66,137 +61,6 @@ public class MockApps { } } - public static List genApps(int n) { - List list = Lists.newArrayList(); - for (int i = 0; i < n; ++i) { - list.add(newApp(i)); - } - return list; - } - - public static ApplicationReport newApp(int i) { - final ApplicationId id = newAppID(i); - final YarnApplicationState state = newAppState(); - final String user = newUserName(); - final String name = newAppName(); - final String queue = newQueue(); - final FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED; - return new ApplicationReport() { - private ApplicationResourceUsageReport appUsageReport; - @Override public ApplicationId getApplicationId() { return id; } - @Override public String getUser() { return user; } - @Override public String getName() { return name; } - @Override public YarnApplicationState getYarnApplicationState() { return state; } - @Override public String getQueue() { return queue; } - @Override public String getTrackingUrl() { return ""; } - @Override public String getOriginalTrackingUrl() { return ""; } - @Override public FinalApplicationStatus getFinalApplicationStatus() { return finishState; } - @Override - public ApplicationResourceUsageReport getApplicationResourceUsageReport() { - return this.appUsageReport; - } - public void setApplicationId(ApplicationId applicationId) { - // TODO Auto-generated method stub - - } - @Override - public void setTrackingUrl(String url) { - // TODO Auto-generated method stub - - } - @Override public void setOriginalTrackingUrl(String url) { } - @Override - public void setApplicationResourceUsageReport(ApplicationResourceUsageReport appResources) { - this.appUsageReport = appResources; - } - @Override - public void setName(String name) { - // TODO Auto-generated method stub - - } - @Override - public void setQueue(String queue) { - // TODO Auto-generated method stub - - } - @Override - public void setYarnApplicationState(YarnApplicationState state) { - // TODO Auto-generated method stub - - } - @Override - public void setUser(String user) { - // TODO Auto-generated method stub - - } - @Override - public String getDiagnostics() { - // TODO Auto-generated method stub - return null; - } - @Override - public void setDiagnostics(String diagnostics) { - // TODO Auto-generated method stub - - } - @Override - public String getHost() { - // TODO Auto-generated method stub - return null; - } - @Override - public void setHost(String host) { - // TODO Auto-generated method stub - - } - @Override - public int getRpcPort() { - // TODO Auto-generated method stub - return 0; - } - @Override - public void setRpcPort(int rpcPort) { - // TODO Auto-generated method stub - - } - @Override - public String getClientToken() { - // TODO Auto-generated method stub - return null; - } - @Override - public void setClientToken(String clientToken) { - // TODO Auto-generated method stub - - } - @Override - public long getStartTime() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public void setStartTime(long startTime) { - // TODO Auto-generated method stub - - } - @Override - public long getFinishTime() { - // TODO Auto-generated method stub - return 0; - } - @Override - public void setFinishTime(long finishTime) { - // TODO Auto-generated method stub - - } - @Override - public void setFinalApplicationStatus(FinalApplicationStatus finishState) { - // TODO Auto-generated method stub - } - }; - } - public static ApplicationId newAppID(int i) { ApplicationId id = Records.newRecord(ApplicationId.class); id.setClusterTimestamp(TS); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 1cf1ca293cc..cf3e8616b73 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -333,8 +333,9 @@ public class RMAppImpl implements RMApp { case NEW: return YarnApplicationState.NEW; case SUBMITTED: - case ACCEPTED: return YarnApplicationState.SUBMITTED; + case ACCEPTED: + return YarnApplicationState.ACCEPTED; case RUNNING: return YarnApplicationState.RUNNING; case FINISHED: @@ -403,12 +404,12 @@ public class RMAppImpl implements RMApp { } else { appUsageReport = DUMMY_APPLICATION_RESOURCE_USAGE_REPORT; } - return BuilderUtils.newApplicationReport(this.applicationId, this.user, - this.queue, this.name, host, rpcPort, clientToken, - createApplicationState(this.stateMachine.getCurrentState()), - diags, trackingUrl, - this.startTime, this.finishTime, finishState, appUsageReport, - origTrackingUrl); + return BuilderUtils.newApplicationReport(this.applicationId, + this.currentAttempt.getAppAttemptId(), this.user, this.queue, + this.name, host, rpcPort, clientToken, + createApplicationState(this.stateMachine.getCurrentState()), diags, + trackingUrl, this.startTime, this.finishTime, finishState, + appUsageReport, origTrackingUrl); } finally { this.readLock.unlock(); } @@ -599,21 +600,32 @@ public class RMAppImpl implements RMApp { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent)event); - if (app.attempts.size() == app.maxRetries) { - String msg = "Application " + app.getApplicationId() - + " failed " + app.maxRetries - + " times due to " + failedEvent.getDiagnostics() - + ". Failing the application."; + RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent) event); + boolean retryApp = true; + String msg = null; + if (app.submissionContext.getUnmanagedAM()) { + // RM does not manage the AM. Do not retry + retryApp = false; + msg = "Unmanaged application " + app.getApplicationId() + + " failed due to " + failedEvent.getDiagnostics() + + ". Failing the application."; + } else if (app.attempts.size() == app.maxRetries) { + retryApp = false; + msg = "Application " + app.getApplicationId() + " failed " + + app.maxRetries + " times due to " + failedEvent.getDiagnostics() + + ". Failing the application."; + } + + if (retryApp) { + app.createNewAttempt(); + return initialState; + } else { LOG.info(msg); app.diagnostics.append(msg); // Inform the node for app-finish FINAL_TRANSITION.transition(app, event); return RMAppState.FAILED; } - - app.createNewAttempt(); - return initialState; } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 151c8156f7c..f315d6aaf96 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -143,16 +143,24 @@ public class RMAppAttemptImpl implements RMAppAttempt { .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.KILLED, RMAppAttemptEventType.KILL, new BaseFinalTransition(RMAppAttemptState.KILLED)) - + .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED, + RMAppAttemptEventType.REGISTERED, + new UnexpectedAMRegisteredTransition()) + // Transitions from SUBMITTED state .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED, RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition()) - .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.SCHEDULED, - RMAppAttemptEventType.APP_ACCEPTED, new ScheduleTransition()) + .addTransition(RMAppAttemptState.SUBMITTED, + EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.SCHEDULED), + RMAppAttemptEventType.APP_ACCEPTED, + new ScheduleTransition()) .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED, RMAppAttemptEventType.KILL, new BaseFinalTransition(RMAppAttemptState.KILLED)) - + .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED, + RMAppAttemptEventType.REGISTERED, + new UnexpectedAMRegisteredTransition()) + // Transitions from SCHEDULED State .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.ALLOCATED, @@ -173,7 +181,7 @@ public class RMAppAttemptImpl implements RMAppAttempt { RMAppAttemptEventType.LAUNCH_FAILED, new LaunchFailedTransition()) .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.KILLED, RMAppAttemptEventType.KILL, new KillAllocatedAMTransition()) - + // Transitions from LAUNCHED State .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING, RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition()) @@ -583,9 +591,11 @@ public class RMAppAttemptImpl implements RMAppAttempt { private static final List EMPTY_CONTAINER_REQUEST_LIST = new ArrayList(); - private static final class ScheduleTransition extends BaseTransition { + private static final class ScheduleTransition + implements + MultipleArcTransition { @Override - public void transition(RMAppAttemptImpl appAttempt, + public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { // Send the acceptance to the app @@ -593,17 +603,27 @@ public class RMAppAttemptImpl implements RMAppAttempt { .getApplicationAttemptId().getApplicationId(), RMAppEventType.APP_ACCEPTED)); - // Request a container for the AM. - ResourceRequest request = BuilderUtils.newResourceRequest( - AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext - .getAMContainerSpec().getResource(), 1); + if (!appAttempt.submissionContext.getUnmanagedAM()) { + // Request a container for the AM. + ResourceRequest request = BuilderUtils.newResourceRequest( + AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext + .getAMContainerSpec().getResource(), 1); - Allocation amContainerAllocation = - appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, - Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST); - if (amContainerAllocation != null - && amContainerAllocation.getContainers() != null) { - assert(amContainerAllocation.getContainers().size() == 0); + Allocation amContainerAllocation = appAttempt.scheduler.allocate( + appAttempt.applicationAttemptId, + Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST); + if (amContainerAllocation != null + && amContainerAllocation.getContainers() != null) { + assert (amContainerAllocation.getContainers().size() == 0); + } + return RMAppAttemptState.SCHEDULED; + } else { + // RM not allocating container. AM is self launched. + // Directly go to LAUNCHED state + // Register with AMLivelinessMonitor + appAttempt.rmContext.getAMLivelinessMonitor().register( + appAttempt.applicationAttemptId); + return RMAppAttemptState.LAUNCHED; } } } @@ -811,11 +831,30 @@ public class RMAppAttemptImpl implements RMAppAttempt { appAttempt.rmContext.getAMLivelinessMonitor().unregister( appAttempt.getAppAttemptId()); - // Tell the launcher to cleanup. - appAttempt.eventHandler.handle(new AMLauncherEvent( - AMLauncherEventType.CLEANUP, appAttempt)); + if(!appAttempt.submissionContext.getUnmanagedAM()) { + // Tell the launcher to cleanup. + appAttempt.eventHandler.handle(new AMLauncherEvent( + AMLauncherEventType.CLEANUP, appAttempt)); + } } } + + private static class UnexpectedAMRegisteredTransition extends + BaseFinalTransition { + + public UnexpectedAMRegisteredTransition() { + super(RMAppAttemptState.FAILED); + } + + @Override + public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + assert appAttempt.submissionContext.getUnmanagedAM(); + appAttempt + .setDiagnostics("Unmanaged AM must register after AM attempt reaches LAUNCHED state."); + super.transition(appAttempt, event); + } + + } private static final class StatusUpdateTransition extends BaseTransition { @@ -884,8 +923,11 @@ public class RMAppAttemptImpl implements RMAppAttempt { // Is this container the AmContainer? If the finished container is same as // the AMContainer, AppAttempt fails - if (appAttempt.masterContainer.getId().equals( - containerStatus.getContainerId())) { + if (appAttempt.masterContainer != null + && appAttempt.masterContainer.getId().equals( + containerStatus.getContainerId())) { + // container associated with AM. must not be unmanaged + assert appAttempt.submissionContext.getUnmanagedAM() == false; // Setup diagnostic message appAttempt.diagnostics.append("AM Container for " + appAttempt.getAppAttemptId() + " exited with " + diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 5782d9189a0..676700d46b3 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -131,7 +132,7 @@ public class TestRMAppTransitions { rmDispatcher.start(); } - protected RMApp createNewTestApp() { + protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) { ApplicationId applicationId = MockApps.newAppID(appId++); String user = MockApps.newUserName(); String name = MockApps.newAppName(); @@ -139,12 +140,15 @@ public class TestRMAppTransitions { Configuration conf = new YarnConfiguration(); // ensure max retries set to known value conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, maxRetries); - ApplicationSubmissionContext submissionContext = null; String clientTokenStr = "bogusstring"; ApplicationStore appStore = mock(ApplicationStore.class); YarnScheduler scheduler = mock(YarnScheduler.class); ApplicationMasterService masterService = new ApplicationMasterService(rmContext, scheduler); + + if(submissionContext == null) { + submissionContext = new ApplicationSubmissionContextPBImpl(); + } RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, user, @@ -235,8 +239,9 @@ public class TestRMAppTransitions { diag.toString().matches(regex)); } - protected RMApp testCreateAppSubmitted() throws IOException { - RMApp application = createNewTestApp(); + protected RMApp testCreateAppSubmitted( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = createNewTestApp(submissionContext); // NEW => SUBMITTED event RMAppEventType.START RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.START); @@ -246,9 +251,10 @@ public class TestRMAppTransitions { return application; } - protected RMApp testCreateAppAccepted() throws IOException { - RMApp application = testCreateAppSubmitted(); - // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED + protected RMApp testCreateAppAccepted( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = testCreateAppSubmitted(submissionContext); + // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_ACCEPTED); @@ -258,8 +264,9 @@ public class TestRMAppTransitions { return application; } - protected RMApp testCreateAppRunning() throws IOException { - RMApp application = testCreateAppAccepted(); + protected RMApp testCreateAppRunning( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = testCreateAppAccepted(submissionContext); // ACCEPTED => RUNNING event RMAppEventType.ATTEMPT_REGISTERED RMAppEvent event = new RMAppEvent(application.getApplicationId(), @@ -271,8 +278,9 @@ public class TestRMAppTransitions { return application; } - protected RMApp testCreateAppFinished() throws IOException { - RMApp application = testCreateAppRunning(); + protected RMApp testCreateAppFinished( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = testCreateAppRunning(submissionContext); // RUNNING => FINISHED event RMAppEventType.ATTEMPT_FINISHED RMAppEvent event = new RMAppEvent(application.getApplicationId(), @@ -285,17 +293,38 @@ public class TestRMAppTransitions { return application; } + @Test + public void testUnmanagedApp() throws IOException { + ApplicationSubmissionContext subContext = new ApplicationSubmissionContextPBImpl(); + subContext.setUnmanagedAM(true); + + // test success path + LOG.info("--- START: testUnmanagedAppSuccessPath ---"); + testCreateAppFinished(subContext); + + // test app fails after 1 app attempt failure + LOG.info("--- START: testUnmanagedAppFailPath ---"); + RMApp application = testCreateAppRunning(subContext); + RMAppEvent event = new RMAppFailedAttemptEvent( + application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, ""); + application.handle(event); + RMAppAttempt appAttempt = application.getCurrentAppAttempt(); + Assert.assertEquals(1, appAttempt.getAppAttemptId().getAttemptId()); + assertFailed(application, + ".*Unmanaged application.*Failing the application.*"); + } + @Test public void testAppSuccessPath() throws IOException { LOG.info("--- START: testAppSuccessPath ---"); - testCreateAppFinished(); + testCreateAppFinished(null); } @Test public void testAppNewKill() throws IOException { LOG.info("--- START: testAppNewKill ---"); - RMApp application = createNewTestApp(); + RMApp application = createNewTestApp(null); // NEW => KILLED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); @@ -307,7 +336,7 @@ public class TestRMAppTransitions { public void testAppNewReject() throws IOException { LOG.info("--- START: testAppNewReject ---"); - RMApp application = createNewTestApp(); + RMApp application = createNewTestApp(null); // NEW => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "Test Application Rejected"; RMAppEvent event = @@ -320,7 +349,7 @@ public class TestRMAppTransitions { public void testAppSubmittedRejected() throws IOException { LOG.info("--- START: testAppSubmittedRejected ---"); - RMApp application = testCreateAppSubmitted(); + RMApp application = testCreateAppSubmitted(null); // SUBMITTED => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "app rejected"; RMAppEvent event = @@ -333,7 +362,7 @@ public class TestRMAppTransitions { public void testAppSubmittedKill() throws IOException { LOG.info("--- START: testAppSubmittedKill---"); - RMApp application = testCreateAppAccepted(); + RMApp application = testCreateAppAccepted(null); // SUBMITTED => KILLED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), application); @@ -345,7 +374,7 @@ public class TestRMAppTransitions { public void testAppAcceptedFailed() throws IOException { LOG.info("--- START: testAppAcceptedFailed ---"); - RMApp application = testCreateAppAccepted(); + RMApp application = testCreateAppAccepted(null); // ACCEPTED => ACCEPTED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED for (int i=1; i KILLED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); @@ -386,7 +415,7 @@ public class TestRMAppTransitions { public void testAppRunningKill() throws IOException { LOG.info("--- START: testAppRunningKill ---"); - RMApp application = testCreateAppRunning(); + RMApp application = testCreateAppRunning(null); // RUNNING => KILLED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); @@ -398,7 +427,7 @@ public class TestRMAppTransitions { public void testAppRunningFailed() throws IOException { LOG.info("--- START: testAppRunningFailed ---"); - RMApp application = testCreateAppRunning(); + RMApp application = testCreateAppRunning(null); RMAppAttempt appAttempt = application.getCurrentAppAttempt(); int expectedAttemptId = 1; Assert.assertEquals(expectedAttemptId, @@ -444,7 +473,7 @@ public class TestRMAppTransitions { public void testAppFinishedFinished() throws IOException { LOG.info("--- START: testAppFinishedFinished ---"); - RMApp application = testCreateAppFinished(); + RMApp application = testCreateAppFinished(null); // FINISHED => FINISHED event RMAppEventType.KILL RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); @@ -460,7 +489,7 @@ public class TestRMAppTransitions { public void testAppKilledKilled() throws IOException { LOG.info("--- START: testAppKilledKilled ---"); - RMApp application = testCreateAppRunning(); + RMApp application = testCreateAppRunning(null); // RUNNING => KILLED event RMAppEventType.KILL RMAppEvent event = diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 44f47e474f8..a183fddba79 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -56,7 +57,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent; @@ -83,6 +86,7 @@ public class TestRMAppAttemptTransitions { private YarnScheduler scheduler; private ApplicationMasterService masterService; private ApplicationMasterLauncher applicationMasterLauncher; + private AMLivelinessMonitor amLivelinessMonitor; private RMApp application; private RMAppAttempt applicationAttempt; @@ -135,6 +139,9 @@ public class TestRMAppAttemptTransitions { } private static int appId = 1; + + private ApplicationSubmissionContext submissionContext = null; + private boolean unmanagedAM; @Before public void setUp() throws Exception { @@ -142,7 +149,7 @@ public class TestRMAppAttemptTransitions { ContainerAllocationExpirer containerAllocationExpirer = mock(ContainerAllocationExpirer.class); - AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); + amLivelinessMonitor = mock(AMLivelinessMonitor.class); rmContext = new RMContextImpl(new MemStore(), rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, null, @@ -174,8 +181,7 @@ public class TestRMAppAttemptTransitions { final String user = MockApps.newUserName(); final String queue = MockApps.newQueue(); - ApplicationSubmissionContext submissionContext = - mock(ApplicationSubmissionContext.class); + submissionContext = mock(ApplicationSubmissionContext.class); when(submissionContext.getUser()).thenReturn(user); when(submissionContext.getQueue()).thenReturn(queue); ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); @@ -183,6 +189,8 @@ public class TestRMAppAttemptTransitions { when(amContainerSpec.getResource()).thenReturn(resource); when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec); + unmanagedAM = false; + application = mock(RMApp.class); applicationAttempt = new RMAppAttemptImpl(applicationAttemptId, null, rmContext, scheduler, @@ -247,7 +255,8 @@ public class TestRMAppAttemptTransitions { assertEquals(0, applicationAttempt.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); - // Check events + // this works for unmanaged and managed AM's because this is actually doing + // verify(application).handle(anyObject()); verify(application).handle(any(RMAppRejectedEvent.class)); } @@ -269,9 +278,24 @@ public class TestRMAppAttemptTransitions { /** * {@link RMAppAttemptState#SCHEDULED} */ + @SuppressWarnings("unchecked") private void testAppAttemptScheduledState() { - assertEquals(RMAppAttemptState.SCHEDULED, + RMAppAttemptState expectedState; + int expectedAllocateCount; + if(unmanagedAM) { + expectedState = RMAppAttemptState.LAUNCHED; + expectedAllocateCount = 0; + } else { + expectedState = RMAppAttemptState.SCHEDULED; + expectedAllocateCount = 1; + } + + assertEquals(expectedState, applicationAttempt.getAppAttemptState()); + verify(scheduler, times(expectedAllocateCount)). + allocate(any(ApplicationAttemptId.class), + any(List.class), any(List.class)); + assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); @@ -280,9 +304,6 @@ public class TestRMAppAttemptTransitions { // Check events verify(application).handle(any(RMAppEvent.class)); - verify(scheduler). - allocate(any(ApplicationAttemptId.class), - any(List.class), any(List.class)); } /** @@ -351,14 +372,16 @@ public class TestRMAppAttemptTransitions { private void testAppAttemptFinishedState(Container container, FinalApplicationStatus finalStatus, String trackingUrl, - String diagnostics) { + String diagnostics, + int finishedContainerCount) { assertEquals(RMAppAttemptState.FINISHED, applicationAttempt.getAppAttemptState()); assertEquals(diagnostics, applicationAttempt.getDiagnostics()); assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals("null/proxy/"+applicationAttempt.getAppAttemptId(). getApplicationId()+"/", applicationAttempt.getTrackingUrl()); - assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); + assertEquals(finishedContainerCount, applicationAttempt + .getJustFinishedContainers().size()); assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); } @@ -424,7 +447,50 @@ public class TestRMAppAttemptTransitions { testAppAttemptRunningState(container, host, rpcPort, trackingUrl); } - + + @Test + public void testUnmanagedAMSuccess() { + unmanagedAM = true; + when(submissionContext.getUnmanagedAM()).thenReturn(true); + // submit AM and check it goes to LAUNCHED state + scheduleApplicationAttempt(); + testAppAttemptLaunchedState(null); + verify(amLivelinessMonitor, times(1)).register( + applicationAttempt.getAppAttemptId()); + + // launch AM + runApplicationAttempt(null, "host", 8042, "oldtrackingurl"); + + // complete a container + applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent( + applicationAttempt.getAppAttemptId(), mock(Container.class))); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); + // complete AM + String trackingUrl = "mytrackingurl"; + String diagnostics = "Successful"; + FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; + applicationAttempt.handle(new RMAppAttemptUnregistrationEvent( + applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus, + diagnostics)); + testAppAttemptFinishedState(null, finalStatus, trackingUrl, diagnostics, 1); + } + + @Test + public void testUnmanagedAMUnexpectedRegistration() { + unmanagedAM = true; + when(submissionContext.getUnmanagedAM()).thenReturn(true); + + // submit AM and check it goes to SUBMITTED state + submitApplicationAttempt(); + assertEquals(RMAppAttemptState.SUBMITTED, + applicationAttempt.getAppAttemptState()); + + // launch AM and verify attempt failed + applicationAttempt.handle(new RMAppAttemptRegistrationEvent( + applicationAttempt.getAppAttemptId(), "host", 8042, "oldtrackingurl")); + testAppAttemptSubmittedToFailedState("Unmanaged AM must register after AM attempt reaches LAUNCHED state."); + } @Test public void testNewToKilled() { @@ -499,7 +565,7 @@ public class TestRMAppAttemptTransitions { applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus, diagnostics)); testAppAttemptFinishedState(amContainer, finalStatus, - trackingUrl, diagnostics); + trackingUrl, diagnostics, 0); } @@ -516,7 +582,7 @@ public class TestRMAppAttemptTransitions { applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus, diagnostics)); testAppAttemptFinishedState(amContainer, finalStatus, - trackingUrl, diagnostics); + trackingUrl, diagnostics, 0); } }