From b4929bcf142db6dd2feadd8e0681545693c8711e Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Wed, 1 Feb 2012 23:33:58 +0000 Subject: [PATCH] MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files. (Contributed by Arun C Murthy) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1239402 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/recover/RecoveryService.java | 5 + .../jobhistory/JobHistoryParser.java | 47 ++++- .../hadoop/mapreduce/v2/hs/CompletedJob.java | 9 +- .../v2/hs/TestJobHistoryParsing.java | 182 +++++++++++++----- 5 files changed, 189 insertions(+), 57 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 94d51701352..ca67fcb995f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -648,6 +648,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3427. Fix streaming unit tests broken after mavenization. (Hitesh Shah via acmurthy) + MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files. + (Arun C Murthy via sseth) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index 775cc11571e..a447add8ed2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -191,6 +191,11 @@ private void parse() throws IOException { in = fc.open(historyFile); JobHistoryParser parser = new JobHistoryParser(in); jobInfo = parser.parse(); + Exception parseException = parser.getParseException(); + if (parseException != null) { + LOG.info("Got an error parsing job-history file " + historyFile + + ", ignoring incomplete events.", parseException); + } Map taskInfos = jobInfo .getAllTasks(); for (TaskInfo taskInfo : taskInfos.values()) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java index c9be77c29d8..aa1089f1db1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java @@ -24,8 +24,11 @@ import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -52,9 +55,13 @@ @InterfaceStability.Unstable public class JobHistoryParser { + private static final Log LOG = LogFactory.getLog(JobHistoryParser.class); + private final FSDataInputStream in; - JobInfo info = null; + private JobInfo info = null; + private IOException parseException = null; + /** * Create a job history parser for the given history file using the * given file system @@ -91,30 +98,58 @@ public JobHistoryParser(FSDataInputStream in) { * The first invocation will populate the object, subsequent calls * will return the already parsed object. * The input stream is closed on return + * + * This api ignores partial records and stops parsing on encountering one. + * {@link #getParseException()} can be used to fetch the exception, if any. + * * @return The populated jobInfo object * @throws IOException + * @see #getParseException() */ public synchronized JobInfo parse() throws IOException { + return parse(new EventReader(in)); + } + + /** + * Only used for unit tests. + */ + @Private + public synchronized JobInfo parse(EventReader reader) throws IOException { if (info != null) { return info; } - EventReader reader = new EventReader(in); - - HistoryEvent event; info = new JobInfo(); + + int eventCtr = 0; + HistoryEvent event; try { while ((event = reader.getNextEvent()) != null) { handleEvent(event); - } + ++eventCtr; + } + } catch (IOException ioe) { + LOG.info("Caught exception parsing history file after " + eventCtr + + " events", ioe); + parseException = ioe; } finally { in.close(); } return info; } - private void handleEvent(HistoryEvent event) throws IOException { + /** + * Get the parse exception, if any. + * + * @return the parse exception, if any + * @see #parse() + */ + public synchronized IOException getParseException() { + return parseException; + } + + private void handleEvent(HistoryEvent event) { EventType type = event.getEventType(); switch (type) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java index c3afb013c5e..041fffa9357 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java @@ -249,8 +249,9 @@ private synchronized void loadFullHistoryData(boolean loadTasks, } if (historyFileAbsolute != null) { + JobHistoryParser parser = null; try { - JobHistoryParser parser = + parser = new JobHistoryParser(historyFileAbsolute.getFileSystem(conf), historyFileAbsolute); jobInfo = parser.parse(); @@ -258,6 +259,12 @@ private synchronized void loadFullHistoryData(boolean loadTasks, throw new YarnException("Could not load history file " + historyFileAbsolute, e); } + IOException parseException = parser.getParseException(); + if (parseException != null) { + throw new YarnException( + "Could not parse history file " + historyFileAbsolute, + parseException); + } } else { throw new IOException("History file not found"); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index 3755eba9466..d737cd23766 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.StringTokenizer; +import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; @@ -37,14 +38,18 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.EventReader; +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; @@ -61,6 +66,9 @@ import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.RackResolver; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestJobHistoryParsing { private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class); @@ -76,6 +84,17 @@ public List resolve(List names) { @Test public void testHistoryParsing() throws Exception { + checkHistoryParsing(2, 1, 2); + } + + @Test + public void testHistoryParsingWithParseErrors() throws Exception { + checkHistoryParsing(3, 0, 2); + } + + private void checkHistoryParsing(final int numMaps, final int numReduces, + final int numSuccessfulMaps) + throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name")); long amStartTimeEst = System.currentTimeMillis(); @@ -83,8 +102,9 @@ public void testHistoryParsing() throws Exception { CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf); - MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), - true); + MRApp app = + new MRAppWithHistory(numMaps, numReduces, true, + this.getClass().getName(), true); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID(); @@ -117,8 +137,42 @@ public void testHistoryParsing() throws Exception { } JobHistoryParser parser = new JobHistoryParser(in); - JobInfo jobInfo = parser.parse(); - + final EventReader realReader = new EventReader(in); + EventReader reader = Mockito.mock(EventReader.class); + if (numMaps == numSuccessfulMaps) { + reader = realReader; + } else { + final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack! + Mockito.when(reader.getNextEvent()).thenAnswer( + new Answer() { + public HistoryEvent answer(InvocationOnMock invocation) + throws IOException { + HistoryEvent event = realReader.getNextEvent(); + if (event instanceof TaskFinishedEvent) { + numFinishedEvents.incrementAndGet(); + } + + if (numFinishedEvents.get() <= numSuccessfulMaps) { + return event; + } else { + throw new IOException("test"); + } + } + } + ); + } + + JobInfo jobInfo = parser.parse(reader); + + long numFinishedMaps = + computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps); + + if (numFinishedMaps != numMaps) { + Exception parseException = parser.getParseException(); + Assert.assertNotNull("Didn't get expected parse exception", + parseException); + } + Assert.assertEquals("Incorrect username ", System.getProperty("user.name"), jobInfo.getUsername()); Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname()); @@ -126,14 +180,16 @@ public void testHistoryParsing() throws Exception { jobInfo.getJobQueueName()); Assert .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath()); - Assert.assertEquals("incorrect finishedMap ", 2, jobInfo.getFinishedMaps()); - Assert.assertEquals("incorrect finishedReduces ", 1, + Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps, + numFinishedMaps); + Assert.assertEquals("incorrect finishedReduces ", numReduces, jobInfo.getFinishedReduces()); Assert.assertEquals("incorrect uberized ", job.isUber(), jobInfo.getUberized()); Map allTasks = jobInfo.getAllTasks(); int totalTasks = allTasks.size(); - Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks); + Assert.assertEquals("total number of tasks is incorrect ", + (numMaps+numReduces), totalTasks); // Verify aminfo Assert.assertEquals(1, jobInfo.getAMInfos().size()); @@ -172,55 +228,78 @@ public void testHistoryParsing() throws Exception { Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo); Assert.assertEquals("Incorrect shuffle port for task attempt", taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort()); - Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname()); - Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort()); - - // Verify rack-name - Assert.assertEquals("rack-name is incorrect", taskAttemptInfo - .getRackname(), RACK_NAME); + if (numMaps == numSuccessfulMaps) { + Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname()); + Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort()); + + // Verify rack-name + Assert.assertEquals("rack-name is incorrect", taskAttemptInfo + .getRackname(), RACK_NAME); + } } } - String summaryFileName = JobHistoryUtils - .getIntermediateSummaryFileName(jobId); - Path summaryFile = new Path(jobhistoryDir, summaryFileName); - String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile); - Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100")); - Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100")); - Assert.assertNotNull(jobSummaryString); + if (numMaps == numSuccessfulMaps) { - Map jobSummaryElements = new HashMap(); - StringTokenizer strToken = new StringTokenizer(jobSummaryString, ","); - while (strToken.hasMoreTokens()) { - String keypair = strToken.nextToken(); - jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]); + String summaryFileName = JobHistoryUtils + .getIntermediateSummaryFileName(jobId); + Path summaryFile = new Path(jobhistoryDir, summaryFileName); + String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile); + Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100")); + Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100")); + Assert.assertNotNull(jobSummaryString); + Map jobSummaryElements = new HashMap(); + StringTokenizer strToken = new StringTokenizer(jobSummaryString, ","); + while (strToken.hasMoreTokens()) { + String keypair = strToken.nextToken(); + jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]); + + } + + Assert.assertEquals("JobId does not match", jobId.toString(), + jobSummaryElements.get("jobId")); + Assert.assertTrue("submitTime should not be 0", + Long.parseLong(jobSummaryElements.get("submitTime")) != 0); + Assert.assertTrue("launchTime should not be 0", + Long.parseLong(jobSummaryElements.get("launchTime")) != 0); + Assert.assertTrue("firstMapTaskLaunchTime should not be 0", + Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0); + Assert + .assertTrue( + "firstReduceTaskLaunchTime should not be 0", + Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0); + Assert.assertTrue("finishTime should not be 0", + Long.parseLong(jobSummaryElements.get("finishTime")) != 0); + Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps, + Integer.parseInt(jobSummaryElements.get("numMaps"))); + Assert.assertEquals("Mismatch in num reduce slots", numReduces, + Integer.parseInt(jobSummaryElements.get("numReduces"))); + Assert.assertEquals("User does not match", System.getProperty("user.name"), + jobSummaryElements.get("user")); + Assert.assertEquals("Queue does not match", "default", + jobSummaryElements.get("queue")); + Assert.assertEquals("Status does not match", "SUCCEEDED", + jobSummaryElements.get("status")); } - - Assert.assertEquals("JobId does not match", jobId.toString(), - jobSummaryElements.get("jobId")); - Assert.assertTrue("submitTime should not be 0", - Long.parseLong(jobSummaryElements.get("submitTime")) != 0); - Assert.assertTrue("launchTime should not be 0", - Long.parseLong(jobSummaryElements.get("launchTime")) != 0); - Assert.assertTrue("firstMapTaskLaunchTime should not be 0", - Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0); - Assert - .assertTrue( - "firstReduceTaskLaunchTime should not be 0", - Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0); - Assert.assertTrue("finishTime should not be 0", - Long.parseLong(jobSummaryElements.get("finishTime")) != 0); - Assert.assertEquals("Mismatch in num map slots", 2, - Integer.parseInt(jobSummaryElements.get("numMaps"))); - Assert.assertEquals("Mismatch in num reduce slots", 1, - Integer.parseInt(jobSummaryElements.get("numReduces"))); - Assert.assertEquals("User does not match", System.getProperty("user.name"), - jobSummaryElements.get("user")); - Assert.assertEquals("Queue does not match", "default", - jobSummaryElements.get("queue")); - Assert.assertEquals("Status does not match", "SUCCEEDED", - jobSummaryElements.get("status")); + } + + // Computes finished maps similar to RecoveryService... + private long computeFinishedMaps(JobInfo jobInfo, + int numMaps, int numSuccessfulMaps) { + if (numMaps == numSuccessfulMaps) { + return jobInfo.getFinishedMaps(); + } + + long numFinishedMaps = 0; + Map taskInfos = + jobInfo.getAllTasks(); + for (TaskInfo taskInfo : taskInfos.values()) { + if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) { + ++numFinishedMaps; + } + } + return numFinishedMaps; } @Test @@ -264,6 +343,9 @@ public void testHistoryParsingForFailedAttempts() throws Exception { JobHistoryParser parser = new JobHistoryParser(in); JobInfo jobInfo = parser.parse(); + Exception parseException = parser.getParseException(); + Assert.assertNull("Caught an expected exception " + parseException, + parseException); int noOffailedAttempts = 0; Map allTasks = jobInfo.getAllTasks(); for (Task task : job.getTasks().values()) {