MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files. (Contributed by Arun C Murthy)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1239402 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2012-02-01 23:33:58 +00:00
parent ab75b8b420
commit b4929bcf14
5 changed files with 189 additions and 57 deletions

View File

@ -648,6 +648,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3427. Fix streaming unit tests broken after mavenization. MAPREDUCE-3427. Fix streaming unit tests broken after mavenization.
(Hitesh Shah via acmurthy) (Hitesh Shah via acmurthy)
MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files.
(Arun C Murthy via sseth)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -191,6 +191,11 @@ private void parse() throws IOException {
in = fc.open(historyFile); in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in); JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse(); jobInfo = parser.parse();
Exception parseException = parser.getParseException();
if (parseException != null) {
LOG.info("Got an error parsing job-history file " + historyFile +
", ignoring incomplete events.", parseException);
}
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
.getAllTasks(); .getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) { for (TaskInfo taskInfo : taskInfos.values()) {

View File

@ -24,8 +24,11 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -52,9 +55,13 @@
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class JobHistoryParser { public class JobHistoryParser {
private static final Log LOG = LogFactory.getLog(JobHistoryParser.class);
private final FSDataInputStream in; private final FSDataInputStream in;
JobInfo info = null; private JobInfo info = null;
private IOException parseException = null;
/** /**
* Create a job history parser for the given history file using the * Create a job history parser for the given history file using the
* given file system * given file system
@ -91,30 +98,58 @@ public JobHistoryParser(FSDataInputStream in) {
* The first invocation will populate the object, subsequent calls * The first invocation will populate the object, subsequent calls
* will return the already parsed object. * will return the already parsed object.
* The input stream is closed on return * The input stream is closed on return
*
* This api ignores partial records and stops parsing on encountering one.
* {@link #getParseException()} can be used to fetch the exception, if any.
*
* @return The populated jobInfo object * @return The populated jobInfo object
* @throws IOException * @throws IOException
* @see #getParseException()
*/ */
public synchronized JobInfo parse() throws IOException { public synchronized JobInfo parse() throws IOException {
return parse(new EventReader(in));
}
/**
* Only used for unit tests.
*/
@Private
public synchronized JobInfo parse(EventReader reader) throws IOException {
if (info != null) { if (info != null) {
return info; return info;
} }
EventReader reader = new EventReader(in);
HistoryEvent event;
info = new JobInfo(); info = new JobInfo();
int eventCtr = 0;
HistoryEvent event;
try { try {
while ((event = reader.getNextEvent()) != null) { while ((event = reader.getNextEvent()) != null) {
handleEvent(event); handleEvent(event);
} ++eventCtr;
}
} catch (IOException ioe) {
LOG.info("Caught exception parsing history file after " + eventCtr +
" events", ioe);
parseException = ioe;
} finally { } finally {
in.close(); in.close();
} }
return info; return info;
} }
private void handleEvent(HistoryEvent event) throws IOException { /**
* Get the parse exception, if any.
*
* @return the parse exception, if any
* @see #parse()
*/
public synchronized IOException getParseException() {
return parseException;
}
private void handleEvent(HistoryEvent event) {
EventType type = event.getEventType(); EventType type = event.getEventType();
switch (type) { switch (type) {

View File

@ -249,8 +249,9 @@ private synchronized void loadFullHistoryData(boolean loadTasks,
} }
if (historyFileAbsolute != null) { if (historyFileAbsolute != null) {
JobHistoryParser parser = null;
try { try {
JobHistoryParser parser = parser =
new JobHistoryParser(historyFileAbsolute.getFileSystem(conf), new JobHistoryParser(historyFileAbsolute.getFileSystem(conf),
historyFileAbsolute); historyFileAbsolute);
jobInfo = parser.parse(); jobInfo = parser.parse();
@ -258,6 +259,12 @@ private synchronized void loadFullHistoryData(boolean loadTasks,
throw new YarnException("Could not load history file " throw new YarnException("Could not load history file "
+ historyFileAbsolute, e); + historyFileAbsolute, e);
} }
IOException parseException = parser.getParseException();
if (parseException != null) {
throw new YarnException(
"Could not parse history file " + historyFileAbsolute,
parseException);
}
} else { } else {
throw new IOException("History file not found"); throw new IOException("History file not found");
} }

View File

@ -24,6 +24,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert; import junit.framework.Assert;
@ -37,14 +38,18 @@
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
@ -61,6 +66,9 @@
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.RackResolver;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestJobHistoryParsing { public class TestJobHistoryParsing {
private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class); private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
@ -76,6 +84,17 @@ public List<String> resolve(List<String> names) {
@Test @Test
public void testHistoryParsing() throws Exception { public void testHistoryParsing() throws Exception {
checkHistoryParsing(2, 1, 2);
}
@Test
public void testHistoryParsingWithParseErrors() throws Exception {
checkHistoryParsing(3, 0, 2);
}
private void checkHistoryParsing(final int numMaps, final int numReduces,
final int numSuccessfulMaps)
throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name")); conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
long amStartTimeEst = System.currentTimeMillis(); long amStartTimeEst = System.currentTimeMillis();
@ -83,8 +102,9 @@ public void testHistoryParsing() throws Exception {
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class); MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf); RackResolver.init(conf);
MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), MRApp app =
true); new MRAppWithHistory(numMaps, numReduces, true,
this.getClass().getName(), true);
app.submit(conf); app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next(); Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID(); JobId jobId = job.getID();
@ -117,8 +137,42 @@ public void testHistoryParsing() throws Exception {
} }
JobHistoryParser parser = new JobHistoryParser(in); JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse(); final EventReader realReader = new EventReader(in);
EventReader reader = Mockito.mock(EventReader.class);
if (numMaps == numSuccessfulMaps) {
reader = realReader;
} else {
final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack!
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation)
throws IOException {
HistoryEvent event = realReader.getNextEvent();
if (event instanceof TaskFinishedEvent) {
numFinishedEvents.incrementAndGet();
}
if (numFinishedEvents.get() <= numSuccessfulMaps) {
return event;
} else {
throw new IOException("test");
}
}
}
);
}
JobInfo jobInfo = parser.parse(reader);
long numFinishedMaps =
computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
if (numFinishedMaps != numMaps) {
Exception parseException = parser.getParseException();
Assert.assertNotNull("Didn't get expected parse exception",
parseException);
}
Assert.assertEquals("Incorrect username ", System.getProperty("user.name"), Assert.assertEquals("Incorrect username ", System.getProperty("user.name"),
jobInfo.getUsername()); jobInfo.getUsername());
Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname()); Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname());
@ -126,14 +180,16 @@ public void testHistoryParsing() throws Exception {
jobInfo.getJobQueueName()); jobInfo.getJobQueueName());
Assert Assert
.assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath()); .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath());
Assert.assertEquals("incorrect finishedMap ", 2, jobInfo.getFinishedMaps()); Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps,
Assert.assertEquals("incorrect finishedReduces ", 1, numFinishedMaps);
Assert.assertEquals("incorrect finishedReduces ", numReduces,
jobInfo.getFinishedReduces()); jobInfo.getFinishedReduces());
Assert.assertEquals("incorrect uberized ", job.isUber(), Assert.assertEquals("incorrect uberized ", job.isUber(),
jobInfo.getUberized()); jobInfo.getUberized());
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks(); Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
int totalTasks = allTasks.size(); int totalTasks = allTasks.size();
Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks); Assert.assertEquals("total number of tasks is incorrect ",
(numMaps+numReduces), totalTasks);
// Verify aminfo // Verify aminfo
Assert.assertEquals(1, jobInfo.getAMInfos().size()); Assert.assertEquals(1, jobInfo.getAMInfos().size());
@ -172,55 +228,78 @@ public void testHistoryParsing() throws Exception {
Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo); Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo);
Assert.assertEquals("Incorrect shuffle port for task attempt", Assert.assertEquals("Incorrect shuffle port for task attempt",
taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort()); taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort());
Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname()); if (numMaps == numSuccessfulMaps) {
Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort()); Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname());
Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
// Verify rack-name
Assert.assertEquals("rack-name is incorrect", taskAttemptInfo // Verify rack-name
.getRackname(), RACK_NAME); Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
.getRackname(), RACK_NAME);
}
} }
} }
String summaryFileName = JobHistoryUtils if (numMaps == numSuccessfulMaps) {
.getIntermediateSummaryFileName(jobId);
Path summaryFile = new Path(jobhistoryDir, summaryFileName);
String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
Assert.assertNotNull(jobSummaryString);
Map<String, String> jobSummaryElements = new HashMap<String, String>(); String summaryFileName = JobHistoryUtils
StringTokenizer strToken = new StringTokenizer(jobSummaryString, ","); .getIntermediateSummaryFileName(jobId);
while (strToken.hasMoreTokens()) { Path summaryFile = new Path(jobhistoryDir, summaryFileName);
String keypair = strToken.nextToken(); String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]); Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
Assert.assertNotNull(jobSummaryString);
Map<String, String> jobSummaryElements = new HashMap<String, String>();
StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
while (strToken.hasMoreTokens()) {
String keypair = strToken.nextToken();
jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
}
Assert.assertEquals("JobId does not match", jobId.toString(),
jobSummaryElements.get("jobId"));
Assert.assertTrue("submitTime should not be 0",
Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
Assert.assertTrue("launchTime should not be 0",
Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
Assert
.assertTrue(
"firstReduceTaskLaunchTime should not be 0",
Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
Assert.assertTrue("finishTime should not be 0",
Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
Integer.parseInt(jobSummaryElements.get("numMaps")));
Assert.assertEquals("Mismatch in num reduce slots", numReduces,
Integer.parseInt(jobSummaryElements.get("numReduces")));
Assert.assertEquals("User does not match", System.getProperty("user.name"),
jobSummaryElements.get("user"));
Assert.assertEquals("Queue does not match", "default",
jobSummaryElements.get("queue"));
Assert.assertEquals("Status does not match", "SUCCEEDED",
jobSummaryElements.get("status"));
} }
}
Assert.assertEquals("JobId does not match", jobId.toString(),
jobSummaryElements.get("jobId")); // Computes finished maps similar to RecoveryService...
Assert.assertTrue("submitTime should not be 0", private long computeFinishedMaps(JobInfo jobInfo,
Long.parseLong(jobSummaryElements.get("submitTime")) != 0); int numMaps, int numSuccessfulMaps) {
Assert.assertTrue("launchTime should not be 0", if (numMaps == numSuccessfulMaps) {
Long.parseLong(jobSummaryElements.get("launchTime")) != 0); return jobInfo.getFinishedMaps();
Assert.assertTrue("firstMapTaskLaunchTime should not be 0", }
Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
Assert long numFinishedMaps = 0;
.assertTrue( Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos =
"firstReduceTaskLaunchTime should not be 0", jobInfo.getAllTasks();
Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0); for (TaskInfo taskInfo : taskInfos.values()) {
Assert.assertTrue("finishTime should not be 0", if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
Long.parseLong(jobSummaryElements.get("finishTime")) != 0); ++numFinishedMaps;
Assert.assertEquals("Mismatch in num map slots", 2, }
Integer.parseInt(jobSummaryElements.get("numMaps"))); }
Assert.assertEquals("Mismatch in num reduce slots", 1, return numFinishedMaps;
Integer.parseInt(jobSummaryElements.get("numReduces")));
Assert.assertEquals("User does not match", System.getProperty("user.name"),
jobSummaryElements.get("user"));
Assert.assertEquals("Queue does not match", "default",
jobSummaryElements.get("queue"));
Assert.assertEquals("Status does not match", "SUCCEEDED",
jobSummaryElements.get("status"));
} }
@Test @Test
@ -264,6 +343,9 @@ public void testHistoryParsingForFailedAttempts() throws Exception {
JobHistoryParser parser = new JobHistoryParser(in); JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse(); JobInfo jobInfo = parser.parse();
Exception parseException = parser.getParseException();
Assert.assertNull("Caught an expected exception " + parseException,
parseException);
int noOffailedAttempts = 0; int noOffailedAttempts = 0;
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks(); Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
for (Task task : job.getTasks().values()) { for (Task task : job.getTasks().values()) {