MAPREDUCE-6018. Added an MR specific config to enable emitting job history data to the timeline server. Contributed by Robert Kanter.
(cherry picked from commit 971e91c8c0
)
This commit is contained in:
parent
733380fa70
commit
52695d38fc
|
@ -61,6 +61,9 @@ Release 2.6.0 - UNRELEASED
|
|||
MAPREDUCE-5970. Provide a boolean switch to enable MR-AM profiling (Gera
|
||||
Shegalov via jlowe)
|
||||
|
||||
MAPREDUCE-6018. Added an MR specific config to enable emitting job history
|
||||
data to the timeline server. (Robert Kanter via zjshen)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -241,8 +241,14 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
|
||||
MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
|
||||
|
||||
timelineClient = TimelineClient.createTimelineClient();
|
||||
timelineClient.init(conf);
|
||||
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
|
||||
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
|
||||
timelineClient = TimelineClient.createTimelineClient();
|
||||
timelineClient.init(conf);
|
||||
LOG.info("Emitting job history data to the timeline server is enabled");
|
||||
} else {
|
||||
LOG.info("Emitting job history data to the timeline server is not enabled");
|
||||
}
|
||||
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
@ -268,7 +274,9 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
timelineClient.start();
|
||||
if (timelineClient != null) {
|
||||
timelineClient.start();
|
||||
}
|
||||
eventHandlingThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -537,7 +545,7 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
// For all events
|
||||
// (1) Write it out
|
||||
// (2) Process it for JobSummary
|
||||
// (3) Process it for ATS
|
||||
// (3) Process it for ATS (if enabled)
|
||||
MetaInfo mi = fileMap.get(event.getJobID());
|
||||
try {
|
||||
HistoryEvent historyEvent = event.getHistoryEvent();
|
||||
|
@ -546,8 +554,10 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
}
|
||||
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
|
||||
event.getJobID());
|
||||
processEventForTimelineServer(historyEvent, event.getJobID(),
|
||||
event.getTimestamp());
|
||||
if (timelineClient != null) {
|
||||
processEventForTimelineServer(historyEvent, event.getJobID(),
|
||||
event.getTimestamp());
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("In HistoryEventHandler "
|
||||
+ event.getHistoryEvent().getEventType());
|
||||
|
@ -839,8 +849,8 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
|
||||
tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
|
||||
tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
|
||||
tfe2.getSuccessfulTaskAttemptId() == null ?
|
||||
"" : tfe2.getSuccessfulTaskAttemptId().toString());
|
||||
tfe2.getSuccessfulTaskAttemptId() == null ?
|
||||
"" : tfe2.getSuccessfulTaskAttemptId().toString());
|
||||
tEntity.addEvent(tEvent);
|
||||
tEntity.setEntityId(tfe2.getTaskId().toString());
|
||||
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
|
||||
|
|
|
@ -378,6 +378,11 @@ public interface MRJobConfig {
|
|||
public static final String JOB_UBERTASK_MAXBYTES =
|
||||
"mapreduce.job.ubertask.maxbytes";
|
||||
|
||||
public static final String MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
|
||||
"mapreduce.job.emit-timeline-data";
|
||||
public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
|
||||
false;
|
||||
|
||||
public static final String MR_PREFIX = "yarn.app.mapreduce.";
|
||||
|
||||
public static final String MR_AM_PREFIX = MR_PREFIX + "am.";
|
||||
|
|
|
@ -833,6 +833,14 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mapreduce.job.emit-timeline-data</name>
|
||||
<value>false</value>
|
||||
<description>Specifies if the Application Master should emit timeline data
|
||||
to the timeline server. Individual jobs can override this value.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mapreduce.input.fileinputformat.split.minsize</name>
|
||||
<value>0</value>
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.EventType;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler;
|
||||
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
||||
|
@ -85,4 +86,83 @@ public class TestMRTimelineEventHandling {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMapreduceJobTimelineServiceEnabled()
|
||||
throws Exception {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
|
||||
MiniMRYarnCluster cluster = null;
|
||||
try {
|
||||
cluster = new MiniMRYarnCluster(
|
||||
TestJobHistoryEventHandler.class.getSimpleName(), 1, true);
|
||||
cluster.init(conf);
|
||||
cluster.start();
|
||||
TimelineStore ts = cluster.getApplicationHistoryServer()
|
||||
.getTimelineStore();
|
||||
|
||||
Path inDir = new Path("input");
|
||||
Path outDir = new Path("output");
|
||||
RunningJob job =
|
||||
UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
|
||||
Assert.assertEquals(JobStatus.SUCCEEDED,
|
||||
job.getJobStatus().getState().getValue());
|
||||
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
|
||||
null, null, null, null, null, null);
|
||||
Assert.assertEquals(0, entities.getEntities().size());
|
||||
|
||||
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
|
||||
job = UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
|
||||
Assert.assertEquals(JobStatus.SUCCEEDED,
|
||||
job.getJobStatus().getState().getValue());
|
||||
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
|
||||
null, null, null);
|
||||
Assert.assertEquals(1, entities.getEntities().size());
|
||||
TimelineEntity tEntity = entities.getEntities().get(0);
|
||||
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.stop();
|
||||
}
|
||||
}
|
||||
|
||||
conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
|
||||
cluster = null;
|
||||
try {
|
||||
cluster = new MiniMRYarnCluster(
|
||||
TestJobHistoryEventHandler.class.getSimpleName(), 1, true);
|
||||
cluster.init(conf);
|
||||
cluster.start();
|
||||
TimelineStore ts = cluster.getApplicationHistoryServer()
|
||||
.getTimelineStore();
|
||||
|
||||
Path inDir = new Path("input");
|
||||
Path outDir = new Path("output");
|
||||
|
||||
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
|
||||
RunningJob job =
|
||||
UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
|
||||
Assert.assertEquals(JobStatus.SUCCEEDED,
|
||||
job.getJobStatus().getState().getValue());
|
||||
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
|
||||
null, null, null, null, null, null);
|
||||
Assert.assertEquals(0, entities.getEntities().size());
|
||||
|
||||
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
|
||||
job = UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
|
||||
Assert.assertEquals(JobStatus.SUCCEEDED,
|
||||
job.getJobStatus().getState().getValue());
|
||||
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
|
||||
null, null, null);
|
||||
Assert.assertEquals(1, entities.getEntities().size());
|
||||
TimelineEntity tEntity = entities.getEntities().get(0);
|
||||
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue