svn merge -c 1391679 FIXES: MAPREDUCE-4689. JobClient.getMapTaskReports on failed job results in NPE (jlowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1391680 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-09-28 22:18:32 +00:00
parent f40997309e
commit 81b14fae80
3 changed files with 88 additions and 1 deletions

View File

@ -425,6 +425,9 @@ Release 0.23.4 - UNRELEASED
MAPREDUCE-4691. Historyserver can report "Unknown job" after RM says job MAPREDUCE-4691. Historyserver can report "Unknown job" after RM says job
has completed (Robert Joseph Evans via jlowe) has completed (Robert Joseph Evans via jlowe)
MAPREDUCE-4689. JobClient.getMapTaskReports on failed job results in NPE
(jlowe via bobby)
Release 0.23.3 - UNRELEASED Release 0.23.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.util.Records;
public class CompletedTask implements Task { public class CompletedTask implements Task {
private static final Counters EMPTY_COUNTERS = new Counters();
private final TaskId taskId; private final TaskId taskId;
private final TaskInfo taskInfo; private final TaskInfo taskInfo;
private TaskReport report; private TaskReport report;
@ -124,7 +126,11 @@ public class CompletedTask implements Task {
report.setFinishTime(taskInfo.getFinishTime()); report.setFinishTime(taskInfo.getFinishTime());
report.setTaskState(getState()); report.setTaskState(getState());
report.setProgress(getProgress()); report.setProgress(getProgress());
report.setCounters(TypeConverter.toYarn(getCounters())); Counters counters = getCounters();
if (counters == null) {
counters = EMPTY_COUNTERS;
}
report.setCounters(TypeConverter.toYarn(counters));
if (successfulAttempt != null) { if (successfulAttempt != null) {
report.setSuccessfulAttempt(successfulAttempt); report.setSuccessfulAttempt(successfulAttempt);
} }

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -402,6 +403,63 @@ public class TestJobHistoryParsing {
} }
} }
@Test
public void testCountersForFailedTask() throws Exception {
LOG.info("STARTING testCountersForFailedTask");
try {
Configuration conf = new Configuration();
conf
.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true,
this.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.FAILED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
String jobhistoryDir = JobHistoryUtils
.getHistoryIntermediateDoneDirForUser(conf);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
.getJobIndexInfo();
String jobhistoryFileName = FileNameIndexUtils
.getDoneFileName(jobIndexInfo);
Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
FSDataInputStream in = null;
FileContext fc = null;
try {
fc = FileContext.getFileContext(conf);
in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe);
throw (new Exception("Can not open History File"));
}
JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse();
Exception parseException = parser.getParseException();
Assert.assertNull("Caught an expected exception " + parseException,
parseException);
for (Map.Entry<TaskID,TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
Assert.assertNotNull("completed task report has null counters",
ct.getReport().getCounters());
}
} finally {
LOG.info("FINISHED testCountersForFailedTask");
}
}
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory { static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete, public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete,
@ -422,6 +480,26 @@ public class TestJobHistoryParsing {
} }
} }
static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory {
public MRAppWithHistoryWithFailedTask(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@SuppressWarnings("unchecked")
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
}
}
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
TestJobHistoryParsing t = new TestJobHistoryParsing(); TestJobHistoryParsing t = new TestJobHistoryParsing();
t.testHistoryParsing(); t.testHistoryParsing();