diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index fd82d495c42..9c2f16e93ee 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -280,6 +280,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5970. Provide a boolean switch to enable MR-AM profiling (Gera
Shegalov via jlowe)
+ MAPREDUCE-6018. Added an MR specific config to enable emitting job history
+ data to the timeline server. (Robert Kanter via zjshen)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index ebedc1b40f0..184baaa3ff6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -241,8 +241,14 @@ public class JobHistoryEventHandler extends AbstractService
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
- timelineClient = TimelineClient.createTimelineClient();
- timelineClient.init(conf);
+ if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
+ MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
+ timelineClient = TimelineClient.createTimelineClient();
+ timelineClient.init(conf);
+ LOG.info("Emitting job history data to the timeline server is enabled");
+ } else {
+ LOG.info("Emitting job history data to the timeline server is not enabled");
+ }
super.serviceInit(conf);
}
@@ -268,7 +274,9 @@ public class JobHistoryEventHandler extends AbstractService
@Override
protected void serviceStart() throws Exception {
- timelineClient.start();
+ if (timelineClient != null) {
+ timelineClient.start();
+ }
eventHandlingThread = new Thread(new Runnable() {
@Override
public void run() {
@@ -537,7 +545,7 @@ public class JobHistoryEventHandler extends AbstractService
// For all events
// (1) Write it out
// (2) Process it for JobSummary
- // (3) Process it for ATS
+ // (3) Process it for ATS (if enabled)
MetaInfo mi = fileMap.get(event.getJobID());
try {
HistoryEvent historyEvent = event.getHistoryEvent();
@@ -546,8 +554,10 @@ public class JobHistoryEventHandler extends AbstractService
}
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID());
- processEventForTimelineServer(historyEvent, event.getJobID(),
- event.getTimestamp());
+ if (timelineClient != null) {
+ processEventForTimelineServer(historyEvent, event.getJobID(),
+ event.getTimestamp());
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("In HistoryEventHandler "
+ event.getHistoryEvent().getEventType());
@@ -839,8 +849,8 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
- tfe2.getSuccessfulTaskAttemptId() == null ?
- "" : tfe2.getSuccessfulTaskAttemptId().toString());
+ tfe2.getSuccessfulTaskAttemptId() == null ?
+ "" : tfe2.getSuccessfulTaskAttemptId().toString());
tEntity.addEvent(tEvent);
tEntity.setEntityId(tfe2.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 689e51c77b9..5476e9237e5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -383,6 +383,11 @@ public interface MRJobConfig {
public static final String JOB_UBERTASK_MAXBYTES =
"mapreduce.job.ubertask.maxbytes";
+ public static final String MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
+ "mapreduce.job.emit-timeline-data";
+ public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
+ false;
+
public static final String MR_PREFIX = "yarn.app.mapreduce.";
public static final String MR_AM_PREFIX = MR_PREFIX + "am.";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 1a8607167b6..6be62ec781e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -568,6 +568,14 @@
+
+ mapreduce.job.emit-timeline-data
+ false
+ Specifies if the Application Master should emit timeline data
+ to the timeline server. Individual jobs can override this value.
+
+
+
mapreduce.input.fileinputformat.split.minsize
0
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
index 2352818ccc7..7b8d6df4a2b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
@@ -85,4 +86,83 @@ public class TestMRTimelineEventHandling {
}
}
+ @Test
+ public void testMapreduceJobTimelineServiceEnabled()
+ throws Exception {
+ Configuration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
+ MiniMRYarnCluster cluster = null;
+ try {
+ cluster = new MiniMRYarnCluster(
+ TestJobHistoryEventHandler.class.getSimpleName(), 1, true);
+ cluster.init(conf);
+ cluster.start();
+ TimelineStore ts = cluster.getApplicationHistoryServer()
+ .getTimelineStore();
+
+ Path inDir = new Path("input");
+ Path outDir = new Path("output");
+ RunningJob job =
+ UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+ Assert.assertEquals(JobStatus.SUCCEEDED,
+ job.getJobStatus().getState().getValue());
+ TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
+ null, null, null, null, null, null);
+ Assert.assertEquals(0, entities.getEntities().size());
+
+ conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
+ job = UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+ Assert.assertEquals(JobStatus.SUCCEEDED,
+ job.getJobStatus().getState().getValue());
+ entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
+ null, null, null);
+ Assert.assertEquals(1, entities.getEntities().size());
+ TimelineEntity tEntity = entities.getEntities().get(0);
+ Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
+ } finally {
+ if (cluster != null) {
+ cluster.stop();
+ }
+ }
+
+ conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
+ cluster = null;
+ try {
+ cluster = new MiniMRYarnCluster(
+ TestJobHistoryEventHandler.class.getSimpleName(), 1, true);
+ cluster.init(conf);
+ cluster.start();
+ TimelineStore ts = cluster.getApplicationHistoryServer()
+ .getTimelineStore();
+
+ Path inDir = new Path("input");
+ Path outDir = new Path("output");
+
+ conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false);
+ RunningJob job =
+ UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+ Assert.assertEquals(JobStatus.SUCCEEDED,
+ job.getJobStatus().getState().getValue());
+ TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
+ null, null, null, null, null, null);
+ Assert.assertEquals(0, entities.getEntities().size());
+
+ conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
+ job = UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+ Assert.assertEquals(JobStatus.SUCCEEDED,
+ job.getJobStatus().getState().getValue());
+ entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
+ null, null, null);
+ Assert.assertEquals(1, entities.getEntities().size());
+ TimelineEntity tEntity = entities.getEntities().get(0);
+ Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
+ } finally {
+ if (cluster != null) {
+ cluster.stop();
+ }
+ }
+ }
}