MAPREDUCE-5465. Tasks are often killed before they exit on their own. Contributed by Ming Ma

This commit is contained in:
Jason Lowe 2015-05-11 22:40:05 +00:00
parent 2c80a3fe19
commit 53e9973afc
22 changed files with 1061 additions and 154 deletions

View File

@ -153,6 +153,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6353. Divide by zero error in MR AM when calculating available MAPREDUCE-6353. Divide by zero error in MR AM when calculating available
containers. (Anubhav Dhoot via kasha) containers. (Anubhav Dhoot via kasha)
MAPREDUCE-5465. Tasks are often killed before they exit on their own
(Ming Ma via jlowe)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -264,7 +264,8 @@ public class LocalContainerLauncher extends AbstractService implements
context.getEventHandler().handle( context.getEventHandler().handle(
new TaskAttemptEvent(taId, new TaskAttemptEvent(taId,
TaskAttemptEventType.TA_CONTAINER_CLEANED)); TaskAttemptEventType.TA_CONTAINER_CLEANED));
} else if (event.getType() == EventType.CONTAINER_COMPLETED) {
LOG.debug("Container completed " + event.toString());
} else { } else {
LOG.warn("Ignoring unexpected event " + event.toString()); LOG.warn("Ignoring unexpected event " + event.toString());
} }
@ -314,7 +315,14 @@ public class LocalContainerLauncher extends AbstractService implements
} }
runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
(numReduceTasks > 0), localMapFiles); (numReduceTasks > 0), localMapFiles);
// In non-uber mode, TA gets TA_CONTAINER_COMPLETED from MRAppMaster
// as part of NM -> RM -> AM notification route.
// In uber mode, given the task run inside the MRAppMaster container,
// we have to simulate the notification.
context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_CONTAINER_COMPLETED));
} catch (RuntimeException re) { } catch (RuntimeException re) {
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1); jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);

View File

@ -67,4 +67,6 @@ public interface AppContext {
boolean hasSuccessfullyUnregistered(); boolean hasSuccessfullyUnregistered();
String getNMHostname(); String getNMHostname();
TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor();
} }

View File

