MAPREDUCE-4689. JobClient.getMapTaskReports on failed job results in NPE (jlowe via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1391679 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
03b7ad04fa
commit
b897d6c35a
|
@ -558,6 +558,9 @@ Release 0.23.4 - UNRELEASED
|
|||
MAPREDUCE-4691. Historyserver can report "Unknown job" after RM says job
|
||||
has completed (Robert Joseph Evans via jlowe)
|
||||
|
||||
MAPREDUCE-4689. JobClient.getMapTaskReports on failed job results in NPE
|
||||
(jlowe via bobby)
|
||||
|
||||
Release 0.23.3 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
|
||||
public class CompletedTask implements Task {
|
||||
|
||||
private static final Counters EMPTY_COUNTERS = new Counters();
|
||||
|
||||
private final TaskId taskId;
|
||||
private final TaskInfo taskInfo;
|
||||
private TaskReport report;
|
||||
|
@ -124,7 +126,11 @@ public class CompletedTask implements Task {
|
|||
report.setFinishTime(taskInfo.getFinishTime());
|
||||
report.setTaskState(getState());
|
||||
report.setProgress(getProgress());
|
||||
report.setCounters(TypeConverter.toYarn(getCounters()));
|
||||
Counters counters = getCounters();
|
||||
if (counters == null) {
|
||||
counters = EMPTY_COUNTERS;
|
||||
}
|
||||
report.setCounters(TypeConverter.toYarn(counters));
|
||||
if (successfulAttempt != null) {
|
||||
report.setSuccessfulAttempt(successfulAttempt);
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
|
|||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
|
@ -402,6 +403,63 @@ public class TestJobHistoryParsing {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountersForFailedTask() throws Exception {
|
||||
LOG.info("STARTING testCountersForFailedTask");
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf
|
||||
.setClass(
|
||||
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
MyResolver.class, DNSToSwitchMapping.class);
|
||||
RackResolver.init(conf);
|
||||
MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true,
|
||||
this.getClass().getName(), true);
|
||||
app.submit(conf);
|
||||
Job job = app.getContext().getAllJobs().values().iterator().next();
|
||||
JobId jobId = job.getID();
|
||||
app.waitForState(job, JobState.FAILED);
|
||||
|
||||
// make sure all events are flushed
|
||||
app.waitForState(Service.STATE.STOPPED);
|
||||
|
||||
String jobhistoryDir = JobHistoryUtils
|
||||
.getHistoryIntermediateDoneDirForUser(conf);
|
||||
JobHistory jobHistory = new JobHistory();
|
||||
jobHistory.init(conf);
|
||||
|
||||
JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
|
||||
.getJobIndexInfo();
|
||||
String jobhistoryFileName = FileNameIndexUtils
|
||||
.getDoneFileName(jobIndexInfo);
|
||||
|
||||
Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
|
||||
FSDataInputStream in = null;
|
||||
FileContext fc = null;
|
||||
try {
|
||||
fc = FileContext.getFileContext(conf);
|
||||
in = fc.open(fc.makeQualified(historyFilePath));
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Can not open history file: " + historyFilePath, ioe);
|
||||
throw (new Exception("Can not open History File"));
|
||||
}
|
||||
|
||||
JobHistoryParser parser = new JobHistoryParser(in);
|
||||
JobInfo jobInfo = parser.parse();
|
||||
Exception parseException = parser.getParseException();
|
||||
Assert.assertNull("Caught an expected exception " + parseException,
|
||||
parseException);
|
||||
for (Map.Entry<TaskID,TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
|
||||
TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
|
||||
CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
|
||||
Assert.assertNotNull("completed task report has null counters",
|
||||
ct.getReport().getCounters());
|
||||
}
|
||||
} finally {
|
||||
LOG.info("FINISHED testCountersForFailedTask");
|
||||
}
|
||||
}
|
||||
|
||||
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
|
||||
|
||||
public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete,
|
||||
|
@ -422,6 +480,26 @@ public class TestJobHistoryParsing {
|
|||
}
|
||||
}
|
||||
|
||||
static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory {
|
||||
|
||||
public MRAppWithHistoryWithFailedTask(int maps, int reduces, boolean autoComplete,
|
||||
String testName, boolean cleanOnStart) {
|
||||
super(maps, reduces, autoComplete, testName, cleanOnStart);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
protected void attemptLaunched(TaskAttemptId attemptID) {
|
||||
if (attemptID.getTaskId().getId() == 0) {
|
||||
getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
|
||||
} else {
|
||||
getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
TestJobHistoryParsing t = new TestJobHistoryParsing();
|
||||
t.testHistoryParsing();
|
||||
|
|
Loading…
Reference in New Issue