diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 11475d6d4da..6ccc9efeaa6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -29,6 +29,9 @@ Release 2.7.1 - UNRELEASED MAPREDUCE-6339. Job history file is not flushed correctly because isTimerActive flag is not set true when flushTimerTask is scheduled. (zhihai xu via devaraj) + MAPREDUCE-6259. IllegalArgumentException due to missing job submit time + (zhihai xu via jlowe) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 6b0ea7957c1..bf328880987 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -426,10 +426,10 @@ public class JobHistoryEventHandler extends AbstractService * This should be the first call to history for a job * * @param jobId the jobId. - * @param forcedJobStateOnShutDown + * @param amStartedEvent * @throws IOException */ - protected void setupEventWriter(JobId jobId, String forcedJobStateOnShutDown) + protected void setupEventWriter(JobId jobId, AMStartedEvent amStartedEvent) throws IOException { if (stagingDirPath == null) { LOG.error("Log Directory is null, returning"); @@ -489,8 +489,13 @@ public class JobHistoryEventHandler extends AbstractService } MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer, - user, jobName, jobId, forcedJobStateOnShutDown, queueName); + user, jobName, jobId, amStartedEvent.getForcedJobStateOnShutDown(), + queueName); fi.getJobSummary().setJobId(jobId); + fi.getJobSummary().setJobLaunchTime(amStartedEvent.getStartTime()); + fi.getJobSummary().setJobSubmitTime(amStartedEvent.getSubmitTime()); + fi.getJobIndexInfo().setJobStartTime(amStartedEvent.getStartTime()); + fi.getJobIndexInfo().setSubmitTime(amStartedEvent.getSubmitTime()); fileMap.put(jobId, fi); } @@ -541,8 +546,7 @@ public class JobHistoryEventHandler extends AbstractService try { AMStartedEvent amStartedEvent = (AMStartedEvent) event.getHistoryEvent(); - setupEventWriter(event.getJobID(), - amStartedEvent.getForcedJobStateOnShutDown()); + setupEventWriter(event.getJobID(), amStartedEvent); } catch (IOException ioe) { LOG.error("Error JobHistoryEventHandler in handleEvent: " + event, ioe); @@ -982,6 +986,7 @@ public class JobHistoryEventHandler extends AbstractService tEvent.addEventInfo("NODE_MANAGER_HTTP_PORT", ase.getNodeManagerHttpPort()); tEvent.addEventInfo("START_TIME", ase.getStartTime()); + tEvent.addEventInfo("SUBMIT_TIME", ase.getSubmitTime()); tEntity.addEvent(tEvent); tEntity.setEntityId(jobId.toString()); tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 8f638829aed..835ad0908dd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -1032,7 +1032,7 @@ public class MRAppMaster extends CompositeService { new JobHistoryEvent(job.getID(), new AMStartedEvent(info .getAppAttemptId(), info.getStartTime(), info.getContainerId(), info.getNodeManagerHost(), info.getNodeManagerPort(), info - .getNodeManagerHttpPort()))); + .getNodeManagerHttpPort(), appSubmitTime))); } // Send out an MR AM inited event for this AM. @@ -1041,7 +1041,7 @@ public class MRAppMaster extends CompositeService { .getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(), amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo .getNodeManagerHttpPort(), this.forcedState == null ? null - : this.forcedState.toString()))); + : this.forcedState.toString(), appSubmitTime))); amInfos.add(amInfo); // metrics system init is really init & start. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index 7ed0b6356ea..5d39f35c044 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -125,7 +125,7 @@ public class TestJobHistoryEventHandler { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); mockWriter = jheh.getEventWriter(); verify(mockWriter).write(any(HistoryEvent.class)); @@ -168,7 +168,7 @@ public class TestJobHistoryEventHandler { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); mockWriter = jheh.getEventWriter(); verify(mockWriter).write(any(HistoryEvent.class)); @@ -213,7 +213,7 @@ public class TestJobHistoryEventHandler { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); mockWriter = jheh.getEventWriter(); verify(mockWriter).write(any(HistoryEvent.class)); @@ -256,7 +256,7 @@ public class TestJobHistoryEventHandler { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); mockWriter = jheh.getEventWriter(); verify(mockWriter).write(any(HistoryEvent.class)); @@ -293,7 +293,7 @@ public class TestJobHistoryEventHandler { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); verify(jheh, times(0)).processDoneFiles(any(JobId.class)); handleEvent(jheh, new JobHistoryEvent(t.jobId, @@ -338,7 +338,7 @@ public class TestJobHistoryEventHandler { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); verify(jheh, times(0)).processDoneFiles(t.jobId); // skip processing done files @@ -395,7 +395,7 @@ public class TestJobHistoryEventHandler { try { jheh.start(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1))); handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent( TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(), @@ -441,6 +441,47 @@ public class TestJobHistoryEventHandler { pathStr); } + // test AMStartedEvent for submitTime and startTime + @Test (timeout=50000) + public void testAMStartedEvent() throws Exception { + TestParams t = new TestParams(); + Configuration conf = new Configuration(); + + JHEvenHandlerForTest realJheh = + new JHEvenHandlerForTest(t.mockAppContext, 0); + JHEvenHandlerForTest jheh = spy(realJheh); + jheh.init(conf); + + EventWriter mockWriter = null; + try { + jheh.start(); + handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, 100))); + + JobHistoryEventHandler.MetaInfo mi = + JobHistoryEventHandler.fileMap.get(t.jobId); + Assert.assertEquals(mi.getJobIndexInfo().getSubmitTime(), 100); + Assert.assertEquals(mi.getJobIndexInfo().getJobStartTime(), 200); + Assert.assertEquals(mi.getJobSummary().getJobSubmitTime(), 100); + Assert.assertEquals(mi.getJobSummary().getJobLaunchTime(), 200); + + handleEvent(jheh, new JobHistoryEvent(t.jobId, + new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0, + 0, 0, JobStateInternal.FAILED.toString()))); + + Assert.assertEquals(mi.getJobIndexInfo().getSubmitTime(), 100); + Assert.assertEquals(mi.getJobIndexInfo().getJobStartTime(), 200); + Assert.assertEquals(mi.getJobSummary().getJobSubmitTime(), 100); + Assert.assertEquals(mi.getJobSummary().getJobLaunchTime(), 200); + verify(jheh, times(1)).processDoneFiles(t.jobId); + + mockWriter = jheh.getEventWriter(); + verify(mockWriter, times(2)).write(any(HistoryEvent.class)); + } finally { + jheh.stop(); + } + } + // Have JobHistoryEventHandler handle some events and make sure they get // stored to the Timeline store @Test (timeout=50000) @@ -463,7 +504,7 @@ public class TestJobHistoryEventHandler { .getTimelineStore(); handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( - t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000), + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1), currentTime - 10)); TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, null, null, null, null); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java index d1a378bc206..e1465f513a3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java @@ -35,6 +35,7 @@ import org.apache.avro.util.Utf8; public class AMStartedEvent implements HistoryEvent { private AMStarted datum = new AMStarted(); private String forcedJobStateOnShutDown; + private long submitTime; /** * Create an event to record the start of an MR AppMaster @@ -54,9 +55,9 @@ public class AMStartedEvent implements HistoryEvent { */ public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime, ContainerId containerId, String nodeManagerHost, int nodeManagerPort, - int nodeManagerHttpPort) { + int nodeManagerHttpPort, long submitTime) { this(appAttemptId, startTime, containerId, nodeManagerHost, - nodeManagerPort, nodeManagerHttpPort, null); + nodeManagerPort, nodeManagerHttpPort, null, submitTime); } /** @@ -79,7 +80,8 @@ public class AMStartedEvent implements HistoryEvent { */ public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime, ContainerId containerId, String nodeManagerHost, int nodeManagerPort, - int nodeManagerHttpPort, String forcedJobStateOnShutDown) { + int nodeManagerHttpPort, String forcedJobStateOnShutDown, + long submitTime) { datum.applicationAttemptId = new Utf8(appAttemptId.toString()); datum.startTime = startTime; datum.containerId = new Utf8(containerId.toString()); @@ -87,6 +89,7 @@ public class AMStartedEvent implements HistoryEvent { datum.nodeManagerPort = nodeManagerPort; datum.nodeManagerHttpPort = nodeManagerHttpPort; this.forcedJobStateOnShutDown = forcedJobStateOnShutDown; + this.submitTime = submitTime; } AMStartedEvent() { @@ -150,6 +153,13 @@ public class AMStartedEvent implements HistoryEvent { return this.forcedJobStateOnShutDown; } + /** + * @return the submit time for the Application(Job) + */ + public long getSubmitTime() { + return this.submitTime; + } + /** Get the attempt id */ @Override