MAPREDUCE-4252. MR2 job never completes with 1 pending task (Tom White via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1359747 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-07-10 16:10:14 +00:00
parent e0f96aa249
commit 6804ef32fc
3 changed files with 50 additions and 4 deletions

View File

@ -667,6 +667,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4300. OOM in AM can turn it into a zombie. (Robert Evans via
tgraves)
MAPREDUCE-4252. MR2 job never completes with 1 pending task (Tom White via
bobby)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -189,7 +189,7 @@ TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
// Transitions from SUCCEEDED state
.addTransition(TaskState.SUCCEEDED, //only possible for map tasks
EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED),
EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED),
TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
.addTransition(TaskState.SUCCEEDED, //only possible for map tasks
EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED),
@ -618,7 +618,7 @@ public void handle(TaskEvent event) {
}
}
private void internalError(TaskEventType type) {
protected void internalError(TaskEventType type) {
LOG.error("Invalid event " + type + " on Task " + this.taskId);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
this.taskId.getJobId(), "Invalid event " + type +
@ -896,6 +896,16 @@ private static class MapRetroactiveFailureTransition
@Override
public TaskState transition(TaskImpl task, TaskEvent event) {
if (event instanceof TaskTAttemptEvent) {
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
if (task.getState() == TaskState.SUCCEEDED &&
!castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
// succeeded state
return TaskState.SUCCEEDED;
}
}
//verify that this occurs only for map task
//TODO: consider moving it to MapTaskImpl
if (!TaskType.MAP.equals(task.getType())) {

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -127,8 +128,13 @@ protected TaskAttemptImpl createAttempt() {
@Override
protected int getMaxAttempts() {
return 100;
}
}
@Override
protected void internalError(TaskEventType type) {
super.internalError(type);
fail("Internal error: " + type);
}
}
private class MockTaskAttemptImpl extends TaskAttemptImpl {
@ -462,5 +468,32 @@ public void testFailureDuringTaskAttemptCommit() {
assertTaskSucceededState();
}
@Test
public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
updateLastAttemptState(TaskAttemptState.RUNNING);
// Add a speculative task attempt that succeeds
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ADD_SPEC_ATTEMPT));
launchTaskAttempt(getLastAttempt().getAttemptId());
commitTaskAttempt(getLastAttempt().getAttemptId());
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ATTEMPT_SUCCEEDED));
// The task should now have succeeded
assertTaskSucceededState();
// Now fail the first task attempt, after the second has succeeded
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
// The task should still be in the succeeded state
assertTaskSucceededState();
}
}