@ -204,6 +204,14 @@ public class MRAppMaster extends CompositeService {
private JobHistoryEventHandler jobHistoryEventHandler; private JobHistoryEventHandler jobHistoryEventHandler;
private SpeculatorEventDispatcher speculatorEventDispatcher; private SpeculatorEventDispatcher speculatorEventDispatcher;
// After a task attempt completes from TaskUmbilicalProtocol's point of view,
// it will be transitioned to finishing state.
// taskAttemptFinishingMonitor is just a timer for attempts in finishing
// state. If the attempt stays in finishing state for too long,
// taskAttemptFinishingMonitor will notify the attempt via TA_TIMED_OUT
// event.
private TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
private Job job; private Job job;
private Credentials jobCredentials = new Credentials(); // Filled during init private Credentials jobCredentials = new Credentials(); // Filled during init
protected UserGroupInformation currentUser; // Will be setup during init protected UserGroupInformation currentUser; // Will be setup during init
@ -246,6 +254,12 @@ public class MRAppMaster extends CompositeService {
logSyncer = TaskLog.createLogSyncer(); logSyncer = TaskLog.createLogSyncer();
LOG.info("Created MRAppMaster for application " + applicationAttemptId); LOG.info("Created MRAppMaster for application " + applicationAttemptId);
} }
protected TaskAttemptFinishingMonitor createTaskAttemptFinishingMonitor(
EventHandler eventHandler) {
TaskAttemptFinishingMonitor monitor =
new TaskAttemptFinishingMonitor(eventHandler);
return monitor;
}
@Override @Override
protected void serviceInit(final Configuration conf) throws Exception { protected void serviceInit(final Configuration conf) throws Exception {
@ -256,7 +270,11 @@ public class MRAppMaster extends CompositeService {
initJobCredentialsAndUGI(conf); initJobCredentialsAndUGI(conf);
context = new RunningAppContext(conf); dispatcher = createDispatcher();
addIfService(dispatcher);
taskAttemptFinishingMonitor = createTaskAttemptFinishingMonitor(dispatcher.getEventHandler());
addIfService(taskAttemptFinishingMonitor);
context = new RunningAppContext(conf, taskAttemptFinishingMonitor);
// Job name is the same as the app name util we support DAG of jobs // Job name is the same as the app name util we support DAG of jobs
// for an app later // for an app later
@ -323,9 +341,6 @@ public class MRAppMaster extends CompositeService {
} }
if (errorHappenedShutDown) { if (errorHappenedShutDown) {
dispatcher = createDispatcher();
addIfService(dispatcher);
NoopEventHandler eater = new NoopEventHandler(); NoopEventHandler eater = new NoopEventHandler();
//We do not have a JobEventDispatcher in this path //We do not have a JobEventDispatcher in this path
dispatcher.register(JobEventType.class, eater); dispatcher.register(JobEventType.class, eater);
@ -372,9 +387,6 @@ public class MRAppMaster extends CompositeService {
} else { } else {
committer = createOutputCommitter(conf); committer = createOutputCommitter(conf);
dispatcher = createDispatcher();
addIfService(dispatcher);
//service to handle requests from JobClient //service to handle requests from JobClient
clientService = createClientService(context); clientService = createClientService(context);
// Init ClientService separately so that we stop it separately, since this // Init ClientService separately so that we stop it separately, since this
@ -946,10 +958,14 @@ public class MRAppMaster extends CompositeService {
private final ClusterInfo clusterInfo = new ClusterInfo(); private final ClusterInfo clusterInfo = new ClusterInfo();
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
public RunningAppContext(Configuration config) { private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
public RunningAppContext(Configuration config,
TaskAttemptFinishingMonitor taskAttemptFinishingMonitor) {
this.conf = config; this.conf = config;
this.clientToAMTokenSecretManager = this.clientToAMTokenSecretManager =
new ClientToAMTokenSecretManager(appAttemptID, null); new ClientToAMTokenSecretManager(appAttemptID, null);
this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor;
} }
@Override @Override
@ -1034,6 +1050,12 @@ public class MRAppMaster extends CompositeService {
public String getNMHostname() { public String getNMHostname() {
return nmHost; return nmHost;
} }
@Override
public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
return taskAttemptFinishingMonitor;
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* This class generates TA_TIMED_OUT if the task attempt stays in FINISHING
* state for too long.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public class TaskAttemptFinishingMonitor extends
AbstractLivelinessMonitor<TaskAttemptId> {
private EventHandler eventHandler;
public TaskAttemptFinishingMonitor(EventHandler eventHandler) {
super("TaskAttemptFinishingMonitor", new SystemClock());
this.eventHandler = eventHandler;
}
public void init(Configuration conf) {
super.init(conf);
int expireIntvl = conf.getInt(MRJobConfig.TASK_EXIT_TIMEOUT,
MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
int checkIntvl = conf.getInt(
MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS,
MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT);
setExpireInterval(expireIntvl);
setMonitorInterval(checkIntvl);
}
@Override
protected void expire(TaskAttemptId id) {
eventHandler.handle(
new TaskAttemptEvent(id,
TaskAttemptEventType.TA_TIMED_OUT));
}
}

View File

@ -370,7 +370,7 @@ public class MRClientService extends AbstractService implements ClientService {
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
appContext.getEventHandler().handle( appContext.getEventHandler().handle(
new TaskAttemptEvent(taskAttemptId, new TaskAttemptEvent(taskAttemptId,
TaskAttemptEventType.TA_FAILMSG)); TaskAttemptEventType.TA_FAILMSG_BY_CLIENT));
FailTaskAttemptResponse response = recordFactory. FailTaskAttemptResponse response = recordFactory.
newRecordInstance(FailTaskAttemptResponse.class); newRecordInstance(FailTaskAttemptResponse.class);
return response; return response;

View File

@ -30,9 +30,42 @@ public enum TaskAttemptStateInternal {
UNASSIGNED, UNASSIGNED,
ASSIGNED, ASSIGNED,
RUNNING, RUNNING,
COMMIT_PENDING, COMMIT_PENDING,
SUCCESS_CONTAINER_CLEANUP,
SUCCEEDED, // Transition into SUCCESS_FINISHING_CONTAINER
// After the attempt finishes successfully from
// TaskUmbilicalProtocol's point of view, it will transition to
// SUCCESS_FINISHING_CONTAINER state. That will give a chance for the
// container to exit by itself. In the transition,
// the attempt will notify the task via T_ATTEMPT_SUCCEEDED so that
// from job point of view, the task is considered succeeded.
// Transition out of SUCCESS_FINISHING_CONTAINER
// The attempt will transition from SUCCESS_FINISHING_CONTAINER to
// SUCCESS_CONTAINER_CLEANUP if it doesn't receive container exit
// notification within TASK_EXIT_TIMEOUT;
// Or it will transition to SUCCEEDED if it receives container exit
// notification from YARN.
SUCCESS_FINISHING_CONTAINER,
// Transition into FAIL_FINISHING_CONTAINER
// After the attempt fails from
// TaskUmbilicalProtocol's point of view, it will transition to
// FAIL_FINISHING_CONTAINER state. That will give a chance for the container
// to exit by itself. In the transition,
// the attempt will notify the task via T_ATTEMPT_FAILED so that
// from job point of view, the task is considered failed.
// Transition out of FAIL_FINISHING_CONTAINER
// The attempt will transition from FAIL_FINISHING_CONTAINER to
// FAIL_CONTAINER_CLEANUP if it doesn't receive container exit
// notification within TASK_EXIT_TIMEOUT;
// Or it will transition to FAILED if it receives container exit
// notification from YARN.
FAIL_FINISHING_CONTAINER,
SUCCESS_CONTAINER_CLEANUP,
SUCCEEDED,
FAIL_CONTAINER_CLEANUP, FAIL_CONTAINER_CLEANUP,
FAIL_TASK_CLEANUP, FAIL_TASK_CLEANUP,
FAILED, FAILED,

View File

@ -48,6 +48,9 @@ public enum TaskAttemptEventType {
TA_UPDATE, TA_UPDATE,
TA_TIMED_OUT, TA_TIMED_OUT,
//Producer:Client
TA_FAILMSG_BY_CLIENT,
//Producer:TaskCleaner //Producer:TaskCleaner
TA_CLEANUP_DONE, TA_CLEANUP_DONE,

View File

@ -184,8 +184,20 @@ public abstract class TaskAttemptImpl implements
private Locality locality; private Locality locality;
private Avataar avataar; private Avataar avataar;
private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION = private static final CleanupContainerTransition
new CleanupContainerTransition(); CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition();
private static final MoveContainerToSucceededFinishingTransition
SUCCEEDED_FINISHING_TRANSITION =
new MoveContainerToSucceededFinishingTransition();
private static final MoveContainerToFailedFinishingTransition
FAILED_FINISHING_TRANSITION =
new MoveContainerToFailedFinishingTransition();
private static final ExitFinishingOnTimeoutTransition
FINISHING_ON_TIMEOUT_TRANSITION =
new ExitFinishingOnTimeoutTransition();
private static final FinalizeFailedTransition FINALIZE_FAILED_TRANSITION =
new FinalizeFailedTransition();
private static final DiagnosticInformationUpdater private static final DiagnosticInformationUpdater
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION
@ -204,6 +216,8 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE); TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE);
private static final StateMachineFactory private static final StateMachineFactory
@ -221,16 +235,16 @@ public abstract class TaskAttemptImpl implements
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED, .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL, new KilledTransition()) TaskAttemptEventType.TA_KILL, new KilledTransition())
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new FailedTransition()) TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new FailedTransition())
.addTransition(TaskAttemptStateInternal.NEW, .addTransition(TaskAttemptStateInternal.NEW,
EnumSet.of(TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptStateInternal.FAILED,
TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.SUCCEEDED), TaskAttemptStateInternal.SUCCEEDED),
TaskAttemptEventType.TA_RECOVER, new RecoverTransition()) TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
.addTransition(TaskAttemptStateInternal.NEW, .addTransition(TaskAttemptStateInternal.NEW,
TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.NEW,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Transitions from the UNASSIGNED state. // Transitions from the UNASSIGNED state.
.addTransition(TaskAttemptStateInternal.UNASSIGNED, .addTransition(TaskAttemptStateInternal.UNASSIGNED,
@ -238,14 +252,14 @@ public abstract class TaskAttemptImpl implements
new ContainerAssignedTransition()) new ContainerAssignedTransition())
.addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED, .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition( TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
TaskAttemptStateInternal.KILLED, true)) TaskAttemptStateInternal.KILLED, true))
.addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED, .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition( TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new DeallocateContainerTransition(
TaskAttemptStateInternal.FAILED, true)) TaskAttemptStateInternal.FAILED, true))
.addTransition(TaskAttemptStateInternal.UNASSIGNED, .addTransition(TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Transitions from the ASSIGNED state. // Transitions from the ASSIGNED state.
.addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING, .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
@ -258,15 +272,19 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false)) new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false))
.addTransition(TaskAttemptStateInternal.ASSIGNED, .addTransition(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_COMPLETED,
CLEANUP_CONTAINER_TRANSITION) FINALIZE_FAILED_TRANSITION)
.addTransition(TaskAttemptStateInternal.ASSIGNED, .addTransition(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
.addTransition(TaskAttemptStateInternal.ASSIGNED, .addTransition(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
.addTransition(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
CLEANUP_CONTAINER_TRANSITION)
// Transitions from RUNNING state. // Transitions from RUNNING state.
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
@ -274,23 +292,27 @@ public abstract class TaskAttemptImpl implements
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// If no commit is required, task directly goes to success // If no commit is required, task goes to finishing state
// This will give a chance for the container to exit by itself
.addTransition(TaskAttemptStateInternal.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION)
// If commit is required, task goes through commit pending state. // If commit is required, task goes through commit pending state.
.addTransition(TaskAttemptStateInternal.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition()) TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
// Failure handling while RUNNING // Failure handling while RUNNING
.addTransition(TaskAttemptStateInternal.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
//for handling container exit without sending the done or fail msg
.addTransition(TaskAttemptStateInternal.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, CLEANUP_CONTAINER_TRANSITION)
//for handling container exit without sending the done or fail msg
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_COMPLETED,
CLEANUP_CONTAINER_TRANSITION) FINALIZE_FAILED_TRANSITION)
// Timeout handling while RUNNING // Timeout handling while RUNNING
.addTransition(TaskAttemptStateInternal.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
@ -301,9 +323,94 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
// Kill handling // Kill handling
.addTransition(TaskAttemptStateInternal.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION) CLEANUP_CONTAINER_TRANSITION)
// Transitions from SUCCESS_FINISHING_CONTAINER state
// When the container exits by itself, the notification of container
// completed event will be routed via NM -> RM -> AM.
// After MRAppMaster gets notification from RM, it will generate
// TA_CONTAINER_COMPLETED event.
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
new ExitFinishingOnContainerCompletedTransition())
// Given TA notifies task T_ATTEMPT_SUCCEEDED when it transitions to
// SUCCESS_FINISHING_CONTAINER, it is possible to receive the event
// TA_CONTAINER_CLEANED in the following scenario.
// 1. It is the last task for the job.
// 2. After the task receives T_ATTEMPT_SUCCEEDED, it will notify job.
// 3. Job will be marked completed.
// 4. As part of MRAppMaster's shutdown, all containers will be killed.
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
new ExitFinishingOnContainerCleanedupTransition())
// The client wants to kill the task. Given the task is in finishing
// state, it could go to succeeded state or killed state. If it is a
// reducer, it will go to succeeded state;
// otherwise, it goes to killed state.
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
EnumSet.of(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP),
TaskAttemptEventType.TA_KILL,
new KilledAfterSucceededFinishingTransition())
// The attempt stays in finishing state for too long
// Let us clean up the container
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION)
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// ignore-able events
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
EnumSet.of(TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT))
// Transitions from FAIL_FINISHING_CONTAINER state
// When the container exits by itself, the notification of container
// completed event will be routed via NM -> RM -> AM.
// After MRAppMaster gets notification from RM, it will generate
// TA_CONTAINER_COMPLETED event.
.addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
new ExitFinishingOnContainerCompletedTransition())
// Given TA notifies task T_ATTEMPT_FAILED when it transitions to
// FAIL_FINISHING_CONTAINER, it is possible to receive the event
// TA_CONTAINER_CLEANED in the following scenario.
// 1. It is the last task attempt for the task.
// 2. After the task receives T_ATTEMPT_FAILED, it will notify job.
// 3. Job will be marked failed.
// 4. As part of MRAppMaster's shutdown, all containers will be killed.
.addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
new ExitFinishingOnContainerCleanedupTransition())
.addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION)
.addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// ignore-able events
.addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT))
// Transitions from COMMIT_PENDING state // Transitions from COMMIT_PENDING state
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
@ -313,22 +420,27 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION) CLEANUP_CONTAINER_TRANSITION)
// if container killed by AM shutting down // if container killed by AM shutting down
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
CLEANUP_CONTAINER_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_COMPLETED,
CLEANUP_CONTAINER_TRANSITION) FINALIZE_FAILED_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
@ -345,8 +457,8 @@ public abstract class TaskAttemptImpl implements
// Transitions from SUCCESS_CONTAINER_CLEANUP state // Transitions from SUCCESS_CONTAINER_CLEANUP state
// kill and cleanup the container // kill and cleanup the container
.addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptStateInternal.SUCCEEDED,
new SucceededTransition()) TaskAttemptEventType.TA_CONTAINER_CLEANED)
.addTransition( .addTransition(
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
@ -357,6 +469,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL, EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_CONTAINER_COMPLETED)) TaskAttemptEventType.TA_CONTAINER_COMPLETED))
@ -380,6 +493,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_TIMED_OUT)) TaskAttemptEventType.TA_TIMED_OUT))
// Transitions from KILL_CONTAINER_CLEANUP // Transitions from KILL_CONTAINER_CLEANUP
@ -402,6 +516,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_TIMED_OUT)) TaskAttemptEventType.TA_TIMED_OUT))
// Transitions from FAIL_TASK_CLEANUP // Transitions from FAIL_TASK_CLEANUP
@ -422,6 +537,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
// Container launch events can arrive late // Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED, TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
@ -444,6 +560,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
// Container launch events can arrive late // Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED, TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
@ -456,7 +573,7 @@ public abstract class TaskAttemptImpl implements
new TooManyFetchFailureTransition()) new TooManyFetchFailureTransition())
.addTransition(TaskAttemptStateInternal.SUCCEEDED, .addTransition(TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED), EnumSet.of(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED),
TaskAttemptEventType.TA_KILL, TaskAttemptEventType.TA_KILL,
new KilledAfterSuccessTransition()) new KilledAfterSuccessTransition())
.addTransition( .addTransition(
TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
@ -466,6 +583,10 @@ public abstract class TaskAttemptImpl implements
.addTransition(TaskAttemptStateInternal.SUCCEEDED, .addTransition(TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptEventType.TA_FAILMSG, EnumSet.of(TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
// TaskAttemptFinishingMonitor might time out the attempt right
// after the attempt receives TA_CONTAINER_COMPLETED.
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED)) TaskAttemptEventType.TA_CONTAINER_COMPLETED))
@ -1220,21 +1341,21 @@ public abstract class TaskAttemptImpl implements
return TaskAttemptState.STARTING; return TaskAttemptState.STARTING;
case COMMIT_PENDING: case COMMIT_PENDING:
return TaskAttemptState.COMMIT_PENDING; return TaskAttemptState.COMMIT_PENDING;
case FAILED:
return TaskAttemptState.FAILED;
case KILLED:
return TaskAttemptState.KILLED;
// All CLEANUP states considered as RUNNING since events have not gone out
// to the Task yet. May be possible to consider them as a Finished state.
case FAIL_CONTAINER_CLEANUP: case FAIL_CONTAINER_CLEANUP:
case FAIL_TASK_CLEANUP: case FAIL_TASK_CLEANUP:
case FAIL_FINISHING_CONTAINER:
case FAILED:
return TaskAttemptState.FAILED;
case KILL_CONTAINER_CLEANUP: case KILL_CONTAINER_CLEANUP:
case KILL_TASK_CLEANUP: case KILL_TASK_CLEANUP:
case SUCCESS_CONTAINER_CLEANUP: case KILLED:
return TaskAttemptState.KILLED;
case RUNNING: case RUNNING:
return TaskAttemptState.RUNNING; return TaskAttemptState.RUNNING;
case NEW: case NEW:
return TaskAttemptState.NEW; return TaskAttemptState.NEW;
case SUCCESS_CONTAINER_CLEANUP:
case SUCCESS_FINISHING_CONTAINER:
case SUCCEEDED: case SUCCEEDED:
return TaskAttemptState.SUCCEEDED; return TaskAttemptState.SUCCEEDED;
default: default:
@ -1436,6 +1557,15 @@ public abstract class TaskAttemptImpl implements
} }
} }
private static void finalizeProgress(TaskAttemptImpl taskAttempt) {
// unregister it to TaskAttemptListener so that it stops listening
taskAttempt.taskAttemptListener.unregister(
taskAttempt.attemptId, taskAttempt.jvmID);
taskAttempt.reportedStatus.progress = 1.0f;
taskAttempt.updateProgressSplits();
}
static class RequestContainerTransition implements static class RequestContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
private final boolean rescheduled; private final boolean rescheduled;
@ -1668,53 +1798,66 @@ public abstract class TaskAttemptImpl implements
} }
} }
private static class SucceededTransition implements /**
* Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER
* state upon receiving TA_CONTAINER_COMPLETED event
*/
private static class ExitFinishingOnContainerCompletedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void transition(TaskAttemptImpl taskAttempt, public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
taskAttempt.attemptId);
sendContainerCompleted(taskAttempt);
}
}
private static class ExitFinishingOnContainerCleanedupTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) { TaskAttemptEvent event) {
//set the finish time taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
taskAttempt.setFinishTime(); taskAttempt.attemptId);
taskAttempt.eventHandler.handle( }
createJobCounterUpdateEventTASucceeded(taskAttempt));
taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_SUCCEEDED));
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
}
} }
private static class FailedTransition implements private static class FailedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
// set the finish time // set the finish time
taskAttempt.setFinishTime(); taskAttempt.setFinishTime();
notifyTaskAttemptFailed(taskAttempt);
if (taskAttempt.getLaunchTime() != 0) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptStateInternal.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
// handling failed map/reduce events.
}else {
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
} }
} }
private static class FinalizeFailedTransition extends FailedTransition {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
finalizeProgress(taskAttempt);
sendContainerCompleted(taskAttempt);
super.transition(taskAttempt, event);
}
}
@SuppressWarnings("unchecked")
private static void sendContainerCompleted(TaskAttemptImpl taskAttempt) {
taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
taskAttempt.attemptId,
taskAttempt.container.getId(), StringInterner
.weakIntern(taskAttempt.container.getNodeId().toString()),
taskAttempt.container.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_COMPLETED));
}
private static class RecoverTransition implements private static class RecoverTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> { MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@ -1839,6 +1982,35 @@ public abstract class TaskAttemptImpl implements
} }
} }
private static class KilledAfterSucceededFinishingTransition
implements MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent,
TaskAttemptStateInternal> {
@SuppressWarnings("unchecked")
@Override
public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
taskAttempt.attemptId);
sendContainerCleanup(taskAttempt, event);
if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) {
// after a reduce task has succeeded, its outputs are in safe in HDFS.
// logically such a task should not be killed. we only come here when
// there is a race condition in the event queue. E.g. some logic sends
// a kill request to this attempt when the successful completion event
// for this task is already in the event queue. so the kill event will
// get executed immediately after the attempt is marked successful and
// result in this transition being exercised.
// ignore this for reduce tasks
LOG.info("Ignoring killed event for successful reduce task attempt" +
taskAttempt.getID().toString());
return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP;
} else {
return TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP;
}
}
}
private static class KilledTransition implements private static class KilledTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@ -1873,6 +2045,31 @@ public abstract class TaskAttemptImpl implements
} }
} }
/**
* Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER
* state upon receiving TA_TIMED_OUT event
*/
private static class ExitFinishingOnTimeoutTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
taskAttempt.attemptId);
// The attempt stays in finishing state for too long
String msg = "Task attempt " + taskAttempt.getID() + " is done from " +
"TaskUmbilicalProtocol's point of view. However, it stays in " +
"finishing state for too long";
LOG.warn(msg);
taskAttempt.addDiagnosticInfo(msg);
sendContainerCleanup(taskAttempt, event);
}
}
/**
* Finish and clean up the container
*/
private static class CleanupContainerTransition implements private static class CleanupContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -1880,27 +2077,103 @@ public abstract class TaskAttemptImpl implements
public void transition(TaskAttemptImpl taskAttempt, public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) { TaskAttemptEvent event) {
// unregister it to TaskAttemptListener so that it stops listening // unregister it to TaskAttemptListener so that it stops listening
// for it // for it.
taskAttempt.taskAttemptListener.unregister( finalizeProgress(taskAttempt);
taskAttempt.attemptId, taskAttempt.jvmID); sendContainerCleanup(taskAttempt, event);
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
taskAttempt.reportedStatus.progress = 1.0f;
taskAttempt.updateProgressSplits();
//send the cleanup event to containerLauncher
taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
taskAttempt.attemptId,
taskAttempt.container.getId(), StringInterner
.weakIntern(taskAttempt.container.getNodeId().toString()),
taskAttempt.container.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
} }
} }
@SuppressWarnings("unchecked")
private static void sendContainerCleanup(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
//send the cleanup event to containerLauncher
taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
taskAttempt.attemptId,
taskAttempt.container.getId(), StringInterner
.weakIntern(taskAttempt.container.getNodeId().toString()),
taskAttempt.container.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
}
/**
* Transition to SUCCESS_FINISHING_CONTAINER upon receiving TA_DONE event
*/
private static class MoveContainerToSucceededFinishingTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
finalizeProgress(taskAttempt);
// register it to finishing state
taskAttempt.appContext.getTaskAttemptFinishingMonitor().register(
taskAttempt.attemptId);
// set the finish time
taskAttempt.setFinishTime();
// notify job history
taskAttempt.eventHandler.handle(
createJobCounterUpdateEventTASucceeded(taskAttempt));
taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
//notify the task even though the container might not have exited yet.
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_SUCCEEDED));
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
}
}
/**
* Transition to FAIL_FINISHING_CONTAINER upon receiving TA_FAILMSG event
*/
private static class MoveContainerToFailedFinishingTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
finalizeProgress(taskAttempt);
// register it to finishing state
taskAttempt.appContext.getTaskAttemptFinishingMonitor().register(
taskAttempt.attemptId);
notifyTaskAttemptFailed(taskAttempt);
}
}
@SuppressWarnings("unchecked")
private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) {
// set the finish time
taskAttempt.setFinishTime();
if (taskAttempt.getLaunchTime() != 0) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptStateInternal.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
// handling failed map/reduce events.
}else {
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
}
private void addDiagnosticInfo(String diag) { private void addDiagnosticInfo(String diag) {
if (diag != null && !diag.equals("")) { if (diag != null && !diag.equals("")) {
diagnostics.add(diag); diagnostics.add(diag);

View File

@ -27,7 +27,13 @@ public interface ContainerLauncher
enum EventType { enum EventType {
CONTAINER_REMOTE_LAUNCH, CONTAINER_REMOTE_LAUNCH,
CONTAINER_REMOTE_CLEANUP CONTAINER_REMOTE_CLEANUP,
// When TaskAttempt receives TA_CONTAINER_COMPLETED,
// it will notify ContainerLauncher so that the container can be removed
// from ContainerLauncher's launched containers list
// Otherwise, ContainerLauncher will try to stop the containers as part of
// serviceStop.
CONTAINER_COMPLETED
} }
} }

View File

@ -121,7 +121,11 @@ public class ContainerLauncherImpl extends AbstractService implements
public synchronized boolean isCompletelyDone() { public synchronized boolean isCompletelyDone() {
return state == ContainerState.DONE || state == ContainerState.FAILED; return state == ContainerState.DONE || state == ContainerState.FAILED;
} }
public synchronized void done() {
state = ContainerState.DONE;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) { public synchronized void launch(ContainerRemoteLaunchEvent event) {
LOG.info("Launching " + taskAttemptID); LOG.info("Launching " + taskAttemptID);
@ -378,6 +382,11 @@ public class ContainerLauncherImpl extends AbstractService implements
case CONTAINER_REMOTE_CLEANUP: case CONTAINER_REMOTE_CLEANUP:
c.kill(); c.kill();
break; break;
case CONTAINER_COMPLETED:
c.done();
break;
} }
removeContainerIfDone(containerID); removeContainerIfDone(containerID);
} }

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestTaskAttemptFinishingMonitor {
@Test
public void testFinshingAttemptTimeout()
throws IOException, InterruptedException {
SystemClock clock = new SystemClock();
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100);
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);
AppContext appCtx = mock(AppContext.class);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
new TaskAttemptFinishingMonitor(eventHandler);
taskAttemptFinishingMonitor.init(conf);
taskAttemptFinishingMonitor.start();
when(appCtx.getEventHandler()).thenReturn(eventHandler);
when(appCtx.getNMHostname()).thenReturn("0.0.0.0");
when(appCtx.getTaskAttemptFinishingMonitor()).thenReturn(
taskAttemptFinishingMonitor);
when(appCtx.getClock()).thenReturn(clock);
TaskAttemptListenerImpl listener =
new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler);
listener.init(conf);
listener.start();
JobId jid = MRBuilderUtils.newJobId(12345, 1, 1);
TaskId tid = MRBuilderUtils.newTaskId(jid, 0,
org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
appCtx.getTaskAttemptFinishingMonitor().register(attemptId);
int check = 0;
while ( !eventHandler.timedOut && check++ < 10 ) {
Thread.sleep(100);
}
taskAttemptFinishingMonitor.stop();
assertTrue("Finishing attempt didn't time out.", eventHandler.timedOut);
}
public static class MockEventHandler implements EventHandler {
public boolean timedOut = false;
@Override
public void handle(Event event) {
if (event instanceof TaskAttemptEvent) {
TaskAttemptEvent attemptEvent = ((TaskAttemptEvent) event);
if (TaskAttemptEventType.TA_TIMED_OUT == attemptEvent.getType()) {
timedOut = true;
}
}
}
};
}

