svn merge -c 1408314 FIXES: MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1408316 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-11-12 15:19:41 +00:00
parent d488a59497
commit 74656788e5
5 changed files with 208 additions and 48 deletions

View File

@ -503,6 +503,8 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED
state (jlowe via bobby)
MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)
Release 0.23.4 - UNRELEASED

View File

@ -712,7 +712,10 @@ protected void scheduleTasks(Set<TaskId> taskIDs) {
* The only entry point to change the Job.
*/
public void handle(JobEvent event) {
LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + event.getJobId() + " of type "
+ event.getType());
}
try {
writeLock.lock();
JobStateInternal oldState = getInternalState();

View File

@ -22,9 +22,11 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -118,9 +120,18 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
protected Credentials credentials;
protected Token<JobTokenIdentifier> jobToken;
//should be set to one which comes first
//saying COMMIT_PENDING
private TaskAttemptId commitAttempt;
private TaskAttemptId successfulAttempt;
private final Set<TaskAttemptId> failedAttempts;
// Track the finished attempts - successful, failed and killed
private final Set<TaskAttemptId> finishedAttempts;
// counts the number of attempts that are either running or in a state where
// they will come to be running when they get a Container
private int numberUncompletedAttempts = 0;
private final Set<TaskAttemptId> inProgressAttempts;
private boolean historyTaskStartGenerated = false;
@ -182,6 +193,14 @@ TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_KILLED,
new KillWaitAttemptKilledTransition())
.addTransition(TaskStateInternal.KILL_WAIT,
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_SUCCEEDED,
new KillWaitAttemptSucceededTransition())
.addTransition(TaskStateInternal.KILL_WAIT,
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_FAILED,
new KillWaitAttemptFailedTransition())
// Ignore-able transitions.
.addTransition(
TaskStateInternal.KILL_WAIT,
@ -189,8 +208,6 @@ TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
TaskEventType.T_ATTEMPT_FAILED,
TaskEventType.T_ATTEMPT_SUCCEEDED,
TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from SUCCEEDED state
@ -242,15 +259,6 @@ public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
new RecoverdAttemptsComparator();
//should be set to one which comes first
//saying COMMIT_PENDING
private TaskAttemptId commitAttempt;
private TaskAttemptId successfulAttempt;
private int failedAttempts;
private int finishedAttempts;//finish are total of success, failed and killed
@Override
public TaskState getState() {
readLock.lock();
@ -275,6 +283,9 @@ public TaskImpl(JobId jobId, TaskType taskType, int partition,
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
this.attempts = Collections.emptyMap();
this.finishedAttempts = new HashSet<TaskAttemptId>(2);
this.failedAttempts = new HashSet<TaskAttemptId>(2);
this.inProgressAttempts = new HashSet<TaskAttemptId>(2);
// This overridable method call is okay in a constructor because we
// have a convention that none of the overrides depends on any
// fields that need initialization.
@ -611,9 +622,9 @@ private void addAndScheduleAttempt() {
taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
}
++numberUncompletedAttempts;
inProgressAttempts.add(attempt.getID());
//schedule the nextAttemptNumber
if (failedAttempts > 0) {
if (failedAttempts.size() > 0) {
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_RESCHEDULE));
} else {
@ -788,12 +799,14 @@ private static class AttemptSucceededTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
TaskTAttemptEvent taskTAttemptEvent = (TaskTAttemptEvent) event;
TaskAttemptId taskAttemptId = taskTAttemptEvent.getTaskAttemptID();
task.handleTaskAttemptCompletion(
((TaskTAttemptEvent) event).getTaskAttemptID(),
taskAttemptId,
TaskAttemptCompletionEventStatus.SUCCEEDED);
task.finishedAttempts++;
--task.numberUncompletedAttempts;
task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID();
task.finishedAttempts.add(taskAttemptId);
task.inProgressAttempts.remove(taskAttemptId);
task.successfulAttempt = taskAttemptId;
task.eventHandler.handle(new JobTaskEvent(
task.taskId, TaskState.SUCCEEDED));
LOG.info("Task succeeded with attempt " + task.successfulAttempt);
@ -824,11 +837,13 @@ private static class AttemptKilledTransition implements
SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
TaskAttemptId taskAttemptId =
((TaskTAttemptEvent) event).getTaskAttemptID();
task.handleTaskAttemptCompletion(
((TaskTAttemptEvent) event).getTaskAttemptID(),
taskAttemptId,
TaskAttemptCompletionEventStatus.KILLED);
task.finishedAttempts++;
--task.numberUncompletedAttempts;
task.finishedAttempts.add(taskAttemptId);
task.inProgressAttempts.remove(taskAttemptId);
if (task.successfulAttempt == null) {
task.addAndScheduleAttempt();
}
@ -840,15 +855,25 @@ private static class KillWaitAttemptKilledTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
protected TaskStateInternal finalState = TaskStateInternal.KILLED;
protected final TaskAttemptCompletionEventStatus taCompletionEventStatus;
public KillWaitAttemptKilledTransition() {
this(TaskAttemptCompletionEventStatus.KILLED);
}
public KillWaitAttemptKilledTransition(
TaskAttemptCompletionEventStatus taCompletionEventStatus) {
this.taCompletionEventStatus = taCompletionEventStatus;
}
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.handleTaskAttemptCompletion(
((TaskTAttemptEvent) event).getTaskAttemptID(),
TaskAttemptCompletionEventStatus.KILLED);
task.finishedAttempts++;
TaskAttemptId taskAttemptId =
((TaskTAttemptEvent) event).getTaskAttemptID();
task.handleTaskAttemptCompletion(taskAttemptId, taCompletionEventStatus);
task.finishedAttempts.add(taskAttemptId);
// check whether all attempts are finished
if (task.finishedAttempts == task.attempts.size()) {
if (task.finishedAttempts.size() == task.attempts.size()) {
if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
finalState, null); // TODO JH verify failedAttempt null
@ -867,43 +892,57 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
}
}
private static class KillWaitAttemptSucceededTransition extends
KillWaitAttemptKilledTransition {
public KillWaitAttemptSucceededTransition() {
super(TaskAttemptCompletionEventStatus.SUCCEEDED);
}
}
private static class KillWaitAttemptFailedTransition extends
KillWaitAttemptKilledTransition {
public KillWaitAttemptFailedTransition() {
super(TaskAttemptCompletionEventStatus.FAILED);
}
}
private static class AttemptFailedTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.failedAttempts++;
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID();
task.failedAttempts.add(taskAttemptId);
if (taskAttemptId.equals(task.commitAttempt)) {
task.commitAttempt = null;
}
TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
TaskAttempt attempt = task.attempts.get(taskAttemptId);
if (attempt.getAssignedContainerMgrAddress() != null) {
//container was assigned
task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(),
attempt.getAssignedContainerMgrAddress()));
}
task.finishedAttempts++;
if (task.failedAttempts < task.maxAttempts) {
task.finishedAttempts.add(taskAttemptId);
if (task.failedAttempts.size() < task.maxAttempts) {
task.handleTaskAttemptCompletion(
((TaskTAttemptEvent) event).getTaskAttemptID(),
taskAttemptId,
TaskAttemptCompletionEventStatus.FAILED);
// we don't need a new event if we already have a spare
if (--task.numberUncompletedAttempts == 0
task.inProgressAttempts.remove(taskAttemptId);
if (task.inProgressAttempts.size() == 0
&& task.successfulAttempt == null) {
task.addAndScheduleAttempt();
}
} else {
task.handleTaskAttemptCompletion(
((TaskTAttemptEvent) event).getTaskAttemptID(),
taskAttemptId,
TaskAttemptCompletionEventStatus.TIPFAILED);
TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
TaskAttemptId taId = ev.getTaskAttemptID();
if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
TaskStateInternal.FAILED, taId);
TaskStateInternal.FAILED, taskAttemptId);
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
taskFailedEvent));
} else {
@ -927,14 +966,12 @@ private static class RetroactiveFailureTransition
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
if (event instanceof TaskTAttemptEvent) {
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
!castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
// succeeded state
return TaskStateInternal.SUCCEEDED;
}
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
!castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
// succeeded state
return TaskStateInternal.SUCCEEDED;
}
// a successful REDUCE task should not be overridden
@ -953,7 +990,7 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
// believe that there's no redundancy.
unSucceed(task);
// fake increase in Uncomplete attempts for super.transition
++task.numberUncompletedAttempts;
task.inProgressAttempts.add(castEvent.getTaskAttemptID());
return super.transition(task, event);
}
@ -1045,7 +1082,7 @@ public void transition(TaskImpl task, TaskEvent event) {
(attempt, "Task KILL is received. Killing attempt!");
}
task.numberUncompletedAttempts = 0;
task.inProgressAttempts.clear();
}
}

