From 52695d38fcf40b8ee52077aaa15534676351d6e0 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Mon, 27 Oct 2014 21:03:53 -0700 Subject: [PATCH] MAPREDUCE-6018. Added an MR specific config to enable emitting job history data to the timeline server. Contributed by Robert Kanter. (cherry picked from commit 971e91c8c03a23e4613ed3f071b4f982ee5a1b63) --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../jobhistory/JobHistoryEventHandler.java | 26 ++++-- .../apache/hadoop/mapreduce/MRJobConfig.java | 5 ++ .../src/main/resources/mapred-default.xml | 8 ++ .../mapred/TestMRTimelineEventHandling.java | 80 +++++++++++++++++++ 5 files changed, 114 insertions(+), 8 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8738bc517e0..a181bb17dfa 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -61,6 +61,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5970. Provide a boolean switch to enable MR-AM profiling (Gera Shegalov via jlowe) + MAPREDUCE-6018. Added an MR specific config to enable emitting job history + data to the timeline server. (Robert Kanter via zjshen) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index ebedc1b40f0..184baaa3ff6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -241,8 +241,14 @@ protected void serviceInit(Configuration conf) throws Exception { MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD); - timelineClient = TimelineClient.createTimelineClient(); - timelineClient.init(conf); + if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, + MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) { + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + LOG.info("Emitting job history data to the timeline server is enabled"); + } else { + LOG.info("Emitting job history data to the timeline server is not enabled"); + } super.serviceInit(conf); } @@ -268,7 +274,9 @@ private void mkdir(FileSystem fs, Path path, FsPermission fsp) @Override protected void serviceStart() throws Exception { - timelineClient.start(); + if (timelineClient != null) { + timelineClient.start(); + } eventHandlingThread = new Thread(new Runnable() { @Override public void run() { @@ -537,7 +545,7 @@ public void handleEvent(JobHistoryEvent event) { // For all events // (1) Write it out // (2) Process it for JobSummary - // (3) Process it for ATS + // (3) Process it for ATS (if enabled) MetaInfo mi = fileMap.get(event.getJobID()); try { HistoryEvent historyEvent = event.getHistoryEvent(); @@ -546,8 +554,10 @@ public void handleEvent(JobHistoryEvent event) { } processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); - processEventForTimelineServer(historyEvent, event.getJobID(), - event.getTimestamp()); + if (timelineClient != null) { + processEventForTimelineServer(historyEvent, event.getJobID(), + event.getTimestamp()); + } if (LOG.isDebugEnabled()) { LOG.debug("In HistoryEventHandler " + event.getHistoryEvent().getEventType()); @@ -839,8 +849,8 @@ private void processEventForTimelineServer(HistoryEvent event, JobId jobId, tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime()); tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString()); tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID", - tfe2.getSuccessfulTaskAttemptId() == null ? - "" : tfe2.getSuccessfulTaskAttemptId().toString()); + tfe2.getSuccessfulTaskAttemptId() == null ? + "" : tfe2.getSuccessfulTaskAttemptId().toString()); tEntity.addEvent(tEvent); tEntity.setEntityId(tfe2.getTaskId().toString()); tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 12bf6df005c..ed591c46960 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -378,6 +378,11 @@ public interface MRJobConfig { public static final String JOB_UBERTASK_MAXBYTES = "mapreduce.job.ubertask.maxbytes"; + public static final String MAPREDUCE_JOB_EMIT_TIMELINE_DATA = + "mapreduce.job.emit-timeline-data"; + public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA = + false; + public static final String MR_PREFIX = "yarn.app.mapreduce."; public static final String MR_AM_PREFIX = MR_PREFIX + "am."; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 82b89506563..7075d7de0a5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -833,6 +833,14 @@ + + mapreduce.job.emit-timeline-data + false + Specifies if the Application Master should emit timeline data + to the timeline server. Individual jobs can override this value. + + + mapreduce.input.fileinputformat.split.minsize 0 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java index 2352818ccc7..7b8d6df4a2b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.jobhistory.EventType; import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; @@ -85,4 +86,83 @@ public void testMRTimelineEventHandling() throws Exception { } } + @Test + public void testMapreduceJobTimelineServiceEnabled() + throws Exception { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false); + MiniMRYarnCluster cluster = null; + try { + cluster = new MiniMRYarnCluster( + TestJobHistoryEventHandler.class.getSimpleName(), 1, true); + cluster.init(conf); + cluster.start(); + TimelineStore ts = cluster.getApplicationHistoryServer() + .getTimelineStore(); + + Path inDir = new Path("input"); + Path outDir = new Path("output"); + RunningJob job = + UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir); + Assert.assertEquals(JobStatus.SUCCEEDED, + job.getJobStatus().getState().getValue()); + TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null, + null, null, null, null, null, null); + Assert.assertEquals(0, entities.getEntities().size()); + + conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true); + job = UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir); + Assert.assertEquals(JobStatus.SUCCEEDED, + job.getJobStatus().getState().getValue()); + entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, + null, null, null); + Assert.assertEquals(1, entities.getEntities().size()); + TimelineEntity tEntity = entities.getEntities().get(0); + Assert.assertEquals(job.getID().toString(), tEntity.getEntityId()); + } finally { + if (cluster != null) { + cluster.stop(); + } + } + + conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true); + cluster = null; + try { + cluster = new MiniMRYarnCluster( + TestJobHistoryEventHandler.class.getSimpleName(), 1, true); + cluster.init(conf); + cluster.start(); + TimelineStore ts = cluster.getApplicationHistoryServer() + .getTimelineStore(); + + Path inDir = new Path("input"); + Path outDir = new Path("output"); + + conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, false); + RunningJob job = + UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir); + Assert.assertEquals(JobStatus.SUCCEEDED, + job.getJobStatus().getState().getValue()); + TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null, + null, null, null, null, null, null); + Assert.assertEquals(0, entities.getEntities().size()); + + conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true); + job = UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir); + Assert.assertEquals(JobStatus.SUCCEEDED, + job.getJobStatus().getState().getValue()); + entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, + null, null, null); + Assert.assertEquals(1, entities.getEntities().size()); + TimelineEntity tEntity = entities.getEntities().get(0); + Assert.assertEquals(job.getID().toString(), tEntity.getEntityId()); + } finally { + if (cluster != null) { + cluster.stop(); + } + } + } }