MAPREDUCE-6892. Issues with the count of failed/killed tasks in the jhist file. (Peter Bacsko via Haibo Chen)

This commit is contained in:
Haibo Chen 2017-08-30 10:07:48 -07:00
parent a20e7105ea
commit d04f85f387
22 changed files with 573 additions and 136 deletions

View File

@ -431,10 +431,18 @@ protected void serviceStop() throws Exception {
+ " to have not been closed. Will close");
//Create a JobFinishEvent so that it is written to the job history
final Job job = context.getJob(toClose);
int successfulMaps = job.getCompletedMaps() - job.getFailedMaps()
- job.getKilledMaps();
int successfulReduces = job.getCompletedReduces()
- job.getFailedReduces() - job.getKilledReduces();
JobUnsuccessfulCompletionEvent jucEvent =
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
System.currentTimeMillis(), job.getCompletedMaps(),
job.getCompletedReduces(),
System.currentTimeMillis(),
successfulMaps,
successfulReduces,
job.getFailedMaps(), job.getFailedReduces(),
job.getKilledMaps(), job.getKilledReduces(),
createJobStateForJobUnsuccessfulCompletionEvent(
mi.getForcedJobStateOnShutDown()),
job.getDiagnostics());
@ -655,9 +663,9 @@ public void handleEvent(JobHistoryEvent event) {
JobFinishedEvent jFinishedEvent =
(JobFinishedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jFinishedEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getFinishedMaps());
mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getSucceededMaps());
mi.getJobIndexInfo().setNumReduces(
jFinishedEvent.getFinishedReduces());
jFinishedEvent.getSucceededReduces());
mi.getJobIndexInfo().setJobStatus(JobState.SUCCEEDED.toString());
closeEventWriter(event.getJobID());
processDoneFiles(event.getJobID());
@ -672,8 +680,8 @@ public void handleEvent(JobHistoryEvent event) {
JobUnsuccessfulCompletionEvent jucEvent =
(JobUnsuccessfulCompletionEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
mi.getJobIndexInfo().setNumMaps(jucEvent.getSucceededMaps());
mi.getJobIndexInfo().setNumReduces(jucEvent.getSucceededReduces());
mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
closeEventWriter(event.getJobID());
if(context.isLastAMRetry())
@ -690,8 +698,8 @@ public void handleEvent(JobHistoryEvent event) {
(JobUnsuccessfulCompletionEvent) event
.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
mi.getJobIndexInfo().setNumMaps(jucEvent.getSucceededMaps());
mi.getJobIndexInfo().setNumReduces(jucEvent.getSucceededReduces());
mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
closeEventWriter(event.getJobID());
processDoneFiles(event.getJobID());
@ -739,10 +747,12 @@ public void processEventForJobSummary(HistoryEvent event, JobSummary summary,
case JOB_FINISHED:
JobFinishedEvent jfe = (JobFinishedEvent) event;
summary.setJobFinishTime(jfe.getFinishTime());
summary.setNumFinishedMaps(jfe.getFinishedMaps());
summary.setNumSucceededMaps(jfe.getSucceededMaps());
summary.setNumFailedMaps(jfe.getFailedMaps());
summary.setNumFinishedReduces(jfe.getFinishedReduces());
summary.setNumSucceededReduces(jfe.getSucceededReduces());
summary.setNumFailedReduces(jfe.getFailedReduces());
summary.setNumKilledMaps(jfe.getKilledMaps());
summary.setNumKilledReduces(jfe.getKilledReduces());
if (summary.getJobStatus() == null)
summary
.setJobStatus(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED
@ -753,11 +763,21 @@ public void processEventForJobSummary(HistoryEvent event, JobSummary summary,
break;
case JOB_FAILED:
case JOB_KILLED:
Job job = context.getJob(jobId);
JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent) event;
int successfulMaps = job.getCompletedMaps() - job.getFailedMaps()
- job.getKilledMaps();
int successfulReduces = job.getCompletedReduces()
- job.getFailedReduces() - job.getKilledReduces();
summary.setJobStatus(juce.getStatus());
summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps());
summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces());
summary.setNumSucceededMaps(successfulMaps);
summary.setNumSucceededReduces(successfulReduces);
summary.setNumFailedMaps(job.getFailedMaps());
summary.setNumFailedReduces(job.getFailedReduces());
summary.setJobFinishTime(juce.getFinishTime());
summary.setNumKilledMaps(juce.getKilledMaps());
summary.setNumKilledReduces(juce.getKilledReduces());
setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters());
break;
default:
@ -840,12 +860,22 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
JobUnsuccessfulCompletionEvent juce =
(JobUnsuccessfulCompletionEvent) event;
tEvent.addEventInfo("FINISH_TIME", juce.getFinishTime());
tEvent.addEventInfo("NUM_MAPS", juce.getFinishedMaps());
tEvent.addEventInfo("NUM_REDUCES", juce.getFinishedReduces());
tEvent.addEventInfo("NUM_MAPS",
juce.getSucceededMaps() +
juce.getFailedMaps() +
juce.getKilledMaps());
tEvent.addEventInfo("NUM_REDUCES",
juce.getSucceededReduces() +
juce.getFailedReduces() +
juce.getKilledReduces());
tEvent.addEventInfo("JOB_STATUS", juce.getStatus());
tEvent.addEventInfo("DIAGNOSTICS", juce.getDiagnostics());
tEvent.addEventInfo("FINISHED_MAPS", juce.getFinishedMaps());
tEvent.addEventInfo("FINISHED_REDUCES", juce.getFinishedReduces());
tEvent.addEventInfo("SUCCESSFUL_MAPS", juce.getSucceededMaps());
tEvent.addEventInfo("SUCCESSFUL_REDUCES", juce.getSucceededReduces());
tEvent.addEventInfo("FAILED_MAPS", juce.getFailedMaps());
tEvent.addEventInfo("FAILED_REDUCES", juce.getFailedReduces());
tEvent.addEventInfo("KILLED_MAPS", juce.getKilledMaps());
tEvent.addEventInfo("KILLED_REDUCES", juce.getKilledReduces());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
@ -853,12 +883,20 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
case JOB_FINISHED:
JobFinishedEvent jfe = (JobFinishedEvent) event;
tEvent.addEventInfo("FINISH_TIME", jfe.getFinishTime());
tEvent.addEventInfo("NUM_MAPS", jfe.getFinishedMaps());
tEvent.addEventInfo("NUM_REDUCES", jfe.getFinishedReduces());
tEvent.addEventInfo("NUM_MAPS",
jfe.getSucceededMaps() +
jfe.getFailedMaps() +
jfe.getKilledMaps());
tEvent.addEventInfo("NUM_REDUCES",
jfe.getSucceededReduces() +
jfe.getFailedReduces() +
jfe.getKilledReduces());
tEvent.addEventInfo("FAILED_MAPS", jfe.getFailedMaps());
tEvent.addEventInfo("FAILED_REDUCES", jfe.getFailedReduces());
tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps());
tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces());
tEvent.addEventInfo("SUCCESSFUL_MAPS", jfe.getSucceededMaps());
tEvent.addEventInfo("SUCCESSFUL_REDUCES", jfe.getSucceededReduces());
tEvent.addEventInfo("KILLED_MAPS", jfe.getKilledMaps());
tEvent.addEventInfo("KILLED_REDUCES", jfe.getKilledReduces());
tEvent.addEventInfo("MAP_COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(jfe.getMapCounters()));
tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS",

View File

@ -30,10 +30,12 @@ public class JobSummary {
private long firstReduceTaskLaunchTime; // ReduceAttemptStarted |
// TaskAttemptStartEvent
private long jobFinishTime;
private int numFinishedMaps;
private int numSucceededMaps;
private int numFailedMaps;
private int numFinishedReduces;
private int numSucceededReduces;
private int numFailedReduces;
private int numKilledMaps;
private int numKilledReduces;
private int resourcesPerMap; // resources used per map/min resource
private int resourcesPerReduce; // resources used per reduce/min resource
// resource models
@ -98,12 +100,12 @@ public void setJobFinishTime(long jobFinishTime) {
this.jobFinishTime = jobFinishTime;
}
public int getNumFinishedMaps() {
return numFinishedMaps;
public int getNumSucceededMaps() {
return numSucceededMaps;
}
public void setNumFinishedMaps(int numFinishedMaps) {
this.numFinishedMaps = numFinishedMaps;
public void setNumSucceededMaps(int numSucceededMaps) {
this.numSucceededMaps = numSucceededMaps;
}
public int getNumFailedMaps() {
@ -114,6 +116,22 @@ public void setNumFailedMaps(int numFailedMaps) {
this.numFailedMaps = numFailedMaps;
}
public int getKilledMaps() {
return numKilledMaps;
}
public void setNumKilledMaps(int numKilledMaps) {
this.numKilledMaps = numKilledMaps;
}
public int getKilledReduces() {
return numKilledReduces;
}
public void setNumKilledReduces(int numKilledReduces) {
this.numKilledReduces = numKilledReduces;
}
public int getResourcesPerMap() {
return resourcesPerMap;
}
@ -122,12 +140,12 @@ public void setResourcesPerMap(int resourcesPerMap) {
this.resourcesPerMap = resourcesPerMap;
}
public int getNumFinishedReduces() {
return numFinishedReduces;
public int getNumSucceededReduces() {
return numSucceededReduces;
}
public void setNumFinishedReduces(int numFinishedReduces) {
this.numFinishedReduces = numFinishedReduces;
public void setNumSucceededReduces(int numSucceededReduces) {
this.numSucceededReduces = numSucceededReduces;
}
public int getNumFailedReduces() {
@ -204,8 +222,15 @@ public String getJobSummaryString() {
.add("finishTime", jobFinishTime)
.add("resourcesPerMap", resourcesPerMap)
.add("resourcesPerReduce", resourcesPerReduce)
.add("numMaps", numFinishedMaps + numFailedMaps)
.add("numReduces", numFinishedReduces + numFailedReduces)
.add("numMaps", numSucceededMaps + numFailedMaps + numKilledMaps)
.add("numReduces", numSucceededReduces + numFailedReduces
+ numKilledReduces)
.add("succededMaps", numSucceededMaps)
.add("succeededReduces", numSucceededReduces)
.add("failedMaps", numFailedMaps)
.add("failedReduces", numFailedReduces)
.add("killedMaps", numKilledMaps)
.add("killedReduces", numKilledReduces)
.add("user", user)
.add("queue", queue)
.add("status", jobStatus)

View File

@ -65,6 +65,10 @@ public interface Job {
int getTotalReduces();
int getCompletedMaps();
int getCompletedReduces();
int getFailedMaps();
int getFailedReduces();
int getKilledMaps();
int getKilledReduces();
float getProgress();
boolean isUber();
String getUserName();

View File

@ -1684,6 +1684,10 @@ private void unsuccessfulFinish(JobStateInternal finalState) {
finishTime,
succeededMapTaskCount,
succeededReduceTaskCount,
failedMapTaskCount,
failedReduceTaskCount,
killedMapTaskCount,
killedReduceTaskCount,
finalState.toString(),
diagnostics);
eventHandler.handle(new JobHistoryEvent(jobId,
@ -1748,6 +1752,7 @@ private static JobFinishedEvent createJobFinishedEvent(JobImpl job) {
job.oldJobId, job.finishTime,
job.succeededMapTaskCount, job.succeededReduceTaskCount,
job.failedMapTaskCount, job.failedReduceTaskCount,
job.killedMapTaskCount, job.killedReduceTaskCount,
job.finalMapCounters,
job.finalReduceCounters,
job.fullCounters);
@ -1797,7 +1802,7 @@ public void transition(JobImpl job, JobEvent event) {
job.setFinishTime();
JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0,
job.finishTime, 0, 0, 0, 0, 0, 0,
JobStateInternal.KILLED.toString(), job.diagnostics);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
job.finished(JobStateInternal.KILLED);
@ -1954,8 +1959,8 @@ private static class TaskCompletedTransition implements
@Override
public JobStateInternal transition(JobImpl job, JobEvent event) {
job.completedTaskCount++;
LOG.info("Num completed Tasks: " + job.completedTaskCount);
JobTaskEvent taskEvent = (JobTaskEvent) event;
LOG.info("Num completed Tasks: " + job.completedTaskCount);
Task task = job.tasks.get(taskEvent.getTaskID());
if (taskEvent.getState() == TaskState.SUCCEEDED) {
taskSucceeded(job, task);
@ -1991,11 +1996,15 @@ protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
job.allowedMapFailuresPercent*job.numMapTasks ||
job.failedReduceTaskCount*100 >
job.allowedReduceFailuresPercent*job.numReduceTasks) {
job.setFinishTime();
String diagnosticMsg = "Job failed as tasks failed. " +
"failedMaps:" + job.failedMapTaskCount +
" failedReduces:" + job.failedReduceTaskCount;
" failedReduces:" + job.failedReduceTaskCount +
" killedMaps:" + job.killedMapTaskCount +
" killedReduces: " + job.killedReduceTaskCount;
LOG.info(diagnosticMsg);
job.addDiagnostic(diagnosticMsg);
@ -2226,7 +2235,13 @@ public void transition(JobImpl job, JobEvent event) {
job.setFinishTime();
JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0,
job.finishTime,
job.succeededMapTaskCount,
job.succeededReduceTaskCount,
job.failedMapTaskCount,
job.failedReduceTaskCount,
job.killedMapTaskCount,
job.killedReduceTaskCount,
jobHistoryString, job.diagnostics);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
job.finished(terminationState);
@ -2266,4 +2281,24 @@ public int getMaxFetchFailuresNotifications() {
public void setJobPriority(Priority priority) {
this.jobPriority = priority;
}
@Override
public int getFailedMaps() {
return failedMapTaskCount;
}
@Override
public int getFailedReduces() {
return failedReduceTaskCount;
}
@Override
public int getKilledMaps() {
return killedMapTaskCount;
}
@Override
public int getKilledReduces() {
return killedReduceTaskCount;
}
}

View File

@ -275,7 +275,8 @@ public void testBatchedFlushJobEndMultiplier() throws Exception {
t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, 0, 0, null, null,
new Counters())));
handleNextNEvents(jheh, 29);
verify(mockWriter, times(0)).flush();
@ -308,22 +309,22 @@ public void testProcessDoneFilesOnLastAMRetry() throws Exception {
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.ERROR.toString())));
0, 0, 0, 0, 0, 0, JobStateInternal.ERROR.toString())));
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(2)).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.FAILED.toString())));
0, 0, 0, 0, 0, 0, JobStateInternal.FAILED.toString())));
verify(jheh, times(3)).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.KILLED.toString())));
0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString())));
verify(jheh, times(4)).processDoneFiles(any(JobId.class));
mockWriter = jheh.getEventWriter();
@ -354,22 +355,22 @@ public void testProcessDoneFilesNotLastAMRetry() throws Exception {
// skip processing done files
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.ERROR.toString())));
0, 0, 0, 0, 0, 0, JobStateInternal.ERROR.toString())));
verify(jheh, times(0)).processDoneFiles(t.jobId);
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(1)).processDoneFiles(t.jobId);
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.FAILED.toString())));
0, 0, 0, 0, 0, 0, JobStateInternal.FAILED.toString())));
verify(jheh, times(2)).processDoneFiles(t.jobId);
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.KILLED.toString())));
0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString())));
verify(jheh, times(3)).processDoneFiles(t.jobId);
mockWriter = jheh.getEventWriter();
@ -405,7 +406,8 @@ public void testPropertyRedactionForJHS() throws Exception {
"nmhost", 3000, 4000, -1)));
handleEvent(jheh, new JobHistoryEvent(params.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(
params.jobId), 0, 0, 0, JobStateInternal.FAILED.toString())));
params.jobId), 0, 0, 0, 0, 0, 0, 0,
JobStateInternal.FAILED.toString())));
// verify the value of the sensitive property in job.xml is restored.
Assert.assertEquals(sensitivePropertyName + " is modified.",
@ -476,7 +478,7 @@ public void testDefaultFsIsUsedForHistory() throws Exception {
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
// If we got here then event handler worked but we don't know with which
@ -546,7 +548,7 @@ public void testAMStartedEvent() throws Exception {
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.FAILED.toString())));
0, 0, 0, 0, 0, 0, JobStateInternal.FAILED.toString())));
Assert.assertEquals(mi.getJobIndexInfo().getSubmitTime(), 100);
Assert.assertEquals(mi.getJobIndexInfo().getJobStartTime(), 200);
@ -642,7 +644,7 @@ public void testTimelineEventHandling() throws Exception {
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
0, new Counters(), new Counters(), new Counters()), currentTime));
0, 0, 0, new Counters(), new Counters(), new Counters()), currentTime));
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
@ -668,7 +670,8 @@ public void testTimelineEventHandling() throws Exception {
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId),
0, 0, 0, JobStateInternal.KILLED.toString()), currentTime + 20));
0, 0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString()),
currentTime + 20));
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
@ -944,7 +947,7 @@ public void testSetTrackingURLAfterHistoryIsWritten() throws Exception {
// Job finishes and successfully writes history
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
@ -978,7 +981,7 @@ public void testDontSetTrackingURLIfHistoryWriteFailed() throws Exception {
// Job finishes, but doesn't successfully write history
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
@ -1009,8 +1012,8 @@ public void testDontSetTrackingURLIfHistoryWriteThrows() throws Exception {
// Job finishes, but doesn't successfully write history
try {
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0,
new Counters(), new Counters(), new Counters())));
throw new RuntimeException(
"processDoneFiles didn't throw, but should have");
} catch (YarnRuntimeException yre) {

View File

@ -43,10 +43,12 @@ public void before() {
summary.setFirstMapTaskLaunchTime(4L);
summary.setFirstReduceTaskLaunchTime(5L);
summary.setJobFinishTime(6L);
summary.setNumFinishedMaps(1);
summary.setNumSucceededMaps(1);
summary.setNumFailedMaps(0);
summary.setNumFinishedReduces(1);
summary.setNumSucceededReduces(1);
summary.setNumFailedReduces(0);
summary.setNumKilledMaps(0);
summary.setNumKilledReduces(0);
summary.setUser("testUser");
summary.setQueue("testQueue");
summary.setJobStatus("testJobStatus");

View File

@ -640,6 +640,25 @@ public void setQueueName(String queueName) {
public void setJobPriority(Priority priority) {
// do nothing
}
public int getFailedMaps() {
return 0;
}
@Override
public int getFailedReduces() {
return 0;
}
@Override
public int getKilledMaps() {
return 0;
}
@Override
public int getKilledReduces() {
return 0;
}
};
}

View File

@ -533,6 +533,26 @@ public void setQueueName(String queueName) {
public void setJobPriority(Priority priority) {
// do nothing
}
@Override
public int getFailedMaps() {
return 0;
}
@Override
public int getFailedReduces() {
return 0;
}
@Override
public int getKilledMaps() {
return 0;
}
@Override
public int getKilledReduces() {
return 0;
}
}
/*

View File

@ -54,7 +54,9 @@
{"name": "failedReduces", "type": "int"},
{"name": "totalCounters", "type": "JhCounters"},
{"name": "mapCounters", "type": "JhCounters"},
{"name": "reduceCounters", "type": "JhCounters"}
{"name": "reduceCounters", "type": "JhCounters"},
{"name": "killedMaps", "type": "int", "default": -1},
{"name": "killedReduces", "type": "int", "default": -1}
]
},
@ -136,7 +138,11 @@
{"name": "finishedMaps", "type": "int"},
{"name": "finishedReduces", "type": "int"},
{"name": "jobStatus", "type": "string"},
{"name": "diagnostics", "type": ["null","string"], "default": null}
{"name": "diagnostics", "type": ["null","string"], "default": null},
{"name": "failedMaps", "type": "int", "default": -1},
{"name": "failedReduces", "type": "int", "default": -1},
{"name": "killedMaps", "type": "int", "default": -1},
{"name": "killedReduces", "type": "int", "default": -1}
]
},

View File

@ -327,12 +327,12 @@ public static class AnalyzedJob {
/** Generate analysis information for the parsed job */
public AnalyzedJob (JobInfo job) {
Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
int finishedMaps = (int) job.getFinishedMaps();
int finishedReduces = (int) job.getFinishedReduces();
int succeededMaps = (int) job.getSucceededMaps();
int succeededReduces = (int) job.getSucceededReduces();
mapTasks =
new JobHistoryParser.TaskAttemptInfo[finishedMaps];
new JobHistoryParser.TaskAttemptInfo[succeededMaps];
reduceTasks =
new JobHistoryParser.TaskAttemptInfo[finishedReduces];
new JobHistoryParser.TaskAttemptInfo[succeededReduces];
int mapIndex = 0 , reduceIndex=0;
avgMapTime = 0;
avgReduceTime = 0;
@ -360,12 +360,12 @@ public AnalyzedJob (JobInfo job) {
}
}
}
if (finishedMaps > 0) {
avgMapTime /= finishedMaps;
if (succeededMaps > 0) {
avgMapTime /= succeededMaps;
}
if (finishedReduces > 0) {
avgReduceTime /= finishedReduces;
avgShuffleTime /= finishedReduces;
if (succeededReduces > 0) {
avgReduceTime /= succeededReduces;
avgShuffleTime /= succeededReduces;
}
}
}

