From a280cd5567a85e163938fdee70c61e01b37abb10 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Thu, 1 Nov 2012 23:05:53 +0000 Subject: [PATCH] svn merge -c 1404817 FIXES: MAPREDUCE-4729. job history UI not showing all job attempts. Contributed by Vinod Kumar Vavilapalli git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1404824 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 65 +++++++++++++- .../v2/app/recover/RecoveryService.java | 42 ++++++---- .../hadoop/mapreduce/v2/app/TestAMInfos.java | 84 +++++++++++++++++++ 4 files changed, 173 insertions(+), 21 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8f0306662a4..8e48b7d2c7c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -479,6 +479,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4746. The MR Application Master does not have a config to set environment variables (Rob Parker via bobby) + MAPREDUCE-4729. job history UI not showing all job attempts. (Vinod + Kumar Vavilapalli via jlowe) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 1191f8d789b..51415ee0874 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -23,14 +23,17 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +48,9 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; +import org.apache.hadoop.mapreduce.jobhistory.EventReader; +import org.apache.hadoop.mapreduce.jobhistory.EventType; +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; @@ -89,6 +95,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.SystemClock; @@ -826,16 +833,21 @@ public class MRAppMaster extends CompositeService { @Override public void start() { + amInfos = new LinkedList(); + // Pull completedTasks etc from recovery if (inRecovery) { completedTasksFromPreviousRun = recoveryServ.getCompletedTasks(); amInfos = recoveryServ.getAMInfos(); + } else { + // Get the amInfos anyways irrespective of whether recovery is enabled or + // not IF this is not the first AM generation + if (appAttemptID.getAttemptId() != 1) { + amInfos.addAll(readJustAMInfos()); + } } - // / Create the AMInfo for the current AppMaster - if (amInfos == null) { - amInfos = new LinkedList(); - } + // Current an AMInfo for the current AM generation. AMInfo amInfo = MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost, nmPort, nmHttpPort); @@ -893,6 +905,51 @@ public class MRAppMaster extends CompositeService { startJobs(); } + private List readJustAMInfos() { + List amInfos = new ArrayList(); + FSDataInputStream inputStream = null; + try { + inputStream = + RecoveryService.getPreviousJobHistoryFileStream(getConfig(), + appAttemptID); + EventReader jobHistoryEventReader = new EventReader(inputStream); + + // All AMInfos are contiguous. Track when the first AMStartedEvent + // appears. + boolean amStartedEventsBegan = false; + + HistoryEvent event; + while ((event = jobHistoryEventReader.getNextEvent()) != null) { + if (event.getEventType() == EventType.AM_STARTED) { + if (!amStartedEventsBegan) { + // First AMStartedEvent. + amStartedEventsBegan = true; + } + AMStartedEvent amStartedEvent = (AMStartedEvent) event; + amInfos.add(MRBuilderUtils.newAMInfo( + amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(), + amStartedEvent.getContainerId(), + StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()), + amStartedEvent.getNodeManagerPort(), + amStartedEvent.getNodeManagerHttpPort())); + } else if (amStartedEventsBegan) { + // This means AMStartedEvents began and this event is a + // non-AMStarted event. + // No need to continue reading all the other events. + break; + } + } + } catch (IOException e) { + LOG.warn("Could not parse the old history file. " + + "Will not have old AMinfos ", e); + } finally { + if (inputStream != null) { + IOUtils.closeQuietly(inputStream); + } + } + return amInfos; + } + /** * This can be overridden to instantiate multiple jobs and create a * workflow. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index 5e5699ce0ad..4e36b906acc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -34,7 +34,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; @@ -178,26 +177,13 @@ public class RecoveryService extends CompositeService implements Recovery { } private void parse() throws IOException { - // TODO: parse history file based on startCount - String jobName = - TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString(); - String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig()); - FSDataInputStream in = null; - Path historyFile = null; - Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified( - new Path(jobhistoryDir)); - FileContext fc = FileContext.getFileContext(histDirPath.toUri(), - getConfig()); - //read the previous history file - historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile( - histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1))); - LOG.info("History file is at " + historyFile); - in = fc.open(historyFile); + FSDataInputStream in = + getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId); JobHistoryParser parser = new JobHistoryParser(in); jobInfo = parser.parse(); Exception parseException = parser.getParseException(); if (parseException != null) { - LOG.info("Got an error parsing job-history file " + historyFile + + LOG.info("Got an error parsing job-history file" + ", ignoring incomplete events.", parseException); } Map taskInfos = jobInfo @@ -213,6 +199,28 @@ public class RecoveryService extends CompositeService implements Recovery { LOG.info("Read completed tasks from history " + completedTasks.size()); } + + public static FSDataInputStream getPreviousJobHistoryFileStream( + Configuration conf, ApplicationAttemptId applicationAttemptId) + throws IOException { + FSDataInputStream in = null; + Path historyFile = null; + String jobName = + TypeConverter.fromYarn(applicationAttemptId.getApplicationId()) + .toString(); + String jobhistoryDir = + JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf); + Path histDirPath = + FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir)); + FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf); + // read the previous history file + historyFile = + fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath, + jobName, (applicationAttemptId.getAttemptId() - 1))); + LOG.info("History file is at " + historyFile); + in = fc.open(historyFile); + return in; + } protected Dispatcher createRecoveryDispatcher() { return new RecoveryDispatcher(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java new file mode 100644 index 00000000000..41387739ec9 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java @@ -0,0 +1,84 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app; + +import java.util.Iterator; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.app.TestRecovery.MRAppWithHistory; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.junit.Test; + +public class TestAMInfos { + + @Test + public void testAMInfosWithoutRecoveryEnabled() throws Exception { + int runCount = 0; + MRApp app = + new MRAppWithHistory(1, 0, false, this.getClass().getName(), true, + ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + long am1StartTime = app.getAllAMInfos().get(0).getStartTime(); + + Assert.assertEquals("No of tasks not correct", 1, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask = it.next(); + app.waitForState(mapTask, TaskState.RUNNING); + TaskAttempt taskAttempt = mapTask.getAttempts().values().iterator().next(); + app.waitForState(taskAttempt, TaskAttemptState.RUNNING); + + // stop the app + app.stop(); + + // rerun + app = + new MRAppWithHistory(1, 0, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + // in rerun the AMInfo will be recovered from previous run even if recovery + // is not enabled. + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 1, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask = it.next(); + // There should be two AMInfos + List amInfos = app.getAllAMInfos(); + Assert.assertEquals(2, amInfos.size()); + AMInfo amInfoOne = amInfos.get(0); + Assert.assertEquals(am1StartTime, amInfoOne.getStartTime()); + app.stop(); + } +}