diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 62714ee889d..01254e14d36 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -667,6 +667,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4300. OOM in AM can turn it into a zombie. (Robert Evans via tgraves) + MAPREDUCE-4252. MR2 job never completes with 1 pending task (Tom White via + bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index c7f89f9ac58..cf5d92b4c7c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -189,7 +189,7 @@ public abstract class TaskImpl implements Task, EventHandler { // Transitions from SUCCEEDED state .addTransition(TaskState.SUCCEEDED, //only possible for map tasks - EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED), + EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED), TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition()) .addTransition(TaskState.SUCCEEDED, //only possible for map tasks EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED), @@ -618,7 +618,7 @@ public abstract class TaskImpl implements Task, EventHandler { } } - private void internalError(TaskEventType type) { + protected void internalError(TaskEventType type) { LOG.error("Invalid event " + type + " on Task " + this.taskId); eventHandler.handle(new JobDiagnosticsUpdateEvent( this.taskId.getJobId(), "Invalid event " + type + @@ -896,6 +896,16 @@ public abstract class TaskImpl implements Task, EventHandler { @Override public TaskState transition(TaskImpl task, TaskEvent event) { + if (event instanceof TaskTAttemptEvent) { + TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; + if (task.getState() == TaskState.SUCCEEDED && + !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) { + // don't allow a different task attempt to override a previous + // succeeded state + return TaskState.SUCCEEDED; + } + } + //verify that this occurs only for map task //TODO: consider moving it to MapTaskImpl if (!TaskType.MAP.equals(task.getType())) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index 4ff6001bf58..af77f7dbb7a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -127,8 +128,13 @@ public class TestTaskImpl { @Override protected int getMaxAttempts() { return 100; - } - + } + + @Override + protected void internalError(TaskEventType type) { + super.internalError(type); + fail("Internal error: " + type); + } } private class MockTaskAttemptImpl extends TaskAttemptImpl { @@ -462,5 +468,32 @@ public class TestTaskImpl { assertTaskSucceededState(); } + + @Test + public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() { + TaskId taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(getLastAttempt().getAttemptId()); + updateLastAttemptState(TaskAttemptState.RUNNING); + + // Add a speculative task attempt that succeeds + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + launchTaskAttempt(getLastAttempt().getAttemptId()); + commitTaskAttempt(getLastAttempt().getAttemptId()); + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + + // The task should now have succeeded + assertTaskSucceededState(); + + // Now fail the first task attempt, after the second has succeeded + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + + // The task should still be in the succeeded state + assertTaskSucceededState(); + + } }