MAPREDUCE-6376. Add avro binary support for jhist files. Contributed by Ray Chiang

(cherry picked from commit 2ac87df578)

Conflicts:

	hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
This commit is contained in:
Jason Lowe 2015-07-01 16:07:54 +00:00
parent c343250657
commit df4e1e4965
7 changed files with 69 additions and 13 deletions

View File

@ -102,6 +102,9 @@ Release 2.8.0 - UNRELEASED
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via
jlowe)
BUG FIXES BUG FIXES
MAPREDUCE-6314. TestPipeApplication fails on trunk. MAPREDUCE-6314. TestPipeApplication fails on trunk.

View File

@ -105,7 +105,8 @@ public class JobHistoryEventHandler extends AbstractService
private int numUnflushedCompletionEvents = 0; private int numUnflushedCompletionEvents = 0;
private boolean isTimerActive; private boolean isTimerActive;
private EventWriter.WriteMode jhistMode =
EventWriter.WriteMode.JSON;
protected BlockingQueue<JobHistoryEvent> eventQueue = protected BlockingQueue<JobHistoryEvent> eventQueue =
new LinkedBlockingQueue<JobHistoryEvent>(); new LinkedBlockingQueue<JobHistoryEvent>();
@ -260,6 +261,20 @@ public class JobHistoryEventHandler extends AbstractService
LOG.info("Emitting job history data to the timeline server is not enabled"); LOG.info("Emitting job history data to the timeline server is not enabled");
} }
// Flag for setting
String jhistFormat = conf.get(JHAdminConfig.MR_HS_JHIST_FORMAT,
JHAdminConfig.DEFAULT_MR_HS_JHIST_FORMAT);
if (jhistFormat.equals("json")) {
jhistMode = EventWriter.WriteMode.JSON;
} else if (jhistFormat.equals("binary")) {
jhistMode = EventWriter.WriteMode.BINARY;
} else {
LOG.warn("Unrecognized value '" + jhistFormat + "' for property " +
JHAdminConfig.MR_HS_JHIST_FORMAT + ". Valid values are " +
"'json' or 'binary'. Falling back to default value '" +
JHAdminConfig.DEFAULT_MR_HS_JHIST_FORMAT + "'.");
}
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -418,7 +433,7 @@ public class JobHistoryEventHandler extends AbstractService
protected EventWriter createEventWriter(Path historyFilePath) protected EventWriter createEventWriter(Path historyFilePath)
throws IOException { throws IOException {
FSDataOutputStream out = stagingDirFS.create(historyFilePath, true); FSDataOutputStream out = stagingDirFS.create(historyFilePath, true);
return new EventWriter(out); return new EventWriter(out, this.jhistMode);
} }
/** /**

View File

@ -190,7 +190,8 @@ public class TestEvents {
ByteArrayOutputStream output = new ByteArrayOutputStream(); ByteArrayOutputStream output = new ByteArrayOutputStream();
FSDataOutputStream fsOutput = new FSDataOutputStream(output, FSDataOutputStream fsOutput = new FSDataOutputStream(output,
new FileSystem.Statistics("scheme")); new FileSystem.Statistics("scheme"));
EventWriter writer = new EventWriter(fsOutput); EventWriter writer = new EventWriter(fsOutput,
EventWriter.WriteMode.JSON);
writer.write(getJobPriorityChangedEvent()); writer.write(getJobPriorityChangedEvent());
writer.write(getJobStatusChangedEvent()); writer.write(getJobStatusChangedEvent());
writer.write(getTaskUpdatedEvent()); writer.write(getTaskUpdatedEvent());

View File

@ -221,4 +221,11 @@ public class JHAdminConfig {
+ "jobname.limit"; + "jobname.limit";
public static final int DEFAULT_MR_HS_JOBNAME_LIMIT = 50; public static final int DEFAULT_MR_HS_JOBNAME_LIMIT = 50;
/**
* Settings for .jhist file format.
*/
public static final String MR_HS_JHIST_FORMAT =
MR_HISTORY_PREFIX + "jhist.format";
public static final String DEFAULT_MR_HS_JHIST_FORMAT =
"json";
} }

View File

