From 5281c188dffb6a6e758a877b178fb439a24d8de3 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 24 Jan 2018 14:44:07 -0600 Subject: [PATCH] MAPREDUCE-7015. Possible race condition in JHS if the job is not loaded. Contributed by Peter Bacsko (cherry picked from commit cff9edd4b514bdcfe22cd49964e3707fb78ab876) --- .../mapreduce/v2/hs/CachedHistoryStorage.java | 8 ++++- .../mapreduce/v2/hs/HistoryFileManager.java | 30 +++++++++++++++---- .../mapreduce/v2/hs/TestJobHistory.java | 26 ++++++++++++++++ 3 files changed, 57 insertions(+), 7 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java index b001ae49b37..69f483166e6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java @@ -173,9 +173,14 @@ public class CachedHistoryStorage extends AbstractService implements HistoryFileInfo fileInfo; fileInfo = hsManager.getFileInfo(jobId); + if (fileInfo == null) { throw new HSFileRuntimeException("Unable to find job " + jobId); - } else if (fileInfo.isDeleted()) { + } + + fileInfo.waitUntilMoved(); + + if (fileInfo.isDeleted()) { throw new HSFileRuntimeException("Cannot load deleted job " + jobId); } else { return fileInfo.loadJob(); @@ -211,6 +216,7 @@ public class CachedHistoryStorage extends AbstractService implements for (HistoryFileInfo mi : hsManager.getAllFileInfo()) { if (mi != null) { JobId id = mi.getJobId(); + mi.waitUntilMoved(); result.put(id, new PartialJob(mi.getJobIndexInfo(), id)); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index b418db7502d..a07ca26c1cd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -452,6 +452,8 @@ public class HistoryFileManager extends AbstractService { } catch (Throwable t) { LOG.error("Error while trying to move a job to done", t); this.state = HistoryInfoState.MOVE_FAILED; + } finally { + notifyAll(); } } @@ -485,12 +487,16 @@ public class HistoryFileManager extends AbstractService { } protected synchronized void delete() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("deleting " + historyFile + " and " + confFile); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("deleting " + historyFile + " and " + confFile); + } + state = HistoryInfoState.DELETED; + doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); + doneDirFc.delete(doneDirFc.makeQualified(confFile), false); + } finally { + notifyAll(); } - state = HistoryInfoState.DELETED; - doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); - doneDirFc.delete(doneDirFc.makeQualified(confFile), false); } public JobIndexInfo getJobIndexInfo() { @@ -517,6 +523,17 @@ public class HistoryFileManager extends AbstractService { jobIndexInfo.getNumMaps(); return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob); } + + public synchronized void waitUntilMoved() { + while (isMovePending() && !didMoveFail()) { + try { + wait(); + } catch (InterruptedException e) { + LOG.warn("Waiting has been interrupted"); + throw new RuntimeException(e); + } + } + } } private SerialNumberIndex serialNumberIndex = null; @@ -956,6 +973,7 @@ public class HistoryFileManager extends AbstractService { if (LOG.isDebugEnabled()) { LOG.debug("Scheduling move to done of " +found); } + moveToDoneExecutor.execute(new Runnable() { @Override public void run() { @@ -1193,5 +1211,5 @@ public class HistoryFileManager extends AbstractService { @VisibleForTesting void setMaxHistoryAge(long newValue){ maxHistoryAge=newValue; - } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java index 936c77221bb..9f36477bd3b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java @@ -445,6 +445,32 @@ public class TestJobHistory { verify(fileInfo, timeout(20000).times(2)).delete(); } + @Test + public void testCachedStorageWaitsForFileMove() throws IOException { + HistoryFileManager historyManager = mock(HistoryFileManager.class); + jobHistory = spy(new JobHistory()); + doReturn(historyManager).when(jobHistory).createHistoryFileManager(); + + Configuration conf = new Configuration(); + jobHistory.init(conf); + jobHistory.start(); + + CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory + .getHistoryStorage()); + + Job job = mock(Job.class); + JobId jobId = mock(JobId.class); + when(job.getID()).thenReturn(jobId); + when(job.getTotalMaps()).thenReturn(10); + when(job.getTotalReduces()).thenReturn(2); + HistoryFileInfo fileInfo = mock(HistoryFileInfo.class); + when(historyManager.getFileInfo(eq(jobId))).thenReturn(fileInfo); + when(fileInfo.loadJob()).thenReturn(job); + + storage.getFullJob(jobId); + verify(fileInfo).waitUntilMoved(); + } + @Test public void testRefreshLoadedJobCacheUnSupportedOperation() { jobHistory = spy(new JobHistory());