svn merge -c 1425223 FIXES: MAPREDUCE-4890. Invalid TaskImpl state transitions when task fails while speculating. Contributed by Jason Lowe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1425224 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2012-12-22 01:46:05 +00:00
parent 674d6ba079
commit 5b6115e3bb
3 changed files with 85 additions and 1 deletions

View File

@ -484,6 +484,9 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4793. Problem with adding resources when using both -files and
-file to hadoop streaming (jlowe)
MAPREDUCE-4890. Invalid TaskImpl state transitions when task fails while
speculating (jlowe)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -231,7 +231,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
// Transitions from FAILED state
.addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ADD_SPEC_ATTEMPT))
TaskEventType.T_ADD_SPEC_ATTEMPT,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
TaskEventType.T_ATTEMPT_FAILED,
TaskEventType.T_ATTEMPT_KILLED,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_SUCCEEDED))
// Transitions from KILLED state
.addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
@ -942,6 +947,13 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
taskAttemptId,
TaskAttemptCompletionEventStatus.TIPFAILED);
// issue kill to all non finished attempts
for (TaskAttempt taskAttempt : task.attempts.values()) {
task.killUnfinishedAttempt
(taskAttempt, "Task has failed. Killing attempt!");
}
task.inProgressAttempts.clear();
if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
TaskStateInternal.FAILED, taskAttemptId);

View File

@ -602,4 +602,73 @@ public class TestTaskImpl {
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
}
@Test
public void testFailedTransitions() {
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
credentials, clock,
completedTasksFromPreviousRun, startCount,
metrics, appContext, TaskType.MAP) {
@Override
protected int getMaxAttempts() {
return 1;
}
};
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
// add three more speculative attempts
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ADD_SPEC_ATTEMPT));
launchTaskAttempt(getLastAttempt().getAttemptId());
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ADD_SPEC_ATTEMPT));
launchTaskAttempt(getLastAttempt().getAttemptId());
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ADD_SPEC_ATTEMPT));
launchTaskAttempt(getLastAttempt().getAttemptId());
assertEquals(4, taskAttempts.size());
// have the first attempt fail, verify task failed due to no retries
MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
taskAttempt.setState(TaskAttemptState.FAILED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
assertEquals(TaskState.FAILED, mockTask.getState());
// verify task can no longer be killed
mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
assertEquals(TaskState.FAILED, mockTask.getState());
// verify speculative doesn't launch new tasks
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ADD_SPEC_ATTEMPT));
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ATTEMPT_LAUNCHED));
assertEquals(TaskState.FAILED, mockTask.getState());
assertEquals(4, taskAttempts.size());
// verify attempt events from active tasks don't knock task out of FAILED
taskAttempt = taskAttempts.get(1);
taskAttempt.setState(TaskAttemptState.COMMIT_PENDING);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_COMMIT_PENDING));
assertEquals(TaskState.FAILED, mockTask.getState());
taskAttempt.setState(TaskAttemptState.FAILED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
assertEquals(TaskState.FAILED, mockTask.getState());
taskAttempt = taskAttempts.get(2);
taskAttempt.setState(TaskAttemptState.SUCCEEDED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_SUCCEEDED));
assertEquals(TaskState.FAILED, mockTask.getState());
taskAttempt = taskAttempts.get(3);
taskAttempt.setState(TaskAttemptState.KILLED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_KILLED));
assertEquals(TaskState.FAILED, mockTask.getState());
}
}