View File

@ -481,7 +481,21 @@ public class MRApp extends MRAppMaster {
} }
@Override @Override
protected TaskAttemptListener createTaskAttemptListener(AppContext context) { protected TaskAttemptFinishingMonitor
createTaskAttemptFinishingMonitor(
EventHandler eventHandler) {
return new TaskAttemptFinishingMonitor(eventHandler) {
@Override
public synchronized void register(TaskAttemptId attemptID) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_CONTAINER_COMPLETED));
}
};
}
@Override
protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
return new TaskAttemptListener(){ return new TaskAttemptListener(){
@Override @Override
public InetSocketAddress getAddress() { public InetSocketAddress getAddress() {
@ -539,6 +553,8 @@ public class MRApp extends MRAppMaster {
new TaskAttemptEvent(event.getTaskAttemptID(), new TaskAttemptEvent(event.getTaskAttemptID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED)); TaskAttemptEventType.TA_CONTAINER_CLEANED));
break; break;
case CONTAINER_COMPLETED:
break;
} }
} }
} }

View File

@ -148,4 +148,10 @@ public class MockAppContext implements AppContext {
// bogus - Not Required // bogus - Not Required
return null; return null;
} }
@Override
public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
return null;
}
} }

View File

@ -222,6 +222,8 @@ public class TestFail {
new TaskAttemptEvent(event.getTaskAttemptID(), new TaskAttemptEvent(event.getTaskAttemptID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED)); TaskAttemptEventType.TA_CONTAINER_CLEANED));
break; break;
case CONTAINER_COMPLETED:
super.handle(event);
} }
} }

