diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 74dacd22633..bb27716a751 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -815,6 +815,9 @@ Release 0.23.1 - 2012-02-08 MAPREDUCE-3802. Added test to validate that AM can crash multiple times and still can recover successfully after MAPREDUCE-3846. (vinodkv) + MAPREDUCE-3858. Task attempt failure during commit results in task never completing. + (Tom White via mahadev) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index e472e99cd21..0d7a2d8caee 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -832,6 +832,9 @@ public abstract class TaskImpl implements Task, EventHandler { public TaskState transition(TaskImpl task, TaskEvent event) { task.failedAttempts++; TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; + if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) { + task.commitAttempt = null; + } TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID()); if (attempt.getAssignedContainerMgrAddress() != null) { //container was assigned @@ -877,6 +880,7 @@ public abstract class TaskImpl implements Task, EventHandler { protected void unSucceed(TaskImpl task) { ++task.numberUncompletedAttempts; + task.commitAttempt = null; task.successfulAttempt = null; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index dcc9b07cc38..0033528347e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -261,6 +263,12 @@ public class TestTaskImpl { assertTaskRunningState(); } + private void commitTaskAttempt(TaskAttemptId attemptId) { + mockTask.handle(new TaskTAttemptEvent(attemptId, + TaskEventType.T_ATTEMPT_COMMIT_PENDING)); + assertTaskRunningState(); + } + private MockTaskAttemptImpl getLastAttempt() { return taskAttempts.get(taskAttempts.size()-1); } @@ -279,32 +287,45 @@ public class TestTaskImpl { assertTaskRunningState(); } + private void failRunningTaskAttempt(TaskAttemptId attemptId) { + mockTask.handle(new TaskTAttemptEvent(attemptId, + TaskEventType.T_ATTEMPT_FAILED)); + assertTaskRunningState(); + } + /** * {@link TaskState#NEW} */ private void assertTaskNewState() { - assertEquals(mockTask.getState(), TaskState.NEW); + assertEquals(TaskState.NEW, mockTask.getState()); } /** * {@link TaskState#SCHEDULED} */ private void assertTaskScheduledState() { - assertEquals(mockTask.getState(), TaskState.SCHEDULED); + assertEquals(TaskState.SCHEDULED, mockTask.getState()); } /** * {@link TaskState#RUNNING} */ private void assertTaskRunningState() { - assertEquals(mockTask.getState(), TaskState.RUNNING); + assertEquals(TaskState.RUNNING, mockTask.getState()); } /** * {@link TaskState#KILL_WAIT} */ private void assertTaskKillWaitState() { - assertEquals(mockTask.getState(), TaskState.KILL_WAIT); + assertEquals(TaskState.KILL_WAIT, mockTask.getState()); + } + + /** + * {@link TaskState#SUCCEEDED} + */ + private void assertTaskSucceededState() { + assertEquals(TaskState.SUCCEEDED, mockTask.getState()); } @Test @@ -409,5 +430,32 @@ public class TestTaskImpl { assert(mockTask.getProgress() == progress); } + + @Test + public void testFailureDuringTaskAttemptCommit() { + TaskId taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(getLastAttempt().getAttemptId()); + updateLastAttemptState(TaskAttemptState.COMMIT_PENDING); + commitTaskAttempt(getLastAttempt().getAttemptId()); + + // During the task attempt commit there is an exception which causes + // the attempt to fail + updateLastAttemptState(TaskAttemptState.FAILED); + failRunningTaskAttempt(getLastAttempt().getAttemptId()); + + assertEquals(2, taskAttempts.size()); + updateLastAttemptState(TaskAttemptState.SUCCEEDED); + commitTaskAttempt(getLastAttempt().getAttemptId()); + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + + assertFalse("First attempt should not commit", + mockTask.canCommit(taskAttempts.get(0).getAttemptId())); + assertTrue("Second attempt should commit", + mockTask.canCommit(getLastAttempt().getAttemptId())); + + assertTaskSucceededState(); + } }