MAPREDUCE-6339. Job history file is not flushed correctly because

isTimerActive flag is not set true when flushTimerTask is scheduled.
Contributed by zhihai xu.

(cherry picked from commit f5b38477f9)
This commit is contained in:
Devaraj K 2015-04-30 12:33:22 +05:30
parent 9f8412ac6e
commit c3d083a9ff
3 changed files with 12 additions and 0 deletions

View File

@ -130,6 +130,9 @@ Release 2.7.1 - UNRELEASED
MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon
IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe) IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe)
MAPREDUCE-6339. Job history file is not flushed correctly because isTimerActive
flag is not set true when flushTimerTask is scheduled. (zhihai xu via devaraj)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -72,6 +72,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode; import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.ObjectNode; import org.codehaus.jackson.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
/** /**
* The job history events get routed to this class. This class writes the Job * The job history events get routed to this class. This class writes the Job
* history events to the DFS directly into a staging dir and then moved to a * history events to the DFS directly into a staging dir and then moved to a
@ -1259,6 +1260,7 @@ public class JobHistoryEventHandler extends AbstractService
if (!isTimerShutDown) { if (!isTimerShutDown) {
flushTimerTask = new FlushTimerTask(this); flushTimerTask = new FlushTimerTask(this);
flushTimer.schedule(flushTimerTask, flushTimeout); flushTimer.schedule(flushTimerTask, flushTimeout);
isTimerActive = true;
} }
} }
} }
@ -1378,4 +1380,9 @@ public class JobHistoryEventHandler extends AbstractService
} }
return JobState.KILLED.toString(); return JobState.KILLED.toString();
} }
@VisibleForTesting
boolean getFlushTimerStatus() {
return isTimerActive;
}
} }

View File

@ -223,10 +223,12 @@ public class TestJobHistoryEventHandler {
} }
handleNextNEvents(jheh, 9); handleNextNEvents(jheh, 9);
Assert.assertTrue(jheh.getFlushTimerStatus());
verify(mockWriter, times(0)).flush(); verify(mockWriter, times(0)).flush();
Thread.sleep(2 * 4 * 1000l); // 4 seconds should be enough. Just be safe. Thread.sleep(2 * 4 * 1000l); // 4 seconds should be enough. Just be safe.
verify(mockWriter).flush(); verify(mockWriter).flush();
Assert.assertFalse(jheh.getFlushTimerStatus());
} finally { } finally {
jheh.stop(); jheh.stop();
verify(mockWriter).close(); verify(mockWriter).close();