MAPREDUCE-5309. 2.0.4 JobHistoryParser can't parse certain failed job history files generated by 2.0.3 history server. Contributed by Rushabh S Shah

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1596295 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2014-05-20 15:46:54 +00:00
parent 0796bddd39
commit 1f8bb1001a
8 changed files with 203 additions and 21 deletions

View File

@ -236,6 +236,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5814. fat jar with *-default.xml may fail when
mapreduce.job.classloader=true. (Gera Shegalov via jlowe)
MAPREDUCE-5309. 2.0.4 JobHistoryParser can't parse certain failed job
history files generated by 2.0.3 history server (Rushabh S Shah via jlowe)
Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -92,11 +92,11 @@
}
},
{"name": "jobQueueName", "type": "string"},
{"name": "workflowId", "type": "string"},
{"name": "workflowName", "type": "string"},
{"name": "workflowNodeName", "type": "string"},
{"name": "workflowAdjacencies", "type": "string"},
{"name": "workflowTags", "type": "string"}
{"name": "workflowId", "type": ["null","string"], "default": null},
{"name": "workflowName", "type": ["null","string"], "default": null},
{"name": "workflowNodeName", "type": ["null","string"], "default": null},
{"name": "workflowAdjacencies", "type": ["null","string"], "default": null},
{"name": "workflowTags", "type": ["null","string"], "default": null}
]
},
@ -136,7 +136,7 @@
{"name": "finishedMaps", "type": "int"},
{"name": "finishedReduces", "type": "int"},
{"name": "jobStatus", "type": "string"},
{"name": "diagnostics", "type": "string"}
{"name": "diagnostics", "type": ["null","string"], "default": null}
]
},
@ -205,8 +205,8 @@
{"name": "httpPort", "type": "int"},
{"name": "shufflePort", "type": "int"},
{"name": "containerId", "type": "string"},
{"name": "locality", "type": "string"},
{"name": "avataar", "type": "string"}
{"name": "locality", "type": ["null","string"], "default": null},
{"name": "avataar", "type": ["null","string"], "default": null}
]
},
@ -221,7 +221,7 @@
{"name": "rackname", "type": "string"},
{"name": "status", "type": "string"},
{"name": "error", "type": "string"},
{"name": "counters", "type": "JhCounters"},
{"name": "counters", "type": ["null","JhCounters"], "default": null},
{"name": "clockSplits", "type": { "type": "array", "items": "int"}},
{"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
{"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
@ -237,7 +237,7 @@
{"name": "error", "type": "string"},
{"name": "failedDueToAttempt", "type": ["null", "string"] },
{"name": "status", "type": "string"},
{"name": "counters", "type": "JhCounters"}
{"name": "counters", "type": ["null","JhCounters"], "default": null}
]
},
@ -248,7 +248,7 @@
{"name": "finishTime", "type": "long"},
{"name": "status", "type": "string"},
{"name": "counters", "type": "JhCounters"},
{"name": "successfulAttemptId", "type": "string"}
{"name": "successfulAttemptId", "type": ["null","string"], "default": null}
]
},

View File

@ -35,6 +35,7 @@ import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
@InterfaceAudience.Private
@ -69,9 +70,10 @@ public class EventReader implements Closeable {
if (!EventWriter.VERSION.equals(version)) {
throw new IOException("Incompatible event log version: "+version);
}
Schema myschema = new SpecificData(Event.class.getClassLoader()).getSchema(Event.class);
this.schema = Schema.parse(in.readLine());
this.reader = new SpecificDatumReader(schema);
this.reader = new SpecificDatumReader(schema, myschema);
this.decoder = DecoderFactory.get().jsonDecoder(schema, in);
}
@ -173,13 +175,15 @@ public class EventReader implements Closeable {
static Counters fromAvro(JhCounters counters) {
Counters result = new Counters();
for (JhCounterGroup g : counters.groups) {
CounterGroup group =
result.addGroup(StringInterner.weakIntern(g.name.toString()),
StringInterner.weakIntern(g.displayName.toString()));
for (JhCounter c : g.counts) {
group.addCounter(StringInterner.weakIntern(c.name.toString()),
StringInterner.weakIntern(c.displayName.toString()), c.value);
if(counters != null) {
for (JhCounterGroup g : counters.groups) {
CounterGroup group =
result.addGroup(StringInterner.weakIntern(g.name.toString()),
StringInterner.weakIntern(g.displayName.toString()));
for (JhCounter c : g.counts) {
group.addCounter(StringInterner.weakIntern(c.name.toString()),
StringInterner.weakIntern(c.displayName.toString()), c.value);
}
}
}
return result;

View File

@ -288,8 +288,18 @@ public class JobHistoryParser implements HistoryEventHandler {
private void handleTaskAttemptFailedEvent(
TaskAttemptUnsuccessfulCompletionEvent event) {
TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
if(taskInfo == null) {
LOG.warn("TaskInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
+ " taskId: " + event.getTaskId().toString());
return;
}
TaskAttemptInfo attemptInfo =
taskInfo.attemptsMap.get(event.getTaskAttemptId());
if(attemptInfo == null) {
LOG.warn("AttemptInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
+ " taskAttemptId: " + event.getTaskAttemptId().toString());
return;
}
attemptInfo.finishTime = event.getFinishTime();
attemptInfo.error = StringInterner.weakIntern(event.getError());
attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus());

View File

@ -891,4 +891,58 @@ public class TestJobHistoryParsing {
fsdis.close();
}
}
}
/**
* Test compatibility of JobHistoryParser with 2.0.3-alpha history files
* @throws IOException
*/
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters203() throws IOException
{
Path histPath = new Path(getClass().getClassLoader().getResource(
"job_2.0.3-alpha-FAILED.jhist").getFile());
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getFinishedMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() ) ;
}
/**
* Test compatibility of JobHistoryParser with 2.4.0 history files
* @throws IOException
*/
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters240() throws IOException
{
Path histPath = new Path(getClass().getClassLoader().getResource(
"job_2.4.0-FAILED.jhist").getFile());
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getFinishedMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() );
}
/**
* Test compatibility of JobHistoryParser with 0.23.9 history files
* @throws IOException
*/
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters0239() throws IOException
{
Path histPath = new Path(getClass().getClassLoader().getResource(
"job_0.23.9-FAILED.jhist").getFile());
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getFinishedMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() ) ;
}
}