From 6a1c41111edcdc58c846fc50e53554fbba230171 Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Thu, 11 Apr 2013 04:52:38 +0000 Subject: [PATCH] MAPREDUCE-5079. Changes job recovery to restore state directly from job history, instaed of simulating state machine events. Contributed by Jason Lowe and Robert Parker. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1466767 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 172 ++++--- .../event/JobStartEvent.java} | 27 +- .../app/job/event/TaskAttemptEventType.java | 1 + .../job/event/TaskAttemptRecoverEvent.java | 50 ++ .../v2/app/job/event/TaskEventType.java | 1 + .../v2/app/job/event/TaskRecoverEvent.java | 50 ++ .../mapreduce/v2/app/job/impl/JobImpl.java | 33 +- .../v2/app/job/impl/MapTaskImpl.java | 9 +- .../v2/app/job/impl/ReduceTaskImpl.java | 9 +- .../v2/app/job/impl/TaskAttemptImpl.java | 214 ++++++-- .../mapreduce/v2/app/job/impl/TaskImpl.java | 269 ++++++---- .../mapreduce/v2/app/recover/Recovery.java | 39 -- .../v2/app/recover/RecoveryService.java | 480 ------------------ .../apache/hadoop/mapreduce/v2/app/MRApp.java | 8 +- .../hadoop/mapreduce/v2/app/TestRecovery.java | 470 +++++++++++++++++ .../mapreduce/v2/app/TestStagingCleanup.java | 3 +- .../v2/app/job/impl/TestJobImpl.java | 41 +- .../v2/app/job/impl/TestTaskImpl.java | 19 +- .../v2/jobhistory/JobHistoryUtils.java | 16 + .../apache/hadoop/mapreduce/MRJobConfig.java | 1 + 21 files changed, 1146 insertions(+), 770 deletions(-) rename hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/{recover/package-info.java => job/event/JobStartEvent.java} (61%) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptRecoverEvent.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskRecoverEvent.java delete mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java delete mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 72967fd6a6a..9d0a815287e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -169,6 +169,10 @@ Release 2.0.5-alpha - UNRELEASED MAPREDUCE-5129. Allow tags to JobHistory for deeper analytics. (billie via acmurthy) + MAPREDUCE-5079. Changes job recovery to restore state directly from job + history, instaed of simulating state machine events. + (Jason Lowe and Robert Parker via sseth) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index b320e4110da..45d6e9e84a9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -24,9 +24,12 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.io.IOUtils; @@ -46,6 +49,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.EventReader; @@ -54,6 +58,9 @@ import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; @@ -61,6 +68,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.MRClientService; @@ -74,6 +82,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; @@ -84,8 +93,6 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; -import org.apache.hadoop.mapreduce.v2.app.recover.Recovery; -import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; @@ -94,6 +101,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; +import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -167,7 +175,6 @@ public class MRAppMaster extends CompositeService { private AppContext context; private Dispatcher dispatcher; private ClientService clientService; - private Recovery recoveryServ; private ContainerAllocator containerAllocator; private ContainerLauncher containerLauncher; private EventHandler committerEventHandler; @@ -180,7 +187,6 @@ public class MRAppMaster extends CompositeService { private OutputCommitter committer; private JobEventDispatcher jobEventDispatcher; private JobHistoryEventHandler jobHistoryEventHandler; - private boolean inRecovery = false; private SpeculatorEventDispatcher speculatorEventDispatcher; private Job job; @@ -193,6 +199,8 @@ public class MRAppMaster extends CompositeService { private String shutDownMessage = null; JobStateInternal forcedState = null; + private long recoveredJobStartTime = 0; + public MRAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, long appSubmitTime, int maxAppAttempts) { @@ -340,34 +348,9 @@ public class MRAppMaster extends CompositeService { } } else { committer = createOutputCommitter(conf); - boolean recoveryEnabled = conf.getBoolean( - MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); - boolean recoverySupportedByCommitter = committer.isRecoverySupported(); - // If a shuffle secret was not provided by the job client then this app - // attempt will generate one. However that disables recovery if there - // are reducers as the shuffle secret would be app attempt specific. - boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 && - TokenCache.getShuffleSecretKey(fsTokens) != null); - - if (recoveryEnabled && recoverySupportedByCommitter - && shuffleKeyValidForRecovery && appAttemptID.getAttemptId() > 1) { - LOG.info("Recovery is enabled. " - + "Will try to recover from previous life on best effort basis."); - recoveryServ = createRecoveryService(context); - addIfService(recoveryServ); - dispatcher = recoveryServ.getDispatcher(); - clock = recoveryServ.getClock(); - inRecovery = true; - } else { - LOG.info("Not starting RecoveryService: recoveryEnabled: " - + recoveryEnabled + " recoverySupportedByCommitter: " - + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: " - + shuffleKeyValidForRecovery + " ApplicationAttemptID: " - + appAttemptID.getAttemptId()); - dispatcher = createDispatcher(); - addIfService(dispatcher); - } + dispatcher = createDispatcher(); + addIfService(dispatcher); //service to handle requests from JobClient clientService = createClientService(context); @@ -595,15 +578,6 @@ public class MRAppMaster extends CompositeService { return new JobFinishEventHandler(); } - /** - * Create the recovery service. - * @return an instance of the recovery service. - */ - protected Recovery createRecoveryService(AppContext appContext) { - return new RecoveryService(appContext.getApplicationAttemptId(), - appContext.getClock(), getCommitter(), isNewApiCommitter()); - } - /** Create and initialize (but don't start) a single job. * @param forcedState a state to force the job into or null for normal operation. * @param diagnostic a diagnostic message to include with the job. @@ -615,7 +589,8 @@ public class MRAppMaster extends CompositeService { Job newJob = new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(), taskAttemptListener, jobTokenSecretManager, fsTokens, clock, - completedTasksFromPreviousRun, metrics, newApiCommitter, + completedTasksFromPreviousRun, metrics, + committer, newApiCommitter, currentUser.getUserName(), appSubmitTime, amInfos, context, forcedState, diagnostic); ((RunningAppContext) context).jobs.put(newJob.getID(), newJob); @@ -978,18 +953,8 @@ public class MRAppMaster extends CompositeService { public void start() { amInfos = new LinkedList(); - - // Pull completedTasks etc from recovery - if (inRecovery) { - completedTasksFromPreviousRun = recoveryServ.getCompletedTasks(); - amInfos = recoveryServ.getAMInfos(); - } else { - // Get the amInfos anyways irrespective of whether recovery is enabled or - // not IF this is not the first AM generation - if (appAttemptID.getAttemptId() != 1) { - amInfos.addAll(readJustAMInfos()); - } - } + completedTasksFromPreviousRun = new HashMap(); + processRecovery(); // Current an AMInfo for the current AM generation. AMInfo amInfo = @@ -1051,13 +1016,105 @@ public class MRAppMaster extends CompositeService { startJobs(); } + private void processRecovery() { + if (appAttemptID.getAttemptId() == 1) { + return; // no need to recover on the first attempt + } + + boolean recoveryEnabled = getConfig().getBoolean( + MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, + MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT); + boolean recoverySupportedByCommitter = + committer != null && committer.isRecoverySupported(); + + // If a shuffle secret was not provided by the job client then this app + // attempt will generate one. However that disables recovery if there + // are reducers as the shuffle secret would be app attempt specific. + int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0); + boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 && + TokenCache.getShuffleSecretKey(fsTokens) != null); + + if (recoveryEnabled && recoverySupportedByCommitter + && shuffleKeyValidForRecovery) { + LOG.info("Recovery is enabled. " + + "Will try to recover from previous life on best effort basis."); + try { + parsePreviousJobHistory(); + } catch (IOException e) { + LOG.warn("Unable to parse prior job history, aborting recovery", e); + // try to get just the AMInfos + amInfos.addAll(readJustAMInfos()); + } + } else { + LOG.info("Will not try to recover. recoveryEnabled: " + + recoveryEnabled + " recoverySupportedByCommitter: " + + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: " + + shuffleKeyValidForRecovery + " ApplicationAttemptID: " + + appAttemptID.getAttemptId()); + // Get the amInfos anyways whether recovery is enabled or not + amInfos.addAll(readJustAMInfos()); + } + } + + private static FSDataInputStream getPreviousJobHistoryStream( + Configuration conf, ApplicationAttemptId appAttemptId) + throws IOException { + Path historyFile = JobHistoryUtils.getPreviousJobHistoryPath(conf, + appAttemptId); + LOG.info("Previous history file is at " + historyFile); + return historyFile.getFileSystem(conf).open(historyFile); + } + + private void parsePreviousJobHistory() throws IOException { + FSDataInputStream in = getPreviousJobHistoryStream(getConfig(), + appAttemptID); + JobHistoryParser parser = new JobHistoryParser(in); + JobInfo jobInfo = parser.parse(); + Exception parseException = parser.getParseException(); + if (parseException != null) { + LOG.info("Got an error parsing job-history file" + + ", ignoring incomplete events.", parseException); + } + Map taskInfos = jobInfo + .getAllTasks(); + for (TaskInfo taskInfo : taskInfos.values()) { + if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) { + Iterator> taskAttemptIterator = + taskInfo.getAllTaskAttempts().entrySet().iterator(); + while (taskAttemptIterator.hasNext()) { + Map.Entry currentEntry = taskAttemptIterator.next(); + if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) { + taskAttemptIterator.remove(); + } + } + completedTasksFromPreviousRun + .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo); + LOG.info("Read from history task " + + TypeConverter.toYarn(taskInfo.getTaskId())); + } + } + LOG.info("Read completed tasks from history " + + completedTasksFromPreviousRun.size()); + recoveredJobStartTime = jobInfo.getLaunchTime(); + + // recover AMInfos + List jhAmInfoList = jobInfo.getAMInfos(); + if (jhAmInfoList != null) { + for (JobHistoryParser.AMInfo jhAmInfo : jhAmInfoList) { + AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(), + jhAmInfo.getStartTime(), jhAmInfo.getContainerId(), + jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(), + jhAmInfo.getNodeManagerHttpPort()); + amInfos.add(amInfo); + } + } + } + private List readJustAMInfos() { List amInfos = new ArrayList(); FSDataInputStream inputStream = null; try { - inputStream = - RecoveryService.getPreviousJobHistoryFileStream(getConfig(), - appAttemptID); + inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID); EventReader jobHistoryEventReader = new EventReader(inputStream); // All AMInfos are contiguous. Track when the first AMStartedEvent @@ -1108,7 +1165,8 @@ public class MRAppMaster extends CompositeService { @SuppressWarnings("unchecked") protected void startJobs() { /** create a job-start event to get this ball rolling */ - JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START); + JobEvent startJobEvent = new JobStartEvent(job.getID(), + recoveredJobStartTime); /** send the job-start event. this triggers the job execution. */ dispatcher.getEventHandler().handle(startJobEvent); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/package-info.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java similarity index 61% rename from hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/package-info.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java index 400fdfaea63..39051da000f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/package-info.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,6 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@InterfaceAudience.Private -package org.apache.hadoop.mapreduce.v2.app.recover; -import org.apache.hadoop.classification.InterfaceAudience; + +package org.apache.hadoop.mapreduce.v2.app.job.event; + +import org.apache.hadoop.mapreduce.v2.api.records.JobId; + +public class JobStartEvent extends JobEvent { + + long recoveredJobStartTime; + + public JobStartEvent(JobId jobID) { + this(jobID, 0); + } + + public JobStartEvent(JobId jobID, long recoveredJobStartTime) { + super(jobID, JobEventType.JOB_START); + this.recoveredJobStartTime = recoveredJobStartTime; + } + + public long getRecoveredJobStartTime() { + return recoveredJobStartTime; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java index a6c684015ed..a43263264e9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java @@ -26,6 +26,7 @@ public enum TaskAttemptEventType { //Producer:Task TA_SCHEDULE, TA_RESCHEDULE, + TA_RECOVER, //Producer:Client, Task TA_KILL, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptRecoverEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptRecoverEvent.java new file mode 100644 index 00000000000..19fe752fb12 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptRecoverEvent.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.job.event; + +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; + +public class TaskAttemptRecoverEvent extends TaskAttemptEvent { + + private TaskAttemptInfo taInfo; + private OutputCommitter committer; + private boolean recoverAttemptOutput; + + public TaskAttemptRecoverEvent(TaskAttemptId id, TaskAttemptInfo taInfo, + OutputCommitter committer, boolean recoverOutput) { + super(id, TaskAttemptEventType.TA_RECOVER); + this.taInfo = taInfo; + this.committer = committer; + this.recoverAttemptOutput = recoverOutput; + } + + public TaskAttemptInfo getTaskAttemptInfo() { + return taInfo; + } + + public OutputCommitter getCommitter() { + return committer; + } + + public boolean getRecoverOutput() { + return recoverAttemptOutput; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java index d385e2fc682..8ce9c9f27c5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEventType.java @@ -28,6 +28,7 @@ public enum TaskEventType { //Producer:Job T_SCHEDULE, + T_RECOVER, //Producer:Speculator T_ADD_SPEC_ATTEMPT, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskRecoverEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskRecoverEvent.java new file mode 100644 index 00000000000..b5ead5ecb4f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskRecoverEvent.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.job.event; + +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; + +public class TaskRecoverEvent extends TaskEvent { + + private TaskInfo taskInfo; + private OutputCommitter committer; + private boolean recoverTaskOutput; + + public TaskRecoverEvent(TaskId taskID, TaskInfo taskInfo, + OutputCommitter committer, boolean recoverTaskOutput) { + super(taskID, TaskEventType.T_RECOVER); + this.taskInfo = taskInfo; + this.committer = committer; + this.recoverTaskOutput = recoverTaskOutput; + } + + public TaskInfo getTaskInfo() { + return taskInfo; + } + + public OutputCommitter getOutputCommitter() { + return committer; + } + + public boolean getRecoverTaskOutput() { + return recoverTaskOutput; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index ec15fae5b45..367b0280845 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; @@ -92,6 +93,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; @@ -101,6 +103,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; @@ -159,6 +162,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private final Lock writeLock; private final JobId jobId; private final String jobName; + private final OutputCommitter committer; private final boolean newApiCommitter; private final org.apache.hadoop.mapreduce.JobID oldJobId; private final TaskAttemptListener taskAttemptListener; @@ -602,7 +606,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobTokenSecretManager jobTokenSecretManager, Credentials fsTokenCredentials, Clock clock, Map completedTasksFromPreviousRun, MRAppMetrics metrics, - boolean newApiCommitter, String userName, + OutputCommitter committer, boolean newApiCommitter, String userName, long appSubmitTime, List amInfos, AppContext appContext, JobStateInternal forcedState, String forcedDiagnostic) { this.applicationAttemptId = applicationAttemptId; @@ -618,6 +622,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default"); this.appSubmitTime = appSubmitTime; this.oldJobId = TypeConverter.fromYarn(jobId); + this.committer = committer; this.newApiCommitter = newApiCommitter; this.taskAttemptListener = taskAttemptListener; @@ -888,10 +893,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } } - protected void scheduleTasks(Set taskIDs) { + protected void scheduleTasks(Set taskIDs, + boolean recoverTaskOutput) { for (TaskId taskID : taskIDs) { - eventHandler.handle(new TaskEvent(taskID, - TaskEventType.T_SCHEDULE)); + TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID); + if (taskInfo != null) { + eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo, + committer, recoverTaskOutput)); + } else { + eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE)); + } } } @@ -1421,7 +1432,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, job.conf, splits[i], job.taskAttemptListener, job.jobToken, job.fsTokens, - job.clock, job.completedTasksFromPreviousRun, + job.clock, job.applicationAttemptId.getAttemptId(), job.metrics, job.appContext); job.addTask(task); @@ -1439,7 +1450,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, job.conf, job.numMapTasks, job.taskAttemptListener, job.jobToken, job.fsTokens, job.clock, - job.completedTasksFromPreviousRun, job.applicationAttemptId.getAttemptId(), job.metrics, job.appContext); job.addTask(task); @@ -1475,8 +1485,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, @Override public void transition(JobImpl job, JobEvent event) { job.setupProgress = 1.0f; - job.scheduleTasks(job.mapTasks); // schedule (i.e., start) the maps - job.scheduleTasks(job.reduceTasks); + job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0); + job.scheduleTasks(job.reduceTasks, true); // If we have no tasks, just transition to job completed if (job.numReduceTasks == 0 && job.numMapTasks == 0) { @@ -1507,7 +1517,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, */ @Override public void transition(JobImpl job, JobEvent event) { - job.startTime = job.clock.getTime(); + JobStartEvent jse = (JobStartEvent) event; + if (jse.getRecoveredJobStartTime() != 0) { + job.startTime = jse.getRecoveredJobStartTime(); + } else { + job.startTime = job.clock.getTime(); + } JobInitedEvent jie = new JobInitedEvent(job.oldJobId, job.startTime, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java index bec20aa0c81..c625f739c63 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java @@ -18,17 +18,13 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; -import java.util.Map; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapTaskAttemptImpl; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; -import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; @@ -49,11 +45,10 @@ public class MapTaskImpl extends TaskImpl { TaskAttemptListener taskAttemptListener, Token jobToken, Credentials credentials, Clock clock, - Map completedTasksFromPreviousRun, int startCount, - MRAppMetrics metrics, AppContext appContext) { + int appAttemptId, MRAppMetrics metrics, AppContext appContext) { super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, jobToken, credentials, clock, - completedTasksFromPreviousRun, startCount, metrics, appContext); + appAttemptId, metrics, appContext); this.taskSplitMetaInfo = taskSplitMetaInfo; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java index a860ad70242..0f4ea9a73bd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java @@ -18,16 +18,12 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; -import java.util.Map; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.ReduceTaskAttemptImpl; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.records.JobId; -import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; @@ -47,11 +43,10 @@ public class ReduceTaskImpl extends TaskImpl { int numMapTasks, TaskAttemptListener taskAttemptListener, Token jobToken, Credentials credentials, Clock clock, - Map completedTasksFromPreviousRun, int startCount, - MRAppMetrics metrics, AppContext appContext) { + int appAttemptId, MRAppMetrics metrics, AppContext appContext) { super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf, taskAttemptListener, jobToken, credentials, clock, - completedTasksFromPreviousRun, startCount, metrics, appContext); + appAttemptId, metrics, appContext); this.numMapTasks = numMapTasks; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index dae9be0d322..3cb4bf913c9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -56,10 +56,12 @@ import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent; @@ -89,6 +91,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; @@ -204,6 +207,11 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_KILL, new KilledTransition()) .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_FAILMSG, new FailedTransition()) + .addTransition(TaskAttemptStateInternal.NEW, + EnumSet.of(TaskAttemptStateInternal.FAILED, + TaskAttemptStateInternal.KILLED, + TaskAttemptStateInternal.SUCCEEDED), + TaskAttemptEventType.TA_RECOVER, new RecoverTransition()) .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.NEW, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, @@ -1082,6 +1090,102 @@ public abstract class TaskAttemptImpl implements this.avataar = avataar; } + @SuppressWarnings("unchecked") + public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo, + OutputCommitter committer, boolean recoverOutput) { + containerID = taInfo.getContainerId(); + containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":" + + taInfo.getPort()); + containerMgrAddress = StringInterner.weakIntern( + containerNodeId.toString()); + nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":" + + taInfo.getHttpPort()); + computeRackAndLocality(); + launchTime = taInfo.getStartTime(); + finishTime = (taInfo.getFinishTime() != -1) ? + taInfo.getFinishTime() : clock.getTime(); + shufflePort = taInfo.getShufflePort(); + trackerName = taInfo.getHostname(); + httpPort = taInfo.getHttpPort(); + sendLaunchedEvents(); + + reportedStatus.id = attemptId; + reportedStatus.progress = 1.0f; + reportedStatus.counters = taInfo.getCounters(); + reportedStatus.stateString = taInfo.getState(); + reportedStatus.phase = Phase.CLEANUP; + reportedStatus.mapFinishTime = taInfo.getMapFinishTime(); + reportedStatus.shuffleFinishTime = taInfo.getShuffleFinishTime(); + reportedStatus.sortFinishTime = taInfo.getSortFinishTime(); + addDiagnosticInfo(taInfo.getError()); + + boolean needToClean = false; + String recoveredState = taInfo.getTaskStatus(); + if (recoverOutput + && TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) { + TaskAttemptContext tac = new TaskAttemptContextImpl(conf, + TypeConverter.fromYarn(attemptId)); + try { + committer.recoverTask(tac); + LOG.info("Recovered output from task attempt " + attemptId); + } catch (Exception e) { + LOG.error("Unable to recover task attempt " + attemptId, e); + LOG.info("Task attempt " + attemptId + " will be recovered as KILLED"); + recoveredState = TaskAttemptState.KILLED.toString(); + needToClean = true; + } + } + + TaskAttemptStateInternal attemptState; + if (TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) { + attemptState = TaskAttemptStateInternal.SUCCEEDED; + reportedStatus.taskState = TaskAttemptState.SUCCEEDED; + eventHandler.handle(createJobCounterUpdateEventTASucceeded(this)); + logAttemptFinishedEvent(attemptState); + } else if (TaskAttemptState.FAILED.toString().equals(recoveredState)) { + attemptState = TaskAttemptStateInternal.FAILED; + reportedStatus.taskState = TaskAttemptState.FAILED; + eventHandler.handle(createJobCounterUpdateEventTAFailed(this, false)); + TaskAttemptUnsuccessfulCompletionEvent tauce = + createTaskAttemptUnsuccessfulCompletionEvent(this, + TaskAttemptStateInternal.FAILED); + eventHandler.handle( + new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce)); + } else { + if (!TaskAttemptState.KILLED.toString().equals(recoveredState)) { + if (String.valueOf(recoveredState).isEmpty()) { + LOG.info("TaskAttempt" + attemptId + + " had not completed, recovering as KILLED"); + } else { + LOG.warn("TaskAttempt " + attemptId + " found in unexpected state " + + recoveredState + ", recovering as KILLED"); + } + addDiagnosticInfo("Killed during application recovery"); + needToClean = true; + } + attemptState = TaskAttemptStateInternal.KILLED; + reportedStatus.taskState = TaskAttemptState.KILLED; + eventHandler.handle(createJobCounterUpdateEventTAKilled(this, false)); + TaskAttemptUnsuccessfulCompletionEvent tauce = + createTaskAttemptUnsuccessfulCompletionEvent(this, + TaskAttemptStateInternal.KILLED); + eventHandler.handle( + new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce)); + } + + if (needToClean) { + TaskAttemptContext tac = new TaskAttemptContextImpl(conf, + TypeConverter.fromYarn(attemptId)); + try { + committer.abortTask(tac); + } catch (Exception e) { + LOG.warn("Task cleanup failed for attempt " + attemptId, e); + } + } + + return attemptState; + } + private static TaskAttemptState getExternalState( TaskAttemptStateInternal smState) { switch (smState) { @@ -1122,6 +1226,24 @@ public abstract class TaskAttemptImpl implements } } + private void computeRackAndLocality() { + nodeRackName = RackResolver.resolve( + containerNodeId.getHost()).getNetworkLocation(); + + locality = Locality.OFF_SWITCH; + if (dataLocalHosts.size() > 0) { + String cHost = resolveHost(containerNodeId.getHost()); + if (dataLocalHosts.contains(cHost)) { + locality = Locality.NODE_LOCAL; + } + } + if (locality == Locality.OFF_SWITCH) { + if (dataLocalRacks.contains(nodeRackName)) { + locality = Locality.RACK_LOCAL; + } + } + } + private static long computeSlotMillis(TaskAttemptImpl taskAttempt) { TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); int slotMemoryReq = @@ -1141,6 +1263,18 @@ public abstract class TaskAttemptImpl implements return slotMillisIncrement; } + private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded( + TaskAttemptImpl taskAttempt) { + long slotMillis = computeSlotMillis(taskAttempt); + TaskId taskId = taskAttempt.attemptId.getTaskId(); + JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId()); + jce.addCounterUpdate( + taskId.getTaskType() == TaskType.MAP ? + JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES, + slotMillis); + return jce; + } + private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed( TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) { TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); @@ -1210,6 +1344,26 @@ public abstract class TaskAttemptImpl implements return tauce; } + @SuppressWarnings("unchecked") + private void sendLaunchedEvents() { + JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId() + .getJobId()); + jce.addCounterUpdate(attemptId.getTaskId().getTaskType() == TaskType.MAP ? + JobCounter.TOTAL_LAUNCHED_MAPS : JobCounter.TOTAL_LAUNCHED_REDUCES, 1); + eventHandler.handle(jce); + + LOG.info("TaskAttempt: [" + attemptId + + "] using containerId: [" + containerID + " on NM: [" + + containerMgrAddress + "]"); + TaskAttemptStartedEvent tase = + new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId), + TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), + launchTime, trackerName, httpPort, shufflePort, containerID, + locality.toString(), avataar.toString()); + eventHandler.handle( + new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase)); + } + private WrappedProgressSplitsBlock getProgressSplitBlock() { readLock.lock(); try { @@ -1342,8 +1496,6 @@ public abstract class TaskAttemptImpl implements taskAttempt.containerNodeId.toString()); taskAttempt.nodeHttpAddress = StringInterner.weakIntern( cEvent.getContainer().getNodeHttpAddress()); - taskAttempt.nodeRackName = RackResolver.resolve( - taskAttempt.containerNodeId.getHost()).getNetworkLocation(); taskAttempt.containerToken = cEvent.getContainer().getContainerToken(); taskAttempt.assignedCapability = cEvent.getContainer().getResource(); // this is a _real_ Task (classic Hadoop mapred flavor): @@ -1354,19 +1506,7 @@ public abstract class TaskAttemptImpl implements taskAttempt.taskAttemptListener.registerPendingTask( taskAttempt.remoteTask, taskAttempt.jvmID); - taskAttempt.locality = Locality.OFF_SWITCH; - if (taskAttempt.dataLocalHosts.size() > 0) { - String cHost = taskAttempt.resolveHost( - taskAttempt.containerNodeId.getHost()); - if (taskAttempt.dataLocalHosts.contains(cHost)) { - taskAttempt.locality = Locality.NODE_LOCAL; - } - } - if (taskAttempt.locality == Locality.OFF_SWITCH) { - if (taskAttempt.dataLocalRacks.contains(taskAttempt.nodeRackName)) { - taskAttempt.locality = Locality.RACK_LOCAL; - } - } + taskAttempt.computeRackAndLocality(); //launch the container //create the container object to be launched for a given Task attempt @@ -1471,27 +1611,7 @@ public abstract class TaskAttemptImpl implements // Costly? taskAttempt.trackerName = nodeHttpInetAddr.getHostName(); taskAttempt.httpPort = nodeHttpInetAddr.getPort(); - JobCounterUpdateEvent jce = - new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId() - .getJobId()); - jce.addCounterUpdate( - taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ? - JobCounter.TOTAL_LAUNCHED_MAPS: JobCounter.TOTAL_LAUNCHED_REDUCES - , 1); - taskAttempt.eventHandler.handle(jce); - - LOG.info("TaskAttempt: [" + taskAttempt.attemptId - + "] using containerId: [" + taskAttempt.containerID + " on NM: [" - + taskAttempt.containerMgrAddress + "]"); - TaskAttemptStartedEvent tase = - new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId), - TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), - taskAttempt.launchTime, - nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(), - taskAttempt.shufflePort, taskAttempt.containerID, - taskAttempt.locality.toString(), taskAttempt.avataar.toString()); - taskAttempt.eventHandler.handle - (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase)); + taskAttempt.sendLaunchedEvents(); taskAttempt.eventHandler.handle (new SpeculatorEvent (taskAttempt.attemptId, true, taskAttempt.clock.getTime())); @@ -1540,14 +1660,8 @@ public abstract class TaskAttemptImpl implements TaskAttemptEvent event) { //set the finish time taskAttempt.setFinishTime(); - long slotMillis = computeSlotMillis(taskAttempt); - TaskId taskId = taskAttempt.attemptId.getTaskId(); - JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId()); - jce.addCounterUpdate( - taskId.getTaskType() == TaskType.MAP ? - JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES, - slotMillis); - taskAttempt.eventHandler.handle(jce); + taskAttempt.eventHandler.handle( + createJobCounterUpdateEventTASucceeded(taskAttempt)); taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED); taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, @@ -1585,6 +1699,18 @@ public abstract class TaskAttemptImpl implements } } + private static class RecoverTransition implements + MultipleArcTransition { + + @Override + public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + TaskAttemptRecoverEvent tare = (TaskAttemptRecoverEvent) event; + return taskAttempt.recover(tare.getTaskAttemptInfo(), + tare.getCommitter(), tare.getRecoverOutput()); + } + } + @SuppressWarnings({ "unchecked" }) private void logAttemptFinishedEvent(TaskAttemptStateInternal state) { //Log finished events only if an attempt started. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index d01d9998aaf..6e4f1b27f62 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; @@ -37,7 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; @@ -69,8 +70,10 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; @@ -152,6 +155,12 @@ public abstract class TaskImpl implements Task, EventHandler { TaskEventType.T_SCHEDULE, new InitialScheduleTransition()) .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED, TaskEventType.T_KILL, new KillNewTransition()) + .addTransition(TaskStateInternal.NEW, + EnumSet.of(TaskStateInternal.FAILED, + TaskStateInternal.KILLED, + TaskStateInternal.RUNNING, + TaskStateInternal.SUCCEEDED), + TaskEventType.T_RECOVER, new RecoverTransition()) // Transitions from SCHEDULED state //when the first attempt is launched, the task state is set to RUNNING @@ -250,20 +259,16 @@ public abstract class TaskImpl implements Task, EventHandler { // By default, the next TaskAttempt number is zero. Changes during recovery protected int nextAttemptNumber = 0; - private List taskAttemptsFromPreviousGeneration = - new ArrayList(); - private static final class RecoverdAttemptsComparator implements - Comparator { - @Override - public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) { - long diff = attempt1.getStartTime() - attempt2.getStartTime(); - return diff == 0 ? 0 : (diff < 0 ? -1 : 1); - } - } - - private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR = - new RecoverdAttemptsComparator(); + // For sorting task attempts by completion time + private static final Comparator TA_INFO_COMPARATOR = + new Comparator() { + @Override + public int compare(TaskAttemptInfo a, TaskAttemptInfo b) { + long diff = a.getFinishTime() - b.getFinishTime(); + return diff == 0 ? 0 : (diff < 0 ? -1 : 1); + } + }; @Override public TaskState getState() { @@ -280,8 +285,7 @@ public abstract class TaskImpl implements Task, EventHandler { TaskAttemptListener taskAttemptListener, Token jobToken, Credentials credentials, Clock clock, - Map completedTasksFromPreviousRun, int startCount, - MRAppMetrics metrics, AppContext appContext) { + int appAttemptId, MRAppMetrics metrics, AppContext appContext) { this.conf = conf; this.clock = clock; this.jobFile = remoteJobConfFile; @@ -307,41 +311,15 @@ public abstract class TaskImpl implements Task, EventHandler { this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT); - // See if this is from a previous generation. - if (completedTasksFromPreviousRun != null - && completedTasksFromPreviousRun.containsKey(taskId)) { - // This task has TaskAttempts from previous generation. We have to replay - // them. - LOG.info("Task is from previous run " + taskId); - TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId); - Map allAttempts = - taskInfo.getAllTaskAttempts(); - taskAttemptsFromPreviousGeneration = new ArrayList(); - taskAttemptsFromPreviousGeneration.addAll(allAttempts.values()); - Collections.sort(taskAttemptsFromPreviousGeneration, - RECOVERED_ATTEMPTS_COMPARATOR); - } - - if (taskAttemptsFromPreviousGeneration.isEmpty()) { - // All the previous attempts are exhausted, now start with a new - // generation. - - // All the new TaskAttemptIDs are generated based on MR - // ApplicationAttemptID so that attempts from previous lives don't - // over-step the current one. This assumes that a task won't have more - // than 1000 attempts in its single generation, which is very reasonable. - // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts - // and requires serious medical attention. - nextAttemptNumber = (startCount - 1) * 1000; - } else { - // There are still some TaskAttempts from previous generation, use them - nextAttemptNumber = - taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId(); - } - // This "this leak" is okay because the retained pointer is in an // instance variable. stateMachine = stateMachineFactory.make(this); + + // All the new TaskAttemptIDs are generated based on MR + // ApplicationAttemptID so that attempts from previous lives don't + // over-step the current one. This assumes that a task won't have more + // than 1000 attempts in its single generation, which is very reasonable. + nextAttemptNumber = (appAttemptId - 1) * 1000; } @Override @@ -600,14 +578,28 @@ public abstract class TaskImpl implements Task, EventHandler { // This is always called in the Write Lock private void addAndScheduleAttempt(Avataar avataar) { - TaskAttempt attempt = createAttempt(); - ((TaskAttemptImpl) attempt).setAvataar(avataar); + TaskAttempt attempt = addAttempt(avataar); + inProgressAttempts.add(attempt.getID()); + //schedule the nextAttemptNumber + if (failedAttempts.size() > 0) { + eventHandler.handle(new TaskAttemptEvent(attempt.getID(), + TaskAttemptEventType.TA_RESCHEDULE)); + } else { + eventHandler.handle(new TaskAttemptEvent(attempt.getID(), + TaskAttemptEventType.TA_SCHEDULE)); + } + } + + private TaskAttemptImpl addAttempt(Avataar avataar) { + TaskAttemptImpl attempt = createAttempt(); + attempt.setAvataar(avataar); if (LOG.isDebugEnabled()) { LOG.debug("Created attempt " + attempt.getID()); } switch (attempts.size()) { case 0: - attempts = Collections.singletonMap(attempt.getID(), attempt); + attempts = Collections.singletonMap(attempt.getID(), + (TaskAttempt) attempt); break; case 1: @@ -623,24 +615,8 @@ public abstract class TaskImpl implements Task, EventHandler { break; } - // Update nextATtemptNumber - if (taskAttemptsFromPreviousGeneration.isEmpty()) { - ++nextAttemptNumber; - } else { - // There are still some TaskAttempts from previous generation, use them - nextAttemptNumber = - taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId(); - } - - inProgressAttempts.add(attempt.getID()); - //schedule the nextAttemptNumber - if (failedAttempts.size() > 0) { - eventHandler.handle(new TaskAttemptEvent(attempt.getID(), - TaskAttemptEventType.TA_RESCHEDULE)); - } else { - eventHandler.handle(new TaskAttemptEvent(attempt.getID(), - TaskAttemptEventType.TA_SCHEDULE)); - } + ++nextAttemptNumber; + return attempt; } @Override @@ -705,6 +681,16 @@ public abstract class TaskImpl implements Task, EventHandler { } } + private void sendTaskStartedEvent() { + TaskStartedEvent tse = new TaskStartedEvent( + TypeConverter.fromYarn(taskId), getLaunchTime(), + TypeConverter.fromYarn(taskId.getTaskType()), + getSplitsAsString()); + eventHandler + .handle(new JobHistoryEvent(taskId.getJobId(), tse)); + historyTaskStartGenerated = true; + } + private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) { TaskFinishedEvent tfe = new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId), @@ -740,6 +726,16 @@ public abstract class TaskImpl implements Task, EventHandler { task.successfulAttempt = null; } + private void sendTaskSucceededEvents() { + eventHandler.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED)); + LOG.info("Task succeeded with attempt " + successfulAttempt); + if (historyTaskStartGenerated) { + TaskFinishedEvent tfe = createTaskFinishedEvent(this, + TaskStateInternal.SUCCEEDED); + eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe)); + } + } + /** * @return a String representation of the splits. * @@ -751,6 +747,122 @@ public abstract class TaskImpl implements Task, EventHandler { return ""; } + /** + * Recover a completed task from a previous application attempt + * @param taskInfo recovered info about the task + * @param recoverTaskOutput whether to recover task outputs + * @return state of the task after recovery + */ + private TaskStateInternal recover(TaskInfo taskInfo, + OutputCommitter committer, boolean recoverTaskOutput) { + LOG.info("Recovering task " + taskId + + " from prior app attempt, status was " + taskInfo.getTaskStatus()); + + scheduledTime = taskInfo.getStartTime(); + sendTaskStartedEvent(); + Collection attemptInfos = + taskInfo.getAllTaskAttempts().values(); + + if (attemptInfos.size() > 0) { + metrics.launchedTask(this); + } + + // recover the attempts for this task in the order they finished + // so task attempt completion events are ordered properly + int savedNextAttemptNumber = nextAttemptNumber; + ArrayList taInfos = + new ArrayList(taskInfo.getAllTaskAttempts().values()); + Collections.sort(taInfos, TA_INFO_COMPARATOR); + for (TaskAttemptInfo taInfo : taInfos) { + nextAttemptNumber = taInfo.getAttemptId().getId(); + TaskAttemptImpl attempt = addAttempt(Avataar.VIRGIN); + // handle the recovery inline so attempts complete before task does + attempt.handle(new TaskAttemptRecoverEvent(attempt.getID(), taInfo, + committer, recoverTaskOutput)); + finishedAttempts.add(attempt.getID()); + TaskAttemptCompletionEventStatus taces = null; + TaskAttemptState attemptState = attempt.getState(); + switch (attemptState) { + case FAILED: + taces = TaskAttemptCompletionEventStatus.FAILED; + break; + case KILLED: + taces = TaskAttemptCompletionEventStatus.KILLED; + break; + case SUCCEEDED: + taces = TaskAttemptCompletionEventStatus.SUCCEEDED; + break; + default: + throw new IllegalStateException( + "Unexpected attempt state during recovery: " + attemptState); + } + if (attemptState == TaskAttemptState.FAILED) { + failedAttempts.add(attempt.getID()); + if (failedAttempts.size() >= maxAttempts) { + taces = TaskAttemptCompletionEventStatus.TIPFAILED; + } + } + + // don't clobber the successful attempt completion event + // TODO: this shouldn't be necessary after MAPREDUCE-4330 + if (successfulAttempt == null) { + handleTaskAttemptCompletion(attempt.getID(), taces); + if (attemptState == TaskAttemptState.SUCCEEDED) { + successfulAttempt = attempt.getID(); + } + } + } + nextAttemptNumber = savedNextAttemptNumber; + + TaskStateInternal taskState = TaskStateInternal.valueOf( + taskInfo.getTaskStatus()); + switch (taskState) { + case SUCCEEDED: + if (successfulAttempt != null) { + sendTaskSucceededEvents(); + } else { + LOG.info("Missing successful attempt for task " + taskId + + ", recovering as RUNNING"); + // there must have been a fetch failure and the retry wasn't complete + taskState = TaskStateInternal.RUNNING; + metrics.runningTask(this); + addAndScheduleAttempt(Avataar.VIRGIN); + } + break; + case FAILED: + case KILLED: + { + if (taskState == TaskStateInternal.KILLED && attemptInfos.size() == 0) { + metrics.endWaitingTask(this); + } + TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(), + taskInfo.getFinishTime(), taskInfo.getTaskType(), + taskInfo.getError(), taskInfo.getTaskStatus(), + taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters()); + eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe)); + eventHandler.handle( + new JobTaskEvent(taskId, getExternalState(taskState))); + break; + } + default: + throw new java.lang.AssertionError("Unexpected recovered task state: " + + taskState); + } + + return taskState; + } + + private static class RecoverTransition + implements MultipleArcTransition { + + @Override + public TaskStateInternal transition(TaskImpl task, TaskEvent event) { + TaskRecoverEvent tre = (TaskRecoverEvent) event; + return task.recover(tre.getTaskInfo(), tre.getOutputCommitter(), + tre.getRecoverTaskOutput()); + } + } + private static class InitialScheduleTransition implements SingleArcTransition { @@ -758,13 +870,7 @@ public abstract class TaskImpl implements Task, EventHandler { public void transition(TaskImpl task, TaskEvent event) { task.addAndScheduleAttempt(Avataar.VIRGIN); task.scheduledTime = task.clock.getTime(); - TaskStartedEvent tse = new TaskStartedEvent( - TypeConverter.fromYarn(task.taskId), task.getLaunchTime(), - TypeConverter.fromYarn(task.taskId.getTaskType()), - task.getSplitsAsString()); - task.eventHandler - .handle(new JobHistoryEvent(task.taskId.getJobId(), tse)); - task.historyTaskStartGenerated = true; + task.sendTaskStartedEvent(); } } @@ -818,16 +924,7 @@ public abstract class TaskImpl implements Task, EventHandler { task.finishedAttempts.add(taskAttemptId); task.inProgressAttempts.remove(taskAttemptId); task.successfulAttempt = taskAttemptId; - task.eventHandler.handle(new JobTaskEvent( - task.taskId, TaskState.SUCCEEDED)); - LOG.info("Task succeeded with attempt " + task.successfulAttempt); - // issue kill to all other attempts - if (task.historyTaskStartGenerated) { - TaskFinishedEvent tfe = createTaskFinishedEvent(task, - TaskStateInternal.SUCCEEDED); - task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), - tfe)); - } + task.sendTaskSucceededEvents(); for (TaskAttempt attempt : task.attempts.values()) { if (attempt.getID() != task.successfulAttempt && // This is okay because it can only talk us out of sending a diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java deleted file mode 100644 index c7134a46bd7..00000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java +++ /dev/null @@ -1,39 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.mapreduce.v2.app.recover; - -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; -import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; -import org.apache.hadoop.mapreduce.v2.api.records.TaskId; -import org.apache.hadoop.yarn.Clock; -import org.apache.hadoop.yarn.event.Dispatcher; - -public interface Recovery { - - Dispatcher getDispatcher(); - - Clock getClock(); - - Map getCompletedTasks(); - - List getAMInfos(); -} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java deleted file mode 100644 index aca752721a7..00000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ /dev/null @@ -1,480 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.mapreduce.v2.app.recover; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.TypeConverter; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; -import org.apache.hadoop.mapreduce.v2.api.records.Phase; -import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; -import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; -import org.apache.hadoop.mapreduce.v2.api.records.TaskId; -import org.apache.hadoop.mapreduce.v2.api.records.TaskState; -import org.apache.hadoop.mapreduce.v2.app.ControlledClock; -import org.apache.hadoop.mapreduce.v2.app.commit.CommitterTaskAbortEvent; -import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType; -import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; -import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; -import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; -import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; -import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; -import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; -import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; -import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; -import org.apache.hadoop.yarn.Clock; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.service.CompositeService; -import org.apache.hadoop.yarn.service.Service; -import org.apache.hadoop.yarn.util.BuilderUtils; -import org.apache.hadoop.yarn.util.ConverterUtils; - -/* - * Recovers the completed tasks from the previous life of Application Master. - * The completed tasks are deciphered from the history file of the previous life. - * Recovery service intercepts and replay the events for completed tasks. - * While recovery is in progress, the scheduling of new tasks are delayed by - * buffering the task schedule events. - * The recovery service controls the clock while recovery is in progress. - */ - -//TODO: -//task cleanup for all non completed tasks -public class RecoveryService extends CompositeService implements Recovery { - - private static final Log LOG = LogFactory.getLog(RecoveryService.class); - - private final ApplicationAttemptId applicationAttemptId; - private final OutputCommitter committer; - private final boolean newApiCommitter; - private final Dispatcher dispatcher; - private final ControlledClock clock; - - private JobInfo jobInfo = null; - private final Map completedTasks = - new HashMap(); - - private final List pendingTaskScheduleEvents = - new ArrayList(); - - private volatile boolean recoveryMode = false; - - public RecoveryService(ApplicationAttemptId applicationAttemptId, - Clock clock, OutputCommitter committer, boolean newApiCommitter) { - super("RecoveringDispatcher"); - this.applicationAttemptId = applicationAttemptId; - this.committer = committer; - this.newApiCommitter = newApiCommitter; - this.dispatcher = createRecoveryDispatcher(); - this.clock = new ControlledClock(clock); - addService((Service) dispatcher); - } - - @Override - public void init(Configuration conf) { - super.init(conf); - // parse the history file - try { - parse(); - } catch (Exception e) { - LOG.warn(e); - LOG.warn("Could not parse the old history file. Aborting recovery. " - + "Starting afresh.", e); - } - if (completedTasks.size() > 0) { - recoveryMode = true; - LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS " - + "TO RECOVER " + completedTasks.size()); - LOG.info("Job launch time " + jobInfo.getLaunchTime()); - clock.setTime(jobInfo.getLaunchTime()); - } - } - - @Override - public Dispatcher getDispatcher() { - return dispatcher; - } - - @Override - public Clock getClock() { - return clock; - } - - @Override - public Map getCompletedTasks() { - return completedTasks; - } - - @Override - public List getAMInfos() { - if (jobInfo == null || jobInfo.getAMInfos() == null) { - return new LinkedList(); - } - List amInfos = new LinkedList(); - for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo - .getAMInfos()) { - AMInfo amInfo = - MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(), - jhAmInfo.getStartTime(), jhAmInfo.getContainerId(), - jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(), - jhAmInfo.getNodeManagerHttpPort()); - - amInfos.add(amInfo); - } - return amInfos; - } - - private void parse() throws IOException { - FSDataInputStream in = - getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId); - JobHistoryParser parser = new JobHistoryParser(in); - jobInfo = parser.parse(); - Exception parseException = parser.getParseException(); - if (parseException != null) { - LOG.info("Got an error parsing job-history file" + - ", ignoring incomplete events.", parseException); - } - Map taskInfos = jobInfo - .getAllTasks(); - for (TaskInfo taskInfo : taskInfos.values()) { - if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) { - Iterator> taskAttemptIterator = - taskInfo.getAllTaskAttempts().entrySet().iterator(); - while (taskAttemptIterator.hasNext()) { - Map.Entry currentEntry = taskAttemptIterator.next(); - if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) { - taskAttemptIterator.remove(); - } - } - completedTasks - .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo); - LOG.info("Read from history task " - + TypeConverter.toYarn(taskInfo.getTaskId())); - } - } - LOG.info("Read completed tasks from history " - + completedTasks.size()); - } - - public static FSDataInputStream getPreviousJobHistoryFileStream( - Configuration conf, ApplicationAttemptId applicationAttemptId) - throws IOException { - FSDataInputStream in = null; - Path historyFile = null; - String jobId = - TypeConverter.fromYarn(applicationAttemptId.getApplicationId()) - .toString(); - String jobhistoryDir = - JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId); - Path histDirPath = - FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir)); - LOG.info("Trying file " + histDirPath.toString()); - FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf); - // read the previous history file - historyFile = - fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath, - jobId, (applicationAttemptId.getAttemptId() - 1))); - LOG.info("History file is at " + historyFile); - in = fc.open(historyFile); - return in; - } - - protected Dispatcher createRecoveryDispatcher() { - return new RecoveryDispatcher(); - } - - @SuppressWarnings("rawtypes") - class RecoveryDispatcher extends AsyncDispatcher { - private final EventHandler actualHandler; - private final EventHandler handler; - - RecoveryDispatcher() { - super(); - actualHandler = super.getEventHandler(); - handler = new InterceptingEventHandler(actualHandler); - } - - @Override - @SuppressWarnings("unchecked") - public void dispatch(Event event) { - if (recoveryMode) { - if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) { - TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event) - .getTaskAttemptID()); - LOG.info("Recovered Attempt start time " + attInfo.getStartTime()); - clock.setTime(attInfo.getStartTime()); - - } else if (event.getType() == TaskAttemptEventType.TA_DONE - || event.getType() == TaskAttemptEventType.TA_FAILMSG - || event.getType() == TaskAttemptEventType.TA_KILL) { - TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event) - .getTaskAttemptID()); - LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime()); - clock.setTime(attInfo.getFinishTime()); - } - - else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED - || event.getType() == TaskEventType.T_ATTEMPT_KILLED - || event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) { - TaskTAttemptEvent tEvent = (TaskTAttemptEvent) event; - LOG.info("Recovered Task attempt " + tEvent.getTaskAttemptID()); - TaskInfo taskInfo = completedTasks.get(tEvent.getTaskAttemptID() - .getTaskId()); - taskInfo.getAllTaskAttempts().remove( - TypeConverter.fromYarn(tEvent.getTaskAttemptID())); - // remove the task info from completed tasks if all attempts are - // recovered - if (taskInfo.getAllTaskAttempts().size() == 0) { - completedTasks.remove(tEvent.getTaskAttemptID().getTaskId()); - // checkForRecoveryComplete - LOG.info("CompletedTasks() " + completedTasks.size()); - if (completedTasks.size() == 0) { - recoveryMode = false; - clock.reset(); - LOG.info("Setting the recovery mode to false. " + - "Recovery is complete!"); - - // send all pending tasks schedule events - for (TaskEvent tEv : pendingTaskScheduleEvents) { - actualHandler.handle(tEv); - } - - } - } - } - } - realDispatch(event); - } - - public void realDispatch(Event event) { - super.dispatch(event); - } - - @Override - public EventHandler getEventHandler() { - return handler; - } - } - - private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) { - TaskInfo taskInfo = completedTasks.get(id.getTaskId()); - return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id)); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - private class InterceptingEventHandler implements EventHandler { - EventHandler actualHandler; - - InterceptingEventHandler(EventHandler actualHandler) { - this.actualHandler = actualHandler; - } - - @Override - public void handle(Event event) { - if (!recoveryMode) { - // delegate to the dispatcher one - actualHandler.handle(event); - return; - } - - else if (event.getType() == TaskEventType.T_SCHEDULE) { - TaskEvent taskEvent = (TaskEvent) event; - // delay the scheduling of new tasks till previous ones are recovered - if (completedTasks.get(taskEvent.getTaskID()) == null) { - LOG.debug("Adding to pending task events " - + taskEvent.getTaskID()); - pendingTaskScheduleEvents.add(taskEvent); - return; - } - } - - else if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { - TaskAttemptId aId = ((ContainerAllocatorEvent) event).getAttemptID(); - TaskAttemptInfo attInfo = getTaskAttemptInfo(aId); - LOG.debug("CONTAINER_REQ " + aId); - sendAssignedEvent(aId, attInfo); - return; - } - - else if (event.getType() == CommitterEventType.TASK_ABORT) { - TaskAttemptId aId = ((CommitterTaskAbortEvent) event).getAttemptID(); - LOG.debug("TASK_CLEAN"); - actualHandler.handle(new TaskAttemptEvent(aId, - TaskAttemptEventType.TA_CLEANUP_DONE)); - return; - } - - else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) { - TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event) - .getTaskAttemptID(); - TaskAttemptInfo attInfo = getTaskAttemptInfo(aId); - actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId, - attInfo.getShufflePort())); - // send the status update event - sendStatusUpdateEvent(aId, attInfo); - - TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus()); - switch (state) { - case SUCCEEDED: - //recover the task output - - // check the committer type and construct corresponding context - TaskAttemptContext taskContext = null; - if(newApiCommitter) { - taskContext = new TaskAttemptContextImpl(getConfig(), - attInfo.getAttemptId()); - } else { - taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()), - TypeConverter.fromYarn(aId)); - } - - try { - TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType(); - int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1); - if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) { - committer.recoverTask(taskContext); - LOG.info("Recovered output from task attempt " + attInfo.getAttemptId()); - } else { - LOG.info("Will not try to recover output for " - + taskContext.getTaskAttemptID()); - } - } catch (IOException e) { - LOG.error("Caught an exception while trying to recover task "+aId, e); - actualHandler.handle(new JobDiagnosticsUpdateEvent( - aId.getTaskId().getJobId(), "Error in recovering task output " + - e.getMessage())); - actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(), - JobEventType.INTERNAL_ERROR)); - } - - // send the done event - LOG.info("Sending done event to recovered attempt " + aId); - actualHandler.handle(new TaskAttemptEvent(aId, - TaskAttemptEventType.TA_DONE)); - break; - case KILLED: - LOG.info("Sending kill event to recovered attempt " + aId); - actualHandler.handle(new TaskAttemptEvent(aId, - TaskAttemptEventType.TA_KILL)); - break; - default: - LOG.info("Sending fail event to recovered attempt " + aId); - actualHandler.handle(new TaskAttemptEvent(aId, - TaskAttemptEventType.TA_FAILMSG)); - break; - } - return; - } - - else if (event.getType() == - ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) { - TaskAttemptId aId = ((ContainerLauncherEvent) event) - .getTaskAttemptID(); - actualHandler.handle( - new TaskAttemptEvent(aId, - TaskAttemptEventType.TA_CONTAINER_CLEANED)); - return; - } - - // delegate to the actual handler - actualHandler.handle(event); - } - - private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID, - TaskAttemptInfo attemptInfo) { - LOG.info("Sending status update event to " + yarnAttemptID); - TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus(); - taskAttemptStatus.id = yarnAttemptID; - taskAttemptStatus.progress = 1.0f; - taskAttemptStatus.stateString = attemptInfo.getTaskStatus(); - // taskAttemptStatus.outputSize = attemptInfo.getOutputSize(); - taskAttemptStatus.phase = Phase.CLEANUP; - org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters(); - if (cntrs == null) { - taskAttemptStatus.counters = null; - } else { - taskAttemptStatus.counters = cntrs; - } - actualHandler.handle(new TaskAttemptStatusUpdateEvent( - taskAttemptStatus.id, taskAttemptStatus)); - } - - private void sendAssignedEvent(TaskAttemptId yarnAttemptID, - TaskAttemptInfo attemptInfo) { - LOG.info("Sending assigned event to " + yarnAttemptID); - ContainerId cId = attemptInfo.getContainerId(); - - NodeId nodeId = - ConverterUtils.toNodeId(attemptInfo.getHostname() + ":" - + attemptInfo.getPort()); - // Resource/Priority/ApplicationACLs are only needed while launching the - // container on an NM, these are already completed tasks, so setting them - // to null - Container container = BuilderUtils.newContainer(cId, nodeId, - attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(), - null, null, null); - actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID, - container, null)); - } - } - -} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 4a28ab00637..4ef4d8d9f4b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -414,7 +414,8 @@ public class MRApp extends MRAppMaster { Job newJob = new TestJob(getJobId(), getAttemptID(), conf, getDispatcher().getEventHandler(), getTaskAttemptListener(), getContext().getClock(), - isNewApiCommitter(), currentUser.getUserName(), getContext(), + getCommitter(), isNewApiCommitter(), + currentUser.getUserName(), getContext(), forcedState, diagnostic); ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); @@ -648,12 +649,13 @@ public class MRApp extends MRAppMaster { public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Clock clock, - boolean newApiCommitter, String user, AppContext appContext, + OutputCommitter committer, boolean newApiCommitter, + String user, AppContext appContext, JobStateInternal forcedState, String diagnostic) { super(jobId, getApplicationAttemptId(applicationId, getStartCount()), conf, eventHandler, taskAttemptListener, new JobTokenSecretManager(), new Credentials(), clock, - getCompletedTaskFromPreviousRun(), metrics, + getCompletedTaskFromPreviousRun(), metrics, committer, newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(), appContext, forcedState, diagnostic); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index a0a08c91bce..e89f0374bf1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -18,10 +18,21 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; +import java.util.List; +import java.util.Map; import junit.framework.Assert; @@ -31,36 +42,66 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.Event; +import org.apache.hadoop.mapreduce.jobhistory.EventType; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.MapTaskImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; +import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.ClusterInfo; +import org.apache.hadoop.yarn.SystemClock; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; +import org.mockito.ArgumentCaptor; @SuppressWarnings({"unchecked", "rawtypes"}) public class TestRecovery { @@ -75,6 +116,7 @@ public class TestRecovery { private Text val1 = new Text("val1"); private Text val2 = new Text("val2"); + /** * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt * completely disappears because of failed launch, one attempt gets killed and @@ -1011,6 +1053,423 @@ public class TestRecovery { app.verifyCompleted(); } + @Test + public void testRecoverySuccessAttempt() { + LOG.info("--- START: testRecoverySuccessAttempt ---"); + + long clusterTimestamp = System.currentTimeMillis(); + EventHandler mockEventHandler = mock(EventHandler.class); + MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, + mockEventHandler); + + TaskId taskId = recoverMapTask.getID(); + JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); + TaskID taskID = new TaskID(jobID, + org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); + + //Mock up the TaskAttempts + Map mockTaskAttempts = + new HashMap(); + + TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); + TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, + TaskAttemptState.SUCCEEDED); + mockTaskAttempts.put(taId1, mockTAinfo1); + + TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); + TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, + TaskAttemptState.FAILED); + mockTaskAttempts.put(taId2, mockTAinfo2); + + OutputCommitter mockCommitter = mock (OutputCommitter.class); + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED"); + when(mockTaskInfo.getTaskId()).thenReturn(taskID); + when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); + + recoverMapTask.handle( + new TaskRecoverEvent(taskId, mockTaskInfo,mockCommitter, true)); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler,atLeast(1)).handle( + (org.apache.hadoop.yarn.event.Event) arg.capture()); + + Map finalAttemptStates = + new HashMap(); + finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED); + finalAttemptStates.put(taId2, TaskAttemptState.FAILED); + + List jobHistoryEvents = new ArrayList(); + jobHistoryEvents.add(EventType.TASK_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); + jobHistoryEvents.add(EventType.TASK_FINISHED); + recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates, + arg, jobHistoryEvents, 2L, 1L); + } + + @Test + public void testRecoveryAllFailAttempts() { + LOG.info("--- START: testRecoveryAllFailAttempts ---"); + + long clusterTimestamp = System.currentTimeMillis(); + EventHandler mockEventHandler = mock(EventHandler.class); + MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, + mockEventHandler); + + TaskId taskId = recoverMapTask.getID(); + JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); + TaskID taskID = new TaskID(jobID, + org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); + + //Mock up the TaskAttempts + Map mockTaskAttempts = + new HashMap(); + + TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); + TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, + TaskAttemptState.FAILED); + mockTaskAttempts.put(taId1, mockTAinfo1); + + TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); + TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, + TaskAttemptState.FAILED); + mockTaskAttempts.put(taId2, mockTAinfo2); + + OutputCommitter mockCommitter = mock (OutputCommitter.class); + + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskInfo.getTaskStatus()).thenReturn("FAILED"); + when(mockTaskInfo.getTaskId()).thenReturn(taskID); + when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); + + recoverMapTask.handle( + new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler,atLeast(1)).handle( + (org.apache.hadoop.yarn.event.Event) arg.capture()); + + Map finalAttemptStates = + new HashMap(); + finalAttemptStates.put(taId1, TaskAttemptState.FAILED); + finalAttemptStates.put(taId2, TaskAttemptState.FAILED); + + List jobHistoryEvents = new ArrayList(); + jobHistoryEvents.add(EventType.TASK_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); + jobHistoryEvents.add(EventType.TASK_FAILED); + recoveryChecker(recoverMapTask, TaskState.FAILED, finalAttemptStates, + arg, jobHistoryEvents, 2L, 2L); + } + + @Test + public void testRecoveryTaskSuccessAllAttemptsFail() { + LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---"); + + long clusterTimestamp = System.currentTimeMillis(); + EventHandler mockEventHandler = mock(EventHandler.class); + MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, + mockEventHandler); + + TaskId taskId = recoverMapTask.getID(); + JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); + TaskID taskID = new TaskID(jobID, + org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); + + //Mock up the TaskAttempts + Map mockTaskAttempts = + new HashMap(); + + TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); + TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, + TaskAttemptState.FAILED); + mockTaskAttempts.put(taId1, mockTAinfo1); + + TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); + TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, + TaskAttemptState.FAILED); + mockTaskAttempts.put(taId2, mockTAinfo2); + + OutputCommitter mockCommitter = mock (OutputCommitter.class); + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED"); + when(mockTaskInfo.getTaskId()).thenReturn(taskID); + when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); + + recoverMapTask.handle( + new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler,atLeast(1)).handle( + (org.apache.hadoop.yarn.event.Event) arg.capture()); + + Map finalAttemptStates = + new HashMap(); + finalAttemptStates.put(taId1, TaskAttemptState.FAILED); + finalAttemptStates.put(taId2, TaskAttemptState.FAILED); + // check for one new attempt launched since successful attempt not found + TaskAttemptID taId3 = new TaskAttemptID(taskID, 2000); + finalAttemptStates.put(taId3, TaskAttemptState.NEW); + + List jobHistoryEvents = new ArrayList(); + jobHistoryEvents.add(EventType.TASK_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED); + recoveryChecker(recoverMapTask, TaskState.RUNNING, finalAttemptStates, + arg, jobHistoryEvents, 2L, 2L); + } + + @Test + public void testRecoveryTaskSuccessAllAttemptsSucceed() { + LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---"); + + long clusterTimestamp = System.currentTimeMillis(); + EventHandler mockEventHandler = mock(EventHandler.class); + MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, + mockEventHandler); + + TaskId taskId = recoverMapTask.getID(); + JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); + TaskID taskID = new TaskID(jobID, + org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); + + //Mock up the TaskAttempts + Map mockTaskAttempts = + new HashMap(); + + TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); + TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, + TaskAttemptState.SUCCEEDED); + mockTaskAttempts.put(taId1, mockTAinfo1); + + TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); + TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, + TaskAttemptState.SUCCEEDED); + mockTaskAttempts.put(taId2, mockTAinfo2); + + OutputCommitter mockCommitter = mock (OutputCommitter.class); + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED"); + when(mockTaskInfo.getTaskId()).thenReturn(taskID); + when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); + + recoverMapTask.handle( + new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler,atLeast(1)).handle( + (org.apache.hadoop.yarn.event.Event) arg.capture()); + + Map finalAttemptStates = + new HashMap(); + finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED); + finalAttemptStates.put(taId2, TaskAttemptState.SUCCEEDED); + + List jobHistoryEvents = new ArrayList(); + jobHistoryEvents.add(EventType.TASK_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED); + jobHistoryEvents.add(EventType.TASK_FINISHED); + recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates, + arg, jobHistoryEvents, 2L, 0L); + } + + @Test + public void testRecoveryAllAttemptsKilled() { + LOG.info("--- START: testRecoveryAllAttemptsKilled ---"); + + long clusterTimestamp = System.currentTimeMillis(); + EventHandler mockEventHandler = mock(EventHandler.class); + MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp, + mockEventHandler); + + TaskId taskId = recoverMapTask.getID(); + JobID jobID = new JobID(Long.toString(clusterTimestamp), 1); + TaskID taskID = new TaskID(jobID, + org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId()); + + //Mock up the TaskAttempts + Map mockTaskAttempts = + new HashMap(); + TaskAttemptID taId1 = new TaskAttemptID(taskID, 2); + TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1, + TaskAttemptState.KILLED); + mockTaskAttempts.put(taId1, mockTAinfo1); + + TaskAttemptID taId2 = new TaskAttemptID(taskID, 1); + TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2, + TaskAttemptState.KILLED); + mockTaskAttempts.put(taId2, mockTAinfo2); + + OutputCommitter mockCommitter = mock (OutputCommitter.class); + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskInfo.getTaskStatus()).thenReturn("KILLED"); + when(mockTaskInfo.getTaskId()).thenReturn(taskID); + when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts); + + recoverMapTask.handle( + new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true)); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler,atLeast(1)).handle( + (org.apache.hadoop.yarn.event.Event) arg.capture()); + + Map finalAttemptStates = + new HashMap(); + finalAttemptStates.put(taId1, TaskAttemptState.KILLED); + finalAttemptStates.put(taId2, TaskAttemptState.KILLED); + + List jobHistoryEvents = new ArrayList(); + jobHistoryEvents.add(EventType.TASK_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED); + jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED); + jobHistoryEvents.add(EventType.TASK_FAILED); + recoveryChecker(recoverMapTask, TaskState.KILLED, finalAttemptStates, + arg, jobHistoryEvents, 2L, 0L); + } + + private void recoveryChecker(MapTaskImpl checkTask, TaskState finalState, + Map finalAttemptStates, + ArgumentCaptor arg, List expectedJobHistoryEvents, + long expectedMapLaunches, long expectedFailedMaps) { + + assertEquals("Final State of Task", finalState, checkTask.getState()); + + Map recoveredAttempts = + checkTask.getAttempts(); + assertEquals("Expected Number of Task Attempts", + finalAttemptStates.size(), recoveredAttempts.size()); + for (TaskAttemptID taID : finalAttemptStates.keySet()) { + assertEquals("Expected Task Attempt State", + finalAttemptStates.get(taID), + recoveredAttempts.get(TypeConverter.toYarn(taID)).getState()); + } + + Iterator ie = arg.getAllValues().iterator(); + int eventNum = 0; + long totalLaunchedMaps = 0; + long totalFailedMaps = 0; + boolean jobTaskEventReceived = false; + + while (ie.hasNext()) { + Object current = ie.next(); + ++eventNum; + LOG.info(eventNum + " " + current.getClass().getName()); + if (current instanceof JobHistoryEvent) { + JobHistoryEvent jhe = (JobHistoryEvent) current; + LOG.info(expectedJobHistoryEvents.get(0).toString() + " " + + jhe.getHistoryEvent().getEventType().toString() + " " + + jhe.getJobID()); + assertEquals(expectedJobHistoryEvents.get(0), + jhe.getHistoryEvent().getEventType()); + expectedJobHistoryEvents.remove(0); + } else if (current instanceof JobCounterUpdateEvent) { + JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current; + + LOG.info("JobCounterUpdateEvent " + + jcue.getCounterUpdates().get(0).getCounterKey() + + " " + jcue.getCounterUpdates().get(0).getIncrementValue()); + if (jcue.getCounterUpdates().get(0).getCounterKey() == + JobCounter.NUM_FAILED_MAPS) { + totalFailedMaps += jcue.getCounterUpdates().get(0) + .getIncrementValue(); + } else if (jcue.getCounterUpdates().get(0).getCounterKey() == + JobCounter.TOTAL_LAUNCHED_MAPS) { + totalLaunchedMaps += jcue.getCounterUpdates().get(0) + .getIncrementValue(); + } + } else if (current instanceof JobTaskEvent) { + JobTaskEvent jte = (JobTaskEvent) current; + assertEquals(jte.getState(), finalState); + jobTaskEventReceived = true; + } + } + assertTrue(jobTaskEventReceived || (finalState == TaskState.RUNNING)); + assertEquals("Did not process all expected JobHistoryEvents", + 0, expectedJobHistoryEvents.size()); + assertEquals("Expected Map Launches", + expectedMapLaunches, totalLaunchedMaps); + assertEquals("Expected Failed Maps", + expectedFailedMaps, totalFailedMaps); + } + + private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) { + + ApplicationId appId = BuilderUtils.newApplicationId(clusterTimestamp, 1); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + + int partitions = 2; + + Path remoteJobConfFile = mock(Path.class); + JobConf conf = new JobConf(); + TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class); + Token jobToken = + (Token) mock(Token.class); + Credentials credentials = null; + Clock clock = new SystemClock(); + int appAttemptId = 3; + MRAppMetrics metrics = mock(MRAppMetrics.class); + Resource minContainerRequirements = mock(Resource.class); + when(minContainerRequirements.getMemory()).thenReturn(1000); + + ClusterInfo clusterInfo = mock(ClusterInfo.class); + when(clusterInfo.getMinContainerCapability()).thenReturn( + minContainerRequirements); + AppContext appContext = mock(AppContext.class); + when(appContext.getClusterInfo()).thenReturn(clusterInfo); + + TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); + MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions, + eh, remoteJobConfFile, conf, + taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock, + appAttemptId, metrics, appContext); + return mapTask; + } + + private TaskAttemptInfo getMockTaskAttemptInfo(TaskAttemptID tai, + TaskAttemptState tas) { + + ContainerId ci = mock(ContainerId.class); + Counters counters = mock(Counters.class); + TaskType tt = TaskType.MAP; + + long finishTime = System.currentTimeMillis(); + + TaskAttemptInfo mockTAinfo = mock(TaskAttemptInfo.class); + + when(mockTAinfo.getAttemptId()).thenReturn(tai); + when(mockTAinfo.getContainerId()).thenReturn(ci); + when(mockTAinfo.getCounters()).thenReturn(counters); + when(mockTAinfo.getError()).thenReturn(""); + when(mockTAinfo.getFinishTime()).thenReturn(finishTime); + when(mockTAinfo.getHostname()).thenReturn("localhost"); + when(mockTAinfo.getHttpPort()).thenReturn(23); + when(mockTAinfo.getMapFinishTime()).thenReturn(finishTime - 1000L); + when(mockTAinfo.getPort()).thenReturn(24); + when(mockTAinfo.getRackname()).thenReturn("defaultRack"); + when(mockTAinfo.getShuffleFinishTime()).thenReturn(finishTime - 2000L); + when(mockTAinfo.getShufflePort()).thenReturn(25); + when(mockTAinfo.getSortFinishTime()).thenReturn(finishTime - 3000L); + when(mockTAinfo.getStartTime()).thenReturn(finishTime -10000); + when(mockTAinfo.getState()).thenReturn("task in progress"); + when(mockTAinfo.getTaskStatus()).thenReturn(tas.toString()); + when(mockTAinfo.getTaskType()).thenReturn(tt); + when(mockTAinfo.getTrackerName()).thenReturn("TrackerName"); + return mockTAinfo; + } + private void writeBadOutput(TaskAttempt attempt, Configuration conf) throws Exception { TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, @@ -1145,5 +1604,16 @@ public class TestRecovery { public static void main(String[] arg) throws Exception { TestRecovery test = new TestRecovery(); test.testCrashed(); + test.testMultipleCrashes(); + test.testOutputRecovery(); + test.testOutputRecoveryMapsOnly(); + test.testRecoveryWithOldCommiter(); + test.testSpeculative(); + test.testRecoveryWithoutShuffleSecret(); + test.testRecoverySuccessAttempt(); + test.testRecoveryAllFailAttempts(); + test.testRecoveryTaskSuccessAllAttemptsFail(); + test.testRecoveryTaskSuccessAllAttemptsSucceed(); + test.testRecoveryAllAttemptsKilled(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index 3636273a9c4..10b79aba73e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -316,7 +316,8 @@ import org.junit.Test; Job newJob = new TestJob(getJobId(), getAttemptID(), conf, getDispatcher().getEventHandler(), getTaskAttemptListener(), getContext().getClock(), - isNewApiCommitter(), currentUser.getUserName(), getContext(), + getCommitter(), isNewApiCommitter(), + currentUser.getUserName(), getContext(), forcedState, diagnostic); ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index aaaa4472b8c..88cf949bc96 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.EnumSet; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; @@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.jobhistory.EventType; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus.State; @@ -47,6 +49,7 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; @@ -57,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition; @@ -69,7 +73,6 @@ import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -133,7 +136,7 @@ public class TestJobImpl { JobImpl job = createStubbedJob(conf, dispatcher, 0); job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(job.getID(), JobEventType.JOB_START)); + job.handle(new JobStartEvent(job.getID())); assertJobState(job, JobStateInternal.SUCCEEDED); dispatcher.stop(); commitHandler.stop(); @@ -222,7 +225,7 @@ public class TestJobImpl { JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + job.handle(new JobStartEvent(jobId)); assertJobState(job, JobStateInternal.SETUP); job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); @@ -284,7 +287,7 @@ public class TestJobImpl { JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + job.handle(new JobStartEvent(jobId)); assertJobState(job, JobStateInternal.SETUP); job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL)); @@ -351,7 +354,7 @@ public class TestJobImpl { JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + job.handle(new JobStartEvent(jobId)); assertJobState(job, JobStateInternal.FAIL_ABORT); job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); @@ -388,7 +391,7 @@ public class TestJobImpl { JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + job.handle(new JobStartEvent(jobId)); assertJobState(job, JobStateInternal.SETUP); job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); @@ -428,7 +431,7 @@ public class TestJobImpl { // Verify access JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null, - null, null, true, null, 0, null, null, null, null); + null, null, null, true, null, 0, null, null, null, null); Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -439,7 +442,7 @@ public class TestJobImpl { // Verify access JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null, - null, null, true, null, 0, null, null, null, null); + null, null, null, true, null, 0, null, null, null, null); Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -450,7 +453,7 @@ public class TestJobImpl { // Verify access JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null, - null, null, true, null, 0, null, null, null, null); + null, null, null, true, null, 0, null, null, null, null); Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -461,7 +464,7 @@ public class TestJobImpl { // Verify access JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null, - null, null, true, null, 0, null, null, null, null); + null, null, null, true, null, 0, null, null, null, null); Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -472,7 +475,7 @@ public class TestJobImpl { // Verify access JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null, - null, null, true, null, 0, null, null, null, null); + null, null, null, true, null, 0, null, null, null, null); Assert.assertTrue(job5.checkAccess(ugi1, null)); Assert.assertTrue(job5.checkAccess(ugi2, null)); } @@ -490,7 +493,7 @@ public class TestJobImpl { mock(EventHandler.class), null, mock(JobTokenSecretManager.class), null, new SystemClock(), null, - mrAppMetrics, true, null, 0, null, null, null, null); + mrAppMetrics, null, true, null, 0, null, null, null, null); job.handle(diagUpdateEvent); String diagnostics = job.getReport().getDiagnostics(); Assert.assertNotNull(diagnostics); @@ -501,7 +504,7 @@ public class TestJobImpl { mock(EventHandler.class), null, mock(JobTokenSecretManager.class), null, new SystemClock(), null, - mrAppMetrics, true, null, 0, null, null, null, null); + mrAppMetrics, null, true, null, 0, null, null, null, null); job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); job.handle(diagUpdateEvent); diagnostics = job.getReport().getDiagnostics(); @@ -556,7 +559,7 @@ public class TestJobImpl { JobImpl job = new JobImpl(jobId, Records .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class), null, new JobTokenSecretManager(), new Credentials(), null, null, - mrAppMetrics, true, null, 0, null, null, null, null); + mrAppMetrics, null, true, null, 0, null, null, null, null); InitTransition initTransition = getInitTransition(2); JobEvent mockJobEvent = mock(JobEvent.class); initTransition.transition(job, mockJobEvent); @@ -597,7 +600,7 @@ public class TestJobImpl { JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(jobId, JobEventType.JOB_START)); + job.handle(new JobStartEvent(jobId)); assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED)); @@ -661,7 +664,7 @@ public class TestJobImpl { StubbedJob job = createStubbedJob(conf, dispatcher, numSplits); job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); - job.handle(new JobEvent(job.getID(), JobEventType.JOB_START)); + job.handle(new JobStartEvent(job.getID())); assertJobState(job, JobStateInternal.RUNNING); return job; } @@ -785,9 +788,9 @@ public class TestJobImpl { boolean newApiCommitter, String user, int numSplits) { super(jobId, applicationAttemptId, conf, eventHandler, null, new JobTokenSecretManager(), new Credentials(), - new SystemClock(), null, MRAppMetrics.create(), - newApiCommitter, user, System.currentTimeMillis(), null, null, null, - null); + new SystemClock(), Collections. emptyMap(), + MRAppMetrics.create(), null, newApiCommitter, user, + System.currentTimeMillis(), null, null, null, null); initTransition = getInitTransition(numSplits); localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index d3297b3fb6b..9fd0fb8b1ac 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -27,7 +27,6 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskCounter; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.Avataar; @@ -80,7 +78,6 @@ public class TestTaskImpl { private Path remoteJobConfFile; private Credentials credentials; private Clock clock; - private Map completedTasksFromPreviousRun; private MRAppMetrics metrics; private TaskImpl mockTask; private ApplicationId appId; @@ -104,13 +101,12 @@ public class TestTaskImpl { EventHandler eventHandler, Path remoteJobConfFile, JobConf conf, TaskAttemptListener taskAttemptListener, Token jobToken, - Credentials credentials, Clock clock, - Map completedTasksFromPreviousRun, int startCount, + Credentials credentials, Clock clock, int startCount, MRAppMetrics metrics, AppContext appContext, TaskType taskType) { super(jobId, taskType , partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, jobToken, credentials, clock, - completedTasksFromPreviousRun, startCount, metrics, appContext); + startCount, metrics, appContext); this.taskType = taskType; } @@ -247,8 +243,7 @@ public class TestTaskImpl { return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), remoteJobConfFile, conf, taskAttemptListener, jobToken, credentials, clock, - completedTasksFromPreviousRun, startCount, - metrics, appContext, taskType); + startCount, metrics, appContext, taskType); } @After @@ -652,9 +647,7 @@ public class TestTaskImpl { public void testFailedTransitions() { mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), remoteJobConfFile, conf, taskAttemptListener, jobToken, - credentials, clock, - completedTasksFromPreviousRun, startCount, - metrics, appContext, TaskType.MAP) { + credentials, clock, startCount, metrics, appContext, TaskType.MAP) { @Override protected int getMaxAttempts() { return 1; @@ -721,9 +714,7 @@ public class TestTaskImpl { public void testCountersWithSpeculation() { mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), remoteJobConfFile, conf, taskAttemptListener, jobToken, - credentials, clock, - completedTasksFromPreviousRun, startCount, - metrics, appContext, TaskType.MAP) { + credentials, clock, startCount, metrics, appContext, TaskType.MAP) { @Override protected int getMaxAttempts() { return 1; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java index a2ba7f79d73..66471d367cb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java @@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import com.google.common.base.Joiner; @@ -525,4 +526,19 @@ public class JobHistoryUtils { sb.append(jobId.toString()); return sb.toString(); } + + public static Path getPreviousJobHistoryPath( + Configuration conf, ApplicationAttemptId applicationAttemptId) + throws IOException { + String jobId = + TypeConverter.fromYarn(applicationAttemptId.getApplicationId()) + .toString(); + String jobhistoryDir = + JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId); + Path histDirPath = FileContext.getFileContext(conf).makeQualified( + new Path(jobhistoryDir)); + FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf); + return fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile( + histDirPath,jobId, (applicationAttemptId.getAttemptId() - 1))); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 7a7d22ec212..6e399ee7410 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -422,6 +422,7 @@ public interface MRJobConfig { /** Enable job recovery.*/ public static final String MR_AM_JOB_RECOVERY_ENABLE = MR_AM_PREFIX + "job.recovery.enable"; + public static final boolean MR_AM_JOB_RECOVERY_ENABLE_DEFAULT = true; /** * Limit on the number of reducers that can be preempted to ensure that at