View File

@ -236,7 +236,7 @@ private void printTaskSummary(PrintStream ps) {
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.setupFinished, ts.setupStarted));
taskSummary.append("\nMap\t").append(ts.totalMaps);
taskSummary.append("\t").append(job.getFinishedMaps());
taskSummary.append("\t").append(job.getSucceededMaps());
taskSummary.append("\t\t").append(ts.numFailedMaps);
taskSummary.append("\t").append(ts.numKilledMaps);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
@ -244,7 +244,7 @@ private void printTaskSummary(PrintStream ps) {
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
dateFormat, ts.mapFinished, ts.mapStarted));
taskSummary.append("\nReduce\t").append(ts.totalReduces);
taskSummary.append("\t").append(job.getFinishedReduces());
taskSummary.append("\t").append(job.getSucceededReduces());
taskSummary.append("\t\t").append(ts.numFailedReduces);
taskSummary.append("\t").append(ts.numKilledReduces);
taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(

View File

@ -145,7 +145,7 @@ private void printTaskSummary() throws JSONException {
jSums.put("setup", jSumSetup);
JSONObject jSumMap = new JSONObject();
jSumMap.put("total", ts.totalMaps);
jSumMap.put("successful", job.getFinishedMaps());
jSumMap.put("successful", job.getSucceededMaps());
jSumMap.put("failed", ts.numFailedMaps);
jSumMap.put("killed", ts.numKilledMaps);
jSumMap.put("startTime", ts.mapStarted);
@ -153,7 +153,7 @@ private void printTaskSummary() throws JSONException {
jSums.put("map", jSumMap);
JSONObject jSumReduce = new JSONObject();
jSumReduce.put("total", ts.totalReduces);
jSumReduce.put("successful", job.getFinishedReduces());
jSumReduce.put("successful", job.getSucceededReduces());
jSumReduce.put("failed", ts.numFailedReduces);
jSumReduce.put("killed", ts.numKilledReduces);
jSumReduce.put("startTime", ts.reduceStarted);

View File

@ -36,16 +36,18 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class JobFinishedEvent implements HistoryEvent {
public class JobFinishedEvent implements HistoryEvent {
private JobFinished datum = null;
private JobID jobId;
private long finishTime;
private int finishedMaps;
private int finishedReduces;
private int succeededMaps;
private int succeededReduces;
private int failedMaps;
private int failedReduces;
private int killedMaps;
private int killedReduces;
private Counters mapCounters;
private Counters reduceCounters;
private Counters totalCounters;
@ -54,8 +56,8 @@ public class JobFinishedEvent implements HistoryEvent {
* Create an event to record successful job completion
* @param id Job ID
* @param finishTime Finish time of the job
* @param finishedMaps The number of finished maps
* @param finishedReduces The number of finished reduces
* @param succeededMaps The number of succeeded maps
* @param succeededReduces The number of succeeded reduces
* @param failedMaps The number of failed maps
* @param failedReduces The number of failed reduces
* @param mapCounters Map Counters for the job
@ -63,16 +65,19 @@ public class JobFinishedEvent implements HistoryEvent {
* @param totalCounters Total Counters for the job
*/
public JobFinishedEvent(JobID id, long finishTime,
int finishedMaps, int finishedReduces,
int succeededMaps, int succeededReduces,
int failedMaps, int failedReduces,
int killedMaps, int killedReduces,
Counters mapCounters, Counters reduceCounters,
Counters totalCounters) {
this.jobId = id;
this.finishTime = finishTime;
this.finishedMaps = finishedMaps;
this.finishedReduces = finishedReduces;
this.succeededMaps = succeededMaps;
this.succeededReduces = succeededReduces;
this.failedMaps = failedMaps;
this.failedReduces = failedReduces;
this.killedMaps = killedMaps;
this.killedReduces = killedReduces;
this.mapCounters = mapCounters;
this.reduceCounters = reduceCounters;
this.totalCounters = totalCounters;
@ -85,10 +90,14 @@ public Object getDatum() {
datum = new JobFinished();
datum.setJobid(new Utf8(jobId.toString()));
datum.setFinishTime(finishTime);
datum.setFinishedMaps(finishedMaps);
datum.setFinishedReduces(finishedReduces);
// using finishedMaps & finishedReduces in the Avro schema for backward
// compatibility
datum.setFinishedMaps(succeededMaps);
datum.setFinishedReduces(succeededReduces);
datum.setFailedMaps(failedMaps);
datum.setFailedReduces(failedReduces);
datum.setKilledMaps(killedMaps);
datum.setKilledReduces(killedReduces);
datum.setMapCounters(EventWriter.toAvro(mapCounters, "MAP_COUNTERS"));
datum.setReduceCounters(EventWriter.toAvro(reduceCounters,
"REDUCE_COUNTERS"));
@ -102,10 +111,12 @@ public void setDatum(Object oDatum) {
this.datum = (JobFinished) oDatum;
this.jobId = JobID.forName(datum.getJobid().toString());
this.finishTime = datum.getFinishTime();
this.finishedMaps = datum.getFinishedMaps();
this.finishedReduces = datum.getFinishedReduces();
this.succeededMaps = datum.getFinishedMaps();
this.succeededReduces = datum.getFinishedReduces();
this.failedMaps = datum.getFailedMaps();
this.failedReduces = datum.getFailedReduces();
this.killedMaps = datum.getKilledMaps();
this.killedReduces = datum.getKilledReduces();
this.mapCounters = EventReader.fromAvro(datum.getMapCounters());
this.reduceCounters = EventReader.fromAvro(datum.getReduceCounters());
this.totalCounters = EventReader.fromAvro(datum.getTotalCounters());
@ -120,13 +131,17 @@ public EventType getEventType() {
/** Get the job finish time */
public long getFinishTime() { return finishTime; }
/** Get the number of finished maps for the job */
public int getFinishedMaps() { return finishedMaps; }
public int getSucceededMaps() { return succeededMaps; }
/** Get the number of finished reducers for the job */
public int getFinishedReduces() { return finishedReduces; }
public int getSucceededReduces() { return succeededReduces; }
/** Get the number of failed maps for the job */
public int getFailedMaps() { return failedMaps; }
/** Get the number of failed reducers for the job */
public int getFailedReduces() { return failedReduces; }
/** Get the number of killed maps */
public int getKilledMaps() { return killedMaps; }
/** Get the number of killed reduces */
public int getKilledReduces() { return killedReduces; }
/** Get the counters for the job */
public Counters getTotalCounters() {
return totalCounters;
@ -145,12 +160,16 @@ public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("FINISH_TIME", getFinishTime());
tEvent.addInfo("NUM_MAPS", getFinishedMaps());
tEvent.addInfo("NUM_REDUCES", getFinishedReduces());
tEvent.addInfo("NUM_MAPS", getSucceededMaps() + getFailedMaps()
+ getKilledMaps());
tEvent.addInfo("NUM_REDUCES", getSucceededReduces() + getFailedReduces()
+ getKilledReduces());
tEvent.addInfo("FAILED_MAPS", getFailedMaps());
tEvent.addInfo("FAILED_REDUCES", getFailedReduces());
tEvent.addInfo("FINISHED_MAPS", getFinishedMaps());
tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces());
tEvent.addInfo("SUCCESSFUL_MAPS", getSucceededMaps());
tEvent.addInfo("SUCCESSFUL_REDUCES", getSucceededReduces());
tEvent.addInfo("KILLED_MAPS", getKilledMaps());
tEvent.addInfo("KILLED_REDUCES", getKilledReduces());
// TODO replace SUCCEEDED with JobState.SUCCEEDED.toString()
tEvent.addInfo("JOB_STATUS", "SUCCEEDED");
return tEvent;

View File

@ -376,18 +376,24 @@ private void handleTaskStartedEvent(TaskStartedEvent event) {
private void handleJobFailedEvent(JobUnsuccessfulCompletionEvent event) {
info.finishTime = event.getFinishTime();
info.finishedMaps = event.getFinishedMaps();
info.finishedReduces = event.getFinishedReduces();
info.succeededMaps = event.getSucceededMaps();
info.succeededReduces = event.getSucceededReduces();
info.failedMaps = event.getFailedMaps();
info.failedReduces = event.getFailedReduces();
info.killedMaps = event.getKilledMaps();
info.killedReduces = event.getKilledReduces();
info.jobStatus = StringInterner.weakIntern(event.getStatus());
info.errorInfo = StringInterner.weakIntern(event.getDiagnostics());
}
private void handleJobFinishedEvent(JobFinishedEvent event) {
info.finishTime = event.getFinishTime();
info.finishedMaps = event.getFinishedMaps();
info.finishedReduces = event.getFinishedReduces();
info.succeededMaps = event.getSucceededMaps();
info.succeededReduces = event.getSucceededReduces();
info.failedMaps = event.getFailedMaps();
info.failedReduces = event.getFailedReduces();
info.killedMaps = event.getKilledMaps();
info.killedReduces = event.getKilledReduces();
info.totalCounters = event.getTotalCounters();
info.mapCounters = event.getMapCounters();
info.reduceCounters = event.getReduceCounters();
@ -456,8 +462,10 @@ public static class JobInfo {
int totalReduces;
int failedMaps;
int failedReduces;
int finishedMaps;
int finishedReduces;
int succeededMaps;
int succeededReduces;
int killedMaps;
int killedReduces;
String jobStatus;
Counters totalCounters;
Counters mapCounters;
@ -477,7 +485,7 @@ public static class JobInfo {
public JobInfo() {
submitTime = launchTime = finishTime = -1;
totalMaps = totalReduces = failedMaps = failedReduces = 0;
finishedMaps = finishedReduces = 0;
succeededMaps = succeededReduces = 0;
username = jobname = jobConfPath = jobQueueName = "";
tasksMap = new HashMap<TaskID, TaskInfo>();
completedTaskAttemptsMap = new HashMap<TaskAttemptID, TaskAttemptInfo>();
@ -540,10 +548,14 @@ public void printAll() {
public long getFailedMaps() { return failedMaps; }
/** @return the number of failed reduces */
public long getFailedReduces() { return failedReduces; }
/** @return the number of finished maps */
public long getFinishedMaps() { return finishedMaps; }
/** @return the number of finished reduces */
public long getFinishedReduces() { return finishedReduces; }
/** @return the number of killed maps */
public long getKilledMaps() { return killedMaps; }
/** @return the number of killed reduces */
public long getKilledReduces() { return killedReduces; }
/** @return the number of succeeded maps */
public long getSucceededMaps() { return succeededMaps; }
/** @return the number of succeeded reduces */
public long getSucceededReduces() { return succeededReduces; }
/** @return the job status */
public String getJobStatus() { return jobStatus; }
public String getErrorInfo() { return errorInfo; }

View File

@ -49,34 +49,58 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
* Create an event to record unsuccessful completion (killed/failed) of jobs
* @param id Job ID
* @param finishTime Finish time of the job
* @param finishedMaps Number of finished maps
* @param finishedReduces Number of finished reduces
* @param succeededMaps Number of succeeded maps
* @param succeededReduces Number of succeeded reduces
* @param failedMaps Number of failed maps
* @param failedReduces Number of failed reduces
* @param killedMaps Number of killed maps
* @param killedReduces Number of killed reduces
* @param status Status of the job
*/
public JobUnsuccessfulCompletionEvent(JobID id, long finishTime,
int finishedMaps,
int finishedReduces, String status) {
this(id, finishTime, finishedMaps, finishedReduces, status, NODIAGS_LIST);
int succeededMaps,
int succeededReduces,
int failedMaps,
int failedReduces,
int killedMaps,
int killedReduces,
String status) {
this(id, finishTime, succeededMaps, succeededReduces, failedMaps,
failedReduces, killedMaps, killedReduces, status, NODIAGS_LIST);
}
/**
* Create an event to record unsuccessful completion (killed/failed) of jobs
* @param id Job ID
* @param finishTime Finish time of the job
* @param finishedMaps Number of finished maps
* @param finishedReduces Number of finished reduces
* @param succeededMaps Number of finished maps
* @param succeededReduces Number of finished reduces
* @param failedMaps Number of failed maps
* @param failedReduces Number of failed reduces
* @param killedMaps Number of killed maps
* @param killedReduces Number of killed reduces
* @param status Status of the job
* @param diagnostics job runtime diagnostics
*/
public JobUnsuccessfulCompletionEvent(JobID id, long finishTime,
int finishedMaps,
int finishedReduces,
int succeededMaps,
int succeededReduces,
int failedMaps,
int failedReduces,
int killedMaps,
int killedReduces,
String status,
Iterable<String> diagnostics) {
datum.setJobid(new Utf8(id.toString()));
datum.setFinishTime(finishTime);
datum.setFinishedMaps(finishedMaps);
datum.setFinishedReduces(finishedReduces);
// using finishedMaps & finishedReduces in the Avro schema for backward
// compatibility
datum.setFinishedMaps(succeededMaps);
datum.setFinishedReduces(succeededReduces);
datum.setFailedMaps(failedMaps);
datum.setFailedReduces(failedReduces);
datum.setKilledMaps(killedMaps);
datum.setKilledReduces(killedReduces);
datum.setJobStatus(new Utf8(status));
if (diagnostics == null) {
diagnostics = NODIAGS_LIST;
@ -98,10 +122,19 @@ public JobID getJobId() {
}
/** Get the job finish time */
public long getFinishTime() { return datum.getFinishTime(); }
/** Get the number of finished maps */
public int getFinishedMaps() { return datum.getFinishedMaps(); }
/** Get the number of finished reduces */
public int getFinishedReduces() { return datum.getFinishedReduces(); }
/** Get the number of succeeded maps */
public int getSucceededMaps() { return datum.getFinishedMaps(); }
/** Get the number of succeeded reduces */
public int getSucceededReduces() { return datum.getFinishedReduces(); }
/** Get the number of failed maps */
public int getFailedMaps() { return datum.getFailedMaps(); }
/** Get the number of failed reduces */
public int getFailedReduces() { return datum.getFailedReduces(); }
/** Get the number of killed maps */
public int getKilledMaps() { return datum.getKilledMaps(); }
/** Get the number of killed reduces */
public int getKilledReduces() { return datum.getKilledReduces(); }
/** Get the status */
public String getStatus() { return datum.getJobStatus().toString(); }
/** Get the event type */
@ -129,12 +162,19 @@ public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
tEvent.addInfo("FINISH_TIME", getFinishTime());
tEvent.addInfo("NUM_MAPS", getFinishedMaps());
tEvent.addInfo("NUM_REDUCES", getFinishedReduces());
tEvent.addInfo("NUM_MAPS", getSucceededMaps() + getFailedMaps()
+ getKilledMaps());
tEvent.addInfo("NUM_REDUCES", getSucceededReduces() + getFailedReduces()
+ getKilledReduces());
tEvent.addInfo("JOB_STATUS", getStatus());
tEvent.addInfo("DIAGNOSTICS", getDiagnostics());
tEvent.addInfo("FINISHED_MAPS", getFinishedMaps());
tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces());
tEvent.addInfo("SUCCESSFUL_MAPS", getSucceededMaps());
tEvent.addInfo("SUCCESSFUL_REDUCES", getSucceededReduces());
tEvent.addInfo("FAILED_MAPS", getFailedMaps());
tEvent.addInfo("FAILED_REDUCES", getFailedReduces());
tEvent.addInfo("KILLED_MAPS", getKilledMaps());
tEvent.addInfo("KILLED_REDUCES", getKilledReduces());
return tEvent;
}

View File

@ -883,8 +883,8 @@ private static JobHistoryParser.JobInfo createJobInfo() {
job.totalReduces = 1;
job.failedMaps = 1;
job.failedReduces = 0;
job.finishedMaps = 5;
job.finishedReduces = 1;
job.succeededMaps = 5;
job.succeededReduces = 1;
job.jobStatus = JobStatus.State.SUCCEEDED.name();
job.totalCounters = createCounters();
job.mapCounters = createCounters();

View File

@ -56,7 +56,6 @@
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
import org.apache.hadoop.security.UserGroupInformation;
@ -71,7 +70,11 @@
* Data from job history file is loaded lazily.
*/
public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
// Backward compatibility: if the failed or killed map/reduce
// count is -1, that means the value was not recorded
// so we count it as 0
private static final int UNDEFINED_VALUE = -1;
static final Log LOG = LogFactory.getLog(CompletedJob.class);
private final Configuration conf;
private final JobId jobId; //Can be picked from JobInfo with a conversion.
@ -104,12 +107,36 @@ public CompletedJob(Configuration conf, JobId jobId, Path historyFile,
@Override
public int getCompletedMaps() {
return (int) jobInfo.getFinishedMaps();
int killedMaps = (int) jobInfo.getKilledMaps();
int failedMaps = (int) jobInfo.getFailedMaps();
if (killedMaps == UNDEFINED_VALUE) {
killedMaps = 0;
}
if (failedMaps == UNDEFINED_VALUE) {
failedMaps = 0;
}
return (int) (jobInfo.getSucceededMaps() +
killedMaps + failedMaps);
}
@Override
public int getCompletedReduces() {
return (int) jobInfo.getFinishedReduces();
int killedReduces = (int) jobInfo.getKilledReduces();
int failedReduces = (int) jobInfo.getFailedReduces();
if (killedReduces == UNDEFINED_VALUE) {
killedReduces = 0;
}
if (failedReduces == UNDEFINED_VALUE) {
failedReduces = 0;
}
return (int) (jobInfo.getSucceededReduces() +
killedReduces + failedReduces);
}
@Override
@ -481,4 +508,24 @@ public void setJobPriority(Priority priority) {
throw new UnsupportedOperationException(
"Can't set job's priority in history");
}
@Override
public int getFailedMaps() {
return (int) jobInfo.getFailedMaps();
}
@Override
public int getFailedReduces() {
return (int) jobInfo.getFailedReduces();
}
@Override
public int getKilledMaps() {
return (int) jobInfo.getKilledMaps();
}
@Override
public int getKilledReduces() {
return (int) jobInfo.getKilledReduces();
}
}

View File

@ -203,4 +203,23 @@ public void setJobPriority(Priority priority) {
"Can't set job's priority in history");
}
@Override
public int getFailedMaps() {
return -1;
}
@Override
public int getFailedReduces() {
return -1;
}
@Override
public int getKilledMaps() {
return -1;
}
@Override
public int getKilledReduces() {
return -1;
}
}

View File

@ -208,4 +208,24 @@ public void setJobPriority(Priority priority) {
throw new UnsupportedOperationException(
"Can't set job's priority in history");
}
@Override
public int getFailedMaps() {
return -1;
}
@Override
public int getFailedReduces() {
return -1;
}
@Override
public int getKilledMaps() {
return -1;
}
@Override
public int getKilledReduces() {
return -1;
}
}

View File

@ -79,9 +79,12 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.net.DNSToSwitchMapping;
@ -292,7 +295,7 @@ public HistoryEvent answer(InvocationOnMock invocation)
Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps,
numFinishedMaps);
Assert.assertEquals("incorrect finishedReduces ", numReduces,
jobInfo.getFinishedReduces());
jobInfo.getSucceededReduces());
Assert.assertEquals("incorrect uberized ", job.isUber(),
jobInfo.getUberized());
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
@ -379,7 +382,7 @@ public HistoryEvent answer(InvocationOnMock invocation)
private long computeFinishedMaps(JobInfo jobInfo, int numMaps,
int numSuccessfulMaps) {
if (numMaps == numSuccessfulMaps) {
return jobInfo.getFinishedMaps();
return jobInfo.getSucceededMaps();
}
long numFinishedMaps = 0;
@ -458,6 +461,76 @@ public void testHistoryParsingForFailedAttempts() throws Exception {
}
}
@Test(timeout = 30000)
public void testHistoryParsingForKilledAndFailedAttempts() throws Exception {
MRApp app = null;
JobHistory jobHistory = null;
LOG.info("STARTING testHistoryParsingForKilledAndFailedAttempts");
try {
Configuration conf = new Configuration();
conf.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
conf.set(JHAdminConfig.MR_HS_JHIST_FORMAT, "json");
// "CommitterEventHandler" thread could be slower in some cases,
// which might cause a failed map/reduce task to fail the job
// immediately (see JobImpl.checkJobAfterTaskCompletion()). If there are
// killed events in progress, those will not be counted. Instead,
// we allow a 50% failure rate, so the job will always succeed and kill
// events will not be ignored.
conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);
conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 50);
RackResolver.init(conf);
app = new MRAppWithHistoryWithFailedAndKilledTask(3, 3, true, this
.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
jobHistory = new JobHistory();
jobHistory.init(conf);
HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
JobHistoryParser parser;
JobInfo jobInfo;
synchronized (fileInfo) {
Path historyFilePath = fileInfo.getHistoryFile();
FSDataInputStream in = null;
FileContext fc = null;
try {
fc = FileContext.getFileContext(conf);
in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe);
throw (new Exception("Can not open History File"));
}
parser = new JobHistoryParser(in);
jobInfo = parser.parse();
}
Exception parseException = parser.getParseException();
Assert.assertNull("Caught an expected exception " + parseException,
parseException);
assertEquals("FailedMaps", 1, jobInfo.getFailedMaps());
assertEquals("KilledMaps", 1, jobInfo.getKilledMaps());
assertEquals("FailedReduces", 1, jobInfo.getFailedReduces());
assertEquals("KilledReduces", 1, jobInfo.getKilledReduces());
} finally {
LOG.info("FINISHED testHistoryParsingForKilledAndFailedAttempts");
if (app != null) {
app.close();
}
if (jobHistory != null) {
jobHistory.close();
}
}
}
@Test(timeout = 60000)
public void testCountersForFailedTask() throws Exception {
LOG.info("STARTING testCountersForFailedTask");
@ -666,6 +739,40 @@ protected void attemptLaunched(TaskAttemptId attemptID) {
}
}
static class MRAppWithHistoryWithFailedAndKilledTask
extends MRAppWithHistory {
MRAppWithHistoryWithFailedAndKilledTask(int maps, int reduces,
boolean autoComplete, String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
final int taskId = attemptID.getTaskId().getId();
final TaskType taskType = attemptID.getTaskId().getTaskType();
// map #0 --> kill
// reduce #0 --> fail
if (taskType == TaskType.MAP && taskId == 0) {
getContext().getEventHandler().handle(
new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL));
} else if (taskType == TaskType.MAP && taskId == 1) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
} else if (taskType == TaskType.REDUCE && taskId == 0) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
} else if (taskType == TaskType.REDUCE && taskId == 1) {
getContext().getEventHandler().handle(
new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
}
}
}
static class MRAppWithHistoryWithJobKilled extends MRAppWithHistory {
public MRAppWithHistoryWithJobKilled(int maps, int reduces,
@ -864,6 +971,7 @@ public HistoryEvent answer(InvocationOnMock invocation)
if (eventId < 5) {
JobUnsuccessfulCompletionEvent juce =
new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
0, 0, 0, 0,
"JOB_FAILED", Collections.singletonList(
"Task failed: " + tids[0].toString()));
return juce;
@ -907,9 +1015,9 @@ public void testTaskAttemptUnsuccessfulCompletionWithoutCounters203() throws IOE
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getFinishedMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() ) ;
+ jobInfo.getSucceededMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() ) ;
}
/**
@ -925,7 +1033,7 @@ public void testTaskAttemptUnsuccessfulCompletionWithoutCounters240() throws IOE
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getFinishedMaps() + " "
+ jobInfo.getSucceededMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() );
}
@ -943,7 +1051,7 @@ public void testTaskAttemptUnsuccessfulCompletionWithoutCounters0239() throws IO
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getFinishedMaps() + " "
+ jobInfo.getSucceededMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() ) ;
}

View File

@ -424,5 +424,25 @@ public void setQueueName(String queueName) {
@Override
public void setJobPriority(Priority priority) {
}
@Override
public int getFailedMaps() {
return mockJob.getFailedMaps();
}
@Override
public int getFailedReduces() {
return mockJob.getFailedReduces();
}
@Override
public int getKilledMaps() {
return mockJob.getKilledMaps();
}
@Override
public int getKilledReduces() {
return mockJob.getKilledReduces();
}
}
}

View File

@ -223,7 +223,7 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
&& finishedReduces != null) {
return new JobUnsuccessfulCompletionEvent(jobID, Long
.parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
.parseInt(finishedReduces), status);
.parseInt(finishedReduces), -1, -1, -1, -1, status);
}
return null;
@ -256,8 +256,8 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
&& finishedReduces != null) {
return new JobFinishedEvent(jobID, Long.parseLong(finishTime), Integer
.parseInt(finishedMaps), Integer.parseInt(finishedReduces), Integer
.parseInt(failedMaps), Integer.parseInt(failedReduces), null, null,
maybeParseCounters(counters));
.parseInt(failedMaps), Integer.parseInt(failedReduces), -1, -1,
null, null, maybeParseCounters(counters));
}
return null;