svn merge -c 1408360 FIXES: MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1408361 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-11-12 17:19:58 +00:00
parent b3c28a5753
commit ec861244a2
3 changed files with 66 additions and 6 deletions

View File

@ -507,6 +507,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)
MAPREDUCE-4787. TestJobMonitorAndPrint is broken (Rob Parker via bobby)
MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe
via bobby)
Release 0.23.4 - UNRELEASED

View File

@ -217,13 +217,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
.addTransition(TaskStateInternal.SUCCEEDED,
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition())
.addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
TaskEventType.T_ATTEMPT_SUCCEEDED,
new AttemptSucceededAtSucceededTransition())
// Ignore-able transitions.
.addTransition(
TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_SUCCEEDED,
TaskEventType.T_KILL))
// Transitions from FAILED state
@ -971,6 +973,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
!castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
// succeeded state
task.finishedAttempts.add(castEvent.getTaskAttemptID());
task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
return TaskStateInternal.SUCCEEDED;
}
@ -1013,6 +1017,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
!attemptId.equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
// succeeded state
task.finishedAttempts.add(castEvent.getTaskAttemptID());
task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
return TaskStateInternal.SUCCEEDED;
}
}
@ -1043,6 +1049,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
private static class AttemptSucceededAtSucceededTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
task.finishedAttempts.add(castEvent.getTaskAttemptID());
task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
}
}
private static class KillNewTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override

View File

@ -141,7 +141,6 @@ public class TestTaskImpl {
private float progress = 0;
private TaskAttemptState state = TaskAttemptState.NEW;
private TaskAttemptId attemptId;
private TaskType taskType;
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
@ -152,14 +151,11 @@ public class TestTaskImpl {
AppContext appContext, TaskType taskType) {
super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
dataLocations, committer, jobToken, credentials, clock, appContext);
attemptId = Records.newRecord(TaskAttemptId.class);
attemptId.setId(id);
attemptId.setTaskId(taskId);
this.taskType = taskType;
}
public TaskAttemptId getAttemptId() {
return attemptId;
return getID();
}
@Override
@ -561,4 +557,49 @@ public class TestTaskImpl {
mockTask = createMockTask(TaskType.REDUCE);
runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING);
}
@Test
public void testSpeculativeMapFetchFailure() {
// Setup a scenario where speculative task wins, first attempt killed
mockTask = createMockTask(TaskType.MAP);
runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
assertEquals(2, taskAttempts.size());
// speculative attempt retroactively fails from fetch failures
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
}
@Test
public void testSpeculativeMapMultipleSucceedFetchFailure() {
// Setup a scenario where speculative task wins, first attempt succeeds
mockTask = createMockTask(TaskType.MAP);
runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED);
assertEquals(2, taskAttempts.size());
// speculative attempt retroactively fails from fetch failures
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
}
@Test
public void testSpeculativeMapFailedFetchFailure() {
// Setup a scenario where speculative task wins, first attempt succeeds
mockTask = createMockTask(TaskType.MAP);
runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
assertEquals(2, taskAttempts.size());
// speculative attempt retroactively fails from fetch failures
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
}
}