diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index b155af220f1..668d8ed908e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -369,14 +369,16 @@ public class TaskAttemptListenerImpl extends CompositeService org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); + AMFeedback feedback = new AMFeedback(); AtomicReference lastStatusRef = attemptIdToStatus.get(yarnAttemptID); if (lastStatusRef == null) { - throw new IllegalStateException("Status update was called" - + " with illegal TaskAttemptId: " + yarnAttemptID); + LOG.error("Status update was called with illegal TaskAttemptId: " + + yarnAttemptID); + feedback.setTaskFound(false); + return feedback; } - AMFeedback feedback = new AMFeedback(); feedback.setTaskFound(true); // Propagating preemption to the task if TASK_PREEMPTION is enabled diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index 4ff6fb2aa58..da7421b8f63 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -487,13 +487,15 @@ public class TestTaskAttemptListenerImpl { assertEquals(Phase.REDUCE, status.phase); } - @Test(expected = IllegalStateException.class) + @Test public void testStatusUpdateFromUnregisteredTask() throws IOException, InterruptedException{ configureMocks(); startListener(false); - listener.statusUpdate(attemptID, firstReduceStatus); + AMFeedback feedback = listener.statusUpdate(attemptID, firstReduceStatus); + + assertFalse(feedback.getTaskFound()); } private void configureMocks() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 87c9e161d13..5b98b35d896 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -855,6 +855,9 @@ abstract public class Task implements Writable, Configurable { long taskProgressInterval = MRJobConfUtil. getTaskProgressReportInterval(conf); + boolean uberized = conf.getBoolean("mapreduce.task.uberized", + false); + while (!taskDone.get()) { synchronized (lock) { done = false; @@ -893,9 +896,14 @@ abstract public class Task implements Writable, Configurable { // if Task Tracker is not aware of our task ID (probably because it died and // came back up), kill ourselves if (!taskFound) { - LOG.warn("Parent died. Exiting "+taskId); - resetDoneFlag(); - System.exit(66); + if (uberized) { + taskDone.set(true); + break; + } else { + LOG.warn("Parent died. Exiting "+taskId); + resetDoneFlag(); + System.exit(66); + } } // Set a flag that says we should preempt this is read by