From 7e55280b5bc6bd32ab77f1772b364c44db45319e Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Tue, 9 Oct 2012 03:24:43 +0000 Subject: [PATCH] merge MAPREDUCE-4705 from trunk. Fix a bug in job history lookup, which makes older jobs inaccessible despite the presence of a valid history file. (Contributed by Jason Lowe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1395851 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 ++ .../mapreduce/v2/hs/HistoryFileManager.java | 15 +++--- .../v2/hs/TestJobHistoryParsing.java | 52 +++++++++++++++++++ 3 files changed, 65 insertions(+), 6 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d8937b4421c..779a43af7ed 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -422,6 +422,10 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4554. Job Credentials are not transmitted if security is turned off (Benoy Antony via bobby) + MAPREDUCE-4705. Fix a bug in job history lookup, which makes older jobs + inaccessible despite the presence of a valid history file. (Jason Lowe + via sseth) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index 85fcdc6bd5a..f7bc50609ab 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -62,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.service.AbstractService; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -130,7 +131,7 @@ public class HistoryFileManager extends AbstractService { } } - private static class JobListCache { + static class JobListCache { private ConcurrentSkipListMap cache; private int maxSize; private long maxAge; @@ -239,12 +240,14 @@ public class HistoryFileManager extends AbstractService { : HistoryInfoState.IN_INTERMEDIATE; } - private synchronized boolean isMovePending() { + @VisibleForTesting + synchronized boolean isMovePending() { return state == HistoryInfoState.IN_INTERMEDIATE || state == HistoryInfoState.MOVE_FAILED; } - private synchronized boolean didMoveFail() { + @VisibleForTesting + synchronized boolean didMoveFail() { return state == HistoryInfoState.MOVE_FAILED; } @@ -365,7 +368,7 @@ public class HistoryFileManager extends AbstractService { } private SerialNumberIndex serialNumberIndex = null; - private JobListCache jobListCache = null; + protected JobListCache jobListCache = null; // Maintains a list of known done subdirectories. private final Set existingDoneSubdirs = Collections @@ -707,8 +710,8 @@ public class HistoryFileManager extends AbstractService { * @throws IOException */ private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { - int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId); - String boxedSerialNumber = String.valueOf(jobSerialNumber); + String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent( + jobId, serialNumberFormat); Set dateStringSet = serialNumberIndex.get(boxedSerialNumber); if (dateStringSet == null) { return null; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index f9acb1a3821..e24cf05d790 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.net.DNSToSwitchMapping; @@ -460,6 +461,51 @@ public class TestJobHistoryParsing { } } + @Test + public void testScanningOldDirs() throws Exception { + LOG.info("STARTING testScanningOldDirs"); + try { + Configuration conf = new Configuration(); + conf + .setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + RackResolver.init(conf); + MRApp app = + new MRAppWithHistory(1, 1, true, + this.getClass().getName(), true); + app.submit(conf); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); + app.waitForState(job, JobState.SUCCEEDED); + + // make sure all events are flushed + app.waitForState(Service.STATE.STOPPED); + + HistoryFileManagerForTest hfm = new HistoryFileManagerForTest(); + hfm.init(conf); + HistoryFileInfo fileInfo = hfm.getFileInfo(jobId); + Assert.assertNotNull("Unable to locate job history", fileInfo); + + // force the manager to "forget" the job + hfm.deleteJobFromJobListCache(fileInfo); + final int msecPerSleep = 10; + int msecToSleep = 10 * 1000; + while (fileInfo.isMovePending() && msecToSleep > 0) { + Assert.assertTrue(!fileInfo.didMoveFail()); + msecToSleep -= msecPerSleep; + Thread.sleep(msecPerSleep); + } + Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0); + + fileInfo = hfm.getFileInfo(jobId); + Assert.assertNotNull("Unable to locate old job history", fileInfo); + } finally { + LOG.info("FINISHED testScanningOldDirs"); + } + } + static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory { public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete, @@ -500,6 +546,12 @@ public class TestJobHistoryParsing { } } + static class HistoryFileManagerForTest extends HistoryFileManager { + void deleteJobFromJobListCache(HistoryFileInfo fileInfo) { + jobListCache.delete(fileInfo); + } + } + public static void main(String[] args) throws Exception { TestJobHistoryParsing t = new TestJobHistoryParsing(); t.testHistoryParsing();