MAPREDUCE-6339. Job history file is not flushed correctly because

isTimerActive flag is not set true when flushTimerTask is scheduled.
Contributed by zhihai xu.
This commit is contained in:
Devaraj K 2015-04-30 12:33:22 +05:30
parent aa22450442
commit f5b38477f9
3 changed files with 12 additions and 0 deletions

View File

@ -382,6 +382,9 @@ Release 2.7.1 - UNRELEASED
MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon
IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe) IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe)
MAPREDUCE-6339. Job history file is not flushed correctly because isTimerActive
flag is not set true when flushTimerTask is scheduled. (zhihai xu via devaraj)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -72,6 +72,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode; import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.ObjectNode; import org.codehaus.jackson.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
/** /**
* The job history events get routed to this class. This class writes the Job * The job history events get routed to this class. This class writes the Job
* history events to the DFS directly into a staging dir and then moved to a * history events to the DFS directly into a staging dir and then moved to a
@ -1259,6 +1260,7 @@ public class JobHistoryEventHandler extends AbstractService
if (!isTimerShutDown) { if (!isTimerShutDown) {
flushTimerTask = new FlushTimerTask(this); flushTimerTask = new FlushTimerTask(this);
flushTimer.schedule(flushTimerTask, flushTimeout); flushTimer.schedule(flushTimerTask, flushTimeout);
isTimerActive = true;
} }
} }
} }
@ -1378,4 +1380,9 @@ public class JobHistoryEventHandler extends AbstractService
} }
return JobState.KILLED.toString(); return JobState.KILLED.toString();
} }
@VisibleForTesting
boolean getFlushTimerStatus() {
return isTimerActive;
}
} }

View File

@ -223,10 +223,12 @@ public class TestJobHistoryEventHandler {
} }
handleNextNEvents(jheh, 9); handleNextNEvents(jheh, 9);
Assert.assertTrue(jheh.getFlushTimerStatus());
verify(mockWriter, times(0)).flush(); verify(mockWriter, times(0)).flush();
Thread.sleep(2 * 4 * 1000l); // 4 seconds should be enough. Just be safe. Thread.sleep(2 * 4 * 1000l); // 4 seconds should be enough. Just be safe.
verify(mockWriter).flush(); verify(mockWriter).flush();
Assert.assertFalse(jheh.getFlushTimerStatus());
} finally { } finally {
jheh.stop(); jheh.stop();
verify(mockWriter).close(); verify(mockWriter).close();