MAPREDUCE-6680. JHS UserLogDir scan algorithm sometime could skip directory with update in CloudFS (Azure FileSystem, S3, etc. Contributed by Junping Du
This commit is contained in:
parent
33fd95a99c
commit
1e48eefe58
|
@ -307,10 +307,21 @@ public class HistoryFileManager extends AbstractService {
|
||||||
*/
|
*/
|
||||||
private class UserLogDir {
|
private class UserLogDir {
|
||||||
long modTime = 0;
|
long modTime = 0;
|
||||||
|
private long scanTime = 0;
|
||||||
|
|
||||||
public synchronized void scanIfNeeded(FileStatus fs) {
|
public synchronized void scanIfNeeded(FileStatus fs) {
|
||||||
long newModTime = fs.getModificationTime();
|
long newModTime = fs.getModificationTime();
|
||||||
if (modTime != newModTime) {
|
// MAPREDUCE-6680: In some Cloud FileSystem, like Azure FS or S3, file's
|
||||||
|
// modification time is truncated into seconds. In that case,
|
||||||
|
// modTime == newModTime doesn't means no file update in the directory,
|
||||||
|
// so we need to have additional check.
|
||||||
|
// Note: modTime (X second Y millisecond) could be casted to X second or
|
||||||
|
// X+1 second.
|
||||||
|
if (modTime != newModTime
|
||||||
|
|| (scanTime/1000) == (modTime/1000)
|
||||||
|
|| (scanTime/1000 + 1) == (modTime/1000)) {
|
||||||
|
// reset scanTime before scanning happens
|
||||||
|
scanTime = System.currentTimeMillis();
|
||||||
Path p = fs.getPath();
|
Path p = fs.getPath();
|
||||||
try {
|
try {
|
||||||
scanIntermediateDirectory(p);
|
scanIntermediateDirectory(p);
|
||||||
|
@ -324,6 +335,8 @@ public class HistoryFileManager extends AbstractService {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Scan not needed of " + fs.getPath());
|
LOG.debug("Scan not needed of " + fs.getPath());
|
||||||
}
|
}
|
||||||
|
// reset scanTime
|
||||||
|
scanTime = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue