From 6eadd0201d0fc03442bcf391d8ec58f2fed2a0cf Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 14 Feb 2012 00:11:54 +0000 Subject: [PATCH] MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then the recovery. (vinodkv) svn merge --ignore-ancestry -c 1243752 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1243755 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../jobhistory/JobHistoryEventHandler.java | 8 +- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 9 +- .../mapreduce/v2/app/job/impl/JobImpl.java | 5 +- .../v2/app/job/impl/MapTaskImpl.java | 7 +- .../v2/app/job/impl/ReduceTaskImpl.java | 7 +- .../mapreduce/v2/app/job/impl/TaskImpl.java | 96 ++++++++++++++----- .../mapreduce/v2/app/recover/Recovery.java | 5 +- .../v2/app/recover/RecoveryService.java | 19 ++-- .../hadoop/mapreduce/v2/app/TestRecovery.java | 82 ++++++++++++---- .../v2/app/job/impl/TestTaskImpl.java | 7 +- .../hadoop/mapreduce/TypeConverter.java | 5 +- 12 files changed, 180 insertions(+), 73 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d084f61278e..9e1b92f03bc 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -759,6 +759,9 @@ Release 0.23.1 - 2012-02-08 MAPREDUCE-3843. Job summary log file found missing on the RM host (Anupam Seth via tgraves) + MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then + the recovery. (vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 7e8c3163bc3..212c86cf77e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -244,7 +244,7 @@ public class JobHistoryEventHandler extends AbstractService while (!stopped && !Thread.currentThread().isInterrupted()) { // Log the size of the history-event-queue every so often. - if (eventCounter % 1000 == 0) { + if (eventCounter != 0 && eventCounter % 1000 == 0) { eventCounter = 0; LOG.info("Size of the JobHistory event queue is " + eventQueue.size()); @@ -464,8 +464,10 @@ public class JobHistoryEventHandler extends AbstractService } processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); - LOG.info("In HistoryEventHandler " - + event.getHistoryEvent().getEventType()); + if (LOG.isDebugEnabled()) { + LOG.debug("In HistoryEventHandler " + + event.getHistoryEvent().getEventType()); + } } catch (IOException e) { LOG.error("Error writing History Event: " + event.getHistoryEvent(), e); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 6097e377d18..6c45574b7dc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -26,7 +26,6 @@ import java.security.PrivilegedExceptionAction; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; @@ -123,7 +123,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; * The information is shared across different components using AppContext. */ -@SuppressWarnings("deprecation") +@SuppressWarnings("rawtypes") public class MRAppMaster extends CompositeService { private static final Log LOG = LogFactory.getLog(MRAppMaster.class); @@ -138,7 +138,7 @@ public class MRAppMaster extends CompositeService { private final int nmPort; private final int nmHttpPort; protected final MRAppMetrics metrics; - private Set completedTasksFromPreviousRun; + private Map completedTasksFromPreviousRun; private List amInfos; private AppContext context; private Dispatcher dispatcher; @@ -596,7 +596,7 @@ public class MRAppMaster extends CompositeService { return dispatcher; } - public Set getCompletedTaskFromPreviousRun() { + public Map getCompletedTaskFromPreviousRun() { return completedTasksFromPreviousRun; } @@ -737,7 +737,6 @@ public class MRAppMaster extends CompositeService { return jobs; } - @SuppressWarnings("rawtypes") @Override public EventHandler getEventHandler() { return dispatcher.getEventHandler(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index e647dc31c97..cd357a23da1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent; import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; @@ -133,7 +134,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private float cleanupWeight = 0.05f; private float mapWeight = 0.0f; private float reduceWeight = 0.0f; - private final Set completedTasksFromPreviousRun; + private final Map completedTasksFromPreviousRun; private final List amInfos; private final Lock readLock; private final Lock writeLock; @@ -376,7 +377,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, TaskAttemptListener taskAttemptListener, JobTokenSecretManager jobTokenSecretManager, Credentials fsTokenCredentials, Clock clock, - Set completedTasksFromPreviousRun, MRAppMetrics metrics, + Map completedTasksFromPreviousRun, MRAppMetrics metrics, OutputCommitter committer, boolean newApiCommitter, String userName, long appSubmitTime, List amInfos) { this.applicationAttemptId = applicationAttemptId; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java index 5bf3d94c877..5b0901eba2d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java @@ -19,13 +19,14 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import java.util.Collection; -import java.util.Set; +import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapTaskAttemptImpl; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -38,7 +39,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.event.EventHandler; -@SuppressWarnings({ "rawtypes", "deprecation" }) +@SuppressWarnings({ "rawtypes" }) public class MapTaskImpl extends TaskImpl { private final TaskSplitMetaInfo taskSplitMetaInfo; @@ -49,7 +50,7 @@ public class MapTaskImpl extends TaskImpl { TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, Collection> fsTokens, Clock clock, - Set completedTasksFromPreviousRun, int startCount, + Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics) { super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, committer, jobToken, fsTokens, clock, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java index a2f386aaab0..4258fdfbc31 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java @@ -19,13 +19,14 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import java.util.Collection; -import java.util.Set; +import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.ReduceTaskAttemptImpl; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; @@ -37,7 +38,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.event.EventHandler; -@SuppressWarnings({ "rawtypes", "deprecation" }) +@SuppressWarnings({ "rawtypes" }) public class ReduceTaskImpl extends TaskImpl { private final int numMapTasks; @@ -47,7 +48,7 @@ public class ReduceTaskImpl extends TaskImpl { int numMapTasks, TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, Collection> fsTokens, Clock clock, - Set completedTasksFromPreviousRun, int startCount, + Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics) { super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf, taskAttemptListener, committer, jobToken, fsTokens, clock, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 9dc135dc1be..e472e99cd21 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -18,13 +18,14 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -35,8 +36,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; @@ -66,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; @@ -208,8 +213,23 @@ public abstract class TaskImpl implements Task, EventHandler { private final StateMachine stateMachine; - - protected int nextAttemptNumber; + + // By default, the next TaskAttempt number is zero. Changes during recovery + protected int nextAttemptNumber = 0; + private List taskAttemptsFromPreviousGeneration = + new ArrayList(); + + private static final class RecoverdAttemptsComparator implements + Comparator { + @Override + public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) { + long diff = attempt1.getStartTime() - attempt2.getStartTime(); + return diff == 0 ? 0 : (diff < 0 ? -1 : 1); + } + } + + private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR = + new RecoverdAttemptsComparator(); //should be set to one which comes first //saying COMMIT_PENDING @@ -230,7 +250,7 @@ public abstract class TaskImpl implements Task, EventHandler { TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, Collection> fsTokens, Clock clock, - Set completedTasksFromPreviousRun, int startCount, + Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics) { this.conf = conf; this.clock = clock; @@ -243,10 +263,7 @@ public abstract class TaskImpl implements Task, EventHandler { // have a convention that none of the overrides depends on any // fields that need initialization. maxAttempts = getMaxAttempts(); - taskId = recordFactory.newRecordInstance(TaskId.class); - taskId.setJobId(jobId); - taskId.setId(partition); - taskId.setTaskType(taskType); + taskId = MRBuilderUtils.newTaskId(jobId, partition, taskType); this.partition = partition; this.taskAttemptListener = taskAttemptListener; this.eventHandler = eventHandler; @@ -255,17 +272,37 @@ public abstract class TaskImpl implements Task, EventHandler { this.jobToken = jobToken; this.metrics = metrics; + // See if this is from a previous generation. if (completedTasksFromPreviousRun != null - && completedTasksFromPreviousRun.contains(taskId)) { + && completedTasksFromPreviousRun.containsKey(taskId)) { + // This task has TaskAttempts from previous generation. We have to replay + // them. LOG.info("Task is from previous run " + taskId); - startCount = startCount - 1; + TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId); + Map allAttempts = + taskInfo.getAllTaskAttempts(); + taskAttemptsFromPreviousGeneration = new ArrayList(); + taskAttemptsFromPreviousGeneration.addAll(allAttempts.values()); + Collections.sort(taskAttemptsFromPreviousGeneration, + RECOVERED_ATTEMPTS_COMPARATOR); } - //attempt ids are generated based on MR app startCount so that attempts - //from previous lives don't overstep the current one. - //this assumes that a task won't have more than 1000 attempts in its single - //life - nextAttemptNumber = (startCount - 1) * 1000; + if (taskAttemptsFromPreviousGeneration.isEmpty()) { + // All the previous attempts are exhausted, now start with a new + // generation. + + // All the new TaskAttemptIDs are generated based on MR + // ApplicationAttemptID so that attempts from previous lives don't + // over-step the current one. This assumes that a task won't have more + // than 1000 attempts in its single generation, which is very reasonable. + // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts + // and requires serious medical attention. + nextAttemptNumber = (startCount - 1) * 1000; + } else { + // There are still some TaskAttempts from previous generation, use them + nextAttemptNumber = + taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId(); + } // This "this leak" is okay because the retained pointer is in an // instance variable. @@ -390,17 +427,23 @@ public abstract class TaskImpl implements Task, EventHandler { //this is always called in read/write lock private long getLaunchTime() { - long launchTime = 0; + long taskLaunchTime = 0; + boolean launchTimeSet = false; for (TaskAttempt at : attempts.values()) { - //select the least launch time of all attempts - if (launchTime == 0 || launchTime > at.getLaunchTime()) { - launchTime = at.getLaunchTime(); + // select the least launch time of all attempts + long attemptLaunchTime = at.getLaunchTime(); + if (attemptLaunchTime != 0 && !launchTimeSet) { + // For the first non-zero launch time + launchTimeSet = true; + taskLaunchTime = attemptLaunchTime; + } else if (attemptLaunchTime != 0 && taskLaunchTime > attemptLaunchTime) { + taskLaunchTime = attemptLaunchTime; } } - if (launchTime == 0) { + if (!launchTimeSet) { return this.scheduledTime; } - return launchTime; + return taskLaunchTime; } //this is always called in read/write lock @@ -525,7 +568,16 @@ public abstract class TaskImpl implements Task, EventHandler { attempts.put(attempt.getID(), attempt); break; } - ++nextAttemptNumber; + + // Update nextATtemptNumber + if (taskAttemptsFromPreviousGeneration.isEmpty()) { + ++nextAttemptNumber; + } else { + // There are still some TaskAttempts from previous generation, use them + nextAttemptNumber = + taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId(); + } + ++numberUncompletedAttempts; //schedule the nextAttemptNumber if (failedAttempts > 0) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java index 95c4919d224..c7134a46bd7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java @@ -19,8 +19,9 @@ package org.apache.hadoop.mapreduce.v2.app.recover; import java.util.List; -import java.util.Set; +import java.util.Map; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.yarn.Clock; @@ -32,7 +33,7 @@ public interface Recovery { Clock getClock(); - Set getCompletedTasks(); + Map getCompletedTasks(); List getAMInfos(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index e6831f83557..3bf6e075849 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; @@ -153,8 +153,8 @@ public class RecoveryService extends CompositeService implements Recovery { } @Override - public Set getCompletedTasks() { - return completedTasks.keySet(); + public Map getCompletedTasks() { + return completedTasks; } @Override @@ -189,7 +189,8 @@ public class RecoveryService extends CompositeService implements Recovery { getConfig()); //read the previous history file historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile( - histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1))); + histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1))); + LOG.info("History file is at " + historyFile); in = fc.open(historyFile); JobHistoryParser parser = new JobHistoryParser(in); jobInfo = parser.parse(); @@ -242,7 +243,7 @@ public class RecoveryService extends CompositeService implements Recovery { if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) { TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event) .getTaskAttemptID()); - LOG.info("Attempt start time " + attInfo.getStartTime()); + LOG.info("Recovered Attempt start time " + attInfo.getStartTime()); clock.setTime(attInfo.getStartTime()); } else if (event.getType() == TaskAttemptEventType.TA_DONE @@ -250,7 +251,7 @@ public class RecoveryService extends CompositeService implements Recovery { || event.getType() == TaskAttemptEventType.TA_KILL) { TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event) .getTaskAttemptID()); - LOG.info("Attempt finish time " + attInfo.getFinishTime()); + LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime()); clock.setTime(attInfo.getFinishTime()); } @@ -380,17 +381,17 @@ public class RecoveryService extends CompositeService implements Recovery { } // send the done event - LOG.info("Sending done event to " + aId); + LOG.info("Sending done event to recovered attempt " + aId); actualHandler.handle(new TaskAttemptEvent(aId, TaskAttemptEventType.TA_DONE)); break; case KILLED: - LOG.info("Sending kill event to " + aId); + LOG.info("Sending kill event to recovered attempt " + aId); actualHandler.handle(new TaskAttemptEvent(aId, TaskAttemptEventType.TA_KILL)); break; default: - LOG.info("Sending fail event to " + aId); + LOG.info("Sending fail event to recovered attempt " + aId); actualHandler.handle(new TaskAttemptEvent(aId, TaskAttemptEventType.TA_FAILMSG)); break; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 376227b51ef..3f4fa4070cd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.job.Job; @@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.recover.Recovery; import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService; import org.apache.hadoop.util.ReflectionUtils; @@ -74,7 +76,14 @@ public class TestRecovery { private Text val1 = new Text("val1"); private Text val2 = new Text("val2"); - + /** + * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt + * completely disappears because of failed launch, one attempt gets killed and + * one attempt succeeds. AM crashes after the first tasks finishes and + * recovers completely and succeeds in the second generation. + * + * @throws Exception + */ @Test public void testCrashed() throws Exception { @@ -112,7 +121,8 @@ public class TestRecovery { // reduces must be in NEW state Assert.assertEquals("Reduce Task state not correct", TaskState.RUNNING, reduceTask.getReport().getTaskState()); - + + /////////// Play some games with the TaskAttempts of the first task ////// //send the fail signal to the 1st map task attempt app.getContext().getEventHandler().handle( new TaskAttemptEvent( @@ -120,29 +130,31 @@ public class TestRecovery { TaskAttemptEventType.TA_FAILMSG)); app.waitForState(task1Attempt1, TaskAttemptState.FAILED); - - while (mapTask1.getAttempts().size() != 2) { + + int timeOut = 0; + while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) { Thread.sleep(2000); LOG.info("Waiting for next attempt to start"); } + Assert.assertEquals(2, mapTask1.getAttempts().size()); Iterator itr = mapTask1.getAttempts().values().iterator(); itr.next(); TaskAttempt task1Attempt2 = itr.next(); - app.waitForState(task1Attempt2, TaskAttemptState.RUNNING); - - //send the kill signal to the 1st map 2nd attempt + // This attempt will automatically fail because of the way ContainerLauncher + // is setup + // This attempt 'disappears' from JobHistory and so causes MAPREDUCE-3846 app.getContext().getEventHandler().handle( - new TaskAttemptEvent( - task1Attempt2.getID(), - TaskAttemptEventType.TA_KILL)); - - app.waitForState(task1Attempt2, TaskAttemptState.KILLED); - - while (mapTask1.getAttempts().size() != 3) { + new TaskAttemptEvent(task1Attempt2.getID(), + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)); + app.waitForState(task1Attempt2, TaskAttemptState.FAILED); + + timeOut = 0; + while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) { Thread.sleep(2000); LOG.info("Waiting for next attempt to start"); } + Assert.assertEquals(3, mapTask1.getAttempts().size()); itr = mapTask1.getAttempts().values().iterator(); itr.next(); itr.next(); @@ -150,12 +162,36 @@ public class TestRecovery { app.waitForState(task1Attempt3, TaskAttemptState.RUNNING); - //send the done signal to the 1st map 3rd attempt + //send the kill signal to the 1st map 3rd attempt app.getContext().getEventHandler().handle( new TaskAttemptEvent( task1Attempt3.getID(), + TaskAttemptEventType.TA_KILL)); + + app.waitForState(task1Attempt3, TaskAttemptState.KILLED); + + timeOut = 0; + while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) { + Thread.sleep(2000); + LOG.info("Waiting for next attempt to start"); + } + Assert.assertEquals(4, mapTask1.getAttempts().size()); + itr = mapTask1.getAttempts().values().iterator(); + itr.next(); + itr.next(); + itr.next(); + TaskAttempt task1Attempt4 = itr.next(); + + app.waitForState(task1Attempt4, TaskAttemptState.RUNNING); + + //send the done signal to the 1st map 4th attempt + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt4.getID(), TaskAttemptEventType.TA_DONE)); + /////////// End of games with the TaskAttempts of the first task ////// + //wait for first map task to complete app.waitForState(mapTask1, TaskState.SUCCEEDED); long task1StartTime = mapTask1.getReport().getStartTime(); @@ -552,7 +588,7 @@ public class TestRecovery { } - class MRAppWithHistory extends MRApp { + static class MRAppWithHistory extends MRApp { public MRAppWithHistory(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) { super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); @@ -567,7 +603,17 @@ public class TestRecovery { @Override protected ContainerLauncher createContainerLauncher(AppContext context) { - MockContainerLauncher launcher = new MockContainerLauncher(); + MockContainerLauncher launcher = new MockContainerLauncher() { + @Override + public void handle(ContainerLauncherEvent event) { + TaskAttemptId taskAttemptID = event.getTaskAttemptID(); + // Pass everything except the 2nd attempt of the first task. + if (taskAttemptID.getId() != 1 + || taskAttemptID.getTaskId().getId() != 0) { + super.handle(event); + } + } + }; launcher.shufflePort = 5467; return launcher; } @@ -581,7 +627,7 @@ public class TestRecovery { } } - class RecoveryServiceWithCustomDispatcher extends RecoveryService { + static class RecoveryServiceWithCustomDispatcher extends RecoveryService { public RecoveryServiceWithCustomDispatcher( ApplicationAttemptId applicationAttemptId, Clock clock, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index 4dcb96a561c..dcc9b07cc38 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -25,7 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Set; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -72,7 +73,7 @@ public class TestTaskImpl { private Path remoteJobConfFile; private Collection> fsTokens; private Clock clock; - private Set completedTasksFromPreviousRun; + private Map completedTasksFromPreviousRun; private MRAppMetrics metrics; private TaskImpl mockTask; private ApplicationId appId; @@ -96,7 +97,7 @@ public class TestTaskImpl { TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, Collection> fsTokens, Clock clock, - Set completedTasksFromPreviousRun, int startCount, + Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics) { super(jobId, taskType , partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, committer, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java index ebdb4160ee8..ddabb4c52f6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java @@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -@SuppressWarnings("deprecation") public class TypeConverter { private static RecordFactory recordFactory; @@ -116,8 +115,8 @@ public class TypeConverter { } public static org.apache.hadoop.mapred.TaskID fromYarn(TaskId id) { - return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()), fromYarn(id.getTaskType()), - id.getId()); + return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()), + fromYarn(id.getTaskType()), id.getId()); } public static TaskId toYarn(org.apache.hadoop.mapreduce.TaskID id) {