View File

@ -54,6 +54,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
@ -63,6 +64,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
@ -243,6 +245,39 @@ public Job submit(Configuration conf) throws Exception {
return job;
}
public void waitForInternalState(JobImpl job,
JobStateInternal finalState) throws Exception {
int timeoutSecs = 0;
JobStateInternal iState = job.getInternalState();
while (!finalState.equals(iState) && timeoutSecs++ < 20) {
System.out.println("Job Internal State is : " + iState
+ " Waiting for Internal state : " + finalState);
Thread.sleep(500);
iState = job.getInternalState();
}
System.out.println("Task Internal State is : " + iState);
Assert.assertEquals("Task Internal state is not correct (timedout)",
finalState, iState);
}
public void waitForInternalState(TaskImpl task,
TaskStateInternal finalState) throws Exception {
int timeoutSecs = 0;
TaskReport report = task.getReport();
TaskStateInternal iState = task.getInternalState();
while (!finalState.equals(iState) && timeoutSecs++ < 20) {
System.out.println("Task Internal State is : " + iState
+ " Waiting for Internal state : " + finalState + " progress : "
+ report.getProgress());
Thread.sleep(500);
report = task.getReport();
iState = task.getInternalState();
}
System.out.println("Task Internal State is : " + iState);
Assert.assertEquals("Task Internal state is not correct (timedout)",
finalState, iState);
}
public void waitForInternalState(TaskAttemptImpl attempt,
TaskAttemptStateInternal finalState) throws Exception {
int timeoutSecs = 0;

View File

@ -25,12 +25,15 @@
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@ -39,12 +42,18 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.junit.Test;
/**
* Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios.
*
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestKill {
@Test
@ -131,6 +140,80 @@ public void testKillTask() throws Exception {
iter.next().getReport().getTaskAttemptState());
}
@Test
public void testKillTaskWait() throws Exception {
final Dispatcher dispatcher = new AsyncDispatcher() {
private TaskAttemptEvent cachedKillEvent;
@Override
protected void dispatch(Event event) {
if (event instanceof TaskAttemptEvent) {
TaskAttemptEvent killEvent = (TaskAttemptEvent) event;
if (killEvent.getType() == TaskAttemptEventType.TA_KILL) {
TaskAttemptId taID = killEvent.getTaskAttemptID();
if (taID.getTaskId().getTaskType() == TaskType.REDUCE
&& taID.getTaskId().getId() == 0 && taID.getId() == 0) {
// Task is asking the reduce TA to kill itself. 'Create' a race
// condition. Make the task succeed and then inform the task that
// TA has succeeded. Once Task gets the TA succeeded event at
// KILL_WAIT, then relay the actual kill signal to TA
super.dispatch(new TaskAttemptEvent(taID,
TaskAttemptEventType.TA_DONE));
super.dispatch(new TaskAttemptEvent(taID,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
super.dispatch(new TaskTAttemptEvent(taID,
TaskEventType.T_ATTEMPT_SUCCEEDED));
this.cachedKillEvent = killEvent;
return;
}
}
} else if (event instanceof TaskEvent) {
TaskEvent taskEvent = (TaskEvent) event;
if (taskEvent.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED
&& this.cachedKillEvent != null) {
// When the TA comes and reports that it is done, send the
// cachedKillEvent
super.dispatch(this.cachedKillEvent);
return;
}
}
super.dispatch(event);
}
};
MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
@Override
public Dispatcher createDispatcher() {
return dispatcher;
}
};
Job job = app.submit(new Configuration());
JobId jobId = app.getJobId();
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask = it.next();
Task reduceTask = it.next();
app.waitForState(mapTask, TaskState.RUNNING);
app.waitForState(reduceTask, TaskState.RUNNING);
TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
// Finish map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapAttempt.getID(),
TaskAttemptEventType.TA_DONE));
app.waitForState(mapTask, TaskState.SUCCEEDED);
// Now kill the job
app.getContext().getEventHandler()
.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
}
@Test
public void testKillTaskAttempt() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);