View File

@ -159,7 +159,7 @@ public class TestKill {
super.dispatch(new TaskAttemptEvent(taID, super.dispatch(new TaskAttemptEvent(taID,
TaskAttemptEventType.TA_DONE)); TaskAttemptEventType.TA_DONE));
super.dispatch(new TaskAttemptEvent(taID, super.dispatch(new TaskAttemptEvent(taID,
TaskAttemptEventType.TA_CONTAINER_CLEANED)); TaskAttemptEventType.TA_CONTAINER_COMPLETED));
super.dispatch(new TaskTAttemptEvent(taID, super.dispatch(new TaskTAttemptEvent(taID,
TaskEventType.T_ATTEMPT_SUCCEEDED)); TaskEventType.T_ATTEMPT_SUCCEEDED));
this.cachedKillEvent = killEvent; this.cachedKillEvent = killEvent;
@ -211,40 +211,9 @@ public class TestKill {
app.getContext().getEventHandler() app.getContext().getEventHandler()
.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); .handle(new JobEvent(jobId, JobEventType.JOB_KILL));
app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); app.waitForInternalState((JobImpl) job, JobStateInternal.KILLED);
} }
static class MyAsyncDispatch extends AsyncDispatcher {
private CountDownLatch latch;
private TaskAttemptEventType attemptEventTypeToWait;
MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) {
super();
this.latch = latch;
this.attemptEventTypeToWait = attemptEventTypeToWait;
}
@Override
protected void dispatch(Event event) {
if (event instanceof TaskAttemptEvent) {
TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event;
TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID();
if (attemptEvent.getType() == this.attemptEventTypeToWait
&& attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
super.dispatch(event);
}
}
// This is to test a race condition where JobEventType.JOB_KILL is generated
// right after TaskAttemptEventType.TA_DONE is generated.
// TaskImpl's state machine might receive both T_ATTEMPT_SUCCEEDED
// and T_ATTEMPT_KILLED from the same attempt.
@Test @Test
public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception { public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
@ -269,15 +238,12 @@ public class TestKill {
TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next(); TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
// The order in the dispatch event queue, from the oldest to the newest // The order in the dispatch event queue, from first to last
// TA_DONE // TA_DONE
// JOB_KILL // JobEventType.JOB_KILL
// CONTAINER_REMOTE_CLEANUP ( from TA_DONE's handling ) // TaskAttemptEventType.TA_CONTAINER_COMPLETED ( from TA_DONE handling )
// T_KILL ( from JOB_KILL's handling ) // TaskEventType.T_KILL ( from JobEventType.JOB_KILL handling )
// TA_CONTAINER_CLEANED ( from CONTAINER_REMOTE_CLEANUP's handling ) // TaskEventType.T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_COMPLETED handling )
// TA_KILL ( from T_KILL's handling )
// T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_CLEANED's handling )
// T_ATTEMPT_KILLED ( from TA_KILL's handling )
// Finish map // Finish map
app.getContext().getEventHandler().handle( app.getContext().getEventHandler().handle(
@ -295,6 +261,100 @@ public class TestKill {
app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
} }
@Test
public void testKillTaskWaitKillJobBeforeTA_DONE() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final Dispatcher dispatcher = new MyAsyncDispatch(latch, JobEventType.JOB_KILL);
MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
@Override
public Dispatcher createDispatcher() {
return dispatcher;
}
};
Job job = app.submit(new Configuration());
JobId jobId = app.getJobId();
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask = it.next();
Task reduceTask = it.next();
app.waitForState(mapTask, TaskState.RUNNING);
app.waitForState(reduceTask, TaskState.RUNNING);
TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
// The order in the dispatch event queue, from first to last
// JobEventType.JOB_KILL
// TA_DONE
// TaskEventType.T_KILL ( from JobEventType.JOB_KILL handling )
// TaskAttemptEventType.TA_CONTAINER_COMPLETED ( from TA_DONE handling )
// TaskAttemptEventType.TA_KILL ( from TaskEventType.T_KILL handling )
// TaskEventType.T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_COMPLETED handling )
// TaskEventType.T_ATTEMPT_KILLED ( from TA_KILL handling )
// Now kill the job
app.getContext().getEventHandler()
.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
// Finish map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapAttempt.getID(),
TaskAttemptEventType.TA_DONE));
//unblock
latch.countDown();
app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
}
static class MyAsyncDispatch extends AsyncDispatcher {
private CountDownLatch latch;
private TaskAttemptEventType attemptEventTypeToWait;
private JobEventType jobEventTypeToWait;
MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) {
super();
this.latch = latch;
this.attemptEventTypeToWait = attemptEventTypeToWait;
}
MyAsyncDispatch(CountDownLatch latch, JobEventType jobEventTypeToWait) {
super();
this.latch = latch;
this.jobEventTypeToWait = jobEventTypeToWait;
}
@Override
protected void dispatch(Event event) {
if (event instanceof TaskAttemptEvent) {
TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event;
TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID();
if (attemptEvent.getType() == this.attemptEventTypeToWait
&& attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else if ( event instanceof JobEvent) {
JobEvent jobEvent = (JobEvent) event;
if (jobEvent.getType() == this.jobEventTypeToWait) {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
super.dispatch(event);
}
}
@Test @Test
public void testKillTaskAttempt() throws Exception { public void testKillTaskAttempt() throws Exception {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);

View File

@ -884,5 +884,10 @@ public class TestRuntimeEstimators {
// bogus - Not Required // bogus - Not Required
return null; return null;
} }
@Override
public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
return null;
}
} }
} }

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
@ -407,6 +408,7 @@ public class TestTaskAttempt{
Resource resource = mock(Resource.class); Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024); when(resource.getMemory()).thenReturn(1024);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@ -464,6 +466,7 @@ public class TestTaskAttempt{
Resource resource = mock(Resource.class); Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024); when(resource.getMemory()).thenReturn(1024);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@ -524,6 +527,7 @@ public class TestTaskAttempt{
Resource resource = mock(Resource.class); Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024); when(resource.getMemory()).thenReturn(1024);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@ -546,7 +550,7 @@ public class TestTaskAttempt{
taImpl.handle(new TaskAttemptEvent(attemptId, taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_DONE)); TaskAttemptEventType.TA_DONE));
taImpl.handle(new TaskAttemptEvent(attemptId, taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_CONTAINER_CLEANED)); TaskAttemptEventType.TA_CONTAINER_COMPLETED));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(), assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED); TaskAttemptState.SUCCEEDED);
@ -593,6 +597,7 @@ public class TestTaskAttempt{
Resource resource = mock(Resource.class); Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024); when(resource.getMemory()).thenReturn(1024);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, jobFile, 1, splits, jobConf, taListener,
@ -641,6 +646,7 @@ public class TestTaskAttempt{
Resource resource = mock(Resource.class); Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024); when(resource.getMemory()).thenReturn(1024);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@ -663,7 +669,7 @@ public class TestTaskAttempt{
taImpl.handle(new TaskAttemptEvent(attemptId, taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_DONE)); TaskAttemptEventType.TA_DONE));
taImpl.handle(new TaskAttemptEvent(attemptId, taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_CONTAINER_CLEANED)); TaskAttemptEventType.TA_CONTAINER_COMPLETED));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(), assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED); TaskAttemptState.SUCCEEDED);
@ -708,6 +714,7 @@ public class TestTaskAttempt{
Resource resource = mock(Resource.class); Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024); when(resource.getMemory()).thenReturn(1024);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, jobFile, 1, splits, jobConf, taListener,
@ -753,6 +760,7 @@ public class TestTaskAttempt{
AppContext appCtx = mock(AppContext.class); AppContext appCtx = mock(AppContext.class);
ClusterInfo clusterInfo = mock(ClusterInfo.class); ClusterInfo clusterInfo = mock(ClusterInfo.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
@ -774,7 +782,7 @@ public class TestTaskAttempt{
taImpl.handle(new TaskAttemptEvent(attemptId, taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_DONE)); TaskAttemptEventType.TA_DONE));
taImpl.handle(new TaskAttemptEvent(attemptId, taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_CONTAINER_CLEANED)); TaskAttemptEventType.TA_CONTAINER_COMPLETED));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(), assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED); TaskAttemptState.SUCCEEDED);
@ -967,6 +975,255 @@ public class TestTaskAttempt{
taImpl.getInternalState()); taImpl.getInternalState());
} }
@Test
public void testKillMapTaskWhileSuccessFinishing() throws Exception {
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_DONE));
assertEquals("Task attempt is not in SUCCEEDED state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
assertEquals("Task attempt's internal state is not " +
"SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
// If the map task is killed when it is in SUCCESS_FINISHING_CONTAINER
// state, the state will move to KILL_CONTAINER_CLEANUP
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_KILL));
assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
TaskAttemptState.KILLED);
assertEquals("Task attempt's internal state is not KILL_CONTAINER_CLEANUP",
taImpl.getInternalState(),
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
assertEquals("Task attempt's internal state is not KILL_TASK_CLEANUP",
taImpl.getInternalState(),
TaskAttemptStateInternal.KILL_TASK_CLEANUP);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_CLEANUP_DONE));
assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
TaskAttemptState.KILLED);
assertFalse("InternalError occurred", eventHandler.internalError);
}
@Test
public void testKillMapTaskWhileFailFinishing() throws Exception {
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_FAILMSG));
assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
TaskAttemptState.FAILED);
assertEquals("Task attempt's internal state is not " +
"FAIL_FINISHING_CONTAINER", taImpl.getInternalState(),
TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
// If the map task is killed when it is in FAIL_FINISHING_CONTAINER state,
// the state will stay in FAIL_FINISHING_CONTAINER.
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_KILL));
assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
TaskAttemptState.FAILED);
assertEquals("Task attempt's internal state is not " +
"FAIL_FINISHING_CONTAINER", taImpl.getInternalState(),
TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_TIMED_OUT));
assertEquals("Task attempt's internal state is not FAIL_CONTAINER_CLEANUP",
taImpl.getInternalState(),
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
assertEquals("Task attempt's internal state is not FAIL_TASK_CLEANUP",
taImpl.getInternalState(),
TaskAttemptStateInternal.FAIL_TASK_CLEANUP);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_CLEANUP_DONE));
assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
TaskAttemptState.FAILED);
assertFalse("InternalError occurred", eventHandler.internalError);
}
@Test
public void testFailMapTaskByClient() throws Exception {
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT));
assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
TaskAttemptState.FAILED);
assertEquals("Task attempt's internal state is not " +
"FAIL_CONTAINER_CLEANUP", taImpl.getInternalState(),
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
assertEquals("Task attempt's internal state is not FAIL_TASK_CLEANUP",
taImpl.getInternalState(),
TaskAttemptStateInternal.FAIL_TASK_CLEANUP);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_CLEANUP_DONE));
assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
TaskAttemptState.FAILED);
assertFalse("InternalError occurred", eventHandler.internalError);
}
@Test
public void testTaskAttemptDiagnosticEventOnFinishing() throws Exception {
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_DONE));
assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
assertEquals("Task attempt's internal state is not " +
"SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
// TA_DIAGNOSTICS_UPDATE doesn't change state
taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(taImpl.getID(),
"Task got updated"));
assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
assertEquals("Task attempt's internal state is not " +
"SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
assertFalse("InternalError occurred", eventHandler.internalError);
}
@Test
public void testTimeoutWhileSuccessFinishing() throws Exception {
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_DONE));
assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
assertEquals("Task attempt's internal state is not " +
"SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(),
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER);
// If the task stays in SUCCESS_FINISHING_CONTAINER for too long,
// TaskAttemptListenerImpl will time out the attempt.
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_TIMED_OUT));
assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
assertEquals("Task attempt's internal state is not " +
"SUCCESS_CONTAINER_CLEANUP", taImpl.getInternalState(),
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP);
assertFalse("InternalError occurred", eventHandler.internalError);
}
@Test
public void testTimeoutWhileFailFinishing() throws Exception {
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_FAILMSG));
assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
TaskAttemptState.FAILED);
assertEquals("Task attempt's internal state is not " +
"FAIL_FINISHING_CONTAINER", taImpl.getInternalState(),
TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER);
// If the task stays in FAIL_FINISHING_CONTAINER for too long,
// TaskAttemptListenerImpl will time out the attempt.
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_TIMED_OUT));
assertEquals("Task attempt's internal state is not FAIL_CONTAINER_CLEANUP",
taImpl.getInternalState(),
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP);
assertFalse("InternalError occurred", eventHandler.internalError);
}
private void setupTaskAttemptFinishingMonitor(
EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
new TaskAttemptFinishingMonitor(eventHandler);
taskAttemptFinishingMonitor.init(jobConf);
when(appCtx.getTaskAttemptFinishingMonitor()).
thenReturn(taskAttemptFinishingMonitor);
}
private TaskAttemptImpl createTaskAttemptImpl(
MockEventHandler eventHandler) {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
Path jobFile = mock(Path.class);
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
JobConf jobConf = new JobConf();
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
jobConf.setBoolean("fs.file.impl.disable.cache", true);
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
AppContext appCtx = mock(AppContext.class);
ClusterInfo clusterInfo = mock(ClusterInfo.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
mock(Token.class), new Credentials(),
new SystemClock(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_SCHEDULE));
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
container, mock(Map.class)));
taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
return taImpl;
}
public static class MockEventHandler implements EventHandler { public static class MockEventHandler implements EventHandler {
public boolean internalError; public boolean internalError;

View File

@ -227,7 +227,15 @@ public interface MRJobConfig {
public static final String TASK_TIMEOUT = "mapreduce.task.timeout"; public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.timeout.check-interval-ms"; public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.timeout.check-interval-ms";
public static final String TASK_EXIT_TIMEOUT = "mapreduce.task.exit.timeout";
public static final int TASK_EXIT_TIMEOUT_DEFAULT = 60 * 1000;
public static final String TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.exit.timeout.check-interval-ms";
public static final int TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT = 20 * 1000;
public static final String TASK_ID = "mapreduce.task.id"; public static final String TASK_ID = "mapreduce.task.id";
public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir"; public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";

View File

@ -1664,4 +1664,24 @@
app master. app master.
</description> </description>
</property> </property>
<property>
<name>mapreduce.task.exit.timeout</name>
<value>60000</value>
<description>The number of milliseconds before a task will be
terminated if it stays in finishing state for too long.
After a task attempt completes from TaskUmbilicalProtocol's point of view,
it will be transitioned to finishing state. That will give a chance for the
task to exit by itself.
</description>
</property>
<property>
<name>mapreduce.task.exit.timeout.check-interval-ms</name>
<value>20000</value>
<description>The interval in milliseconds between which the MR framework
checks if task attempts stay in finishing state for too long.
</description>
</property>
</configuration> </configuration>

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
@ -399,4 +400,9 @@ public class JobHistory extends AbstractService implements HistoryContext {
// bogus - Not Required // bogus - Not Required
return null; return null;
} }
@Override
public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
return null;
}
} }

View File

@ -102,7 +102,7 @@ public class TestSpeculativeExecutionWithMRApp {
appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
TaskAttemptEventType.TA_DONE)); TaskAttemptEventType.TA_DONE));
appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
TaskAttemptEventType.TA_CONTAINER_CLEANED)); TaskAttemptEventType.TA_CONTAINER_COMPLETED));
app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED); app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED);
} }
} }
@ -170,7 +170,7 @@ public class TestSpeculativeExecutionWithMRApp {
appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
TaskAttemptEventType.TA_DONE)); TaskAttemptEventType.TA_DONE));
appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
TaskAttemptEventType.TA_CONTAINER_CLEANED)); TaskAttemptEventType.TA_CONTAINER_COMPLETED));
numTasksToFinish--; numTasksToFinish--;
app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED); app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED);
} else { } else {
@ -228,7 +228,7 @@ public class TestSpeculativeExecutionWithMRApp {
appEventHandler.handle( appEventHandler.handle(
new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE)); new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE));
appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(), appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED)); TaskAttemptEventType.TA_CONTAINER_COMPLETED));
return ta; return ta;
} }