MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1408314 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
42f4691ead
commit
dd72ca3536
|
@ -651,6 +651,8 @@ Release 0.23.5 - UNRELEASED
|
|||
MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED
|
||||
state (jlowe via bobby)
|
||||
|
||||
MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)
|
||||
|
||||
Release 0.23.4 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -712,7 +712,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
* The only entry point to change the Job.
|
||||
*/
|
||||
public void handle(JobEvent event) {
|
||||
LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Processing " + event.getJobId() + " of type "
|
||||
+ event.getType());
|
||||
}
|
||||
try {
|
||||
writeLock.lock();
|
||||
JobStateInternal oldState = getInternalState();
|
||||
|
|
|
@ -22,9 +22,11 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
@ -118,9 +120,18 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
protected Credentials credentials;
|
||||
protected Token<JobTokenIdentifier> jobToken;
|
||||
|
||||
//should be set to one which comes first
|
||||
//saying COMMIT_PENDING
|
||||
private TaskAttemptId commitAttempt;
|
||||
|
||||
private TaskAttemptId successfulAttempt;
|
||||
|
||||
private final Set<TaskAttemptId> failedAttempts;
|
||||
// Track the finished attempts - successful, failed and killed
|
||||
private final Set<TaskAttemptId> finishedAttempts;
|
||||
// counts the number of attempts that are either running or in a state where
|
||||
// they will come to be running when they get a Container
|
||||
private int numberUncompletedAttempts = 0;
|
||||
private final Set<TaskAttemptId> inProgressAttempts;
|
||||
|
||||
private boolean historyTaskStartGenerated = false;
|
||||
|
||||
|
@ -182,6 +193,14 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
|
||||
TaskEventType.T_ATTEMPT_KILLED,
|
||||
new KillWaitAttemptKilledTransition())
|
||||
.addTransition(TaskStateInternal.KILL_WAIT,
|
||||
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
|
||||
TaskEventType.T_ATTEMPT_SUCCEEDED,
|
||||
new KillWaitAttemptSucceededTransition())
|
||||
.addTransition(TaskStateInternal.KILL_WAIT,
|
||||
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
|
||||
TaskEventType.T_ATTEMPT_FAILED,
|
||||
new KillWaitAttemptFailedTransition())
|
||||
// Ignore-able transitions.
|
||||
.addTransition(
|
||||
TaskStateInternal.KILL_WAIT,
|
||||
|
@ -189,8 +208,6 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
EnumSet.of(TaskEventType.T_KILL,
|
||||
TaskEventType.T_ATTEMPT_LAUNCHED,
|
||||
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
|
||||
TaskEventType.T_ATTEMPT_FAILED,
|
||||
TaskEventType.T_ATTEMPT_SUCCEEDED,
|
||||
TaskEventType.T_ADD_SPEC_ATTEMPT))
|
||||
|
||||
// Transitions from SUCCEEDED state
|
||||
|
@ -242,15 +259,6 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
|
||||
new RecoverdAttemptsComparator();
|
||||
|
||||
//should be set to one which comes first
|
||||
//saying COMMIT_PENDING
|
||||
private TaskAttemptId commitAttempt;
|
||||
|
||||
private TaskAttemptId successfulAttempt;
|
||||
|
||||
private int failedAttempts;
|
||||
private int finishedAttempts;//finish are total of success, failed and killed
|
||||
|
||||
@Override
|
||||
public TaskState getState() {
|
||||
readLock.lock();
|
||||
|
@ -275,6 +283,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
readLock = readWriteLock.readLock();
|
||||
writeLock = readWriteLock.writeLock();
|
||||
this.attempts = Collections.emptyMap();
|
||||
this.finishedAttempts = new HashSet<TaskAttemptId>(2);
|
||||
this.failedAttempts = new HashSet<TaskAttemptId>(2);
|
||||
this.inProgressAttempts = new HashSet<TaskAttemptId>(2);
|
||||
// This overridable method call is okay in a constructor because we
|
||||
// have a convention that none of the overrides depends on any
|
||||
// fields that need initialization.
|
||||
|
@ -611,9 +622,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
|
||||
}
|
||||
|
||||
++numberUncompletedAttempts;
|
||||
inProgressAttempts.add(attempt.getID());
|
||||
//schedule the nextAttemptNumber
|
||||
if (failedAttempts > 0) {
|
||||
if (failedAttempts.size() > 0) {
|
||||
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
|
||||
TaskAttemptEventType.TA_RESCHEDULE));
|
||||
} else {
|
||||
|
@ -788,12 +799,14 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
implements SingleArcTransition<TaskImpl, TaskEvent> {
|
||||
@Override
|
||||
public void transition(TaskImpl task, TaskEvent event) {
|
||||
TaskTAttemptEvent taskTAttemptEvent = (TaskTAttemptEvent) event;
|
||||
TaskAttemptId taskAttemptId = taskTAttemptEvent.getTaskAttemptID();
|
||||
task.handleTaskAttemptCompletion(
|
||||
((TaskTAttemptEvent) event).getTaskAttemptID(),
|
||||
taskAttemptId,
|
||||
TaskAttemptCompletionEventStatus.SUCCEEDED);
|
||||
task.finishedAttempts++;
|
||||
--task.numberUncompletedAttempts;
|
||||
task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID();
|
||||
task.finishedAttempts.add(taskAttemptId);
|
||||
task.inProgressAttempts.remove(taskAttemptId);
|
||||
task.successfulAttempt = taskAttemptId;
|
||||
task.eventHandler.handle(new JobTaskEvent(
|
||||
task.taskId, TaskState.SUCCEEDED));
|
||||
LOG.info("Task succeeded with attempt " + task.successfulAttempt);
|
||||
|
@ -824,11 +837,13 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
SingleArcTransition<TaskImpl, TaskEvent> {
|
||||
@Override
|
||||
public void transition(TaskImpl task, TaskEvent event) {
|
||||
TaskAttemptId taskAttemptId =
|
||||
((TaskTAttemptEvent) event).getTaskAttemptID();
|
||||
task.handleTaskAttemptCompletion(
|
||||
((TaskTAttemptEvent) event).getTaskAttemptID(),
|
||||
taskAttemptId,
|
||||
TaskAttemptCompletionEventStatus.KILLED);
|
||||
task.finishedAttempts++;
|
||||
--task.numberUncompletedAttempts;
|
||||
task.finishedAttempts.add(taskAttemptId);
|
||||
task.inProgressAttempts.remove(taskAttemptId);
|
||||
if (task.successfulAttempt == null) {
|
||||
task.addAndScheduleAttempt();
|
||||
}
|
||||
|
@ -840,15 +855,25 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
|
||||
|
||||
protected TaskStateInternal finalState = TaskStateInternal.KILLED;
|
||||
protected final TaskAttemptCompletionEventStatus taCompletionEventStatus;
|
||||
|
||||
public KillWaitAttemptKilledTransition() {
|
||||
this(TaskAttemptCompletionEventStatus.KILLED);
|
||||
}
|
||||
|
||||
public KillWaitAttemptKilledTransition(
|
||||
TaskAttemptCompletionEventStatus taCompletionEventStatus) {
|
||||
this.taCompletionEventStatus = taCompletionEventStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
|
||||
task.handleTaskAttemptCompletion(
|
||||
((TaskTAttemptEvent) event).getTaskAttemptID(),
|
||||
TaskAttemptCompletionEventStatus.KILLED);
|
||||
task.finishedAttempts++;
|
||||
TaskAttemptId taskAttemptId =
|
||||
((TaskTAttemptEvent) event).getTaskAttemptID();
|
||||
task.handleTaskAttemptCompletion(taskAttemptId, taCompletionEventStatus);
|
||||
task.finishedAttempts.add(taskAttemptId);
|
||||
// check whether all attempts are finished
|
||||
if (task.finishedAttempts == task.attempts.size()) {
|
||||
if (task.finishedAttempts.size() == task.attempts.size()) {
|
||||
if (task.historyTaskStartGenerated) {
|
||||
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
|
||||
finalState, null); // TODO JH verify failedAttempt null
|
||||
|
@ -867,43 +892,57 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
}
|
||||
}
|
||||
|
||||
private static class KillWaitAttemptSucceededTransition extends
|
||||
KillWaitAttemptKilledTransition {
|
||||
public KillWaitAttemptSucceededTransition() {
|
||||
super(TaskAttemptCompletionEventStatus.SUCCEEDED);
|
||||
}
|
||||
}
|
||||
|
||||
private static class KillWaitAttemptFailedTransition extends
|
||||
KillWaitAttemptKilledTransition {
|
||||
public KillWaitAttemptFailedTransition() {
|
||||
super(TaskAttemptCompletionEventStatus.FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
private static class AttemptFailedTransition implements
|
||||
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
|
||||
|
||||
@Override
|
||||
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
|
||||
task.failedAttempts++;
|
||||
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
|
||||
if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
|
||||
TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID();
|
||||
task.failedAttempts.add(taskAttemptId);
|
||||
if (taskAttemptId.equals(task.commitAttempt)) {
|
||||
task.commitAttempt = null;
|
||||
}
|
||||
TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
|
||||
TaskAttempt attempt = task.attempts.get(taskAttemptId);
|
||||
if (attempt.getAssignedContainerMgrAddress() != null) {
|
||||
//container was assigned
|
||||
task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(),
|
||||
attempt.getAssignedContainerMgrAddress()));
|
||||
}
|
||||
|
||||
task.finishedAttempts++;
|
||||
if (task.failedAttempts < task.maxAttempts) {
|
||||
task.finishedAttempts.add(taskAttemptId);
|
||||
if (task.failedAttempts.size() < task.maxAttempts) {
|
||||
task.handleTaskAttemptCompletion(
|
||||
((TaskTAttemptEvent) event).getTaskAttemptID(),
|
||||
taskAttemptId,
|
||||
TaskAttemptCompletionEventStatus.FAILED);
|
||||
// we don't need a new event if we already have a spare
|
||||
if (--task.numberUncompletedAttempts == 0
|
||||
task.inProgressAttempts.remove(taskAttemptId);
|
||||
if (task.inProgressAttempts.size() == 0
|
||||
&& task.successfulAttempt == null) {
|
||||
task.addAndScheduleAttempt();
|
||||
}
|
||||
} else {
|
||||
task.handleTaskAttemptCompletion(
|
||||
((TaskTAttemptEvent) event).getTaskAttemptID(),
|
||||
taskAttemptId,
|
||||
TaskAttemptCompletionEventStatus.TIPFAILED);
|
||||
TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
|
||||
TaskAttemptId taId = ev.getTaskAttemptID();
|
||||
|
||||
if (task.historyTaskStartGenerated) {
|
||||
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
|
||||
TaskStateInternal.FAILED, taId);
|
||||
TaskStateInternal.FAILED, taskAttemptId);
|
||||
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
|
||||
taskFailedEvent));
|
||||
} else {
|
||||
|
@ -927,14 +966,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
|
||||
@Override
|
||||
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
|
||||
if (event instanceof TaskTAttemptEvent) {
|
||||
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
|
||||
if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
|
||||
!castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
|
||||
// don't allow a different task attempt to override a previous
|
||||
// succeeded state
|
||||
return TaskStateInternal.SUCCEEDED;
|
||||
}
|
||||
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
|
||||
if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
|
||||
!castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
|
||||
// don't allow a different task attempt to override a previous
|
||||
// succeeded state
|
||||
return TaskStateInternal.SUCCEEDED;
|
||||
}
|
||||
|
||||
// a successful REDUCE task should not be overridden
|
||||
|
@ -953,7 +990,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
// believe that there's no redundancy.
|
||||
unSucceed(task);
|
||||
// fake increase in Uncomplete attempts for super.transition
|
||||
++task.numberUncompletedAttempts;
|
||||
task.inProgressAttempts.add(castEvent.getTaskAttemptID());
|
||||
return super.transition(task, event);
|
||||
}
|
||||
|
||||
|
@ -1045,7 +1082,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
(attempt, "Task KILL is received. Killing attempt!");
|
||||
}
|
||||
|
||||
task.numberUncompletedAttempts = 0;
|
||||
task.inProgressAttempts.clear();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
||||
|
@ -63,6 +64,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
||||
|
@ -243,6 +245,39 @@ public class MRApp extends MRAppMaster {
|
|||
return job;
|
||||
}
|
||||
|
||||
public void waitForInternalState(JobImpl job,
|
||||
JobStateInternal finalState) throws Exception {
|
||||
int timeoutSecs = 0;
|
||||
JobStateInternal iState = job.getInternalState();
|
||||
while (!finalState.equals(iState) && timeoutSecs++ < 20) {
|
||||
System.out.println("Job Internal State is : " + iState
|
||||
+ " Waiting for Internal state : " + finalState);
|
||||
Thread.sleep(500);
|
||||
iState = job.getInternalState();
|
||||
}
|
||||
System.out.println("Task Internal State is : " + iState);
|
||||
Assert.assertEquals("Task Internal state is not correct (timedout)",
|
||||
finalState, iState);
|
||||
}
|
||||
|
||||
public void waitForInternalState(TaskImpl task,
|
||||
TaskStateInternal finalState) throws Exception {
|
||||
int timeoutSecs = 0;
|
||||
TaskReport report = task.getReport();
|
||||
TaskStateInternal iState = task.getInternalState();
|
||||
while (!finalState.equals(iState) && timeoutSecs++ < 20) {
|
||||
System.out.println("Task Internal State is : " + iState
|
||||
+ " Waiting for Internal state : " + finalState + " progress : "
|
||||
+ report.getProgress());
|
||||
Thread.sleep(500);
|
||||
report = task.getReport();
|
||||
iState = task.getInternalState();
|
||||
}
|
||||
System.out.println("Task Internal State is : " + iState);
|
||||
Assert.assertEquals("Task Internal state is not correct (timedout)",
|
||||
finalState, iState);
|
||||
}
|
||||
|
||||
public void waitForInternalState(TaskAttemptImpl attempt,
|
||||
TaskAttemptStateInternal finalState) throws Exception {
|
||||
int timeoutSecs = 0;
|
||||
|
|
|
@ -25,12 +25,15 @@ import java.util.concurrent.CountDownLatch;
|
|||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||
|
@ -39,12 +42,18 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios.
|
||||
*
|
||||
*/
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public class TestKill {
|
||||
|
||||
@Test
|
||||
|
@ -131,6 +140,80 @@ public class TestKill {
|
|||
iter.next().getReport().getTaskAttemptState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKillTaskWait() throws Exception {
|
||||
final Dispatcher dispatcher = new AsyncDispatcher() {
|
||||
private TaskAttemptEvent cachedKillEvent;
|
||||
@Override
|
||||
protected void dispatch(Event event) {
|
||||
if (event instanceof TaskAttemptEvent) {
|
||||
TaskAttemptEvent killEvent = (TaskAttemptEvent) event;
|
||||
if (killEvent.getType() == TaskAttemptEventType.TA_KILL) {
|
||||
TaskAttemptId taID = killEvent.getTaskAttemptID();
|
||||
if (taID.getTaskId().getTaskType() == TaskType.REDUCE
|
||||
&& taID.getTaskId().getId() == 0 && taID.getId() == 0) {
|
||||
// Task is asking the reduce TA to kill itself. 'Create' a race
|
||||
// condition. Make the task succeed and then inform the task that
|
||||
// TA has succeeded. Once Task gets the TA succeeded event at
|
||||
// KILL_WAIT, then relay the actual kill signal to TA
|
||||
super.dispatch(new TaskAttemptEvent(taID,
|
||||
TaskAttemptEventType.TA_DONE));
|
||||
super.dispatch(new TaskAttemptEvent(taID,
|
||||
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
||||
super.dispatch(new TaskTAttemptEvent(taID,
|
||||
TaskEventType.T_ATTEMPT_SUCCEEDED));
|
||||
this.cachedKillEvent = killEvent;
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else if (event instanceof TaskEvent) {
|
||||
TaskEvent taskEvent = (TaskEvent) event;
|
||||
if (taskEvent.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED
|
||||
&& this.cachedKillEvent != null) {
|
||||
// When the TA comes and reports that it is done, send the
|
||||
// cachedKillEvent
|
||||
super.dispatch(this.cachedKillEvent);
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
super.dispatch(event);
|
||||
}
|
||||
};
|
||||
MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
|
||||
@Override
|
||||
public Dispatcher createDispatcher() {
|
||||
return dispatcher;
|
||||
}
|
||||
};
|
||||
Job job = app.submit(new Configuration());
|
||||
JobId jobId = app.getJobId();
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
|
||||
Iterator<Task> it = job.getTasks().values().iterator();
|
||||
Task mapTask = it.next();
|
||||
Task reduceTask = it.next();
|
||||
app.waitForState(mapTask, TaskState.RUNNING);
|
||||
app.waitForState(reduceTask, TaskState.RUNNING);
|
||||
TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
|
||||
app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
|
||||
TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
|
||||
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
|
||||
|
||||
// Finish map
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(
|
||||
mapAttempt.getID(),
|
||||
TaskAttemptEventType.TA_DONE));
|
||||
app.waitForState(mapTask, TaskState.SUCCEEDED);
|
||||
|
||||
// Now kill the job
|
||||
app.getContext().getEventHandler()
|
||||
.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
|
||||
|
||||
app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKillTaskAttempt() throws Exception {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
|
Loading…
Reference in New Issue