From 74656788e5cda75bf87ed82b4bb555d0e5c2418f Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Mon, 12 Nov 2012 15:19:41 +0000 Subject: [PATCH] svn merge -c 1408314 FIXES: MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1408316 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 2 + .../mapreduce/v2/app/job/impl/JobImpl.java | 5 +- .../mapreduce/v2/app/job/impl/TaskImpl.java | 131 +++++++++++------- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 35 +++++ .../hadoop/mapreduce/v2/app/TestKill.java | 83 +++++++++++ 5 files changed, 208 insertions(+), 48 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 32696f2462b..83abcb3125f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -503,6 +503,8 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED state (jlowe via bobby) + + MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby) Release 0.23.4 - UNRELEASED diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 19488b2e81e..b46ee466ae2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -712,7 +712,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, * The only entry point to change the Job. */ public void handle(JobEvent event) { - LOG.debug("Processing " + event.getJobId() + " of type " + event.getType()); + if (LOG.isDebugEnabled()) { + LOG.debug("Processing " + event.getJobId() + " of type " + + event.getType()); + } try { writeLock.lock(); JobStateInternal oldState = getInternalState(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index e9be7d18fc2..b05dd439a5a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -22,9 +22,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -118,9 +120,18 @@ public abstract class TaskImpl implements Task, EventHandler { protected Credentials credentials; protected Token jobToken; + //should be set to one which comes first + //saying COMMIT_PENDING + private TaskAttemptId commitAttempt; + + private TaskAttemptId successfulAttempt; + + private final Set failedAttempts; + // Track the finished attempts - successful, failed and killed + private final Set finishedAttempts; // counts the number of attempts that are either running or in a state where // they will come to be running when they get a Container - private int numberUncompletedAttempts = 0; + private final Set inProgressAttempts; private boolean historyTaskStartGenerated = false; @@ -182,6 +193,14 @@ public abstract class TaskImpl implements Task, EventHandler { EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED), TaskEventType.T_ATTEMPT_KILLED, new KillWaitAttemptKilledTransition()) + .addTransition(TaskStateInternal.KILL_WAIT, + EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED), + TaskEventType.T_ATTEMPT_SUCCEEDED, + new KillWaitAttemptSucceededTransition()) + .addTransition(TaskStateInternal.KILL_WAIT, + EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED), + TaskEventType.T_ATTEMPT_FAILED, + new KillWaitAttemptFailedTransition()) // Ignore-able transitions. .addTransition( TaskStateInternal.KILL_WAIT, @@ -189,8 +208,6 @@ public abstract class TaskImpl implements Task, EventHandler { EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_LAUNCHED, TaskEventType.T_ATTEMPT_COMMIT_PENDING, - TaskEventType.T_ATTEMPT_FAILED, - TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ADD_SPEC_ATTEMPT)) // Transitions from SUCCEEDED state @@ -242,15 +259,6 @@ public abstract class TaskImpl implements Task, EventHandler { private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR = new RecoverdAttemptsComparator(); - //should be set to one which comes first - //saying COMMIT_PENDING - private TaskAttemptId commitAttempt; - - private TaskAttemptId successfulAttempt; - - private int failedAttempts; - private int finishedAttempts;//finish are total of success, failed and killed - @Override public TaskState getState() { readLock.lock(); @@ -275,6 +283,9 @@ public abstract class TaskImpl implements Task, EventHandler { readLock = readWriteLock.readLock(); writeLock = readWriteLock.writeLock(); this.attempts = Collections.emptyMap(); + this.finishedAttempts = new HashSet(2); + this.failedAttempts = new HashSet(2); + this.inProgressAttempts = new HashSet(2); // This overridable method call is okay in a constructor because we // have a convention that none of the overrides depends on any // fields that need initialization. @@ -611,9 +622,9 @@ public abstract class TaskImpl implements Task, EventHandler { taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId(); } - ++numberUncompletedAttempts; + inProgressAttempts.add(attempt.getID()); //schedule the nextAttemptNumber - if (failedAttempts > 0) { + if (failedAttempts.size() > 0) { eventHandler.handle(new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_RESCHEDULE)); } else { @@ -788,12 +799,14 @@ public abstract class TaskImpl implements Task, EventHandler { implements SingleArcTransition { @Override public void transition(TaskImpl task, TaskEvent event) { + TaskTAttemptEvent taskTAttemptEvent = (TaskTAttemptEvent) event; + TaskAttemptId taskAttemptId = taskTAttemptEvent.getTaskAttemptID(); task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), + taskAttemptId, TaskAttemptCompletionEventStatus.SUCCEEDED); - task.finishedAttempts++; - --task.numberUncompletedAttempts; - task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID(); + task.finishedAttempts.add(taskAttemptId); + task.inProgressAttempts.remove(taskAttemptId); + task.successfulAttempt = taskAttemptId; task.eventHandler.handle(new JobTaskEvent( task.taskId, TaskState.SUCCEEDED)); LOG.info("Task succeeded with attempt " + task.successfulAttempt); @@ -824,11 +837,13 @@ public abstract class TaskImpl implements Task, EventHandler { SingleArcTransition { @Override public void transition(TaskImpl task, TaskEvent event) { + TaskAttemptId taskAttemptId = + ((TaskTAttemptEvent) event).getTaskAttemptID(); task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), + taskAttemptId, TaskAttemptCompletionEventStatus.KILLED); - task.finishedAttempts++; - --task.numberUncompletedAttempts; + task.finishedAttempts.add(taskAttemptId); + task.inProgressAttempts.remove(taskAttemptId); if (task.successfulAttempt == null) { task.addAndScheduleAttempt(); } @@ -840,15 +855,25 @@ public abstract class TaskImpl implements Task, EventHandler { MultipleArcTransition { protected TaskStateInternal finalState = TaskStateInternal.KILLED; + protected final TaskAttemptCompletionEventStatus taCompletionEventStatus; + + public KillWaitAttemptKilledTransition() { + this(TaskAttemptCompletionEventStatus.KILLED); + } + + public KillWaitAttemptKilledTransition( + TaskAttemptCompletionEventStatus taCompletionEventStatus) { + this.taCompletionEventStatus = taCompletionEventStatus; + } @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { - task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), - TaskAttemptCompletionEventStatus.KILLED); - task.finishedAttempts++; + TaskAttemptId taskAttemptId = + ((TaskTAttemptEvent) event).getTaskAttemptID(); + task.handleTaskAttemptCompletion(taskAttemptId, taCompletionEventStatus); + task.finishedAttempts.add(taskAttemptId); // check whether all attempts are finished - if (task.finishedAttempts == task.attempts.size()) { + if (task.finishedAttempts.size() == task.attempts.size()) { if (task.historyTaskStartGenerated) { TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null, finalState, null); // TODO JH verify failedAttempt null @@ -867,43 +892,57 @@ public abstract class TaskImpl implements Task, EventHandler { } } + private static class KillWaitAttemptSucceededTransition extends + KillWaitAttemptKilledTransition { + public KillWaitAttemptSucceededTransition() { + super(TaskAttemptCompletionEventStatus.SUCCEEDED); + } + } + + private static class KillWaitAttemptFailedTransition extends + KillWaitAttemptKilledTransition { + public KillWaitAttemptFailedTransition() { + super(TaskAttemptCompletionEventStatus.FAILED); + } + } + private static class AttemptFailedTransition implements MultipleArcTransition { @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { - task.failedAttempts++; TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; - if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) { + TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID(); + task.failedAttempts.add(taskAttemptId); + if (taskAttemptId.equals(task.commitAttempt)) { task.commitAttempt = null; } - TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID()); + TaskAttempt attempt = task.attempts.get(taskAttemptId); if (attempt.getAssignedContainerMgrAddress() != null) { //container was assigned task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(), attempt.getAssignedContainerMgrAddress())); } - task.finishedAttempts++; - if (task.failedAttempts < task.maxAttempts) { + task.finishedAttempts.add(taskAttemptId); + if (task.failedAttempts.size() < task.maxAttempts) { task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), + taskAttemptId, TaskAttemptCompletionEventStatus.FAILED); // we don't need a new event if we already have a spare - if (--task.numberUncompletedAttempts == 0 + task.inProgressAttempts.remove(taskAttemptId); + if (task.inProgressAttempts.size() == 0 && task.successfulAttempt == null) { task.addAndScheduleAttempt(); } } else { task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), + taskAttemptId, TaskAttemptCompletionEventStatus.TIPFAILED); - TaskTAttemptEvent ev = (TaskTAttemptEvent) event; - TaskAttemptId taId = ev.getTaskAttemptID(); if (task.historyTaskStartGenerated) { TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(), - TaskStateInternal.FAILED, taId); + TaskStateInternal.FAILED, taskAttemptId); task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), taskFailedEvent)); } else { @@ -927,14 +966,12 @@ public abstract class TaskImpl implements Task, EventHandler { @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { - if (event instanceof TaskTAttemptEvent) { - TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; - if (task.getInternalState() == TaskStateInternal.SUCCEEDED && - !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) { - // don't allow a different task attempt to override a previous - // succeeded state - return TaskStateInternal.SUCCEEDED; - } + TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; + if (task.getInternalState() == TaskStateInternal.SUCCEEDED && + !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) { + // don't allow a different task attempt to override a previous + // succeeded state + return TaskStateInternal.SUCCEEDED; } // a successful REDUCE task should not be overridden @@ -953,7 +990,7 @@ public abstract class TaskImpl implements Task, EventHandler { // believe that there's no redundancy. unSucceed(task); // fake increase in Uncomplete attempts for super.transition - ++task.numberUncompletedAttempts; + task.inProgressAttempts.add(castEvent.getTaskAttemptID()); return super.transition(task, event); } @@ -1045,7 +1082,7 @@ public abstract class TaskImpl implements Task, EventHandler { (attempt, "Task KILL is received. Killing attempt!"); } - task.numberUncompletedAttempts = 0; + task.inProgressAttempts.clear(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 2fe3dcf1547..67400846426 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; +import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; @@ -63,6 +64,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; +import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; @@ -243,6 +245,39 @@ public class MRApp extends MRAppMaster { return job; } + public void waitForInternalState(JobImpl job, + JobStateInternal finalState) throws Exception { + int timeoutSecs = 0; + JobStateInternal iState = job.getInternalState(); + while (!finalState.equals(iState) && timeoutSecs++ < 20) { + System.out.println("Job Internal State is : " + iState + + " Waiting for Internal state : " + finalState); + Thread.sleep(500); + iState = job.getInternalState(); + } + System.out.println("Task Internal State is : " + iState); + Assert.assertEquals("Task Internal state is not correct (timedout)", + finalState, iState); + } + + public void waitForInternalState(TaskImpl task, + TaskStateInternal finalState) throws Exception { + int timeoutSecs = 0; + TaskReport report = task.getReport(); + TaskStateInternal iState = task.getInternalState(); + while (!finalState.equals(iState) && timeoutSecs++ < 20) { + System.out.println("Task Internal State is : " + iState + + " Waiting for Internal state : " + finalState + " progress : " + + report.getProgress()); + Thread.sleep(500); + report = task.getReport(); + iState = task.getInternalState(); + } + System.out.println("Task Internal State is : " + iState); + Assert.assertEquals("Task Internal state is not correct (timedout)", + finalState, iState); + } + public void waitForInternalState(TaskAttemptImpl attempt, TaskAttemptStateInternal finalState) throws Exception { int timeoutSecs = 0; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java index 3533c285300..f9bc0338705 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java @@ -25,12 +25,15 @@ import java.util.concurrent.CountDownLatch; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; @@ -39,12 +42,18 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; import org.junit.Test; /** * Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios. * */ +@SuppressWarnings({"unchecked", "rawtypes"}) public class TestKill { @Test @@ -131,6 +140,80 @@ public class TestKill { iter.next().getReport().getTaskAttemptState()); } + @Test + public void testKillTaskWait() throws Exception { + final Dispatcher dispatcher = new AsyncDispatcher() { + private TaskAttemptEvent cachedKillEvent; + @Override + protected void dispatch(Event event) { + if (event instanceof TaskAttemptEvent) { + TaskAttemptEvent killEvent = (TaskAttemptEvent) event; + if (killEvent.getType() == TaskAttemptEventType.TA_KILL) { + TaskAttemptId taID = killEvent.getTaskAttemptID(); + if (taID.getTaskId().getTaskType() == TaskType.REDUCE + && taID.getTaskId().getId() == 0 && taID.getId() == 0) { + // Task is asking the reduce TA to kill itself. 'Create' a race + // condition. Make the task succeed and then inform the task that + // TA has succeeded. Once Task gets the TA succeeded event at + // KILL_WAIT, then relay the actual kill signal to TA + super.dispatch(new TaskAttemptEvent(taID, + TaskAttemptEventType.TA_DONE)); + super.dispatch(new TaskAttemptEvent(taID, + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + super.dispatch(new TaskTAttemptEvent(taID, + TaskEventType.T_ATTEMPT_SUCCEEDED)); + this.cachedKillEvent = killEvent; + return; + } + } + } else if (event instanceof TaskEvent) { + TaskEvent taskEvent = (TaskEvent) event; + if (taskEvent.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED + && this.cachedKillEvent != null) { + // When the TA comes and reports that it is done, send the + // cachedKillEvent + super.dispatch(this.cachedKillEvent); + return; + } + + } + super.dispatch(event); + } + }; + MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { + @Override + public Dispatcher createDispatcher() { + return dispatcher; + } + }; + Job job = app.submit(new Configuration()); + JobId jobId = app.getJobId(); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask = it.next(); + Task reduceTask = it.next(); + app.waitForState(mapTask, TaskState.RUNNING); + app.waitForState(reduceTask, TaskState.RUNNING); + TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt, TaskAttemptState.RUNNING); + TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next(); + app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); + + // Finish map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapAttempt.getID(), + TaskAttemptEventType.TA_DONE)); + app.waitForState(mapTask, TaskState.SUCCEEDED); + + // Now kill the job + app.getContext().getEventHandler() + .handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + + app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); + } + @Test public void testKillTaskAttempt() throws Exception { final CountDownLatch latch = new CountDownLatch(1);