@ -67,15 +67,17 @@ public class EventReader implements Closeable {
this.in = in; this.in = in;
this.version = in.readLine(); this.version = in.readLine();
if (!EventWriter.VERSION.equals(version)) {
throw new IOException("Incompatible event log version: "+version);
}
Schema myschema = new SpecificData(Event.class.getClassLoader()).getSchema(Event.class); Schema myschema = new SpecificData(Event.class.getClassLoader()).getSchema(Event.class);
Schema.Parser parser = new Schema.Parser(); Schema.Parser parser = new Schema.Parser();
this.schema = parser.parse(in.readLine()); this.schema = parser.parse(in.readLine());
this.reader = new SpecificDatumReader(schema, myschema); this.reader = new SpecificDatumReader(schema, myschema);
if (EventWriter.VERSION.equals(version)) {
this.decoder = DecoderFactory.get().jsonDecoder(schema, in); this.decoder = DecoderFactory.get().jsonDecoder(schema, in);
} else if (EventWriter.VERSION_BINARY.equals(version)) {
this.decoder = DecoderFactory.get().binaryDecoder(in, null);
} else {
throw new IOException("Incompatible event log version: " + version);
}
} }
/** /**

View File

@ -43,21 +43,38 @@ import org.apache.hadoop.mapreduce.Counters;
*/ */
class EventWriter { class EventWriter {
static final String VERSION = "Avro-Json"; static final String VERSION = "Avro-Json";
static final String VERSION_BINARY = "Avro-Binary";
private FSDataOutputStream out; private FSDataOutputStream out;
private DatumWriter<Event> writer = private DatumWriter<Event> writer =
new SpecificDatumWriter<Event>(Event.class); new SpecificDatumWriter<Event>(Event.class);
private Encoder encoder; private Encoder encoder;
private static final Log LOG = LogFactory.getLog(EventWriter.class); private static final Log LOG = LogFactory.getLog(EventWriter.class);
public enum WriteMode { JSON, BINARY }
private final WriteMode writeMode;
private final boolean jsonOutput; // Cache value while we have 2 modes
EventWriter(FSDataOutputStream out) throws IOException { EventWriter(FSDataOutputStream out, WriteMode mode) throws IOException {
this.out = out; this.out = out;
this.writeMode = mode;
if (this.writeMode==WriteMode.JSON) {
this.jsonOutput = true;
out.writeBytes(VERSION); out.writeBytes(VERSION);
} else if (this.writeMode==WriteMode.BINARY) {
this.jsonOutput = false;
out.writeBytes(VERSION_BINARY);
} else {
throw new IOException("Unknown mode: " + mode);
}
out.writeBytes("\n"); out.writeBytes("\n");
out.writeBytes(Event.SCHEMA$.toString()); out.writeBytes(Event.SCHEMA$.toString());
out.writeBytes("\n"); out.writeBytes("\n");
if (!this.jsonOutput) {
this.encoder = EncoderFactory.get().binaryEncoder(out, null);
} else {
this.encoder = EncoderFactory.get().jsonEncoder(Event.SCHEMA$, out); this.encoder = EncoderFactory.get().jsonEncoder(Event.SCHEMA$, out);
} }
}
synchronized void write(HistoryEvent event) throws IOException { synchronized void write(HistoryEvent event) throws IOException {
Event wrapper = new Event(); Event wrapper = new Event();
@ -65,8 +82,10 @@ class EventWriter {
wrapper.setEvent(event.getDatum()); wrapper.setEvent(event.getDatum());
writer.write(wrapper, encoder); writer.write(wrapper, encoder);
encoder.flush(); encoder.flush();
if (this.jsonOutput) {
out.writeBytes("\n"); out.writeBytes("\n");
} }
}
void flush() throws IOException { void flush() throws IOException {
encoder.flush(); encoder.flush();

View File

@ -2167,6 +2167,15 @@
</description> </description>
</property> </property>
<property>
<description>
File format the AM will use when generating the .jhist file. Valid
values are "json" for text output and "binary" for faster parsing.
</description>
<name>mapreduce.jobhistory.jhist.format</name>
<value>json</value>
</property>
<property> <property>
<name>yarn.app.mapreduce.am.containerlauncher.threadpool-initial-size</name> <name>yarn.app.mapreduce.am.containerlauncher.threadpool-initial-size</name>
<value>10</value> <value>10</value>