diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 40ecdaa8513..30483ec00c7 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -153,6 +153,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6353. Divide by zero error in MR AM when calculating available containers. (Anubhav Dhoot via kasha) + MAPREDUCE-5465. Tasks are often killed before they exit on their own + (Ming Ma via jlowe) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java index ffc53263378..52b349795bb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java @@ -264,7 +264,8 @@ public class LocalContainerLauncher extends AbstractService implements context.getEventHandler().handle( new TaskAttemptEvent(taId, TaskAttemptEventType.TA_CONTAINER_CLEANED)); - + } else if (event.getType() == EventType.CONTAINER_COMPLETED) { + LOG.debug("Container completed " + event.toString()); } else { LOG.warn("Ignoring unexpected event " + event.toString()); } @@ -314,7 +315,14 @@ public class LocalContainerLauncher extends AbstractService implements } runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, (numReduceTasks > 0), localMapFiles); - + + // In non-uber mode, TA gets TA_CONTAINER_COMPLETED from MRAppMaster + // as part of NM -> RM -> AM notification route. + // In uber mode, given the task run inside the MRAppMaster container, + // we have to simulate the notification. + context.getEventHandler().handle(new TaskAttemptEvent(attemptID, + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); + } catch (RuntimeException re) { JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java index 31e282a63e9..4af11c371e8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java @@ -67,4 +67,6 @@ public interface AppContext { boolean hasSuccessfullyUnregistered(); String getNMHostname(); + + TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 8074e17be78..2ac948a6f3f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -204,6 +204,14 @@ public class MRAppMaster extends CompositeService { private JobHistoryEventHandler jobHistoryEventHandler; private SpeculatorEventDispatcher speculatorEventDispatcher; + // After a task attempt completes from TaskUmbilicalProtocol's point of view, + // it will be transitioned to finishing state. + // taskAttemptFinishingMonitor is just a timer for attempts in finishing + // state. If the attempt stays in finishing state for too long, + // taskAttemptFinishingMonitor will notify the attempt via TA_TIMED_OUT + // event. + private TaskAttemptFinishingMonitor taskAttemptFinishingMonitor; + private Job job; private Credentials jobCredentials = new Credentials(); // Filled during init protected UserGroupInformation currentUser; // Will be setup during init @@ -246,6 +254,12 @@ public class MRAppMaster extends CompositeService { logSyncer = TaskLog.createLogSyncer(); LOG.info("Created MRAppMaster for application " + applicationAttemptId); } + protected TaskAttemptFinishingMonitor createTaskAttemptFinishingMonitor( + EventHandler eventHandler) { + TaskAttemptFinishingMonitor monitor = + new TaskAttemptFinishingMonitor(eventHandler); + return monitor; + } @Override protected void serviceInit(final Configuration conf) throws Exception { @@ -256,7 +270,11 @@ public class MRAppMaster extends CompositeService { initJobCredentialsAndUGI(conf); - context = new RunningAppContext(conf); + dispatcher = createDispatcher(); + addIfService(dispatcher); + taskAttemptFinishingMonitor = createTaskAttemptFinishingMonitor(dispatcher.getEventHandler()); + addIfService(taskAttemptFinishingMonitor); + context = new RunningAppContext(conf, taskAttemptFinishingMonitor); // Job name is the same as the app name util we support DAG of jobs // for an app later @@ -323,9 +341,6 @@ public class MRAppMaster extends CompositeService { } if (errorHappenedShutDown) { - dispatcher = createDispatcher(); - addIfService(dispatcher); - NoopEventHandler eater = new NoopEventHandler(); //We do not have a JobEventDispatcher in this path dispatcher.register(JobEventType.class, eater); @@ -372,9 +387,6 @@ public class MRAppMaster extends CompositeService { } else { committer = createOutputCommitter(conf); - dispatcher = createDispatcher(); - addIfService(dispatcher); - //service to handle requests from JobClient clientService = createClientService(context); // Init ClientService separately so that we stop it separately, since this @@ -946,10 +958,14 @@ public class MRAppMaster extends CompositeService { private final ClusterInfo clusterInfo = new ClusterInfo(); private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; - public RunningAppContext(Configuration config) { + private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor; + + public RunningAppContext(Configuration config, + TaskAttemptFinishingMonitor taskAttemptFinishingMonitor) { this.conf = config; this.clientToAMTokenSecretManager = new ClientToAMTokenSecretManager(appAttemptID, null); + this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor; } @Override @@ -1034,6 +1050,12 @@ public class MRAppMaster extends CompositeService { public String getNMHostname() { return nmHost; } + + @Override + public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() { + return taskAttemptFinishingMonitor; + } + } @SuppressWarnings("unchecked") diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java new file mode 100644 index 00000000000..f6033988a4b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor; +import org.apache.hadoop.yarn.util.SystemClock; + +/** + * This class generates TA_TIMED_OUT if the task attempt stays in FINISHING + * state for too long. + */ +@SuppressWarnings({"unchecked", "rawtypes"}) +public class TaskAttemptFinishingMonitor extends + AbstractLivelinessMonitor { + + private EventHandler eventHandler; + + public TaskAttemptFinishingMonitor(EventHandler eventHandler) { + super("TaskAttemptFinishingMonitor", new SystemClock()); + this.eventHandler = eventHandler; + } + + public void init(Configuration conf) { + super.init(conf); + int expireIntvl = conf.getInt(MRJobConfig.TASK_EXIT_TIMEOUT, + MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT); + int checkIntvl = conf.getInt( + MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, + MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT); + + setExpireInterval(expireIntvl); + setMonitorInterval(checkIntvl); + } + + @Override + protected void expire(TaskAttemptId id) { + eventHandler.handle( + new TaskAttemptEvent(id, + TaskAttemptEventType.TA_TIMED_OUT)); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java index ceb1dbf34cf..d378b0a7c02 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java @@ -370,7 +370,7 @@ public class MRClientService extends AbstractService implements ClientService { new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); appContext.getEventHandler().handle( new TaskAttemptEvent(taskAttemptId, - TaskAttemptEventType.TA_FAILMSG)); + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT)); FailTaskAttemptResponse response = recordFactory. newRecordInstance(FailTaskAttemptResponse.class); return response; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java index f6c3e57244c..5f1765100df 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java @@ -30,9 +30,42 @@ public enum TaskAttemptStateInternal { UNASSIGNED, ASSIGNED, RUNNING, - COMMIT_PENDING, - SUCCESS_CONTAINER_CLEANUP, - SUCCEEDED, + COMMIT_PENDING, + + // Transition into SUCCESS_FINISHING_CONTAINER + // After the attempt finishes successfully from + // TaskUmbilicalProtocol's point of view, it will transition to + // SUCCESS_FINISHING_CONTAINER state. That will give a chance for the + // container to exit by itself. In the transition, + // the attempt will notify the task via T_ATTEMPT_SUCCEEDED so that + // from job point of view, the task is considered succeeded. + + // Transition out of SUCCESS_FINISHING_CONTAINER + // The attempt will transition from SUCCESS_FINISHING_CONTAINER to + // SUCCESS_CONTAINER_CLEANUP if it doesn't receive container exit + // notification within TASK_EXIT_TIMEOUT; + // Or it will transition to SUCCEEDED if it receives container exit + // notification from YARN. + SUCCESS_FINISHING_CONTAINER, + + // Transition into FAIL_FINISHING_CONTAINER + // After the attempt fails from + // TaskUmbilicalProtocol's point of view, it will transition to + // FAIL_FINISHING_CONTAINER state. That will give a chance for the container + // to exit by itself. In the transition, + // the attempt will notify the task via T_ATTEMPT_FAILED so that + // from job point of view, the task is considered failed. + + // Transition out of FAIL_FINISHING_CONTAINER + // The attempt will transition from FAIL_FINISHING_CONTAINER to + // FAIL_CONTAINER_CLEANUP if it doesn't receive container exit + // notification within TASK_EXIT_TIMEOUT; + // Or it will transition to FAILED if it receives container exit + // notification from YARN. + FAIL_FINISHING_CONTAINER, + + SUCCESS_CONTAINER_CLEANUP, + SUCCEEDED, FAIL_CONTAINER_CLEANUP, FAIL_TASK_CLEANUP, FAILED, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java index a43263264e9..afac40ebd37 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java @@ -48,6 +48,9 @@ public enum TaskAttemptEventType { TA_UPDATE, TA_TIMED_OUT, + //Producer:Client + TA_FAILMSG_BY_CLIENT, + //Producer:TaskCleaner TA_CLEANUP_DONE, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 5a8a6585bd9..12e5e775033 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -184,8 +184,20 @@ public abstract class TaskAttemptImpl implements private Locality locality; private Avataar avataar; - private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION = - new CleanupContainerTransition(); + private static final CleanupContainerTransition + CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition(); + private static final MoveContainerToSucceededFinishingTransition + SUCCEEDED_FINISHING_TRANSITION = + new MoveContainerToSucceededFinishingTransition(); + private static final MoveContainerToFailedFinishingTransition + FAILED_FINISHING_TRANSITION = + new MoveContainerToFailedFinishingTransition(); + private static final ExitFinishingOnTimeoutTransition + FINISHING_ON_TIMEOUT_TRANSITION = + new ExitFinishingOnTimeoutTransition(); + + private static final FinalizeFailedTransition FINALIZE_FAILED_TRANSITION = + new FinalizeFailedTransition(); private static final DiagnosticInformationUpdater DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION @@ -204,6 +216,8 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, + TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE); private static final StateMachineFactory @@ -221,16 +235,16 @@ public abstract class TaskAttemptImpl implements .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL, new KilledTransition()) .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, - TaskAttemptEventType.TA_FAILMSG, new FailedTransition()) + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new FailedTransition()) .addTransition(TaskAttemptStateInternal.NEW, EnumSet.of(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_RECOVER, new RecoverTransition()) .addTransition(TaskAttemptStateInternal.NEW, - TaskAttemptStateInternal.NEW, - TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, - DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) + TaskAttemptStateInternal.NEW, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Transitions from the UNASSIGNED state. .addTransition(TaskAttemptStateInternal.UNASSIGNED, @@ -238,14 +252,14 @@ public abstract class TaskAttemptImpl implements new ContainerAssignedTransition()) .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition( - TaskAttemptStateInternal.KILLED, true)) + TaskAttemptStateInternal.KILLED, true)) .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED, - TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition( + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new DeallocateContainerTransition( TaskAttemptStateInternal.FAILED, true)) .addTransition(TaskAttemptStateInternal.UNASSIGNED, - TaskAttemptStateInternal.UNASSIGNED, - TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, - DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) + TaskAttemptStateInternal.UNASSIGNED, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Transitions from the ASSIGNED state. .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING, @@ -258,15 +272,19 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false)) .addTransition(TaskAttemptStateInternal.ASSIGNED, - TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, + TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_COMPLETED, - CLEANUP_CONTAINER_TRANSITION) + FINALIZE_FAILED_TRANSITION) .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) - .addTransition(TaskAttemptStateInternal.ASSIGNED, + .addTransition(TaskAttemptStateInternal.ASSIGNED, + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION) + .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, - TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, + CLEANUP_CONTAINER_TRANSITION) // Transitions from RUNNING state. .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, @@ -274,23 +292,27 @@ public abstract class TaskAttemptImpl implements .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) - // If no commit is required, task directly goes to success + // If no commit is required, task goes to finishing state + // This will give a chance for the container to exit by itself .addTransition(TaskAttemptStateInternal.RUNNING, - TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, - TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION) + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION) // If commit is required, task goes through commit pending state. .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition()) // Failure handling while RUNNING .addTransition(TaskAttemptStateInternal.RUNNING, - TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, - TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) - //for handling container exit without sending the done or fail msg + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION) .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, CLEANUP_CONTAINER_TRANSITION) + //for handling container exit without sending the done or fail msg + .addTransition(TaskAttemptStateInternal.RUNNING, + TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_COMPLETED, - CLEANUP_CONTAINER_TRANSITION) + FINALIZE_FAILED_TRANSITION) // Timeout handling while RUNNING .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, @@ -301,9 +323,94 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) // Kill handling .addTransition(TaskAttemptStateInternal.RUNNING, - TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) + // Transitions from SUCCESS_FINISHING_CONTAINER state + // When the container exits by itself, the notification of container + // completed event will be routed via NM -> RM -> AM. + // After MRAppMaster gets notification from RM, it will generate + // TA_CONTAINER_COMPLETED event. + .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptStateInternal.SUCCEEDED, + TaskAttemptEventType.TA_CONTAINER_COMPLETED, + new ExitFinishingOnContainerCompletedTransition()) + // Given TA notifies task T_ATTEMPT_SUCCEEDED when it transitions to + // SUCCESS_FINISHING_CONTAINER, it is possible to receive the event + // TA_CONTAINER_CLEANED in the following scenario. + // 1. It is the last task for the job. + // 2. After the task receives T_ATTEMPT_SUCCEEDED, it will notify job. + // 3. Job will be marked completed. + // 4. As part of MRAppMaster's shutdown, all containers will be killed. + .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptStateInternal.SUCCEEDED, + TaskAttemptEventType.TA_CONTAINER_CLEANED, + new ExitFinishingOnContainerCleanedupTransition()) + // The client wants to kill the task. Given the task is in finishing + // state, it could go to succeeded state or killed state. If it is a + // reducer, it will go to succeeded state; + // otherwise, it goes to killed state. + .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + EnumSet.of(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP), + TaskAttemptEventType.TA_KILL, + new KilledAfterSucceededFinishingTransition()) + // The attempt stays in finishing state for too long + // Let us clean up the container + .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, + TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION) + .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) + // ignore-able events + .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + EnumSet.of(TaskAttemptEventType.TA_UPDATE, + TaskAttemptEventType.TA_DONE, + TaskAttemptEventType.TA_COMMIT_PENDING, + TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT)) + + // Transitions from FAIL_FINISHING_CONTAINER state + // When the container exits by itself, the notification of container + // completed event will be routed via NM -> RM -> AM. + // After MRAppMaster gets notification from RM, it will generate + // TA_CONTAINER_COMPLETED event. + .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptStateInternal.FAILED, + TaskAttemptEventType.TA_CONTAINER_COMPLETED, + new ExitFinishingOnContainerCompletedTransition()) + // Given TA notifies task T_ATTEMPT_FAILED when it transitions to + // FAIL_FINISHING_CONTAINER, it is possible to receive the event + // TA_CONTAINER_CLEANED in the following scenario. + // 1. It is the last task attempt for the task. + // 2. After the task receives T_ATTEMPT_FAILED, it will notify job. + // 3. Job will be marked failed. + // 4. As part of MRAppMaster's shutdown, all containers will be killed. + .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptStateInternal.FAILED, + TaskAttemptEventType.TA_CONTAINER_CLEANED, + new ExitFinishingOnContainerCleanedupTransition()) + .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, + TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION) + .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) + // ignore-able events + .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + EnumSet.of(TaskAttemptEventType.TA_KILL, + TaskAttemptEventType.TA_UPDATE, + TaskAttemptEventType.TA_DONE, + TaskAttemptEventType.TA_COMMIT_PENDING, + TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT)) + // Transitions from COMMIT_PENDING state .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE, @@ -313,22 +420,27 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, - TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, - TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION) + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER, + TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION) .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, - TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) // if container killed by AM shutting down .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, - TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, - TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER, + TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION) .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, + CLEANUP_CONTAINER_TRANSITION) + .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, + TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_COMPLETED, - CLEANUP_CONTAINER_TRANSITION) + FINALIZE_FAILED_TRANSITION) .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION) @@ -345,8 +457,8 @@ public abstract class TaskAttemptImpl implements // Transitions from SUCCESS_CONTAINER_CLEANUP state // kill and cleanup the container .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, - TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED, - new SucceededTransition()) + TaskAttemptStateInternal.SUCCEEDED, + TaskAttemptEventType.TA_CONTAINER_CLEANED) .addTransition( TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, @@ -357,6 +469,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP, EnumSet.of(TaskAttemptEventType.TA_KILL, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_CONTAINER_COMPLETED)) @@ -380,6 +493,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, TaskAttemptEventType.TA_TIMED_OUT)) // Transitions from KILL_CONTAINER_CLEANUP @@ -402,6 +516,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, TaskAttemptEventType.TA_TIMED_OUT)) // Transitions from FAIL_TASK_CLEANUP @@ -422,6 +537,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, TaskAttemptEventType.TA_CONTAINER_CLEANED, // Container launch events can arrive late TaskAttemptEventType.TA_CONTAINER_LAUNCHED, @@ -444,6 +560,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, TaskAttemptEventType.TA_CONTAINER_CLEANED, // Container launch events can arrive late TaskAttemptEventType.TA_CONTAINER_LAUNCHED, @@ -456,7 +573,7 @@ public abstract class TaskAttemptImpl implements new TooManyFetchFailureTransition()) .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED), - TaskAttemptEventType.TA_KILL, + TaskAttemptEventType.TA_KILL, new KilledAfterSuccessTransition()) .addTransition( TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, @@ -466,6 +583,10 @@ public abstract class TaskAttemptImpl implements .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, + // TaskAttemptFinishingMonitor might time out the attempt right + // after the attempt receives TA_CONTAINER_COMPLETED. + TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_CONTAINER_COMPLETED)) @@ -1220,21 +1341,21 @@ public abstract class TaskAttemptImpl implements return TaskAttemptState.STARTING; case COMMIT_PENDING: return TaskAttemptState.COMMIT_PENDING; - case FAILED: - return TaskAttemptState.FAILED; - case KILLED: - return TaskAttemptState.KILLED; - // All CLEANUP states considered as RUNNING since events have not gone out - // to the Task yet. May be possible to consider them as a Finished state. case FAIL_CONTAINER_CLEANUP: case FAIL_TASK_CLEANUP: + case FAIL_FINISHING_CONTAINER: + case FAILED: + return TaskAttemptState.FAILED; case KILL_CONTAINER_CLEANUP: case KILL_TASK_CLEANUP: - case SUCCESS_CONTAINER_CLEANUP: + case KILLED: + return TaskAttemptState.KILLED; case RUNNING: return TaskAttemptState.RUNNING; case NEW: return TaskAttemptState.NEW; + case SUCCESS_CONTAINER_CLEANUP: + case SUCCESS_FINISHING_CONTAINER: case SUCCEEDED: return TaskAttemptState.SUCCEEDED; default: @@ -1436,6 +1557,15 @@ public abstract class TaskAttemptImpl implements } } + private static void finalizeProgress(TaskAttemptImpl taskAttempt) { + // unregister it to TaskAttemptListener so that it stops listening + taskAttempt.taskAttemptListener.unregister( + taskAttempt.attemptId, taskAttempt.jvmID); + taskAttempt.reportedStatus.progress = 1.0f; + taskAttempt.updateProgressSplits(); + } + + static class RequestContainerTransition implements SingleArcTransition { private final boolean rescheduled; @@ -1668,53 +1798,66 @@ public abstract class TaskAttemptImpl implements } } - private static class SucceededTransition implements + /** + * Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER + * state upon receiving TA_CONTAINER_COMPLETED event + */ + private static class ExitFinishingOnContainerCompletedTransition implements SingleArcTransition { @SuppressWarnings("unchecked") @Override - public void transition(TaskAttemptImpl taskAttempt, + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister( + taskAttempt.attemptId); + sendContainerCompleted(taskAttempt); + } + } + + private static class ExitFinishingOnContainerCleanedupTransition implements + SingleArcTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { - //set the finish time - taskAttempt.setFinishTime(); - taskAttempt.eventHandler.handle( - createJobCounterUpdateEventTASucceeded(taskAttempt)); - taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED); - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, - TaskEventType.T_ATTEMPT_SUCCEEDED)); - taskAttempt.eventHandler.handle - (new SpeculatorEvent - (taskAttempt.reportedStatus, taskAttempt.clock.getTime())); - } + taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister( + taskAttempt.attemptId); + } } private static class FailedTransition implements SingleArcTransition { @SuppressWarnings("unchecked") @Override - public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { // set the finish time taskAttempt.setFinishTime(); - - if (taskAttempt.getLaunchTime() != 0) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); - TaskAttemptUnsuccessfulCompletionEvent tauce = - createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - TaskAttemptStateInternal.FAILED); - taskAttempt.eventHandler.handle(new JobHistoryEvent( - taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not - // handling failed map/reduce events. - }else { - LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); - } - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); + notifyTaskAttemptFailed(taskAttempt); } } + private static class FinalizeFailedTransition extends FailedTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + finalizeProgress(taskAttempt); + sendContainerCompleted(taskAttempt); + super.transition(taskAttempt, event); + } + } + + @SuppressWarnings("unchecked") + private static void sendContainerCompleted(TaskAttemptImpl taskAttempt) { + taskAttempt.eventHandler.handle(new ContainerLauncherEvent( + taskAttempt.attemptId, + taskAttempt.container.getId(), StringInterner + .weakIntern(taskAttempt.container.getNodeId().toString()), + taskAttempt.container.getContainerToken(), + ContainerLauncher.EventType.CONTAINER_COMPLETED)); + } + private static class RecoverTransition implements MultipleArcTransition { @@ -1839,6 +1982,35 @@ public abstract class TaskAttemptImpl implements } } + private static class KilledAfterSucceededFinishingTransition + implements MultipleArcTransition { + + @SuppressWarnings("unchecked") + @Override + public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister( + taskAttempt.attemptId); + sendContainerCleanup(taskAttempt, event); + if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) { + // after a reduce task has succeeded, its outputs are in safe in HDFS. + // logically such a task should not be killed. we only come here when + // there is a race condition in the event queue. E.g. some logic sends + // a kill request to this attempt when the successful completion event + // for this task is already in the event queue. so the kill event will + // get executed immediately after the attempt is marked successful and + // result in this transition being exercised. + // ignore this for reduce tasks + LOG.info("Ignoring killed event for successful reduce task attempt" + + taskAttempt.getID().toString()); + return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP; + } else { + return TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP; + } + } + } + private static class KilledTransition implements SingleArcTransition { @@ -1873,6 +2045,31 @@ public abstract class TaskAttemptImpl implements } } + /** + * Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER + * state upon receiving TA_TIMED_OUT event + */ + private static class ExitFinishingOnTimeoutTransition implements + SingleArcTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister( + taskAttempt.attemptId); + // The attempt stays in finishing state for too long + String msg = "Task attempt " + taskAttempt.getID() + " is done from " + + "TaskUmbilicalProtocol's point of view. However, it stays in " + + "finishing state for too long"; + LOG.warn(msg); + taskAttempt.addDiagnosticInfo(msg); + sendContainerCleanup(taskAttempt, event); + } + } + + /** + * Finish and clean up the container + */ private static class CleanupContainerTransition implements SingleArcTransition { @SuppressWarnings("unchecked") @@ -1880,27 +2077,103 @@ public abstract class TaskAttemptImpl implements public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { // unregister it to TaskAttemptListener so that it stops listening - // for it - taskAttempt.taskAttemptListener.unregister( - taskAttempt.attemptId, taskAttempt.jvmID); - - if (event instanceof TaskAttemptKillEvent) { - taskAttempt.addDiagnosticInfo( - ((TaskAttemptKillEvent) event).getMessage()); - } - - taskAttempt.reportedStatus.progress = 1.0f; - taskAttempt.updateProgressSplits(); - //send the cleanup event to containerLauncher - taskAttempt.eventHandler.handle(new ContainerLauncherEvent( - taskAttempt.attemptId, - taskAttempt.container.getId(), StringInterner - .weakIntern(taskAttempt.container.getNodeId().toString()), - taskAttempt.container.getContainerToken(), - ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP)); + // for it. + finalizeProgress(taskAttempt); + sendContainerCleanup(taskAttempt, event); } } + @SuppressWarnings("unchecked") + private static void sendContainerCleanup(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + if (event instanceof TaskAttemptKillEvent) { + taskAttempt.addDiagnosticInfo( + ((TaskAttemptKillEvent) event).getMessage()); + } + //send the cleanup event to containerLauncher + taskAttempt.eventHandler.handle(new ContainerLauncherEvent( + taskAttempt.attemptId, + taskAttempt.container.getId(), StringInterner + .weakIntern(taskAttempt.container.getNodeId().toString()), + taskAttempt.container.getContainerToken(), + ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP)); + } + + /** + * Transition to SUCCESS_FINISHING_CONTAINER upon receiving TA_DONE event + */ + private static class MoveContainerToSucceededFinishingTransition implements + SingleArcTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + finalizeProgress(taskAttempt); + + // register it to finishing state + taskAttempt.appContext.getTaskAttemptFinishingMonitor().register( + taskAttempt.attemptId); + + // set the finish time + taskAttempt.setFinishTime(); + + // notify job history + taskAttempt.eventHandler.handle( + createJobCounterUpdateEventTASucceeded(taskAttempt)); + taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED); + + //notify the task even though the container might not have exited yet. + taskAttempt.eventHandler.handle(new TaskTAttemptEvent( + taskAttempt.attemptId, + TaskEventType.T_ATTEMPT_SUCCEEDED)); + taskAttempt.eventHandler.handle + (new SpeculatorEvent + (taskAttempt.reportedStatus, taskAttempt.clock.getTime())); + + } + } + + /** + * Transition to FAIL_FINISHING_CONTAINER upon receiving TA_FAILMSG event + */ + private static class MoveContainerToFailedFinishingTransition implements + SingleArcTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + finalizeProgress(taskAttempt); + // register it to finishing state + taskAttempt.appContext.getTaskAttemptFinishingMonitor().register( + taskAttempt.attemptId); + notifyTaskAttemptFailed(taskAttempt); + } + } + + @SuppressWarnings("unchecked") + private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) { + // set the finish time + taskAttempt.setFinishTime(); + + if (taskAttempt.getLaunchTime() != 0) { + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); + TaskAttemptUnsuccessfulCompletionEvent tauce = + createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, + TaskAttemptStateInternal.FAILED); + taskAttempt.eventHandler.handle(new JobHistoryEvent( + taskAttempt.attemptId.getTaskId().getJobId(), tauce)); + // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not + // handling failed map/reduce events. + }else { + LOG.debug("Not generating HistoryFinish event since start event not " + + "generated for taskAttempt: " + taskAttempt.getID()); + } + taskAttempt.eventHandler.handle(new TaskTAttemptEvent( + taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); + + } + private void addDiagnosticInfo(String diag) { if (diag != null && !diag.equals("")) { diagnostics.add(diag); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java index 40ecdb2b3b9..82360f08d45 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java @@ -27,7 +27,13 @@ public interface ContainerLauncher enum EventType { CONTAINER_REMOTE_LAUNCH, - CONTAINER_REMOTE_CLEANUP + CONTAINER_REMOTE_CLEANUP, + // When TaskAttempt receives TA_CONTAINER_COMPLETED, + // it will notify ContainerLauncher so that the container can be removed + // from ContainerLauncher's launched containers list + // Otherwise, ContainerLauncher will try to stop the containers as part of + // serviceStop. + CONTAINER_COMPLETED } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 9c1125d4ec2..a7e966cd362 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -121,7 +121,11 @@ public class ContainerLauncherImpl extends AbstractService implements public synchronized boolean isCompletelyDone() { return state == ContainerState.DONE || state == ContainerState.FAILED; } - + + public synchronized void done() { + state = ContainerState.DONE; + } + @SuppressWarnings("unchecked") public synchronized void launch(ContainerRemoteLaunchEvent event) { LOG.info("Launching " + taskAttemptID); @@ -378,6 +382,11 @@ public class ContainerLauncherImpl extends AbstractService implements case CONTAINER_REMOTE_CLEANUP: c.kill(); break; + + case CONTAINER_COMPLETED: + c.done(); + break; + } removeContainerIfDone(containerID); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java new file mode 100644 index 00000000000..02656bee77f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java @@ -0,0 +1,105 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapred; + + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.SystemClock; + +import org.junit.Test; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestTaskAttemptFinishingMonitor { + + @Test + public void testFinshingAttemptTimeout() + throws IOException, InterruptedException { + SystemClock clock = new SystemClock(); + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100); + conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10); + + AppContext appCtx = mock(AppContext.class); + JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptFinishingMonitor taskAttemptFinishingMonitor = + new TaskAttemptFinishingMonitor(eventHandler); + taskAttemptFinishingMonitor.init(conf); + taskAttemptFinishingMonitor.start(); + + when(appCtx.getEventHandler()).thenReturn(eventHandler); + when(appCtx.getNMHostname()).thenReturn("0.0.0.0"); + when(appCtx.getTaskAttemptFinishingMonitor()).thenReturn( + taskAttemptFinishingMonitor); + when(appCtx.getClock()).thenReturn(clock); + + TaskAttemptListenerImpl listener = + new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler); + + listener.init(conf); + listener.start(); + + JobId jid = MRBuilderUtils.newJobId(12345, 1, 1); + TaskId tid = MRBuilderUtils.newTaskId(jid, 0, + org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0); + appCtx.getTaskAttemptFinishingMonitor().register(attemptId); + int check = 0; + while ( !eventHandler.timedOut && check++ < 10 ) { + Thread.sleep(100); + } + taskAttemptFinishingMonitor.stop(); + + assertTrue("Finishing attempt didn't time out.", eventHandler.timedOut); + + } + + public static class MockEventHandler implements EventHandler { + public boolean timedOut = false; + + @Override + public void handle(Event event) { + if (event instanceof TaskAttemptEvent) { + TaskAttemptEvent attemptEvent = ((TaskAttemptEvent) event); + if (TaskAttemptEventType.TA_TIMED_OUT == attemptEvent.getType()) { + timedOut = true; + } + } + } + }; + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 3100d12ce14..14b4c2da7f6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -481,7 +481,21 @@ public class MRApp extends MRAppMaster { } @Override - protected TaskAttemptListener createTaskAttemptListener(AppContext context) { + protected TaskAttemptFinishingMonitor + createTaskAttemptFinishingMonitor( + EventHandler eventHandler) { + return new TaskAttemptFinishingMonitor(eventHandler) { + @Override + public synchronized void register(TaskAttemptId attemptID) { + getContext().getEventHandler().handle( + new TaskAttemptEvent(attemptID, + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); + } + }; + } + + @Override + protected TaskAttemptListener createTaskAttemptListener(AppContext context) { return new TaskAttemptListener(){ @Override public InetSocketAddress getAddress() { @@ -539,6 +553,8 @@ public class MRApp extends MRAppMaster { new TaskAttemptEvent(event.getTaskAttemptID(), TaskAttemptEventType.TA_CONTAINER_CLEANED)); break; + case CONTAINER_COMPLETED: + break; } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java index a900241464c..e690f3f0bc9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java @@ -148,4 +148,10 @@ public class MockAppContext implements AppContext { // bogus - Not Required return null; } + + @Override + public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() { + return null; + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index 3b845f9b264..a4395d5163f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -222,6 +222,8 @@ public class TestFail { new TaskAttemptEvent(event.getTaskAttemptID(), TaskAttemptEventType.TA_CONTAINER_CLEANED)); break; + case CONTAINER_COMPLETED: + super.handle(event); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java index c33bd4d173d..aae591e33d3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java @@ -159,7 +159,7 @@ public class TestKill { super.dispatch(new TaskAttemptEvent(taID, TaskAttemptEventType.TA_DONE)); super.dispatch(new TaskAttemptEvent(taID, - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); super.dispatch(new TaskTAttemptEvent(taID, TaskEventType.T_ATTEMPT_SUCCEEDED)); this.cachedKillEvent = killEvent; @@ -211,40 +211,9 @@ public class TestKill { app.getContext().getEventHandler() .handle(new JobEvent(jobId, JobEventType.JOB_KILL)); - app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); + app.waitForInternalState((JobImpl) job, JobStateInternal.KILLED); } - static class MyAsyncDispatch extends AsyncDispatcher { - private CountDownLatch latch; - private TaskAttemptEventType attemptEventTypeToWait; - MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) { - super(); - this.latch = latch; - this.attemptEventTypeToWait = attemptEventTypeToWait; - } - - @Override - protected void dispatch(Event event) { - if (event instanceof TaskAttemptEvent) { - TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event; - TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID(); - if (attemptEvent.getType() == this.attemptEventTypeToWait - && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) { - try { - latch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - super.dispatch(event); - } - } - - // This is to test a race condition where JobEventType.JOB_KILL is generated - // right after TaskAttemptEventType.TA_DONE is generated. - // TaskImpl's state machine might receive both T_ATTEMPT_SUCCEEDED - // and T_ATTEMPT_KILLED from the same attempt. @Test public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception { CountDownLatch latch = new CountDownLatch(1); @@ -269,15 +238,12 @@ public class TestKill { TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next(); app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); - // The order in the dispatch event queue, from the oldest to the newest + // The order in the dispatch event queue, from first to last // TA_DONE - // JOB_KILL - // CONTAINER_REMOTE_CLEANUP ( from TA_DONE's handling ) - // T_KILL ( from JOB_KILL's handling ) - // TA_CONTAINER_CLEANED ( from CONTAINER_REMOTE_CLEANUP's handling ) - // TA_KILL ( from T_KILL's handling ) - // T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_CLEANED's handling ) - // T_ATTEMPT_KILLED ( from TA_KILL's handling ) + // JobEventType.JOB_KILL + // TaskAttemptEventType.TA_CONTAINER_COMPLETED ( from TA_DONE handling ) + // TaskEventType.T_KILL ( from JobEventType.JOB_KILL handling ) + // TaskEventType.T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_COMPLETED handling ) // Finish map app.getContext().getEventHandler().handle( @@ -295,6 +261,100 @@ public class TestKill { app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); } + + @Test + public void testKillTaskWaitKillJobBeforeTA_DONE() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + final Dispatcher dispatcher = new MyAsyncDispatch(latch, JobEventType.JOB_KILL); + MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { + @Override + public Dispatcher createDispatcher() { + return dispatcher; + } + }; + Job job = app.submit(new Configuration()); + JobId jobId = app.getJobId(); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask = it.next(); + Task reduceTask = it.next(); + app.waitForState(mapTask, TaskState.RUNNING); + app.waitForState(reduceTask, TaskState.RUNNING); + TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt, TaskAttemptState.RUNNING); + TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next(); + app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); + + // The order in the dispatch event queue, from first to last + // JobEventType.JOB_KILL + // TA_DONE + // TaskEventType.T_KILL ( from JobEventType.JOB_KILL handling ) + // TaskAttemptEventType.TA_CONTAINER_COMPLETED ( from TA_DONE handling ) + // TaskAttemptEventType.TA_KILL ( from TaskEventType.T_KILL handling ) + // TaskEventType.T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_COMPLETED handling ) + // TaskEventType.T_ATTEMPT_KILLED ( from TA_KILL handling ) + + // Now kill the job + app.getContext().getEventHandler() + .handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + + // Finish map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapAttempt.getID(), + TaskAttemptEventType.TA_DONE)); + + //unblock + latch.countDown(); + + app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); + } + + static class MyAsyncDispatch extends AsyncDispatcher { + private CountDownLatch latch; + private TaskAttemptEventType attemptEventTypeToWait; + private JobEventType jobEventTypeToWait; + MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) { + super(); + this.latch = latch; + this.attemptEventTypeToWait = attemptEventTypeToWait; + } + + MyAsyncDispatch(CountDownLatch latch, JobEventType jobEventTypeToWait) { + super(); + this.latch = latch; + this.jobEventTypeToWait = jobEventTypeToWait; + } + + @Override + protected void dispatch(Event event) { + if (event instanceof TaskAttemptEvent) { + TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event; + TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID(); + if (attemptEvent.getType() == this.attemptEventTypeToWait + && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } else if ( event instanceof JobEvent) { + JobEvent jobEvent = (JobEvent) event; + if (jobEvent.getType() == this.jobEventTypeToWait) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + super.dispatch(event); + } + } + @Test public void testKillTaskAttempt() throws Exception { final CountDownLatch latch = new CountDownLatch(1); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 69f27091442..475cd1f5e8a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -884,5 +884,10 @@ public class TestRuntimeEstimators { // bogus - Not Required return null; } + + @Override + public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() { + return null; + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 1807c1c3e09..79b88d846d1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; import org.apache.hadoop.mapreduce.v2.app.MRApp; +import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; @@ -407,6 +408,7 @@ public class TestTaskAttempt{ Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, @@ -464,6 +466,7 @@ public class TestTaskAttempt{ Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, @@ -524,6 +527,7 @@ public class TestTaskAttempt{ Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, @@ -546,7 +550,7 @@ public class TestTaskAttempt{ taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE)); taImpl.handle(new TaskAttemptEvent(attemptId, - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); @@ -593,6 +597,7 @@ public class TestTaskAttempt{ Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, @@ -641,6 +646,7 @@ public class TestTaskAttempt{ Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, @@ -663,7 +669,7 @@ public class TestTaskAttempt{ taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE)); taImpl.handle(new TaskAttemptEvent(attemptId, - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); @@ -708,6 +714,7 @@ public class TestTaskAttempt{ Resource resource = mock(Resource.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); when(resource.getMemory()).thenReturn(1024); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splits, jobConf, taListener, @@ -753,6 +760,7 @@ public class TestTaskAttempt{ AppContext appCtx = mock(AppContext.class); ClusterInfo clusterInfo = mock(ClusterInfo.class); when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, @@ -774,7 +782,7 @@ public class TestTaskAttempt{ taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE)); taImpl.handle(new TaskAttemptEvent(attemptId, - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); assertEquals("Task attempt is not in succeeded state", taImpl.getState(), TaskAttemptState.SUCCEEDED); @@ -967,6 +975,255 @@ public class TestTaskAttempt{ taImpl.getInternalState()); } + + @Test + public void testKillMapTaskWhileSuccessFinishing() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_DONE)); + + assertEquals("Task attempt is not in SUCCEEDED state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); + + // If the map task is killed when it is in SUCCESS_FINISHING_CONTAINER + // state, the state will move to KILL_CONTAINER_CLEANUP + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_KILL)); + assertEquals("Task attempt is not in KILLED state", taImpl.getState(), + TaskAttemptState.KILLED); + assertEquals("Task attempt's internal state is not KILL_CONTAINER_CLEANUP", + taImpl.getInternalState(), + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + assertEquals("Task attempt's internal state is not KILL_TASK_CLEANUP", + taImpl.getInternalState(), + TaskAttemptStateInternal.KILL_TASK_CLEANUP); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CLEANUP_DONE)); + + assertEquals("Task attempt is not in KILLED state", taImpl.getState(), + TaskAttemptState.KILLED); + + assertFalse("InternalError occurred", eventHandler.internalError); + } + + @Test + public void testKillMapTaskWhileFailFinishing() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_FAILMSG)); + + assertEquals("Task attempt is not in FAILED state", taImpl.getState(), + TaskAttemptState.FAILED); + assertEquals("Task attempt's internal state is not " + + "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER); + + // If the map task is killed when it is in FAIL_FINISHING_CONTAINER state, + // the state will stay in FAIL_FINISHING_CONTAINER. + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_KILL)); + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.FAILED); + assertEquals("Task attempt's internal state is not " + + "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_TIMED_OUT)); + assertEquals("Task attempt's internal state is not FAIL_CONTAINER_CLEANUP", + taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + assertEquals("Task attempt's internal state is not FAIL_TASK_CLEANUP", + taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_TASK_CLEANUP); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CLEANUP_DONE)); + + assertEquals("Task attempt is not in KILLED state", taImpl.getState(), + TaskAttemptState.FAILED); + + assertFalse("InternalError occurred", eventHandler.internalError); + } + + @Test + public void testFailMapTaskByClient() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT)); + + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.FAILED); + assertEquals("Task attempt's internal state is not " + + "FAIL_CONTAINER_CLEANUP", taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + assertEquals("Task attempt's internal state is not FAIL_TASK_CLEANUP", + taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_TASK_CLEANUP); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CLEANUP_DONE)); + + assertEquals("Task attempt is not in KILLED state", taImpl.getState(), + TaskAttemptState.FAILED); + + assertFalse("InternalError occurred", eventHandler.internalError); + } + + @Test + public void testTaskAttemptDiagnosticEventOnFinishing() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_DONE)); + + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); + + // TA_DIAGNOSTICS_UPDATE doesn't change state + taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(taImpl.getID(), + "Task got updated")); + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); + + assertFalse("InternalError occurred", eventHandler.internalError); + } + + @Test + public void testTimeoutWhileSuccessFinishing() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_DONE)); + + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); + + // If the task stays in SUCCESS_FINISHING_CONTAINER for too long, + // TaskAttemptListenerImpl will time out the attempt. + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_TIMED_OUT)); + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_CONTAINER_CLEANUP", taImpl.getInternalState(), + TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP); + + assertFalse("InternalError occurred", eventHandler.internalError); + } + + @Test + public void testTimeoutWhileFailFinishing() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_FAILMSG)); + + assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), + TaskAttemptState.FAILED); + assertEquals("Task attempt's internal state is not " + + "FAIL_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER); + + // If the task stays in FAIL_FINISHING_CONTAINER for too long, + // TaskAttemptListenerImpl will time out the attempt. + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_TIMED_OUT)); + assertEquals("Task attempt's internal state is not FAIL_CONTAINER_CLEANUP", + taImpl.getInternalState(), + TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP); + + assertFalse("InternalError occurred", eventHandler.internalError); + } + + private void setupTaskAttemptFinishingMonitor( + EventHandler eventHandler, JobConf jobConf, AppContext appCtx) { + TaskAttemptFinishingMonitor taskAttemptFinishingMonitor = + new TaskAttemptFinishingMonitor(eventHandler); + taskAttemptFinishingMonitor.init(jobConf); + when(appCtx.getTaskAttemptFinishingMonitor()). + thenReturn(taskAttemptFinishingMonitor); + } + + private TaskAttemptImpl createTaskAttemptImpl( + MockEventHandler eventHandler) { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + splits, jobConf, taListener, + mock(Token.class), new Credentials(), + new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, + container, mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + return taImpl; + } + public static class MockEventHandler implements EventHandler { public boolean internalError; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index d408d80dc1c..451ace82d33 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -227,7 +227,15 @@ public interface MRJobConfig { public static final String TASK_TIMEOUT = "mapreduce.task.timeout"; public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.timeout.check-interval-ms"; - + + public static final String TASK_EXIT_TIMEOUT = "mapreduce.task.exit.timeout"; + + public static final int TASK_EXIT_TIMEOUT_DEFAULT = 60 * 1000; + + public static final String TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.exit.timeout.check-interval-ms"; + + public static final int TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT = 20 * 1000; + public static final String TASK_ID = "mapreduce.task.id"; public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 9f1717d4f63..8608bcde0a0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1664,4 +1664,24 @@ app master. + + + mapreduce.task.exit.timeout + 60000 + The number of milliseconds before a task will be + terminated if it stays in finishing state for too long. + After a task attempt completes from TaskUmbilicalProtocol's point of view, + it will be transitioned to finishing state. That will give a chance for the + task to exit by itself. + + + + + mapreduce.task.exit.timeout.check-interval-ms + 20000 + The interval in milliseconds between which the MR framework + checks if task attempts stay in finishing state for too long. + + + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index 194b85a5a29..41bc90a99ac 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; @@ -399,4 +400,9 @@ public class JobHistory extends AbstractService implements HistoryContext { // bogus - Not Required return null; } + + @Override + public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() { + return null; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java index d2edd192df5..5ce27612ba7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java @@ -102,7 +102,7 @@ public class TestSpeculativeExecutionWithMRApp { appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), TaskAttemptEventType.TA_DONE)); appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED); } } @@ -170,7 +170,7 @@ public class TestSpeculativeExecutionWithMRApp { appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), TaskAttemptEventType.TA_DONE)); appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); numTasksToFinish--; app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED); } else { @@ -228,7 +228,7 @@ public class TestSpeculativeExecutionWithMRApp { appEventHandler.handle( new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE)); appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(), - TaskAttemptEventType.TA_CONTAINER_CLEANED)); + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); return ta; }