MAPREDUCE-6339. Job history file is not flushed correctly because
isTimerActive flag is not set true when flushTimerTask is scheduled. Contributed by zhihai xu. (cherry picked from commit f5b38477f9d0827b238fadd260c1dd2889531fd4)
This commit is contained in:
parent
6ac2b5712b
commit
9828638fac
@ -26,6 +26,9 @@ Release 2.7.1 - UNRELEASED
|
||||
MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon
|
||||
IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe)
|
||||
|
||||
MAPREDUCE-6339. Job history file is not flushed correctly because isTimerActive
|
||||
flag is not set true when flushTimerTask is scheduled. (zhihai xu via devaraj)
|
||||
|
||||
Release 2.7.0 - 2015-04-20
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -72,6 +72,7 @@
|
||||
import org.codehaus.jackson.node.ArrayNode;
|
||||
import org.codehaus.jackson.node.ObjectNode;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
/**
|
||||
* The job history events get routed to this class. This class writes the Job
|
||||
* history events to the DFS directly into a staging dir and then moved to a
|
||||
@ -1259,6 +1260,7 @@ void processEventForFlush(HistoryEvent historyEvent) throws IOException {
|
||||
if (!isTimerShutDown) {
|
||||
flushTimerTask = new FlushTimerTask(this);
|
||||
flushTimer.schedule(flushTimerTask, flushTimeout);
|
||||
isTimerActive = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1378,4 +1380,9 @@ private String createJobStateForJobUnsuccessfulCompletionEvent(
|
||||
}
|
||||
return JobState.KILLED.toString();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean getFlushTimerStatus() {
|
||||
return isTimerActive;
|
||||
}
|
||||
}
|
||||
|
@ -223,10 +223,12 @@ public void testUnflushedTimer() throws Exception {
|
||||
}
|
||||
|
||||
handleNextNEvents(jheh, 9);
|
||||
Assert.assertTrue(jheh.getFlushTimerStatus());
|
||||
verify(mockWriter, times(0)).flush();
|
||||
|
||||
Thread.sleep(2 * 4 * 1000l); // 4 seconds should be enough. Just be safe.
|
||||
verify(mockWriter).flush();
|
||||
Assert.assertFalse(jheh.getFlushTimerStatus());
|
||||
} finally {
|
||||
jheh.stop();
|
||||
verify(mockWriter).close();
|
||||
|
Loading…
x
Reference in New Issue
Block a user