diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9a750af999e..9e98ac96e1d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -558,6 +558,9 @@ Release 0.23.4 - UNRELEASED MAPREDUCE-4691. Historyserver can report "Unknown job" after RM says job has completed (Robert Joseph Evans via jlowe) + MAPREDUCE-4689. JobClient.getMapTaskReports on failed job results in NPE + (jlowe via bobby) + Release 0.23.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java index 669eaa4d6f7..830b64f1ad3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java @@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.util.Records; public class CompletedTask implements Task { + private static final Counters EMPTY_COUNTERS = new Counters(); + private final TaskId taskId; private final TaskInfo taskInfo; private TaskReport report; @@ -124,7 +126,11 @@ public class CompletedTask implements Task { report.setFinishTime(taskInfo.getFinishTime()); report.setTaskState(getState()); report.setProgress(getProgress()); - report.setCounters(TypeConverter.toYarn(getCounters())); + Counters counters = getCounters(); + if (counters == null) { + counters = EMPTY_COUNTERS; + } + report.setCounters(TypeConverter.toYarn(counters)); if (successfulAttempt != null) { report.setSuccessfulAttempt(successfulAttempt); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index b596a2123a7..f9acb1a3821 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.job.Job; @@ -402,6 +403,63 @@ public class TestJobHistoryParsing { } } + @Test + public void testCountersForFailedTask() throws Exception { + LOG.info("STARTING testCountersForFailedTask"); + try { + Configuration conf = new Configuration(); + conf + .setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + RackResolver.init(conf); + MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, + this.getClass().getName(), true); + app.submit(conf); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + app.waitForState(job, JobState.FAILED); + + // make sure all events are flushed + app.waitForState(Service.STATE.STOPPED); + + String jobhistoryDir = JobHistoryUtils + .getHistoryIntermediateDoneDirForUser(conf); + JobHistory jobHistory = new JobHistory(); + jobHistory.init(conf); + + JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) + .getJobIndexInfo(); + String jobhistoryFileName = FileNameIndexUtils + .getDoneFileName(jobIndexInfo); + + Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); + FSDataInputStream in = null; + FileContext fc = null; + try { + fc = FileContext.getFileContext(conf); + in = fc.open(fc.makeQualified(historyFilePath)); + } catch (IOException ioe) { + LOG.info("Can not open history file: " + historyFilePath, ioe); + throw (new Exception("Can not open History File")); + } + + JobHistoryParser parser = new JobHistoryParser(in); + JobInfo jobInfo = parser.parse(); + Exception parseException = parser.getParseException(); + Assert.assertNull("Caught an expected exception " + parseException, + parseException); + for (Map.Entry entry : jobInfo.getAllTasks().entrySet()) { + TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey()); + CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue()); + Assert.assertNotNull("completed task report has null counters", + ct.getReport().getCounters()); + } + } finally { + LOG.info("FINISHED testCountersForFailedTask"); + } + } + static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory { public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete, @@ -422,6 +480,26 @@ public class TestJobHistoryParsing { } } + static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory { + + public MRAppWithHistoryWithFailedTask(int maps, int reduces, boolean autoComplete, + String testName, boolean cleanOnStart) { + super(maps, reduces, autoComplete, testName, cleanOnStart); + } + + @SuppressWarnings("unchecked") + @Override + protected void attemptLaunched(TaskAttemptId attemptID) { + if (attemptID.getTaskId().getId() == 0) { + getContext().getEventHandler().handle( + new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + } else { + getContext().getEventHandler().handle( + new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); + } + } + } + public static void main(String[] args) throws Exception { TestJobHistoryParsing t = new TestJobHistoryParsing(); t.testHistoryParsing();