svn merge -c 1596295 FIXES: MAPREDUCE-5309. 2.0.4 JobHistoryParser can't parse certain failed job history files generated by 2.0.3 history server. Contributed by Rushabh S Shah
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1596298 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
63740cb27a
commit
fd6d5121e3
@ -94,6 +94,9 @@ Release 2.5.0 - UNRELEASED
|
||||
MAPREDUCE-5814. fat jar with *-default.xml may fail when
|
||||
mapreduce.job.classloader=true. (Gera Shegalov via jlowe)
|
||||
|
||||
MAPREDUCE-5309. 2.0.4 JobHistoryParser can't parse certain failed job
|
||||
history files generated by 2.0.3 history server (Rushabh S Shah via jlowe)
|
||||
|
||||
Release 2.4.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -92,11 +92,11 @@
|
||||
}
|
||||
},
|
||||
{"name": "jobQueueName", "type": "string"},
|
||||
{"name": "workflowId", "type": "string"},
|
||||
{"name": "workflowName", "type": "string"},
|
||||
{"name": "workflowNodeName", "type": "string"},
|
||||
{"name": "workflowAdjacencies", "type": "string"},
|
||||
{"name": "workflowTags", "type": "string"}
|
||||
{"name": "workflowId", "type": ["null","string"], "default": null},
|
||||
{"name": "workflowName", "type": ["null","string"], "default": null},
|
||||
{"name": "workflowNodeName", "type": ["null","string"], "default": null},
|
||||
{"name": "workflowAdjacencies", "type": ["null","string"], "default": null},
|
||||
{"name": "workflowTags", "type": ["null","string"], "default": null}
|
||||
]
|
||||
},
|
||||
|
||||
@ -136,7 +136,7 @@
|
||||
{"name": "finishedMaps", "type": "int"},
|
||||
{"name": "finishedReduces", "type": "int"},
|
||||
{"name": "jobStatus", "type": "string"},
|
||||
{"name": "diagnostics", "type": "string"}
|
||||
{"name": "diagnostics", "type": ["null","string"], "default": null}
|
||||
]
|
||||
},
|
||||
|
||||
@ -205,8 +205,8 @@
|
||||
{"name": "httpPort", "type": "int"},
|
||||
{"name": "shufflePort", "type": "int"},
|
||||
{"name": "containerId", "type": "string"},
|
||||
{"name": "locality", "type": "string"},
|
||||
{"name": "avataar", "type": "string"}
|
||||
{"name": "locality", "type": ["null","string"], "default": null},
|
||||
{"name": "avataar", "type": ["null","string"], "default": null}
|
||||
]
|
||||
},
|
||||
|
||||
@ -221,7 +221,7 @@
|
||||
{"name": "rackname", "type": "string"},
|
||||
{"name": "status", "type": "string"},
|
||||
{"name": "error", "type": "string"},
|
||||
{"name": "counters", "type": "JhCounters"},
|
||||
{"name": "counters", "type": ["null","JhCounters"], "default": null},
|
||||
{"name": "clockSplits", "type": { "type": "array", "items": "int"}},
|
||||
{"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
|
||||
{"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
|
||||
@ -237,7 +237,7 @@
|
||||
{"name": "error", "type": "string"},
|
||||
{"name": "failedDueToAttempt", "type": ["null", "string"] },
|
||||
{"name": "status", "type": "string"},
|
||||
{"name": "counters", "type": "JhCounters"}
|
||||
{"name": "counters", "type": ["null","JhCounters"], "default": null}
|
||||
]
|
||||
},
|
||||
|
||||
@ -248,7 +248,7 @@
|
||||
{"name": "finishTime", "type": "long"},
|
||||
{"name": "status", "type": "string"},
|
||||
{"name": "counters", "type": "JhCounters"},
|
||||
{"name": "successfulAttemptId", "type": "string"}
|
||||
{"name": "successfulAttemptId", "type": ["null","string"], "default": null}
|
||||
]
|
||||
},
|
||||
|
||||
|
@ -35,6 +35,7 @@
|
||||
import org.apache.avro.io.Decoder;
|
||||
import org.apache.avro.io.DecoderFactory;
|
||||
import org.apache.avro.io.DatumReader;
|
||||
import org.apache.avro.specific.SpecificData;
|
||||
import org.apache.avro.specific.SpecificDatumReader;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@ -69,9 +70,10 @@ public EventReader(DataInputStream in) throws IOException {
|
||||
if (!EventWriter.VERSION.equals(version)) {
|
||||
throw new IOException("Incompatible event log version: "+version);
|
||||
}
|
||||
|
||||
|
||||
Schema myschema = new SpecificData(Event.class.getClassLoader()).getSchema(Event.class);
|
||||
this.schema = Schema.parse(in.readLine());
|
||||
this.reader = new SpecificDatumReader(schema);
|
||||
this.reader = new SpecificDatumReader(schema, myschema);
|
||||
this.decoder = DecoderFactory.get().jsonDecoder(schema, in);
|
||||
}
|
||||
|
||||
@ -173,13 +175,15 @@ public void close() throws IOException {
|
||||
|
||||
static Counters fromAvro(JhCounters counters) {
|
||||
Counters result = new Counters();
|
||||
for (JhCounterGroup g : counters.groups) {
|
||||
CounterGroup group =
|
||||
result.addGroup(StringInterner.weakIntern(g.name.toString()),
|
||||
StringInterner.weakIntern(g.displayName.toString()));
|
||||
for (JhCounter c : g.counts) {
|
||||
group.addCounter(StringInterner.weakIntern(c.name.toString()),
|
||||
StringInterner.weakIntern(c.displayName.toString()), c.value);
|
||||
if(counters != null) {
|
||||
for (JhCounterGroup g : counters.groups) {
|
||||
CounterGroup group =
|
||||
result.addGroup(StringInterner.weakIntern(g.name.toString()),
|
||||
StringInterner.weakIntern(g.displayName.toString()));
|
||||
for (JhCounter c : g.counts) {
|
||||
group.addCounter(StringInterner.weakIntern(c.name.toString()),
|
||||
StringInterner.weakIntern(c.displayName.toString()), c.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
@ -288,8 +288,18 @@ private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
|
||||
private void handleTaskAttemptFailedEvent(
|
||||
TaskAttemptUnsuccessfulCompletionEvent event) {
|
||||
TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
|
||||
if(taskInfo == null) {
|
||||
LOG.warn("TaskInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
|
||||
+ " taskId: " + event.getTaskId().toString());
|
||||
return;
|
||||
}
|
||||
TaskAttemptInfo attemptInfo =
|
||||
taskInfo.attemptsMap.get(event.getTaskAttemptId());
|
||||
if(attemptInfo == null) {
|
||||
LOG.warn("AttemptInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
|
||||
+ " taskAttemptId: " + event.getTaskAttemptId().toString());
|
||||
return;
|
||||
}
|
||||
attemptInfo.finishTime = event.getFinishTime();
|
||||
attemptInfo.error = StringInterner.weakIntern(event.getError());
|
||||
attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus());
|
||||
|
@ -886,4 +886,58 @@ public void testFailedJobHistoryWithoutDiagnostics() throws Exception {
|
||||
fsdis.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test compatibility of JobHistoryParser with 2.0.3-alpha history files
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters203() throws IOException
|
||||
{
|
||||
Path histPath = new Path(getClass().getClassLoader().getResource(
|
||||
"job_2.0.3-alpha-FAILED.jhist").getFile());
|
||||
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
|
||||
(new Configuration()), histPath);
|
||||
JobInfo jobInfo = parser.parse();
|
||||
LOG.info(" job info: " + jobInfo.getJobname() + " "
|
||||
+ jobInfo.getFinishedMaps() + " "
|
||||
+ jobInfo.getTotalMaps() + " "
|
||||
+ jobInfo.getJobId() ) ;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test compatibility of JobHistoryParser with 2.4.0 history files
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters240() throws IOException
|
||||
{
|
||||
Path histPath = new Path(getClass().getClassLoader().getResource(
|
||||
"job_2.4.0-FAILED.jhist").getFile());
|
||||
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
|
||||
(new Configuration()), histPath);
|
||||
JobInfo jobInfo = parser.parse();
|
||||
LOG.info(" job info: " + jobInfo.getJobname() + " "
|
||||
+ jobInfo.getFinishedMaps() + " "
|
||||
+ jobInfo.getTotalMaps() + " "
|
||||
+ jobInfo.getJobId() );
|
||||
}
|
||||
|
||||
/**
|
||||
* Test compatibility of JobHistoryParser with 0.23.9 history files
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters0239() throws IOException
|
||||
{
|
||||
Path histPath = new Path(getClass().getClassLoader().getResource(
|
||||
"job_0.23.9-FAILED.jhist").getFile());
|
||||
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
|
||||
(new Configuration()), histPath);
|
||||
JobInfo jobInfo = parser.parse();
|
||||
LOG.info(" job info: " + jobInfo.getJobname() + " "
|
||||
+ jobInfo.getFinishedMaps() + " "
|
||||
+ jobInfo.getTotalMaps() + " "
|
||||
+ jobInfo.getJobId() ) ;
|
||||
}
|
||||
}
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
x
Reference in New Issue
Block a user