MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1408360 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1a45b7e357
commit
c493d06b1b
|
@ -654,6 +654,9 @@ Release 0.23.5 - UNRELEASED
|
|||
MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)
|
||||
|
||||
MAPREDUCE-4787. TestJobMonitorAndPrint is broken (Rob Parker via bobby)
|
||||
|
||||
MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe
|
||||
via bobby)
|
||||
|
||||
Release 0.23.4 - UNRELEASED
|
||||
|
||||
|
|
|
@ -217,13 +217,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
.addTransition(TaskStateInternal.SUCCEEDED,
|
||||
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
|
||||
TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition())
|
||||
.addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
|
||||
TaskEventType.T_ATTEMPT_SUCCEEDED,
|
||||
new AttemptSucceededAtSucceededTransition())
|
||||
// Ignore-able transitions.
|
||||
.addTransition(
|
||||
TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
|
||||
EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
|
||||
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
|
||||
TaskEventType.T_ATTEMPT_LAUNCHED,
|
||||
TaskEventType.T_ATTEMPT_SUCCEEDED,
|
||||
TaskEventType.T_KILL))
|
||||
|
||||
// Transitions from FAILED state
|
||||
|
@ -971,6 +973,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
!castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
|
||||
// don't allow a different task attempt to override a previous
|
||||
// succeeded state
|
||||
task.finishedAttempts.add(castEvent.getTaskAttemptID());
|
||||
task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
|
||||
return TaskStateInternal.SUCCEEDED;
|
||||
}
|
||||
|
||||
|
@ -1013,6 +1017,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
!attemptId.equals(task.successfulAttempt)) {
|
||||
// don't allow a different task attempt to override a previous
|
||||
// succeeded state
|
||||
task.finishedAttempts.add(castEvent.getTaskAttemptID());
|
||||
task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
|
||||
return TaskStateInternal.SUCCEEDED;
|
||||
}
|
||||
}
|
||||
|
@ -1043,6 +1049,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
}
|
||||
}
|
||||
|
||||
private static class AttemptSucceededAtSucceededTransition
|
||||
implements SingleArcTransition<TaskImpl, TaskEvent> {
|
||||
@Override
|
||||
public void transition(TaskImpl task, TaskEvent event) {
|
||||
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
|
||||
task.finishedAttempts.add(castEvent.getTaskAttemptID());
|
||||
task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
|
||||
}
|
||||
}
|
||||
|
||||
private static class KillNewTransition
|
||||
implements SingleArcTransition<TaskImpl, TaskEvent> {
|
||||
@Override
|
||||
|
|
|
@ -141,7 +141,6 @@ public class TestTaskImpl {
|
|||
|
||||
private float progress = 0;
|
||||
private TaskAttemptState state = TaskAttemptState.NEW;
|
||||
private TaskAttemptId attemptId;
|
||||
private TaskType taskType;
|
||||
|
||||
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
|
||||
|
@ -152,14 +151,11 @@ public class TestTaskImpl {
|
|||
AppContext appContext, TaskType taskType) {
|
||||
super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
|
||||
dataLocations, committer, jobToken, credentials, clock, appContext);
|
||||
attemptId = Records.newRecord(TaskAttemptId.class);
|
||||
attemptId.setId(id);
|
||||
attemptId.setTaskId(taskId);
|
||||
this.taskType = taskType;
|
||||
}
|
||||
|
||||
public TaskAttemptId getAttemptId() {
|
||||
return attemptId;
|
||||
return getID();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -561,4 +557,49 @@ public class TestTaskImpl {
|
|||
mockTask = createMockTask(TaskType.REDUCE);
|
||||
runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpeculativeMapFetchFailure() {
|
||||
// Setup a scenario where speculative task wins, first attempt killed
|
||||
mockTask = createMockTask(TaskType.MAP);
|
||||
runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
|
||||
assertEquals(2, taskAttempts.size());
|
||||
|
||||
// speculative attempt retroactively fails from fetch failures
|
||||
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
|
||||
TaskEventType.T_ATTEMPT_FAILED));
|
||||
|
||||
assertTaskScheduledState();
|
||||
assertEquals(3, taskAttempts.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpeculativeMapMultipleSucceedFetchFailure() {
|
||||
// Setup a scenario where speculative task wins, first attempt succeeds
|
||||
mockTask = createMockTask(TaskType.MAP);
|
||||
runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED);
|
||||
assertEquals(2, taskAttempts.size());
|
||||
|
||||
// speculative attempt retroactively fails from fetch failures
|
||||
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
|
||||
TaskEventType.T_ATTEMPT_FAILED));
|
||||
|
||||
assertTaskScheduledState();
|
||||
assertEquals(3, taskAttempts.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpeculativeMapFailedFetchFailure() {
|
||||
// Setup a scenario where speculative task wins, first attempt succeeds
|
||||
mockTask = createMockTask(TaskType.MAP);
|
||||
runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
|
||||
assertEquals(2, taskAttempts.size());
|
||||
|
||||
// speculative attempt retroactively fails from fetch failures
|
||||
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
|
||||
TaskEventType.T_ATTEMPT_FAILED));
|
||||
|
||||
assertTaskScheduledState();
|
||||
assertEquals(3, taskAttempts.size());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue