diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 85d073f7de2..14462a4a43f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1383,6 +1383,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3053. Better diagnostic message for unknown methods in ProtoBuf RPCs. (vinodkv via acmurthy) + MAPREDUCE-2952. Fixed ResourceManager/MR-client to consume diagnostics + for AM failures in a couple of corner cases. (Arun C Murthy via vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java index 9cbc9ad6d47..9f221e6354a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java @@ -281,14 +281,17 @@ public class TypeConverter { } public static org.apache.hadoop.mapred.JobStatus fromYarn( - JobReport jobreport, String jobFile, String trackingUrl) { + JobReport jobreport, String jobFile) { JobPriority jobPriority = JobPriority.NORMAL; - return new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.getJobId()), - jobreport.getSetupProgress(), jobreport.getMapProgress(), - jobreport.getReduceProgress(), jobreport.getCleanupProgress(), - fromYarn(jobreport.getJobState()), - jobPriority, jobreport.getUser(), jobreport.getJobName(), - jobFile, trackingUrl); + org.apache.hadoop.mapred.JobStatus jobStatus = + new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.getJobId()), + jobreport.getSetupProgress(), jobreport.getMapProgress(), + jobreport.getReduceProgress(), jobreport.getCleanupProgress(), + fromYarn(jobreport.getJobState()), + jobPriority, jobreport.getUser(), jobreport.getJobName(), + jobFile, jobreport.getTrackingUrl()); + jobStatus.setFailureInfo(jobreport.getDiagnostics()); + return jobStatus; } public static org.apache.hadoop.mapreduce.QueueState fromYarn( @@ -422,6 +425,7 @@ public class TypeConverter { ); jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url jobStatus.setStartTime(application.getStartTime()); + jobStatus.setFailureInfo(application.getDiagnostics()); return jobStatus; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java index fb585e8dd27..0bfc9db3ed4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java @@ -29,6 +29,8 @@ public interface JobReport { public abstract long getFinishTime(); public abstract String getUser(); public abstract String getJobName(); + public abstract String getTrackingUrl(); + public abstract String getDiagnostics(); public abstract void setJobId(JobId jobId); public abstract void setJobState(JobState jobState); @@ -40,4 +42,6 @@ public interface JobReport { public abstract void setFinishTime(long finishTime); public abstract void setUser(String user); public abstract void setJobName(String jobName); + public abstract void setTrackingUrl(String trackingUrl); + public abstract void setDiagnostics(String diagnostics); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java index a4033e695f2..c5d2527a9da 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java @@ -206,6 +206,30 @@ public class JobReportPBImpl extends ProtoBase implements JobRep builder.setJobName((jobName)); } + @Override + public String getTrackingUrl() { + JobReportProtoOrBuilder p = viaProto ? proto : builder; + return (p.getTrackingUrl()); + } + + @Override + public void setTrackingUrl(String trackingUrl) { + maybeInitBuilder(); + builder.setTrackingUrl(trackingUrl); + } + + @Override + public String getDiagnostics() { + JobReportProtoOrBuilder p = viaProto ? proto : builder; + return p.getDiagnostics(); + } + + @Override + public void setDiagnostics(String diagnostics) { + maybeInitBuilder(); + builder.setDiagnostics(diagnostics); + } + private JobIdPBImpl convertFromProtoFormat(JobIdProto p) { return new JobIdPBImpl(p); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto index 7d8d1b2e0b8..29184da4868 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto @@ -143,6 +143,8 @@ message JobReportProto { optional int64 finish_time = 8; optional string user = 9; optional string jobName = 10; + optional string trackingUrl = 11; + optional string diagnostics = 12; } enum TaskAttemptCompletionEventStatusProto { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java index 90b68872ff4..e5add2139f5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java @@ -321,6 +321,10 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus { super.setJobACLs(acls); } + public synchronized void setFailureInfo(String failureInfo) { + super.setFailureInfo(failureInfo); + } + /** * Set the priority of the job, defaulting to NORMAL. * @param jp new job priority diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index c30216e0669..f616df80b8f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -1239,7 +1239,8 @@ public class Job extends JobContextImpl implements JobContext { if (success) { LOG.info("Job " + jobId + " completed successfully"); } else { - LOG.info("Job " + jobId + " failed with state " + status.getState()); + LOG.info("Job " + jobId + " failed with state " + status.getState() + + " due to: " + status.getFailureInfo()); } Counters counters = getCounters(); if (counters != null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java index 9e438989cfb..6f57f1733ad 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java @@ -81,6 +81,7 @@ public class JobStatus implements Writable, Cloneable { private String queue; private JobPriority priority; private String schedulingInfo="NA"; + private String failureInfo = "NA"; private Map jobACLs = new HashMap(); @@ -278,6 +279,14 @@ public class JobStatus implements Writable, Cloneable { this.queue = queue; } + /** + * Set diagnostic information. + * @param failureInfo diagnostic information + */ + protected synchronized void setFailureInfo(String failureInfo) { + this.failureInfo = failureInfo; + } + /** * Get queue name * @return queue name @@ -359,6 +368,15 @@ public class JobStatus implements Writable, Cloneable { */ public synchronized JobPriority getPriority() { return priority; } + /** + * Gets any available info on the reason of failure of the job. + * @return diagnostic information on why a job might have failed. + */ + public synchronized String getFailureInfo() { + return this.failureInfo; + } + + /** * Returns true if the status is for a completed job. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java index 605c44e5ed9..429d350c5ab 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java @@ -101,16 +101,20 @@ class ClientServiceDelegate { // Get the instance of the NotRunningJob corresponding to the specified // user and state - private NotRunningJob getNotRunningJob(String user, JobState state) { + private NotRunningJob getNotRunningJob(ApplicationReport applicationReport, + JobState state) { synchronized (notRunningJobs) { HashMap map = notRunningJobs.get(state); if (map == null) { map = new HashMap(); notRunningJobs.put(state, map); } + String user = + (applicationReport == null) ? + UNKNOWN_USER : applicationReport.getUser(); NotRunningJob notRunningJob = map.get(user); if (notRunningJob == null) { - notRunningJob = new NotRunningJob(user, state); + notRunningJob = new NotRunningJob(applicationReport, state); map.put(user, notRunningJob); } return notRunningJob; @@ -130,7 +134,7 @@ class ClientServiceDelegate { if (application == null) { LOG.info("Could not get Job info from RM for job " + jobId + ". Redirecting to job history server."); - return checkAndGetHSProxy(UNKNOWN_USER, JobState.NEW); + return checkAndGetHSProxy(null, JobState.NEW); } try { if (application.getHost() == null || "".equals(application.getHost())) { @@ -171,7 +175,7 @@ class ClientServiceDelegate { if (application == null) { LOG.info("Could not get Job info from RM for job " + jobId + ". Redirecting to job history server."); - return checkAndGetHSProxy(UNKNOWN_USER, JobState.RUNNING); + return checkAndGetHSProxy(null, JobState.RUNNING); } } catch (InterruptedException e) { LOG.warn("getProxy() call interruped", e); @@ -191,17 +195,17 @@ class ClientServiceDelegate { if (application.getState() == ApplicationState.NEW || application.getState() == ApplicationState.SUBMITTED) { realProxy = null; - return getNotRunningJob(user, JobState.NEW); + return getNotRunningJob(application, JobState.NEW); } if (application.getState() == ApplicationState.FAILED) { realProxy = null; - return getNotRunningJob(user, JobState.FAILED); + return getNotRunningJob(application, JobState.FAILED); } if (application.getState() == ApplicationState.KILLED) { realProxy = null; - return getNotRunningJob(user, JobState.KILLED); + return getNotRunningJob(application, JobState.KILLED); } //History server can serve a job only if application @@ -209,15 +213,16 @@ class ClientServiceDelegate { if (application.getState() == ApplicationState.SUCCEEDED) { LOG.info("Application state is completed. " + "Redirecting to job history server"); - realProxy = checkAndGetHSProxy(user, JobState.SUCCEEDED); + realProxy = checkAndGetHSProxy(application, JobState.SUCCEEDED); } return realProxy; } - private MRClientProtocol checkAndGetHSProxy(String user, JobState state) { + private MRClientProtocol checkAndGetHSProxy( + ApplicationReport applicationReport, JobState state) { if (null == historyServerProxy) { LOG.warn("Job History Server is not configured."); - return getNotRunningJob(user, state); + return getNotRunningJob(applicationReport, state); } return historyServerProxy; } @@ -324,21 +329,22 @@ class ClientServiceDelegate { JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter.toYarn(oldJobID); - GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class); + GetJobReportRequest request = + recordFactory.newRecordInstance(GetJobReportRequest.class); request.setJobId(jobId); JobReport report = ((GetJobReportResponse) invoke("getJobReport", GetJobReportRequest.class, request)).getJobReport(); String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID); - //TODO: add tracking url in JobReport - return TypeConverter.fromYarn(report, jobFile, ""); + return TypeConverter.fromYarn(report, jobFile); } org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType) throws YarnRemoteException, YarnRemoteException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter.toYarn(oldJobID); - GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class); + GetTaskReportsRequest request = + recordFactory.newRecordInstance(GetTaskReportsRequest.class); request.setJobId(jobId); request.setTaskType(TypeConverter.toYarn(taskType)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java index a40fcedda39..17ad9f62aae 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.HashMap; import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse; @@ -53,20 +55,41 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; public class NotRunningJob implements MRClientProtocol { + private static final Log LOG = LogFactory.getLog(NotRunningJob.class); + private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private final JobState jobState; - private final String user; - - NotRunningJob(String username, JobState jobState) { - this.user = username; + private final ApplicationReport applicationReport; + + + private ApplicationReport getUnknownApplicationReport() { + ApplicationReport unknown = + recordFactory.newRecordInstance(ApplicationReport.class); + unknown.setUser("N/A"); + unknown.setHost("N/A"); + unknown.setName("N/A"); + unknown.setQueue("N/A"); + unknown.setStartTime(0); + unknown.setFinishTime(0); + unknown.setTrackingUrl("N/A"); + unknown.setDiagnostics("N/A"); + LOG.info("getUnknownApplicationReport"); + return unknown; + } + + NotRunningJob(ApplicationReport applicationReport, JobState jobState) { + this.applicationReport = + (applicationReport == null) ? + getUnknownApplicationReport() : applicationReport; this.jobState = jobState; } @@ -101,15 +124,19 @@ public class NotRunningJob implements MRClientProtocol { @Override public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException { - GetJobReportResponse resp = - recordFactory.newRecordInstance(GetJobReportResponse.class); JobReport jobReport = recordFactory.newRecordInstance(JobReport.class); jobReport.setJobId(request.getJobId()); - jobReport.setJobState(this.jobState); + jobReport.setJobState(jobState); + jobReport.setUser(applicationReport.getUser()); + jobReport.setStartTime(applicationReport.getStartTime()); + jobReport.setDiagnostics(applicationReport.getDiagnostics()); + jobReport.setJobName(applicationReport.getName()); + jobReport.setTrackingUrl(applicationReport.getTrackingUrl()); + jobReport.setFinishTime(applicationReport.getFinishTime()); - jobReport.setUser(this.user); - // TODO: Add jobName & other job information that is available + GetJobReportResponse resp = + recordFactory.newRecordInstance(GetJobReportResponse.class); resp.setJobReport(jobReport); return resp; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index e2cb1e05ea7..12b1c2cc9cc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -267,6 +267,13 @@ public class TestClientRedirect { application.setHost(split[0]); application.setRpcPort(Integer.parseInt(split[1])); application.setUser("TestClientRedirect-user"); + application.setName("N/A"); + application.setQueue("N/A"); + application.setStartTime(0); + application.setFinishTime(0); + application.setTrackingUrl("N/A"); + application.setDiagnostics("N/A"); + GetApplicationReportResponse response = recordFactory .newRecordInstance(GetApplicationReportResponse.class); response.setApplicationReport(application); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java index b7fd6c9475a..5b07d4997d7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java @@ -109,7 +109,7 @@ public class TestClientServiceDelegate { ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( null, getRMDelegate()); JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); - Assert.assertEquals("Unknown User", jobStatus.getUsername()); + Assert.assertEquals("N/A", jobStatus.getUsername()); Assert.assertEquals(JobStatus.State.PREP, jobStatus.getState()); //RM has app report and job History Server is not configured @@ -145,6 +145,13 @@ public class TestClientServiceDelegate { .newRecord(ApplicationReport.class); applicationReport.setState(ApplicationState.SUCCEEDED); applicationReport.setUser("root"); + applicationReport.setHost("N/A"); + applicationReport.setName("N/A"); + applicationReport.setQueue("N/A"); + applicationReport.setStartTime(0); + applicationReport.setFinishTime(0); + applicationReport.setTrackingUrl("N/A"); + applicationReport.setDiagnostics("N/A"); return applicationReport; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 219fd1eb579..7e34ff5487d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -49,6 +49,10 @@ + + + + diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java index ca7a6f415a1..ffb920d5b90 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java @@ -186,4 +186,16 @@ public interface ApplicationReport { @Private @Unstable void setStartTime(long startTime); + + /** + * Get the finish time of the application. + * @return finish time of the application + */ + @Public + @Stable + long getFinishTime(); + + @Private + @Unstable + void setFinishTime(long finishTime); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java index b1e80fc7598..2ea2ddbcdb2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java @@ -240,6 +240,30 @@ implements ApplicationReport { return proto; } + @Override + public long getStartTime() { + ApplicationReportProtoOrBuilder p = viaProto ? proto : builder; + return p.getStartTime(); + } + + @Override + public void setStartTime(long startTime) { + maybeInitBuilder(); + builder.setStartTime(startTime); + } + + @Override + public long getFinishTime() { + ApplicationReportProtoOrBuilder p = viaProto ? proto : builder; + return p.getFinishTime(); + } + + @Override + public void setFinishTime(long finishTime) { + maybeInitBuilder(); + builder.setFinishTime(finishTime); + } + private void mergeLocalToBuilder() { if (this.applicationId != null && !((ApplicationIdPBImpl) this.applicationId).getProto().equals( @@ -279,16 +303,4 @@ implements ApplicationReport { ApplicationIdProto applicationId) { return new ApplicationIdPBImpl(applicationId); } - - @Override - public long getStartTime() { - ApplicationReportProtoOrBuilder p = viaProto ? proto : builder; - return p.getStartTime(); - } - - @Override - public void setStartTime(long startTime) { - maybeInitBuilder(); - builder.setStartTime(startTime); - } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index cdcd1a747b8..cd29a9431c3 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -140,6 +140,7 @@ message ApplicationReportProto { optional string trackingUrl = 11; optional string diagnostics = 12 [default = "N/A"]; optional int64 startTime = 13; + optional int64 finishTime = 14; } message NodeIdProto { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java index 4eb63c04470..2caafdc19a1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java @@ -242,7 +242,7 @@ public class BuilderUtils { public static ApplicationReport newApplicationReport( ApplicationId applicationId, String user, String queue, String name, String host, int rpcPort, String clientToken, ApplicationState state, - String diagnostics, String url, long startTime) { + String diagnostics, String url, long startTime, long finishTime) { ApplicationReport report = recordFactory .newRecordInstance(ApplicationReport.class); report.setApplicationId(applicationId); @@ -256,6 +256,7 @@ public class BuilderUtils { report.setDiagnostics(diagnostics); report.setTrackingUrl(url); report.setStartTime(startTime); + report.setFinishTime(finishTime); return report; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java index 65f6c548fbc..7d233e2d9fc 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java @@ -167,6 +167,16 @@ public class MockApps { // TODO Auto-generated method stub } + @Override + public long getFinishTime() { + // TODO Auto-generated method stub + return 0; + } + @Override + public void setFinishTime(long finishTime) { + // TODO Auto-generated method stub + + } }; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 85cd8825daa..997906a62e4 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore; @@ -31,7 +30,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; public class RMContextImpl implements RMContext { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 179b56a4af4..8bd45dff4d9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager; import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store; @@ -186,6 +187,9 @@ public class ResourceManager extends CompositeService implements Recoverable { addService(adminService); this.applicationMasterLauncher = createAMLauncher(); + this.rmDispatcher.register(AMLauncherEventType.class, + this.applicationMasterLauncher); + addService(applicationMasterLauncher); super.init(conf); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java index 67f0c8a0161..a25a4312b17 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java @@ -46,13 +46,12 @@ public class ApplicationMasterLauncher extends AbstractService implements private ClientToAMSecretManager clientToAMSecretManager; protected final RMContext context; - public ApplicationMasterLauncher(ApplicationTokenSecretManager - applicationTokenSecretManager, ClientToAMSecretManager clientToAMSecretManager, + public ApplicationMasterLauncher( + ApplicationTokenSecretManager applicationTokenSecretManager, + ClientToAMSecretManager clientToAMSecretManager, RMContext context) { super(ApplicationMasterLauncher.class.getName()); this.context = context; - /* register to dispatcher */ - this.context.getDispatcher().register(AMLauncherEventType.class, this); this.launcherPool = new ThreadPoolExecutor(1, 10, 1, TimeUnit.HOURS, new LinkedBlockingQueue()); this.launcherHandlingThread = new LauncherThread(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 2e739a98b99..484a7a38ba4 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java new file mode 100644 index 00000000000..111c6acc41b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +public class RMAppFailedAttemptEvent extends RMAppEvent { + + private final String diagnostics; + + public RMAppFailedAttemptEvent(ApplicationId appId, RMAppEventType event, + String diagnostics) { + super(appId, event); + this.diagnostics = diagnostics; + } + + public String getDiagnostics() { + return this.diagnostics; + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 9246d1838c7..838dfc0b087 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -310,7 +310,8 @@ public class RMAppImpl implements RMApp { return BuilderUtils.newApplicationReport(this.applicationId, this.user, this.queue, this.name, host, rpcPort, clientToken, createApplicationState(this.stateMachine.getCurrentState()), - this.diagnostics.toString(), trackingUrl, this.startTime); + this.diagnostics.toString(), trackingUrl, + this.startTime, this.finishTime); } finally { this.readLock.unlock(); } @@ -470,11 +471,13 @@ public class RMAppImpl implements RMApp { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - + + RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent)event); if (app.attempts.size() == app.maxRetries) { String msg = "Application " + app.getApplicationId() + " failed " + app.maxRetries - + " times. Failing the application."; + + " times due to " + failedEvent.getDiagnostics() + + ". Failing the application."; LOG.info(msg); app.diagnostics.append(msg); // Inform the node for app-finish diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 70747deacba..3164602f59f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -79,7 +79,7 @@ public interface RMAppAttempt extends EventHandler{ * Diagnostics information for the application attempt. * @return diagnostics information for the application attempt. */ - StringBuilder getDiagnostics(); + String getDiagnostics(); /** * Progress for the application attempt. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 7c6357defab..7f7f050bc4f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; @@ -104,10 +106,10 @@ public class RMAppAttemptImpl implements RMAppAttempt { private Container masterContainer; private float progress = 0; - private String host; + private String host = "N/A"; private int rpcPort; - private String trackingUrl; - private String finalState; + private String trackingUrl = "N/A"; + private String finalState = "N/A"; private final StringBuilder diagnostics = new StringBuilder(); private static final StateMachineFactory 0); Assert.assertTrue("application start time is before currentTime", @@ -202,7 +204,8 @@ public class TestRMAppTransitions { protected RMApp testCreateAppSubmitted() throws IOException { RMApp application = createNewTestApp(); // NEW => SUBMITTED event RMAppEventType.START - RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.START); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.START); application.handle(event); assertStartTimeSet(application); assertAppState(RMAppState.SUBMITTED, application); @@ -212,7 +215,9 @@ public class TestRMAppTransitions { protected RMApp testCreateAppAccepted() throws IOException { RMApp application = testCreateAppSubmitted(); // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED - RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_ACCEPTED); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.APP_ACCEPTED); application.handle(event); assertStartTimeSet(application); assertAppState(RMAppState.ACCEPTED, application); @@ -222,7 +227,9 @@ public class TestRMAppTransitions { protected RMApp testCreateAppRunning() throws IOException { RMApp application = testCreateAppAccepted(); // ACCEPTED => RUNNING event RMAppEventType.ATTEMPT_REGISTERED - RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_REGISTERED); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_REGISTERED); application.handle(event); assertStartTimeSet(application); assertAppState(RMAppState.RUNNING, application); @@ -232,7 +239,9 @@ public class TestRMAppTransitions { protected RMApp testCreateAppFinished() throws IOException { RMApp application = testCreateAppRunning(); // RUNNING => FINISHED event RMAppEventType.ATTEMPT_FINISHED - RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FINISHED); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_FINISHED); application.handle(event); assertAppState(RMAppState.FINISHED, application); assertTimesAtFinish(application); @@ -251,7 +260,8 @@ public class TestRMAppTransitions { RMApp application = createNewTestApp(); // NEW => KILLED event RMAppEventType.KILL - RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); application.handle(event); assertKilled(application); } @@ -263,7 +273,8 @@ public class TestRMAppTransitions { RMApp application = createNewTestApp(); // NEW => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "Test Application Rejected"; - RMAppEvent event = new RMAppRejectedEvent(application.getApplicationId(), rejectedText); + RMAppEvent event = + new RMAppRejectedEvent(application.getApplicationId(), rejectedText); application.handle(event); assertFailed(application, rejectedText); } @@ -275,7 +286,8 @@ public class TestRMAppTransitions { RMApp application = testCreateAppSubmitted(); // SUBMITTED => FAILED event RMAppEventType.APP_REJECTED String rejectedText = "app rejected"; - RMAppEvent event = new RMAppRejectedEvent(application.getApplicationId(), rejectedText); + RMAppEvent event = + new RMAppRejectedEvent(application.getApplicationId(), rejectedText); application.handle(event); assertFailed(application, rejectedText); } @@ -286,7 +298,8 @@ public class TestRMAppTransitions { RMApp application = testCreateAppAccepted(); // SUBMITTED => KILLED event RMAppEventType.KILL - RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); application.handle(event); assertKilled(application); } @@ -298,18 +311,26 @@ public class TestRMAppTransitions { RMApp application = testCreateAppAccepted(); // ACCEPTED => ACCEPTED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED for (int i=1; i FAILED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED after max retries - RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED); + // ACCEPTED => FAILED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED + // after max retries + String message = "Test fail"; + RMAppEvent event = + new RMAppFailedAttemptEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_FAILED, message); application.handle(event); - assertFailed(application, ".*Failing the application.*"); + assertFailed(application, ".*" + message + ".*Failing the application.*"); } @Test @@ -318,7 +339,8 @@ public class TestRMAppTransitions { RMApp application = testCreateAppAccepted(); // ACCEPTED => KILLED event RMAppEventType.KILL - RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); application.handle(event); assertKilled(application); } @@ -329,7 +351,8 @@ public class TestRMAppTransitions { RMApp application = testCreateAppRunning(); // RUNNING => KILLED event RMAppEventType.KILL - RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); application.handle(event); assertKilled(application); } @@ -341,25 +364,35 @@ public class TestRMAppTransitions { RMApp application = testCreateAppRunning(); RMAppAttempt appAttempt = application.getCurrentAppAttempt(); int expectedAttemptId = 1; - Assert.assertEquals(expectedAttemptId, appAttempt.getAppAttemptId().getAttemptId()); + Assert.assertEquals(expectedAttemptId, + appAttempt.getAppAttemptId().getAttemptId()); // RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED for (int i=1; i FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED after max retries - RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED); + // RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED + // after max retries + RMAppEvent event = + new RMAppFailedAttemptEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_FAILED, ""); application.handle(event); assertFailed(application, ".*Failing the application.*"); @@ -376,7 +409,8 @@ public class TestRMAppTransitions { RMApp application = testCreateAppFinished(); // FINISHED => FINISHED event RMAppEventType.KILL - RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); application.handle(event); assertTimesAtFinish(application); assertAppState(RMAppState.FINISHED, application); @@ -392,25 +426,32 @@ public class TestRMAppTransitions { RMApp application = testCreateAppRunning(); // RUNNING => KILLED event RMAppEventType.KILL - RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); application.handle(event); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); // KILLED => KILLED event RMAppEventType.ATTEMPT_FINISHED - event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FINISHED); + event = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_FINISHED); application.handle(event); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); // KILLED => KILLED event RMAppEventType.ATTEMPT_FAILED - event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED); + event = + new RMAppFailedAttemptEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_FAILED, ""); application.handle(event); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); // KILLED => KILLED event RMAppEventType.ATTEMPT_KILLED - event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_KILLED); + event = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); application.handle(event); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java new file mode 100644 index 00000000000..03a4ba07441 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -0,0 +1,403 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.MockApps; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore; +import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestRMAppAttemptTransitions { + + private static final Log LOG = + LogFactory.getLog(TestRMAppAttemptTransitions.class); + + private static final String EMPTY_DIAGNOSTICS = ""; + + private RMContext rmContext; + private YarnScheduler scheduler; + private ApplicationMasterService masterService; + private ApplicationMasterLauncher applicationMasterLauncher; + + private RMApp application; + private RMAppAttempt applicationAttempt; + + private final class TestApplicationAttemptEventDispatcher implements + EventHandler { + + @Override + public void handle(RMAppAttemptEvent event) { + ApplicationAttemptId appID = event.getApplicationAttemptId(); + assertEquals(applicationAttempt.getAppAttemptId(), appID); + try { + applicationAttempt.handle(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " for application " + appID, t); + } + } + } + + // handle all the RM application events - same as in ResourceManager.java + private final class TestApplicationEventDispatcher implements + EventHandler { + @Override + public void handle(RMAppEvent event) { + assertEquals(application.getApplicationId(), event.getApplicationId()); + try { + application.handle(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " for application " + application.getApplicationId(), t); + } + } + } + + private final class TestSchedulerEventDispatcher implements + EventHandler { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + } + + private final class TestAMLauncherEventDispatcher implements + EventHandler { + @Override + public void handle(AMLauncherEvent event) { + applicationMasterLauncher.handle(event); + } + } + + private static int appId = 1; + + @Before + public void setUp() throws Exception { + InlineDispatcher rmDispatcher = new InlineDispatcher(); + + ContainerAllocationExpirer containerAllocationExpirer = + mock(ContainerAllocationExpirer.class); + AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); + rmContext = new RMContextImpl(new MemStore(), rmDispatcher, + containerAllocationExpirer, amLivelinessMonitor); + + scheduler = mock(YarnScheduler.class); + masterService = mock(ApplicationMasterService.class); + applicationMasterLauncher = mock(ApplicationMasterLauncher.class); + + rmDispatcher.register(RMAppAttemptEventType.class, + new TestApplicationAttemptEventDispatcher()); + + rmDispatcher.register(RMAppEventType.class, + new TestApplicationEventDispatcher()); + + rmDispatcher.register(SchedulerEventType.class, + new TestSchedulerEventDispatcher()); + + rmDispatcher.register(AMLauncherEventType.class, + new TestAMLauncherEventDispatcher()); + + rmDispatcher.init(new Configuration()); + rmDispatcher.start(); + + + ApplicationId applicationId = MockApps.newAppID(appId++); + ApplicationAttemptId applicationAttemptId = + MockApps.newAppAttemptID(applicationId, 0); + + final String user = MockApps.newUserName(); + final String queue = MockApps.newQueue(); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); + when(submissionContext.getUser()).thenReturn(user); + when(submissionContext.getQueue()).thenReturn(queue); + ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); + Resource resource = mock(Resource.class); + when(amContainerSpec.getResource()).thenReturn(resource); + when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec); + + application = mock(RMApp.class); + applicationAttempt = + new RMAppAttemptImpl(applicationAttemptId, null, rmContext, scheduler, + masterService, submissionContext); + when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt); + when(application.getApplicationId()).thenReturn(applicationId); + + testAppAttemptNewState(); + } + + @After + public void tearDown() throws Exception { + ((AsyncDispatcher)this.rmContext.getDispatcher()).stop(); + } + + + /** + * {@link RMAppAttemptState#NEW} + */ + private void testAppAttemptNewState() { + assertEquals(RMAppAttemptState.NEW, + applicationAttempt.getAppAttemptState()); + assertEquals(0, applicationAttempt.getDiagnostics().length()); + assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); + assertNull(applicationAttempt.getMasterContainer()); + assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); + assertEquals(0, applicationAttempt.getRanNodes().size()); + } + + /** + * {@link RMAppAttemptState#SUBMITTED} + */ + private void testAppAttemptSubmittedState() { + assertEquals(RMAppAttemptState.SUBMITTED, + applicationAttempt.getAppAttemptState()); + assertEquals(0, applicationAttempt.getDiagnostics().length()); + assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); + assertNull(applicationAttempt.getMasterContainer()); + assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); + assertEquals(0, applicationAttempt.getRanNodes().size()); + + // Check events + verify(masterService). + registerAppAttempt(applicationAttempt.getAppAttemptId()); + verify(scheduler).handle(any(AppAddedSchedulerEvent.class)); + } + + /** + * {@link RMAppAttemptState#SUBMITTED} -> {@link RMAppAttemptState#FAILED} + */ + private void testAppAttemptSubmittedToFailedState(String diagnostics) { + assertEquals(RMAppAttemptState.FAILED, + applicationAttempt.getAppAttemptState()); + assertEquals(diagnostics, applicationAttempt.getDiagnostics()); + assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); + assertNull(applicationAttempt.getMasterContainer()); + assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); + assertEquals(0, applicationAttempt.getRanNodes().size()); + + // Check events + verify(application).handle(any(RMAppRejectedEvent.class)); + } + + /** + * {@link RMAppAttemptState#KILLED} + */ + private void testAppAttemptKilledState(Container amContainer, + String diagnostics) { + assertEquals(RMAppAttemptState.KILLED, + applicationAttempt.getAppAttemptState()); + assertEquals(diagnostics, applicationAttempt.getDiagnostics()); + assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); + assertEquals(amContainer, applicationAttempt.getMasterContainer()); + assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); + assertEquals(0, applicationAttempt.getRanNodes().size()); + } + + /** + * {@link RMAppAttemptState#SCHEDULED} + */ + private void testAppAttemptScheduledState() { + assertEquals(RMAppAttemptState.SCHEDULED, + applicationAttempt.getAppAttemptState()); + assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); + assertNull(applicationAttempt.getMasterContainer()); + assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); + assertEquals(0, applicationAttempt.getRanNodes().size()); + + // Check events + verify(application).handle(any(RMAppEvent.class)); + verify(scheduler). + allocate(any(ApplicationAttemptId.class), + any(List.class), any(List.class)); + } + + /** + * {@link RMAppAttemptState#ALLOCATED} + */ + private void testAppAttemptAllocatedState(Container amContainer) { + assertEquals(RMAppAttemptState.ALLOCATED, + applicationAttempt.getAppAttemptState()); + assertEquals(amContainer, applicationAttempt.getMasterContainer()); + + // Check events + verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class)); + verify(scheduler, times(2)). + allocate( + any(ApplicationAttemptId.class), any(List.class), any(List.class)); + } + + /** + * {@link RMAppAttemptState#FAILED} + */ + private void testAppAttemptFailedState(Container container, + String diagnostics) { + assertEquals(RMAppAttemptState.FAILED, + applicationAttempt.getAppAttemptState()); + assertEquals(diagnostics, applicationAttempt.getDiagnostics()); + assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); + assertEquals(container, applicationAttempt.getMasterContainer()); + assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); + assertEquals(0, applicationAttempt.getRanNodes().size()); + + // Check events + verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class)); + } + + private void submitApplicationAttempt() { + ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); + applicationAttempt.handle( + new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START)); + testAppAttemptSubmittedState(); + } + + private void scheduleApplicationAttempt() { + submitApplicationAttempt(); + applicationAttempt.handle( + new RMAppAttemptEvent( + applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.APP_ACCEPTED)); + testAppAttemptScheduledState(); + } + + private Container allocateApplicationAttempt() { + scheduleApplicationAttempt(); + + // Mock the allocation of AM container + Container container = mock(Container.class); + Allocation allocation = mock(Allocation.class); + when(allocation.getContainers()). + thenReturn(Collections.singletonList(container)); + when( + scheduler.allocate( + any(ApplicationAttemptId.class), + any(List.class), + any(List.class))). + thenReturn(allocation); + + applicationAttempt.handle( + new RMAppAttemptContainerAllocatedEvent( + applicationAttempt.getAppAttemptId(), + container)); + + testAppAttemptAllocatedState(container); + + return container; + } + + @Test + public void testNewToKilled() { + applicationAttempt.handle( + new RMAppAttemptEvent( + applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.KILL)); + testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS); + } + + @Test + public void testSubmittedToFailed() { + submitApplicationAttempt(); + String message = "Rejected"; + applicationAttempt.handle( + new RMAppAttemptRejectedEvent( + applicationAttempt.getAppAttemptId(), message)); + testAppAttemptSubmittedToFailedState(message); + } + + @Test + public void testSubmittedToKilled() { + submitApplicationAttempt(); + applicationAttempt.handle( + new RMAppAttemptEvent( + applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.KILL)); + testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS); + } + + @Test + public void testScheduledToKilled() { + scheduleApplicationAttempt(); + applicationAttempt.handle( + new RMAppAttemptEvent( + applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.KILL)); + testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS); + } + + @Test + public void testAllocatedToKilled() { + Container amContainer = allocateApplicationAttempt(); + applicationAttempt.handle( + new RMAppAttemptEvent( + applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.KILL)); + testAppAttemptKilledState(amContainer, EMPTY_DIAGNOSTICS); + } + + @Test + public void testAllocatedToFailed() { + Container amContainer = allocateApplicationAttempt(); + String diagnostics = "Launch Failed"; + applicationAttempt.handle( + new RMAppAttemptLaunchFailedEvent( + applicationAttempt.getAppAttemptId(), + diagnostics)); + testAppAttemptFailedState(amContainer, diagnostics); + } + +}