MAPREDUCE-6436. JobHistory cache issue. Contributed by Kai Sasaki
(cherry picked from commit 5b7078d069
)
This commit is contained in:
parent
58cc106378
commit
c4febc7d02
|
@ -662,6 +662,8 @@ Release 2.6.4 - UNRELEASED
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
MAPREDUCE-6436. JobHistory cache issue. (Kai Sasaki via zxu)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -219,13 +219,21 @@ public class HistoryFileManager extends AbstractService {
|
||||||
// keeping the cache size exactly at the maximum.
|
// keeping the cache size exactly at the maximum.
|
||||||
Iterator<JobId> keys = cache.navigableKeySet().iterator();
|
Iterator<JobId> keys = cache.navigableKeySet().iterator();
|
||||||
long cutoff = System.currentTimeMillis() - maxAge;
|
long cutoff = System.currentTimeMillis() - maxAge;
|
||||||
|
|
||||||
|
// MAPREDUCE-6436: In order to reduce the number of logs written
|
||||||
|
// in case of a lot of move pending histories.
|
||||||
|
JobId firstInIntermediateKey = null;
|
||||||
|
int inIntermediateCount = 0;
|
||||||
|
JobId firstMoveFailedKey = null;
|
||||||
|
int moveFailedCount = 0;
|
||||||
|
|
||||||
while(cache.size() > maxSize && keys.hasNext()) {
|
while(cache.size() > maxSize && keys.hasNext()) {
|
||||||
JobId key = keys.next();
|
JobId key = keys.next();
|
||||||
HistoryFileInfo firstValue = cache.get(key);
|
HistoryFileInfo firstValue = cache.get(key);
|
||||||
if(firstValue != null) {
|
if(firstValue != null) {
|
||||||
synchronized(firstValue) {
|
synchronized(firstValue) {
|
||||||
if (firstValue.isMovePending()) {
|
if (firstValue.isMovePending()) {
|
||||||
if(firstValue.didMoveFail() &&
|
if(firstValue.didMoveFail() &&
|
||||||
firstValue.jobIndexInfo.getFinishTime() <= cutoff) {
|
firstValue.jobIndexInfo.getFinishTime() <= cutoff) {
|
||||||
cache.remove(key);
|
cache.remove(key);
|
||||||
//Now lets try to delete it
|
//Now lets try to delete it
|
||||||
|
@ -236,8 +244,17 @@ public class HistoryFileManager extends AbstractService {
|
||||||
" that could not be moved to done.", e);
|
" that could not be moved to done.", e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Waiting to remove " + key
|
if (firstValue.didMoveFail()) {
|
||||||
+ " from JobListCache because it is not in done yet.");
|
if (moveFailedCount == 0) {
|
||||||
|
firstMoveFailedKey = key;
|
||||||
|
}
|
||||||
|
moveFailedCount += 1;
|
||||||
|
} else {
|
||||||
|
if (inIntermediateCount == 0) {
|
||||||
|
firstInIntermediateKey = key;
|
||||||
|
}
|
||||||
|
inIntermediateCount += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
cache.remove(key);
|
cache.remove(key);
|
||||||
|
@ -245,6 +262,20 @@ public class HistoryFileManager extends AbstractService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Log output only for first jobhisotry in pendings to restrict
|
||||||
|
// the total number of logs.
|
||||||
|
if (inIntermediateCount > 0) {
|
||||||
|
LOG.warn("Waiting to remove IN_INTERMEDIATE state histories " +
|
||||||
|
"(e.g. " + firstInIntermediateKey + ") from JobListCache " +
|
||||||
|
"because it is not in done yet. Total count is " +
|
||||||
|
inIntermediateCount + ".");
|
||||||
|
}
|
||||||
|
if (moveFailedCount > 0) {
|
||||||
|
LOG.warn("Waiting to remove MOVE_FAILED state histories " +
|
||||||
|
"(e.g. " + firstMoveFailedKey + ") from JobListCache " +
|
||||||
|
"because it is not in done yet. Total count is " +
|
||||||
|
moveFailedCount + ".");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return old;
|
return old;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue