diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java index c59d17f7192..b93c8d7954a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java @@ -172,9 +172,14 @@ public class CachedHistoryStorage extends AbstractService implements HistoryFileInfo fileInfo; fileInfo = hsManager.getFileInfo(jobId); + if (fileInfo == null) { throw new HSFileRuntimeException("Unable to find job " + jobId); - } else if (fileInfo.isDeleted()) { + } + + fileInfo.waitUntilMoved(); + + if (fileInfo.isDeleted()) { throw new HSFileRuntimeException("Cannot load deleted job " + jobId); } else { return fileInfo.loadJob(); @@ -210,6 +215,7 @@ public class CachedHistoryStorage extends AbstractService implements for (HistoryFileInfo mi : hsManager.getAllFileInfo()) { if (mi != null) { JobId id = mi.getJobId(); + mi.waitUntilMoved(); result.put(id, new PartialJob(mi.getJobIndexInfo(), id)); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index 045f0ee4909..6fd1c0ee55d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -455,6 +455,8 @@ public class HistoryFileManager extends AbstractService { } catch (Throwable t) { LOG.error("Error while trying to move a job to done", t); this.state = HistoryInfoState.MOVE_FAILED; + } finally { + notifyAll(); } } @@ -488,12 +490,16 @@ public class HistoryFileManager extends AbstractService { } protected synchronized void delete() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("deleting " + historyFile + " and " + confFile); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("deleting " + historyFile + " and " + confFile); + } + state = HistoryInfoState.DELETED; + doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); + doneDirFc.delete(doneDirFc.makeQualified(confFile), false); + } finally { + notifyAll(); } - state = HistoryInfoState.DELETED; - doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); - doneDirFc.delete(doneDirFc.makeQualified(confFile), false); } public JobIndexInfo getJobIndexInfo() { @@ -520,6 +526,17 @@ public class HistoryFileManager extends AbstractService { jobIndexInfo.getNumMaps(); return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob); } + + public synchronized void waitUntilMoved() { + while (isMovePending() && !didMoveFail()) { + try { + wait(); + } catch (InterruptedException e) { + LOG.warn("Waiting has been interrupted"); + throw new RuntimeException(e); + } + } + } } private SerialNumberIndex serialNumberIndex = null; @@ -959,6 +976,7 @@ public class HistoryFileManager extends AbstractService { if (LOG.isDebugEnabled()) { LOG.debug("Scheduling move to done of " +found); } + moveToDoneExecutor.execute(new Runnable() { @Override public void run() { @@ -1195,5 +1213,5 @@ public class HistoryFileManager extends AbstractService { @VisibleForTesting protected void setMaxHistoryAge(long newValue){ maxHistoryAge=newValue; - } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java index 936c77221bb..9f36477bd3b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java @@ -445,6 +445,32 @@ public class TestJobHistory { verify(fileInfo, timeout(20000).times(2)).delete(); } + @Test + public void testCachedStorageWaitsForFileMove() throws IOException { + HistoryFileManager historyManager = mock(HistoryFileManager.class); + jobHistory = spy(new JobHistory()); + doReturn(historyManager).when(jobHistory).createHistoryFileManager(); + + Configuration conf = new Configuration(); + jobHistory.init(conf); + jobHistory.start(); + + CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory + .getHistoryStorage()); + + Job job = mock(Job.class); + JobId jobId = mock(JobId.class); + when(job.getID()).thenReturn(jobId); + when(job.getTotalMaps()).thenReturn(10); + when(job.getTotalReduces()).thenReturn(2); + HistoryFileInfo fileInfo = mock(HistoryFileInfo.class); + when(historyManager.getFileInfo(eq(jobId))).thenReturn(fileInfo); + when(fileInfo.loadJob()).thenReturn(job); + + storage.getFullJob(jobId); + verify(fileInfo).waitUntilMoved(); + } + @Test public void testRefreshLoadedJobCacheUnSupportedOperation() { jobHistory = spy(new JobHistory());