MAPREDUCE-7015. Possible race condition in JHS if the job is not loaded. Contributed by Peter Bacsko

(cherry picked from commit cff9edd4b5)
This commit is contained in:
Jason Lowe 2018-01-24 14:44:07 -06:00
parent 7598f73320
commit 5281c188df
3 changed files with 57 additions and 7 deletions

View File

@ -173,9 +173,14 @@ public class CachedHistoryStorage extends AbstractService implements
HistoryFileInfo fileInfo; HistoryFileInfo fileInfo;
fileInfo = hsManager.getFileInfo(jobId); fileInfo = hsManager.getFileInfo(jobId);
if (fileInfo == null) { if (fileInfo == null) {
throw new HSFileRuntimeException("Unable to find job " + jobId); throw new HSFileRuntimeException("Unable to find job " + jobId);
} else if (fileInfo.isDeleted()) { }
fileInfo.waitUntilMoved();
if (fileInfo.isDeleted()) {
throw new HSFileRuntimeException("Cannot load deleted job " + jobId); throw new HSFileRuntimeException("Cannot load deleted job " + jobId);
} else { } else {
return fileInfo.loadJob(); return fileInfo.loadJob();
@ -211,6 +216,7 @@ public class CachedHistoryStorage extends AbstractService implements
for (HistoryFileInfo mi : hsManager.getAllFileInfo()) { for (HistoryFileInfo mi : hsManager.getAllFileInfo()) {
if (mi != null) { if (mi != null) {
JobId id = mi.getJobId(); JobId id = mi.getJobId();
mi.waitUntilMoved();
result.put(id, new PartialJob(mi.getJobIndexInfo(), id)); result.put(id, new PartialJob(mi.getJobIndexInfo(), id));
} }
} }

View File

@ -452,6 +452,8 @@ public class HistoryFileManager extends AbstractService {
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("Error while trying to move a job to done", t); LOG.error("Error while trying to move a job to done", t);
this.state = HistoryInfoState.MOVE_FAILED; this.state = HistoryInfoState.MOVE_FAILED;
} finally {
notifyAll();
} }
} }
@ -485,12 +487,16 @@ public class HistoryFileManager extends AbstractService {
} }
protected synchronized void delete() throws IOException { protected synchronized void delete() throws IOException {
if (LOG.isDebugEnabled()) { try {
LOG.debug("deleting " + historyFile + " and " + confFile); if (LOG.isDebugEnabled()) {
LOG.debug("deleting " + historyFile + " and " + confFile);
}
state = HistoryInfoState.DELETED;
doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
} finally {
notifyAll();
} }
state = HistoryInfoState.DELETED;
doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
} }
public JobIndexInfo getJobIndexInfo() { public JobIndexInfo getJobIndexInfo() {
@ -517,6 +523,17 @@ public class HistoryFileManager extends AbstractService {
jobIndexInfo.getNumMaps(); jobIndexInfo.getNumMaps();
return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob); return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob);
} }
public synchronized void waitUntilMoved() {
while (isMovePending() && !didMoveFail()) {
try {
wait();
} catch (InterruptedException e) {
LOG.warn("Waiting has been interrupted");
throw new RuntimeException(e);
}
}
}
} }
private SerialNumberIndex serialNumberIndex = null; private SerialNumberIndex serialNumberIndex = null;
@ -956,6 +973,7 @@ public class HistoryFileManager extends AbstractService {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling move to done of " +found); LOG.debug("Scheduling move to done of " +found);
} }
moveToDoneExecutor.execute(new Runnable() { moveToDoneExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -1193,5 +1211,5 @@ public class HistoryFileManager extends AbstractService {
@VisibleForTesting @VisibleForTesting
void setMaxHistoryAge(long newValue){ void setMaxHistoryAge(long newValue){
maxHistoryAge=newValue; maxHistoryAge=newValue;
} }
} }

View File

@ -445,6 +445,32 @@ public class TestJobHistory {
verify(fileInfo, timeout(20000).times(2)).delete(); verify(fileInfo, timeout(20000).times(2)).delete();
} }
@Test
public void testCachedStorageWaitsForFileMove() throws IOException {
HistoryFileManager historyManager = mock(HistoryFileManager.class);
jobHistory = spy(new JobHistory());
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
Configuration conf = new Configuration();
jobHistory.init(conf);
jobHistory.start();
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
.getHistoryStorage());
Job job = mock(Job.class);
JobId jobId = mock(JobId.class);
when(job.getID()).thenReturn(jobId);
when(job.getTotalMaps()).thenReturn(10);
when(job.getTotalReduces()).thenReturn(2);
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
when(historyManager.getFileInfo(eq(jobId))).thenReturn(fileInfo);
when(fileInfo.loadJob()).thenReturn(job);
storage.getFullJob(jobId);
verify(fileInfo).waitUntilMoved();
}
@Test @Test
public void testRefreshLoadedJobCacheUnSupportedOperation() { public void testRefreshLoadedJobCacheUnSupportedOperation() {
jobHistory = spy(new JobHistory()); jobHistory = spy(new JobHistory());