svn merge -c 1404817 FIXES: MAPREDUCE-4729. job history UI not showing all job attempts. Contributed by Vinod Kumar Vavilapalli
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1404824 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d9e5740ce7
commit
a280cd5567
|
@ -479,6 +479,9 @@ Release 0.23.5 - UNRELEASED
|
|||
MAPREDUCE-4746. The MR Application Master does not have a config to set
|
||||
environment variables (Rob Parker via bobby)
|
||||
|
||||
MAPREDUCE-4729. job history UI not showing all job attempts. (Vinod
|
||||
Kumar Vavilapalli via jlowe)
|
||||
|
||||
Release 0.23.4 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -23,14 +23,17 @@ import java.io.IOException;
|
|||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -45,6 +48,9 @@ import org.apache.hadoop.mapreduce.OutputFormat;
|
|||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.EventType;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
|
@ -89,6 +95,7 @@ import org.apache.hadoop.security.Credentials;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
import org.apache.hadoop.util.StringInterner;
|
||||
import org.apache.hadoop.yarn.Clock;
|
||||
import org.apache.hadoop.yarn.ClusterInfo;
|
||||
import org.apache.hadoop.yarn.SystemClock;
|
||||
|
@ -826,16 +833,21 @@ public class MRAppMaster extends CompositeService {
|
|||
@Override
|
||||
public void start() {
|
||||
|
||||
amInfos = new LinkedList<AMInfo>();
|
||||
|
||||
// Pull completedTasks etc from recovery
|
||||
if (inRecovery) {
|
||||
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
|
||||
amInfos = recoveryServ.getAMInfos();
|
||||
} else {
|
||||
// Get the amInfos anyways irrespective of whether recovery is enabled or
|
||||
// not IF this is not the first AM generation
|
||||
if (appAttemptID.getAttemptId() != 1) {
|
||||
amInfos.addAll(readJustAMInfos());
|
||||
}
|
||||
}
|
||||
|
||||
// / Create the AMInfo for the current AppMaster
|
||||
if (amInfos == null) {
|
||||
amInfos = new LinkedList<AMInfo>();
|
||||
}
|
||||
// Current an AMInfo for the current AM generation.
|
||||
AMInfo amInfo =
|
||||
MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
|
||||
nmPort, nmHttpPort);
|
||||
|
@ -893,6 +905,51 @@ public class MRAppMaster extends CompositeService {
|
|||
startJobs();
|
||||
}
|
||||
|
||||
private List<AMInfo> readJustAMInfos() {
|
||||
List<AMInfo> amInfos = new ArrayList<AMInfo>();
|
||||
FSDataInputStream inputStream = null;
|
||||
try {
|
||||
inputStream =
|
||||
RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
|
||||
appAttemptID);
|
||||
EventReader jobHistoryEventReader = new EventReader(inputStream);
|
||||
|
||||
// All AMInfos are contiguous. Track when the first AMStartedEvent
|
||||
// appears.
|
||||
boolean amStartedEventsBegan = false;
|
||||
|
||||
HistoryEvent event;
|
||||
while ((event = jobHistoryEventReader.getNextEvent()) != null) {
|
||||
if (event.getEventType() == EventType.AM_STARTED) {
|
||||
if (!amStartedEventsBegan) {
|
||||
// First AMStartedEvent.
|
||||
amStartedEventsBegan = true;
|
||||
}
|
||||
AMStartedEvent amStartedEvent = (AMStartedEvent) event;
|
||||
amInfos.add(MRBuilderUtils.newAMInfo(
|
||||
amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
|
||||
amStartedEvent.getContainerId(),
|
||||
StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
|
||||
amStartedEvent.getNodeManagerPort(),
|
||||
amStartedEvent.getNodeManagerHttpPort()));
|
||||
} else if (amStartedEventsBegan) {
|
||||
// This means AMStartedEvents began and this event is a
|
||||
// non-AMStarted event.
|
||||
// No need to continue reading all the other events.
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Could not parse the old history file. "
|
||||
+ "Will not have old AMinfos ", e);
|
||||
} finally {
|
||||
if (inputStream != null) {
|
||||
IOUtils.closeQuietly(inputStream);
|
||||
}
|
||||
}
|
||||
return amInfos;
|
||||
}
|
||||
|
||||
/**
|
||||
* This can be overridden to instantiate multiple jobs and create a
|
||||
* workflow.
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
|
||||
|
@ -178,26 +177,13 @@ public class RecoveryService extends CompositeService implements Recovery {
|
|||
}
|
||||
|
||||
private void parse() throws IOException {
|
||||
// TODO: parse history file based on startCount
|
||||
String jobName =
|
||||
TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
|
||||
String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
|
||||
FSDataInputStream in = null;
|
||||
Path historyFile = null;
|
||||
Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
|
||||
new Path(jobhistoryDir));
|
||||
FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
|
||||
getConfig());
|
||||
//read the previous history file
|
||||
historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
|
||||
histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));
|
||||
LOG.info("History file is at " + historyFile);
|
||||
in = fc.open(historyFile);
|
||||
FSDataInputStream in =
|
||||
getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId);
|
||||
JobHistoryParser parser = new JobHistoryParser(in);
|
||||
jobInfo = parser.parse();
|
||||
Exception parseException = parser.getParseException();
|
||||
if (parseException != null) {
|
||||
LOG.info("Got an error parsing job-history file " + historyFile +
|
||||
LOG.info("Got an error parsing job-history file" +
|
||||
", ignoring incomplete events.", parseException);
|
||||
}
|
||||
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
|
||||
|
@ -214,6 +200,28 @@ public class RecoveryService extends CompositeService implements Recovery {
|
|||
+ completedTasks.size());
|
||||
}
|
||||
|
||||
public static FSDataInputStream getPreviousJobHistoryFileStream(
|
||||
Configuration conf, ApplicationAttemptId applicationAttemptId)
|
||||
throws IOException {
|
||||
FSDataInputStream in = null;
|
||||
Path historyFile = null;
|
||||
String jobName =
|
||||
TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
|
||||
.toString();
|
||||
String jobhistoryDir =
|
||||
JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
|
||||
Path histDirPath =
|
||||
FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
|
||||
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
|
||||
// read the previous history file
|
||||
historyFile =
|
||||
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
|
||||
jobName, (applicationAttemptId.getAttemptId() - 1)));
|
||||
LOG.info("History file is at " + historyFile);
|
||||
in = fc.open(historyFile);
|
||||
return in;
|
||||
}
|
||||
|
||||
protected Dispatcher createRecoveryDispatcher() {
|
||||
return new RecoveryDispatcher();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.v2.app.TestRecovery.MRAppWithHistory;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestAMInfos {
|
||||
|
||||
@Test
|
||||
public void testAMInfosWithoutRecoveryEnabled() throws Exception {
|
||||
int runCount = 0;
|
||||
MRApp app =
|
||||
new MRAppWithHistory(1, 0, false, this.getClass().getName(), true,
|
||||
++runCount);
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||
Job job = app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
|
||||
long am1StartTime = app.getAllAMInfos().get(0).getStartTime();
|
||||
|
||||
Assert.assertEquals("No of tasks not correct", 1, job.getTasks().size());
|
||||
Iterator<Task> it = job.getTasks().values().iterator();
|
||||
Task mapTask = it.next();
|
||||
app.waitForState(mapTask, TaskState.RUNNING);
|
||||
TaskAttempt taskAttempt = mapTask.getAttempts().values().iterator().next();
|
||||
app.waitForState(taskAttempt, TaskAttemptState.RUNNING);
|
||||
|
||||
// stop the app
|
||||
app.stop();
|
||||
|
||||
// rerun
|
||||
app =
|
||||
new MRAppWithHistory(1, 0, false, this.getClass().getName(), false,
|
||||
++runCount);
|
||||
conf = new Configuration();
|
||||
// in rerun the AMInfo will be recovered from previous run even if recovery
|
||||
// is not enabled.
|
||||
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
|
||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||
job = app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
Assert.assertEquals("No of tasks not correct", 1, job.getTasks().size());
|
||||
it = job.getTasks().values().iterator();
|
||||
mapTask = it.next();
|
||||
// There should be two AMInfos
|
||||
List<AMInfo> amInfos = app.getAllAMInfos();
|
||||
Assert.assertEquals(2, amInfos.size());
|
||||
AMInfo amInfoOne = amInfos.get(0);
|
||||
Assert.assertEquals(am1StartTime, amInfoOne.getStartTime());
|
||||
app.stop();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue