MAPREDUCE-4890. Invalid TaskImpl state transitions when task fails while speculating. Contributed by Jason Lowe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1425223 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2012-12-22 01:41:26 +00:00
parent 5a408bdeac
commit 04db3ce177
3 changed files with 85 additions and 1 deletions

View File

@ -641,6 +641,9 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4793. Problem with adding resources when using both -files and MAPREDUCE-4793. Problem with adding resources when using both -files and
-file to hadoop streaming (jlowe) -file to hadoop streaming (jlowe)
MAPREDUCE-4890. Invalid TaskImpl state transitions when task fails while
speculating (jlowe)
Release 0.23.5 - UNRELEASED Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -231,7 +231,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
// Transitions from FAILED state // Transitions from FAILED state
.addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED, .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
EnumSet.of(TaskEventType.T_KILL, EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ADD_SPEC_ATTEMPT)) TaskEventType.T_ADD_SPEC_ATTEMPT,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
TaskEventType.T_ATTEMPT_FAILED,
TaskEventType.T_ATTEMPT_KILLED,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_SUCCEEDED))
// Transitions from KILLED state // Transitions from KILLED state
.addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED, .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
@ -942,6 +947,13 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
taskAttemptId, taskAttemptId,
TaskAttemptCompletionEventStatus.TIPFAILED); TaskAttemptCompletionEventStatus.TIPFAILED);
// issue kill to all non finished attempts
for (TaskAttempt taskAttempt : task.attempts.values()) {
task.killUnfinishedAttempt
(taskAttempt, "Task has failed. Killing attempt!");
}
task.inProgressAttempts.clear();
if (task.historyTaskStartGenerated) { if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(), TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
TaskStateInternal.FAILED, taskAttemptId); TaskStateInternal.FAILED, taskAttemptId);

View File

@ -602,4 +602,73 @@ public class TestTaskImpl {
assertTaskScheduledState(); assertTaskScheduledState();
assertEquals(3, taskAttempts.size()); assertEquals(3, taskAttempts.size());
} }
@Test
public void testFailedTransitions() {
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
credentials, clock,
completedTasksFromPreviousRun, startCount,
metrics, appContext, TaskType.MAP) {
@Override
protected int getMaxAttempts() {
return 1;
}
};
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
// add three more speculative attempts
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ADD_SPEC_ATTEMPT));
launchTaskAttempt(getLastAttempt().getAttemptId());
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ADD_SPEC_ATTEMPT));
launchTaskAttempt(getLastAttempt().getAttemptId());
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ADD_SPEC_ATTEMPT));
launchTaskAttempt(getLastAttempt().getAttemptId());
assertEquals(4, taskAttempts.size());
// have the first attempt fail, verify task failed due to no retries
MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
taskAttempt.setState(TaskAttemptState.FAILED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
assertEquals(TaskState.FAILED, mockTask.getState());
// verify task can no longer be killed
mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
assertEquals(TaskState.FAILED, mockTask.getState());
// verify speculative doesn't launch new tasks
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ADD_SPEC_ATTEMPT));
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ATTEMPT_LAUNCHED));
assertEquals(TaskState.FAILED, mockTask.getState());
assertEquals(4, taskAttempts.size());
// verify attempt events from active tasks don't knock task out of FAILED
taskAttempt = taskAttempts.get(1);
taskAttempt.setState(TaskAttemptState.COMMIT_PENDING);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_COMMIT_PENDING));
assertEquals(TaskState.FAILED, mockTask.getState());
taskAttempt.setState(TaskAttemptState.FAILED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
assertEquals(TaskState.FAILED, mockTask.getState());
taskAttempt = taskAttempts.get(2);
taskAttempt.setState(TaskAttemptState.SUCCEEDED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_SUCCEEDED));
assertEquals(TaskState.FAILED, mockTask.getState());
taskAttempt = taskAttempts.get(3);
taskAttempt.setState(TaskAttemptState.KILLED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_KILLED));
assertEquals(TaskState.FAILED, mockTask.getState());
}
} }