MAPREDUCE-7020. Task timeout in uber mode can crash AM. Contributed by Peter Bacsko

(cherry picked from commit e3a1647477)
This commit is contained in:
Jason Lowe 2018-01-26 15:46:01 -06:00
parent e062e2b08c
commit d69df0b596
3 changed files with 18 additions and 7 deletions

View File

@ -342,8 +342,9 @@ public class TaskAttemptListenerImpl extends CompositeService
AtomicReference<TaskAttemptStatus> lastStatusRef = AtomicReference<TaskAttemptStatus> lastStatusRef =
attemptIdToStatus.get(yarnAttemptID); attemptIdToStatus.get(yarnAttemptID);
if (lastStatusRef == null) { if (lastStatusRef == null) {
throw new IllegalStateException("Status update was called" LOG.error("Status update was called with illegal TaskAttemptId: "
+ " with illegal TaskAttemptId: " + yarnAttemptID); + yarnAttemptID);
return false;
} }
taskHeartbeatHandler.progressing(yarnAttemptID); taskHeartbeatHandler.progressing(yarnAttemptID);

View File

@ -397,13 +397,15 @@ public class TestTaskAttemptListenerImpl {
assertEquals(Phase.REDUCE, status.phase); assertEquals(Phase.REDUCE, status.phase);
} }
@Test(expected = IllegalStateException.class) @Test
public void testStatusUpdateFromUnregisteredTask() public void testStatusUpdateFromUnregisteredTask()
throws IOException, InterruptedException{ throws IOException, InterruptedException{
configureMocks(); configureMocks();
startListener(false); startListener(false);
listener.statusUpdate(attemptID, firstReduceStatus); boolean taskFound = listener.statusUpdate(attemptID, firstReduceStatus);
assertFalse(taskFound);
} }
private void configureMocks() { private void configureMocks() {

View File

@ -785,6 +785,9 @@ abstract public class Task implements Writable, Configurable {
long taskProgressInterval = MRJobConfUtil. long taskProgressInterval = MRJobConfUtil.
getTaskProgressReportInterval(conf); getTaskProgressReportInterval(conf);
boolean uberized = conf.getBoolean("mapreduce.task.uberized",
false);
while (!taskDone.get()) { while (!taskDone.get()) {
synchronized (lock) { synchronized (lock) {
done = false; done = false;
@ -820,9 +823,14 @@ abstract public class Task implements Writable, Configurable {
// if Task Tracker is not aware of our task ID (probably because it died and // if Task Tracker is not aware of our task ID (probably because it died and
// came back up), kill ourselves // came back up), kill ourselves
if (!taskFound) { if (!taskFound) {
LOG.warn("Parent died. Exiting "+taskId); if (uberized) {
resetDoneFlag(); taskDone.set(true);
System.exit(66); break;
} else {
LOG.warn("Parent died. Exiting "+taskId);
resetDoneFlag();
System.exit(66);
}
} }
sendProgress = resetProgressFlag(); sendProgress = resetProgressFlag();