diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index fe75fefeac4..96b176614df 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -484,6 +484,9 @@ Release 0.23.6 - UNRELEASED MAPREDUCE-4793. Problem with adding resources when using both -files and -file to hadoop streaming (jlowe) + MAPREDUCE-4890. Invalid TaskImpl state transitions when task fails while + speculating (jlowe) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index e2ebeb554cf..8b3a084e925 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -231,7 +231,12 @@ public abstract class TaskImpl implements Task, EventHandler { // Transitions from FAILED state .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED, EnumSet.of(TaskEventType.T_KILL, - TaskEventType.T_ADD_SPEC_ATTEMPT)) + TaskEventType.T_ADD_SPEC_ATTEMPT, + TaskEventType.T_ATTEMPT_COMMIT_PENDING, + TaskEventType.T_ATTEMPT_FAILED, + TaskEventType.T_ATTEMPT_KILLED, + TaskEventType.T_ATTEMPT_LAUNCHED, + TaskEventType.T_ATTEMPT_SUCCEEDED)) // Transitions from KILLED state .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED, @@ -941,6 +946,13 @@ public abstract class TaskImpl implements Task, EventHandler { task.handleTaskAttemptCompletion( taskAttemptId, TaskAttemptCompletionEventStatus.TIPFAILED); + + // issue kill to all non finished attempts + for (TaskAttempt taskAttempt : task.attempts.values()) { + task.killUnfinishedAttempt + (taskAttempt, "Task has failed. Killing attempt!"); + } + task.inProgressAttempts.clear(); if (task.historyTaskStartGenerated) { TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(), diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index 5e416d99e83..2c3732da529 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -602,4 +602,73 @@ public class TestTaskImpl { assertTaskScheduledState(); assertEquals(3, taskAttempts.size()); } + + @Test + public void testFailedTransitions() { + mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), + remoteJobConfFile, conf, taskAttemptListener, committer, jobToken, + credentials, clock, + completedTasksFromPreviousRun, startCount, + metrics, appContext, TaskType.MAP) { + @Override + protected int getMaxAttempts() { + return 1; + } + }; + TaskId taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(getLastAttempt().getAttemptId()); + + // add three more speculative attempts + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + launchTaskAttempt(getLastAttempt().getAttemptId()); + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + launchTaskAttempt(getLastAttempt().getAttemptId()); + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + launchTaskAttempt(getLastAttempt().getAttemptId()); + assertEquals(4, taskAttempts.size()); + + // have the first attempt fail, verify task failed due to no retries + MockTaskAttemptImpl taskAttempt = taskAttempts.get(0); + taskAttempt.setState(TaskAttemptState.FAILED); + mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + + // verify task can no longer be killed + mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL)); + assertEquals(TaskState.FAILED, mockTask.getState()); + + // verify speculative doesn't launch new tasks + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ATTEMPT_LAUNCHED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + assertEquals(4, taskAttempts.size()); + + // verify attempt events from active tasks don't knock task out of FAILED + taskAttempt = taskAttempts.get(1); + taskAttempt.setState(TaskAttemptState.COMMIT_PENDING); + mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), + TaskEventType.T_ATTEMPT_COMMIT_PENDING)); + assertEquals(TaskState.FAILED, mockTask.getState()); + taskAttempt.setState(TaskAttemptState.FAILED); + mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + taskAttempt = taskAttempts.get(2); + taskAttempt.setState(TaskAttemptState.SUCCEEDED); + mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + taskAttempt = taskAttempts.get(3); + taskAttempt.setState(TaskAttemptState.KILLED); + mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), + TaskEventType.T_ATTEMPT_KILLED)); + assertEquals(TaskState.FAILED, mockTask.getState()); + } }