svn merge -c 1391671 FIXES: MAPREDUCE-4691. Historyserver can report "Unknown job" after RM says job has completed. Contributed by Robert Joseph Evans.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1391675 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b2db4b2917
commit
f40997309e
|
@ -422,6 +422,9 @@ Release 0.23.4 - UNRELEASED
|
|||
MAPREDUCE-4647. We should only unjar jobjar if there is a lib directory
|
||||
in it. (Robert Evans via tgraves)
|
||||
|
||||
MAPREDUCE-4691. Historyserver can report "Unknown job" after RM says job
|
||||
has completed (Robert Joseph Evans via jlowe)
|
||||
|
||||
Release 0.23.3 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -23,14 +23,14 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
@ -77,7 +77,7 @@ public class HistoryFileManager extends AbstractService {
|
|||
private static enum HistoryInfoState {
|
||||
IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
|
||||
};
|
||||
|
||||
|
||||
private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
|
||||
.doneSubdirsBeforeSerialTail();
|
||||
|
||||
|
@ -199,6 +199,29 @@ public class HistoryFileManager extends AbstractService {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This class represents a user dir in the intermediate done directory. This
|
||||
* is mostly for locking purposes.
|
||||
*/
|
||||
private class UserLogDir {
|
||||
long modTime = 0;
|
||||
|
||||
public synchronized void scanIfNeeded(FileStatus fs) {
|
||||
long newModTime = fs.getModificationTime();
|
||||
if (modTime != newModTime) {
|
||||
Path p = fs.getPath();
|
||||
try {
|
||||
scanIntermediateDirectory(p);
|
||||
//If scanning fails, we will scan again. We assume the failure is
|
||||
// temporary.
|
||||
modTime = newModTime;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error while trying to scan the directory " + p, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class HistoryFileInfo {
|
||||
private Path historyFile;
|
||||
private Path confFile;
|
||||
|
@ -352,7 +375,8 @@ public class HistoryFileManager extends AbstractService {
|
|||
* Maintains a mapping between intermediate user directories and the last
|
||||
* known modification time.
|
||||
*/
|
||||
private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
|
||||
private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap =
|
||||
new ConcurrentHashMap<String, UserLogDir>();
|
||||
|
||||
private JobACLsManager aclsMgr;
|
||||
|
||||
|
@ -584,23 +608,15 @@ public class HistoryFileManager extends AbstractService {
|
|||
|
||||
for (FileStatus userDir : userDirList) {
|
||||
String name = userDir.getPath().getName();
|
||||
long newModificationTime = userDir.getModificationTime();
|
||||
boolean shouldScan = false;
|
||||
synchronized (userDirModificationTimeMap) {
|
||||
if (!userDirModificationTimeMap.containsKey(name)
|
||||
|| newModificationTime > userDirModificationTimeMap.get(name)) {
|
||||
shouldScan = true;
|
||||
userDirModificationTimeMap.put(name, newModificationTime);
|
||||
}
|
||||
}
|
||||
if (shouldScan) {
|
||||
try {
|
||||
scanIntermediateDirectory(userDir.getPath());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error while trying to scan the directory "
|
||||
+ userDir.getPath(), e);
|
||||
UserLogDir dir = userDirModificationTimeMap.get(name);
|
||||
if(dir == null) {
|
||||
dir = new UserLogDir();
|
||||
UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
|
||||
if(old != null) {
|
||||
dir = old;
|
||||
}
|
||||
}
|
||||
dir.scanIfNeeded(userDir);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue