MAPREDUCE-6018. Added an MR specific config to enable emitting job history data to the timeline server. Contributed by Robert Kanter.

This commit is contained in:
Zhijie Shen 2014-10-27 21:03:53 -07:00
parent 6b2f11b54b
commit 971e91c8c0
5 changed files with 114 additions and 8 deletions

View File

@ -280,6 +280,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5970. Provide a boolean switch to enable MR-AM profiling (Gera MAPREDUCE-5970. Provide a boolean switch to enable MR-AM profiling (Gera
Shegalov via jlowe) Shegalov via jlowe)
MAPREDUCE-6018. Added an MR specific config to enable emitting job history
data to the timeline server. (Robert Kanter via zjshen)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -241,8 +241,14 @@ public class JobHistoryEventHandler extends AbstractService
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD); MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
timelineClient = TimelineClient.createTimelineClient(); if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
timelineClient.init(conf); MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
timelineClient = TimelineClient.createTimelineClient();
timelineClient.init(conf);
LOG.info("Emitting job history data to the timeline server is enabled");
} else {
LOG.info("Emitting job history data to the timeline server is not enabled");
}
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -268,7 +274,9 @@ public class JobHistoryEventHandler extends AbstractService
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
timelineClient.start(); if (timelineClient != null) {
timelineClient.start();
}
eventHandlingThread = new Thread(new Runnable() { eventHandlingThread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -537,7 +545,7 @@ public class JobHistoryEventHandler extends AbstractService
// For all events // For all events
// (1) Write it out // (1) Write it out
// (2) Process it for JobSummary // (2) Process it for JobSummary
// (3) Process it for ATS // (3) Process it for ATS (if enabled)
MetaInfo mi = fileMap.get(event.getJobID()); MetaInfo mi = fileMap.get(event.getJobID());
try { try {
HistoryEvent historyEvent = event.getHistoryEvent(); HistoryEvent historyEvent = event.getHistoryEvent();
@ -546,8 +554,10 @@ public class JobHistoryEventHandler extends AbstractService
} }
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID()); event.getJobID());
processEventForTimelineServer(historyEvent, event.getJobID(), if (timelineClient != null) {
event.getTimestamp()); processEventForTimelineServer(historyEvent, event.getJobID(),
event.getTimestamp());
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("In HistoryEventHandler " LOG.debug("In HistoryEventHandler "
+ event.getHistoryEvent().getEventType()); + event.getHistoryEvent().getEventType());
@ -839,8 +849,8 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime()); tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString()); tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID", tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
tfe2.getSuccessfulTaskAttemptId() == null ? tfe2.getSuccessfulTaskAttemptId() == null ?
"" : tfe2.getSuccessfulTaskAttemptId().toString()); "" : tfe2.getSuccessfulTaskAttemptId().toString());
tEntity.addEvent(tEvent); tEntity.addEvent(tEvent);
tEntity.setEntityId(tfe2.getTaskId().toString()); tEntity.setEntityId(tfe2.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);

View File

@ -383,6 +383,11 @@ public interface MRJobConfig {
public static final String JOB_UBERTASK_MAXBYTES = public static final String JOB_UBERTASK_MAXBYTES =
"mapreduce.job.ubertask.maxbytes"; "mapreduce.job.ubertask.maxbytes";
public static final String MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
"mapreduce.job.emit-timeline-data";
public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
false;
public static final String MR_PREFIX = "yarn.app.mapreduce."; public static final String MR_PREFIX = "yarn.app.mapreduce.";
public static final String MR_AM_PREFIX = MR_PREFIX + "am."; public static final String MR_AM_PREFIX = MR_PREFIX + "am.";

View File

@ -568,6 +568,14 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.job.emit-timeline-data</name>
<value>false</value>
<description>Specifies if the Application Master should emit timeline data
to the timeline server. Individual jobs can override this value.
</description>
</property>
<property> <property>
<name>mapreduce.input.fileinputformat.split.minsize</name> <name>mapreduce.input.fileinputformat.split.minsize</name>
<value>0</value> <value>0</value>

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.EventType; import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
@ -85,4 +86,83 @@ public class TestMRTimelineEventHandling {
} }
} }
@Test
public void testMapreduceJobTimelineServiceEnabled()
throws Exception {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
MiniMRYarnCluster cluster = null;
try {
cluster = new MiniMRYarnCluster(
TestJobHistoryEventHandler.class.getSimpleName(), 1, true);
cluster.init(conf);
cluster.start();
TimelineStore ts = cluster.getApplicationHistoryServer()
.getTimelineStore();
Path inDir = new Path("input");
Path outDir = new Path("output");
RunningJob job =
UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
Assert.assertEquals(JobStatus.SUCCEEDED,
job.getJobStatus().getState().getValue());
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
null, null, null, null, null, null);
Assert.assertEquals(0, entities.getEntities().size());
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
job = UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
Assert.assertEquals(JobStatus.SUCCEEDED,
job.getJobStatus().getState().getValue());
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
TimelineEntity tEntity = entities.getEntities().get(0);
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
} finally {
if (cluster != null) {
cluster.stop();
}
}
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
cluster = null;
try {
cluster = new MiniMRYarnCluster(
TestJobHistoryEventHandler.class.getSimpleName(), 1, true);
cluster.init(conf);
cluster.start();
TimelineStore ts = cluster.getApplicationHistoryServer()
.getTimelineStore();
Path inDir = new Path("input");
Path outDir = new Path("output");
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
RunningJob job =
UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
Assert.assertEquals(JobStatus.SUCCEEDED,
job.getJobStatus().getState().getValue());
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
null, null, null, null, null, null);
Assert.assertEquals(0, entities.getEntities().size());
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
job = UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
Assert.assertEquals(JobStatus.SUCCEEDED,
job.getJobStatus().getState().getValue());
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
TimelineEntity tEntity = entities.getEntities().get(0);
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
} finally {
if (cluster != null) {
cluster.stop();
}
}
}
} }