MAPREDUCE-4729. job history UI not showing all job attempts. Contributed by Vinod Kumar Vavilapalli

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1404817 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2012-11-01 22:59:43 +00:00
parent b50a3c5de3
commit 42d1eaf237
4 changed files with 173 additions and 21 deletions

View File

@ -626,6 +626,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4746. The MR Application Master does not have a config to set MAPREDUCE-4746. The MR Application Master does not have a config to set
environment variables (Rob Parker via bobby) environment variables (Rob Parker via bobby)
MAPREDUCE-4729. job history UI not showing all job attempts. (Vinod
Kumar Vavilapalli via jlowe)
Release 0.23.4 - UNRELEASED Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -23,14 +23,17 @@ import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -45,6 +48,9 @@ import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@ -89,6 +95,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.SystemClock;
@ -826,16 +833,21 @@ public class MRAppMaster extends CompositeService {
@Override @Override
public void start() { public void start() {
amInfos = new LinkedList<AMInfo>();
// Pull completedTasks etc from recovery // Pull completedTasks etc from recovery
if (inRecovery) { if (inRecovery) {
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks(); completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
amInfos = recoveryServ.getAMInfos(); amInfos = recoveryServ.getAMInfos();
} else {
// Get the amInfos anyways irrespective of whether recovery is enabled or
// not IF this is not the first AM generation
if (appAttemptID.getAttemptId() != 1) {
amInfos.addAll(readJustAMInfos());
}
} }
// / Create the AMInfo for the current AppMaster // Current an AMInfo for the current AM generation.
if (amInfos == null) {
amInfos = new LinkedList<AMInfo>();
}
AMInfo amInfo = AMInfo amInfo =
MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost, MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
nmPort, nmHttpPort); nmPort, nmHttpPort);
@ -893,6 +905,51 @@ public class MRAppMaster extends CompositeService {
startJobs(); startJobs();
} }
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
inputStream =
RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
// appears.
boolean amStartedEventsBegan = false;
HistoryEvent event;
while ((event = jobHistoryEventReader.getNextEvent()) != null) {
if (event.getEventType() == EventType.AM_STARTED) {
if (!amStartedEventsBegan) {
// First AMStartedEvent.
amStartedEventsBegan = true;
}
AMStartedEvent amStartedEvent = (AMStartedEvent) event;
amInfos.add(MRBuilderUtils.newAMInfo(
amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
amStartedEvent.getContainerId(),
StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
amStartedEvent.getNodeManagerPort(),
amStartedEvent.getNodeManagerHttpPort()));
} else if (amStartedEventsBegan) {
// This means AMStartedEvents began and this event is a
// non-AMStarted event.
// No need to continue reading all the other events.
break;
}
}
} catch (IOException e) {
LOG.warn("Could not parse the old history file. "
+ "Will not have old AMinfos ", e);
} finally {
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
}
}
return amInfos;
}
/** /**
* This can be overridden to instantiate multiple jobs and create a * This can be overridden to instantiate multiple jobs and create a
* workflow. * workflow.

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@ -178,26 +177,13 @@ public class RecoveryService extends CompositeService implements Recovery {
} }
private void parse() throws IOException { private void parse() throws IOException {
// TODO: parse history file based on startCount FSDataInputStream in =
String jobName = getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId);
TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
FSDataInputStream in = null;
Path historyFile = null;
Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
getConfig());
//read the previous history file
historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile);
in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in); JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse(); jobInfo = parser.parse();
Exception parseException = parser.getParseException(); Exception parseException = parser.getParseException();
if (parseException != null) { if (parseException != null) {
LOG.info("Got an error parsing job-history file " + historyFile + LOG.info("Got an error parsing job-history file" +
", ignoring incomplete events.", parseException); ", ignoring incomplete events.", parseException);
} }
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
@ -213,6 +199,28 @@ public class RecoveryService extends CompositeService implements Recovery {
LOG.info("Read completed tasks from history " LOG.info("Read completed tasks from history "
+ completedTasks.size()); + completedTasks.size());
} }
public static FSDataInputStream getPreviousJobHistoryFileStream(
Configuration conf, ApplicationAttemptId applicationAttemptId)
throws IOException {
FSDataInputStream in = null;
Path historyFile = null;
String jobName =
TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
.toString();
String jobhistoryDir =
JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
Path histDirPath =
FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
// read the previous history file
historyFile =
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
jobName, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile);
in = fc.open(historyFile);
return in;
}
protected Dispatcher createRecoveryDispatcher() { protected Dispatcher createRecoveryDispatcher() {
return new RecoveryDispatcher(); return new RecoveryDispatcher();

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import java.util.Iterator;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.TestRecovery.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.junit.Test;
public class TestAMInfos {
@Test
public void testAMInfosWithoutRecoveryEnabled() throws Exception {
int runCount = 0;
MRApp app =
new MRAppWithHistory(1, 0, false, this.getClass().getName(), true,
++runCount);
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
long am1StartTime = app.getAllAMInfos().get(0).getStartTime();
Assert.assertEquals("No of tasks not correct", 1, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask = it.next();
app.waitForState(mapTask, TaskState.RUNNING);
TaskAttempt taskAttempt = mapTask.getAttempts().values().iterator().next();
app.waitForState(taskAttempt, TaskAttemptState.RUNNING);
// stop the app
app.stop();
// rerun
app =
new MRAppWithHistory(1, 0, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
// in rerun the AMInfo will be recovered from previous run even if recovery
// is not enabled.
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 1, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask = it.next();
// There should be two AMInfos
List<AMInfo> amInfos = app.getAllAMInfos();
Assert.assertEquals(2, amInfos.size());
AMInfo amInfoOne = amInfos.get(0);
Assert.assertEquals(am1StartTime, amInfoOne.getStartTime());
app.stop();
}
}