merge MAPREDUCE-4705 from trunk. Fix a bug in job history lookup, which makes older jobs inaccessible despite the presence of a valid history file. (Contributed by Jason Lowe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1395851 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2012-10-09 03:24:43 +00:00
parent 9f60339854
commit 7e55280b5b
3 changed files with 65 additions and 6 deletions

View File

@ -422,6 +422,10 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4554. Job Credentials are not transmitted if security is turned MAPREDUCE-4554. Job Credentials are not transmitted if security is turned
off (Benoy Antony via bobby) off (Benoy Antony via bobby)
MAPREDUCE-4705. Fix a bug in job history lookup, which makes older jobs
inaccessible despite the presence of a valid history file. (Jason Lowe
via sseth)
Release 0.23.4 - UNRELEASED Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
@ -130,7 +131,7 @@ public class HistoryFileManager extends AbstractService {
} }
} }
private static class JobListCache { static class JobListCache {
private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache; private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
private int maxSize; private int maxSize;
private long maxAge; private long maxAge;
@ -239,12 +240,14 @@ public class HistoryFileManager extends AbstractService {
: HistoryInfoState.IN_INTERMEDIATE; : HistoryInfoState.IN_INTERMEDIATE;
} }
private synchronized boolean isMovePending() { @VisibleForTesting
synchronized boolean isMovePending() {
return state == HistoryInfoState.IN_INTERMEDIATE return state == HistoryInfoState.IN_INTERMEDIATE
|| state == HistoryInfoState.MOVE_FAILED; || state == HistoryInfoState.MOVE_FAILED;
} }
private synchronized boolean didMoveFail() { @VisibleForTesting
synchronized boolean didMoveFail() {
return state == HistoryInfoState.MOVE_FAILED; return state == HistoryInfoState.MOVE_FAILED;
} }
@ -365,7 +368,7 @@ public class HistoryFileManager extends AbstractService {
} }
private SerialNumberIndex serialNumberIndex = null; private SerialNumberIndex serialNumberIndex = null;
private JobListCache jobListCache = null; protected JobListCache jobListCache = null;
// Maintains a list of known done subdirectories. // Maintains a list of known done subdirectories.
private final Set<Path> existingDoneSubdirs = Collections private final Set<Path> existingDoneSubdirs = Collections
@ -707,8 +710,8 @@ public class HistoryFileManager extends AbstractService {
* @throws IOException * @throws IOException
*/ */
private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId); String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
String boxedSerialNumber = String.valueOf(jobSerialNumber); jobId, serialNumberFormat);
Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber); Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
if (dateStringSet == null) { if (dateStringSet == null) {
return null; return null;

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory; import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping;
@ -460,6 +461,51 @@ public class TestJobHistoryParsing {
} }
} }
@Test
public void testScanningOldDirs() throws Exception {
LOG.info("STARTING testScanningOldDirs");
try {
Configuration conf = new Configuration();
conf
.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app =
new MRAppWithHistory(1, 1, true,
this.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
HistoryFileManagerForTest hfm = new HistoryFileManagerForTest();
hfm.init(conf);
HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
Assert.assertNotNull("Unable to locate job history", fileInfo);
// force the manager to "forget" the job
hfm.deleteJobFromJobListCache(fileInfo);
final int msecPerSleep = 10;
int msecToSleep = 10 * 1000;
while (fileInfo.isMovePending() && msecToSleep > 0) {
Assert.assertTrue(!fileInfo.didMoveFail());
msecToSleep -= msecPerSleep;
Thread.sleep(msecPerSleep);
}
Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
fileInfo = hfm.getFileInfo(jobId);
Assert.assertNotNull("Unable to locate old job history", fileInfo);
} finally {
LOG.info("FINISHED testScanningOldDirs");
}
}
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory { static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete, public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete,
@ -500,6 +546,12 @@ public class TestJobHistoryParsing {
} }
} }
static class HistoryFileManagerForTest extends HistoryFileManager {
void deleteJobFromJobListCache(HistoryFileInfo fileInfo) {
jobListCache.delete(fileInfo);
}
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
TestJobHistoryParsing t = new TestJobHistoryParsing(); TestJobHistoryParsing t = new TestJobHistoryParsing();
t.testHistoryParsing(); t.testHistoryParsing();