MAPREDUCE-3858. Task attempt failure during commit results in task never completing. (Tom White via mahadev) - Merging r1244254 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1244255 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d606016217
commit
c8debd083b
|
@ -768,6 +768,9 @@ Release 0.23.1 - 2012-02-08
|
|||
MAPREDUCE-3802. Added test to validate that AM can crash multiple times and
|
||||
still can recover successfully after MAPREDUCE-3846. (vinodkv)
|
||||
|
||||
MAPREDUCE-3858. Task attempt failure during commit results in task never completing.
|
||||
(Tom White via mahadev)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -832,6 +832,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
public TaskState transition(TaskImpl task, TaskEvent event) {
|
||||
task.failedAttempts++;
|
||||
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
|
||||
if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
|
||||
task.commitAttempt = null;
|
||||
}
|
||||
TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
|
||||
if (attempt.getAssignedContainerMgrAddress() != null) {
|
||||
//container was assigned
|
||||
|
@ -877,6 +880,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
|
||||
protected void unSucceed(TaskImpl task) {
|
||||
++task.numberUncompletedAttempts;
|
||||
task.commitAttempt = null;
|
||||
task.successfulAttempt = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -261,6 +263,12 @@ public class TestTaskImpl {
|
|||
assertTaskRunningState();
|
||||
}
|
||||
|
||||
private void commitTaskAttempt(TaskAttemptId attemptId) {
|
||||
mockTask.handle(new TaskTAttemptEvent(attemptId,
|
||||
TaskEventType.T_ATTEMPT_COMMIT_PENDING));
|
||||
assertTaskRunningState();
|
||||
}
|
||||
|
||||
private MockTaskAttemptImpl getLastAttempt() {
|
||||
return taskAttempts.get(taskAttempts.size()-1);
|
||||
}
|
||||
|
@ -279,32 +287,45 @@ public class TestTaskImpl {
|
|||
assertTaskRunningState();
|
||||
}
|
||||
|
||||
private void failRunningTaskAttempt(TaskAttemptId attemptId) {
|
||||
mockTask.handle(new TaskTAttemptEvent(attemptId,
|
||||
TaskEventType.T_ATTEMPT_FAILED));
|
||||
assertTaskRunningState();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link TaskState#NEW}
|
||||
*/
|
||||
private void assertTaskNewState() {
|
||||
assertEquals(mockTask.getState(), TaskState.NEW);
|
||||
assertEquals(TaskState.NEW, mockTask.getState());
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link TaskState#SCHEDULED}
|
||||
*/
|
||||
private void assertTaskScheduledState() {
|
||||
assertEquals(mockTask.getState(), TaskState.SCHEDULED);
|
||||
assertEquals(TaskState.SCHEDULED, mockTask.getState());
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link TaskState#RUNNING}
|
||||
*/
|
||||
private void assertTaskRunningState() {
|
||||
assertEquals(mockTask.getState(), TaskState.RUNNING);
|
||||
assertEquals(TaskState.RUNNING, mockTask.getState());
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link TaskState#KILL_WAIT}
|
||||
*/
|
||||
private void assertTaskKillWaitState() {
|
||||
assertEquals(mockTask.getState(), TaskState.KILL_WAIT);
|
||||
assertEquals(TaskState.KILL_WAIT, mockTask.getState());
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link TaskState#SUCCEEDED}
|
||||
*/
|
||||
private void assertTaskSucceededState() {
|
||||
assertEquals(TaskState.SUCCEEDED, mockTask.getState());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -409,5 +430,32 @@ public class TestTaskImpl {
|
|||
assert(mockTask.getProgress() == progress);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailureDuringTaskAttemptCommit() {
|
||||
TaskId taskId = getNewTaskID();
|
||||
scheduleTaskAttempt(taskId);
|
||||
launchTaskAttempt(getLastAttempt().getAttemptId());
|
||||
updateLastAttemptState(TaskAttemptState.COMMIT_PENDING);
|
||||
commitTaskAttempt(getLastAttempt().getAttemptId());
|
||||
|
||||
// During the task attempt commit there is an exception which causes
|
||||
// the attempt to fail
|
||||
updateLastAttemptState(TaskAttemptState.FAILED);
|
||||
failRunningTaskAttempt(getLastAttempt().getAttemptId());
|
||||
|
||||
assertEquals(2, taskAttempts.size());
|
||||
updateLastAttemptState(TaskAttemptState.SUCCEEDED);
|
||||
commitTaskAttempt(getLastAttempt().getAttemptId());
|
||||
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
|
||||
TaskEventType.T_ATTEMPT_SUCCEEDED));
|
||||
|
||||
assertFalse("First attempt should not commit",
|
||||
mockTask.canCommit(taskAttempts.get(0).getAttemptId()));
|
||||
assertTrue("Second attempt should commit",
|
||||
mockTask.canCommit(getLastAttempt().getAttemptId()));
|
||||
|
||||
assertTaskSucceededState();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue