MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then the recovery. (vinodkv)

svn merge --ignore-ancestry -c 1243752 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1243755 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-02-14 00:11:54 +00:00
parent 10c7f0eb15
commit 6eadd0201d
12 changed files with 180 additions and 73 deletions

View File

@ -759,6 +759,9 @@ Release 0.23.1 - 2012-02-08
MAPREDUCE-3843. Job summary log file found missing on the RM host MAPREDUCE-3843. Job summary log file found missing on the RM host
(Anupam Seth via tgraves) (Anupam Seth via tgraves)
MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then
the recovery. (vinodkv)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -244,7 +244,7 @@ public class JobHistoryEventHandler extends AbstractService
while (!stopped && !Thread.currentThread().isInterrupted()) { while (!stopped && !Thread.currentThread().isInterrupted()) {
// Log the size of the history-event-queue every so often. // Log the size of the history-event-queue every so often.
if (eventCounter % 1000 == 0) { if (eventCounter != 0 && eventCounter % 1000 == 0) {
eventCounter = 0; eventCounter = 0;
LOG.info("Size of the JobHistory event queue is " LOG.info("Size of the JobHistory event queue is "
+ eventQueue.size()); + eventQueue.size());
@ -464,8 +464,10 @@ public class JobHistoryEventHandler extends AbstractService
} }
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID()); event.getJobID());
LOG.info("In HistoryEventHandler " if (LOG.isDebugEnabled()) {
+ event.getHistoryEvent().getEventType()); LOG.debug("In HistoryEventHandler "
+ event.getHistoryEvent().getEventType());
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error writing History Event: " + event.getHistoryEvent(), LOG.error("Error writing History Event: " + event.getHistoryEvent(),
e); e);

View File

@ -26,7 +26,6 @@ import java.security.PrivilegedExceptionAction;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
@ -123,7 +123,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
* The information is shared across different components using AppContext. * The information is shared across different components using AppContext.
*/ */
@SuppressWarnings("deprecation") @SuppressWarnings("rawtypes")
public class MRAppMaster extends CompositeService { public class MRAppMaster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MRAppMaster.class); private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
@ -138,7 +138,7 @@ public class MRAppMaster extends CompositeService {
private final int nmPort; private final int nmPort;
private final int nmHttpPort; private final int nmHttpPort;
protected final MRAppMetrics metrics; protected final MRAppMetrics metrics;
private Set<TaskId> completedTasksFromPreviousRun; private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private List<AMInfo> amInfos; private List<AMInfo> amInfos;
private AppContext context; private AppContext context;
private Dispatcher dispatcher; private Dispatcher dispatcher;
@ -596,7 +596,7 @@ public class MRAppMaster extends CompositeService {
return dispatcher; return dispatcher;
} }
public Set<TaskId> getCompletedTaskFromPreviousRun() { public Map<TaskId, TaskInfo> getCompletedTaskFromPreviousRun() {
return completedTasksFromPreviousRun; return completedTasksFromPreviousRun;
} }
@ -737,7 +737,6 @@ public class MRAppMaster extends CompositeService {
return jobs; return jobs;
} }
@SuppressWarnings("rawtypes")
@Override @Override
public EventHandler getEventHandler() { public EventHandler getEventHandler() {
return dispatcher.getEventHandler(); return dispatcher.getEventHandler();

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent; import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
@ -133,7 +134,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private float cleanupWeight = 0.05f; private float cleanupWeight = 0.05f;
private float mapWeight = 0.0f; private float mapWeight = 0.0f;
private float reduceWeight = 0.0f; private float reduceWeight = 0.0f;
private final Set<TaskId> completedTasksFromPreviousRun; private final Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private final List<AMInfo> amInfos; private final List<AMInfo> amInfos;
private final Lock readLock; private final Lock readLock;
private final Lock writeLock; private final Lock writeLock;
@ -376,7 +377,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
TaskAttemptListener taskAttemptListener, TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager, JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock, Credentials fsTokenCredentials, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics, Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
OutputCommitter committer, boolean newApiCommitter, String userName, OutputCommitter committer, boolean newApiCommitter, String userName,
long appSubmitTime, List<AMInfo> amInfos) { long appSubmitTime, List<AMInfo> amInfos) {
this.applicationAttemptId = applicationAttemptId; this.applicationAttemptId = applicationAttemptId;

View File

@ -19,13 +19,14 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl; package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.Collection; import java.util.Collection;
import java.util.Set; import java.util.Map;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl; import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -38,7 +39,7 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@SuppressWarnings({ "rawtypes", "deprecation" }) @SuppressWarnings({ "rawtypes" })
public class MapTaskImpl extends TaskImpl { public class MapTaskImpl extends TaskImpl {
private final TaskSplitMetaInfo taskSplitMetaInfo; private final TaskSplitMetaInfo taskSplitMetaInfo;
@ -49,7 +50,7 @@ public class MapTaskImpl extends TaskImpl {
TaskAttemptListener taskAttemptListener, OutputCommitter committer, TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken, Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock, Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, int startCount, Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics) { MRAppMetrics metrics) {
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile, super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
conf, taskAttemptListener, committer, jobToken, fsTokens, clock, conf, taskAttemptListener, committer, jobToken, fsTokens, clock,

View File

@ -19,13 +19,14 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl; package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.Collection; import java.util.Collection;
import java.util.Set; import java.util.Map;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.ReduceTaskAttemptImpl; import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@ -37,7 +38,7 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@SuppressWarnings({ "rawtypes", "deprecation" }) @SuppressWarnings({ "rawtypes" })
public class ReduceTaskImpl extends TaskImpl { public class ReduceTaskImpl extends TaskImpl {
private final int numMapTasks; private final int numMapTasks;
@ -47,7 +48,7 @@ public class ReduceTaskImpl extends TaskImpl {
int numMapTasks, TaskAttemptListener taskAttemptListener, int numMapTasks, TaskAttemptListener taskAttemptListener,
OutputCommitter committer, Token<JobTokenIdentifier> jobToken, OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock, Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, int startCount, Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics) { MRAppMetrics metrics) {
super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf, super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
taskAttemptListener, committer, jobToken, fsTokens, clock, taskAttemptListener, committer, jobToken, fsTokens, clock,

View File

@ -18,13 +18,14 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl; package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -35,8 +36,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
@ -66,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
@ -209,7 +214,22 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private final StateMachine<TaskState, TaskEventType, TaskEvent> private final StateMachine<TaskState, TaskEventType, TaskEvent>
stateMachine; stateMachine;
protected int nextAttemptNumber; // By default, the next TaskAttempt number is zero. Changes during recovery
protected int nextAttemptNumber = 0;
private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
new ArrayList<TaskAttemptInfo>();
private static final class RecoverdAttemptsComparator implements
Comparator<TaskAttemptInfo> {
@Override
public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
long diff = attempt1.getStartTime() - attempt2.getStartTime();
return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
}
}
private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
new RecoverdAttemptsComparator();
//should be set to one which comes first //should be set to one which comes first
//saying COMMIT_PENDING //saying COMMIT_PENDING
@ -230,7 +250,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskAttemptListener taskAttemptListener, OutputCommitter committer, TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken, Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock, Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, int startCount, Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics) { MRAppMetrics metrics) {
this.conf = conf; this.conf = conf;
this.clock = clock; this.clock = clock;
@ -243,10 +263,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
// have a convention that none of the overrides depends on any // have a convention that none of the overrides depends on any
// fields that need initialization. // fields that need initialization.
maxAttempts = getMaxAttempts(); maxAttempts = getMaxAttempts();
taskId = recordFactory.newRecordInstance(TaskId.class); taskId = MRBuilderUtils.newTaskId(jobId, partition, taskType);
taskId.setJobId(jobId);
taskId.setId(partition);
taskId.setTaskType(taskType);
this.partition = partition; this.partition = partition;
this.taskAttemptListener = taskAttemptListener; this.taskAttemptListener = taskAttemptListener;
this.eventHandler = eventHandler; this.eventHandler = eventHandler;
@ -255,17 +272,37 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
this.jobToken = jobToken; this.jobToken = jobToken;
this.metrics = metrics; this.metrics = metrics;
// See if this is from a previous generation.
if (completedTasksFromPreviousRun != null if (completedTasksFromPreviousRun != null
&& completedTasksFromPreviousRun.contains(taskId)) { && completedTasksFromPreviousRun.containsKey(taskId)) {
// This task has TaskAttempts from previous generation. We have to replay
// them.
LOG.info("Task is from previous run " + taskId); LOG.info("Task is from previous run " + taskId);
startCount = startCount - 1; TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
taskInfo.getAllTaskAttempts();
taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
Collections.sort(taskAttemptsFromPreviousGeneration,
RECOVERED_ATTEMPTS_COMPARATOR);
} }
//attempt ids are generated based on MR app startCount so that attempts if (taskAttemptsFromPreviousGeneration.isEmpty()) {
//from previous lives don't overstep the current one. // All the previous attempts are exhausted, now start with a new
//this assumes that a task won't have more than 1000 attempts in its single // generation.
//life
nextAttemptNumber = (startCount - 1) * 1000; // All the new TaskAttemptIDs are generated based on MR
// ApplicationAttemptID so that attempts from previous lives don't
// over-step the current one. This assumes that a task won't have more
// than 1000 attempts in its single generation, which is very reasonable.
// Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
// and requires serious medical attention.
nextAttemptNumber = (startCount - 1) * 1000;
} else {
// There are still some TaskAttempts from previous generation, use them
nextAttemptNumber =
taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
}
// This "this leak" is okay because the retained pointer is in an // This "this leak" is okay because the retained pointer is in an
// instance variable. // instance variable.
@ -390,17 +427,23 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
//this is always called in read/write lock //this is always called in read/write lock
private long getLaunchTime() { private long getLaunchTime() {
long launchTime = 0; long taskLaunchTime = 0;
boolean launchTimeSet = false;
for (TaskAttempt at : attempts.values()) { for (TaskAttempt at : attempts.values()) {
//select the least launch time of all attempts // select the least launch time of all attempts
if (launchTime == 0 || launchTime > at.getLaunchTime()) { long attemptLaunchTime = at.getLaunchTime();
launchTime = at.getLaunchTime(); if (attemptLaunchTime != 0 && !launchTimeSet) {
// For the first non-zero launch time
launchTimeSet = true;
taskLaunchTime = attemptLaunchTime;
} else if (attemptLaunchTime != 0 && taskLaunchTime > attemptLaunchTime) {
taskLaunchTime = attemptLaunchTime;
} }
} }
if (launchTime == 0) { if (!launchTimeSet) {
return this.scheduledTime; return this.scheduledTime;
} }
return launchTime; return taskLaunchTime;
} }
//this is always called in read/write lock //this is always called in read/write lock
@ -525,7 +568,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
attempts.put(attempt.getID(), attempt); attempts.put(attempt.getID(), attempt);
break; break;
} }
++nextAttemptNumber;
// Update nextATtemptNumber
if (taskAttemptsFromPreviousGeneration.isEmpty()) {
++nextAttemptNumber;
} else {
// There are still some TaskAttempts from previous generation, use them
nextAttemptNumber =
taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
}
++numberUncompletedAttempts; ++numberUncompletedAttempts;
//schedule the nextAttemptNumber //schedule the nextAttemptNumber
if (failedAttempts > 0) { if (failedAttempts > 0) {

View File

@ -19,8 +19,9 @@
package org.apache.hadoop.mapreduce.v2.app.recover; package org.apache.hadoop.mapreduce.v2.app.recover;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Map;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
@ -32,7 +33,7 @@ public interface Recovery {
Clock getClock(); Clock getClock();
Set<TaskId> getCompletedTasks(); Map<TaskId, TaskInfo> getCompletedTasks();
List<AMInfo> getAMInfos(); List<AMInfo> getAMInfos();
} }

View File

@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@ -153,8 +153,8 @@ public class RecoveryService extends CompositeService implements Recovery {
} }
@Override @Override
public Set<TaskId> getCompletedTasks() { public Map<TaskId, TaskInfo> getCompletedTasks() {
return completedTasks.keySet(); return completedTasks;
} }
@Override @Override
@ -190,6 +190,7 @@ public class RecoveryService extends CompositeService implements Recovery {
//read the previous history file //read the previous history file
historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile( historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1))); histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile);
in = fc.open(historyFile); in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in); JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse(); jobInfo = parser.parse();
@ -242,7 +243,7 @@ public class RecoveryService extends CompositeService implements Recovery {
if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) { if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event) TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
.getTaskAttemptID()); .getTaskAttemptID());
LOG.info("Attempt start time " + attInfo.getStartTime()); LOG.info("Recovered Attempt start time " + attInfo.getStartTime());
clock.setTime(attInfo.getStartTime()); clock.setTime(attInfo.getStartTime());
} else if (event.getType() == TaskAttemptEventType.TA_DONE } else if (event.getType() == TaskAttemptEventType.TA_DONE
@ -250,7 +251,7 @@ public class RecoveryService extends CompositeService implements Recovery {
|| event.getType() == TaskAttemptEventType.TA_KILL) { || event.getType() == TaskAttemptEventType.TA_KILL) {
TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event) TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
.getTaskAttemptID()); .getTaskAttemptID());
LOG.info("Attempt finish time " + attInfo.getFinishTime()); LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime());
clock.setTime(attInfo.getFinishTime()); clock.setTime(attInfo.getFinishTime());
} }
@ -380,17 +381,17 @@ public class RecoveryService extends CompositeService implements Recovery {
} }
// send the done event // send the done event
LOG.info("Sending done event to " + aId); LOG.info("Sending done event to recovered attempt " + aId);
actualHandler.handle(new TaskAttemptEvent(aId, actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_DONE)); TaskAttemptEventType.TA_DONE));
break; break;
case KILLED: case KILLED:
LOG.info("Sending kill event to " + aId); LOG.info("Sending kill event to recovered attempt " + aId);
actualHandler.handle(new TaskAttemptEvent(aId, actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_KILL)); TaskAttemptEventType.TA_KILL));
break; break;
default: default:
LOG.info("Sending fail event to " + aId); LOG.info("Sending fail event to recovered attempt " + aId);
actualHandler.handle(new TaskAttemptEvent(aId, actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_FAILMSG)); TaskAttemptEventType.TA_FAILMSG));
break; break;

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery; import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService; import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
@ -74,7 +76,14 @@ public class TestRecovery {
private Text val1 = new Text("val1"); private Text val1 = new Text("val1");
private Text val2 = new Text("val2"); private Text val2 = new Text("val2");
/**
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
* completely disappears because of failed launch, one attempt gets killed and
* one attempt succeeds. AM crashes after the first tasks finishes and
* recovers completely and succeeds in the second generation.
*
* @throws Exception
*/
@Test @Test
public void testCrashed() throws Exception { public void testCrashed() throws Exception {
@ -113,6 +122,7 @@ public class TestRecovery {
Assert.assertEquals("Reduce Task state not correct", Assert.assertEquals("Reduce Task state not correct",
TaskState.RUNNING, reduceTask.getReport().getTaskState()); TaskState.RUNNING, reduceTask.getReport().getTaskState());
/////////// Play some games with the TaskAttempts of the first task //////
//send the fail signal to the 1st map task attempt //send the fail signal to the 1st map task attempt
app.getContext().getEventHandler().handle( app.getContext().getEventHandler().handle(
new TaskAttemptEvent( new TaskAttemptEvent(
@ -121,28 +131,30 @@ public class TestRecovery {
app.waitForState(task1Attempt1, TaskAttemptState.FAILED); app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
while (mapTask1.getAttempts().size() != 2) { int timeOut = 0;
while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
Thread.sleep(2000); Thread.sleep(2000);
LOG.info("Waiting for next attempt to start"); LOG.info("Waiting for next attempt to start");
} }
Assert.assertEquals(2, mapTask1.getAttempts().size());
Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator(); Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
itr.next(); itr.next();
TaskAttempt task1Attempt2 = itr.next(); TaskAttempt task1Attempt2 = itr.next();
app.waitForState(task1Attempt2, TaskAttemptState.RUNNING); // This attempt will automatically fail because of the way ContainerLauncher
// is setup
//send the kill signal to the 1st map 2nd attempt // This attempt 'disappears' from JobHistory and so causes MAPREDUCE-3846
app.getContext().getEventHandler().handle( app.getContext().getEventHandler().handle(
new TaskAttemptEvent( new TaskAttemptEvent(task1Attempt2.getID(),
task1Attempt2.getID(), TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
TaskAttemptEventType.TA_KILL)); app.waitForState(task1Attempt2, TaskAttemptState.FAILED);
app.waitForState(task1Attempt2, TaskAttemptState.KILLED); timeOut = 0;
while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) {
while (mapTask1.getAttempts().size() != 3) {
Thread.sleep(2000); Thread.sleep(2000);
LOG.info("Waiting for next attempt to start"); LOG.info("Waiting for next attempt to start");
} }
Assert.assertEquals(3, mapTask1.getAttempts().size());
itr = mapTask1.getAttempts().values().iterator(); itr = mapTask1.getAttempts().values().iterator();
itr.next(); itr.next();
itr.next(); itr.next();
@ -150,12 +162,36 @@ public class TestRecovery {
app.waitForState(task1Attempt3, TaskAttemptState.RUNNING); app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
//send the done signal to the 1st map 3rd attempt //send the kill signal to the 1st map 3rd attempt
app.getContext().getEventHandler().handle( app.getContext().getEventHandler().handle(
new TaskAttemptEvent( new TaskAttemptEvent(
task1Attempt3.getID(), task1Attempt3.getID(),
TaskAttemptEventType.TA_KILL));
app.waitForState(task1Attempt3, TaskAttemptState.KILLED);
timeOut = 0;
while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) {
Thread.sleep(2000);
LOG.info("Waiting for next attempt to start");
}
Assert.assertEquals(4, mapTask1.getAttempts().size());
itr = mapTask1.getAttempts().values().iterator();
itr.next();
itr.next();
itr.next();
TaskAttempt task1Attempt4 = itr.next();
app.waitForState(task1Attempt4, TaskAttemptState.RUNNING);
//send the done signal to the 1st map 4th attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt4.getID(),
TaskAttemptEventType.TA_DONE)); TaskAttemptEventType.TA_DONE));
/////////// End of games with the TaskAttempts of the first task //////
//wait for first map task to complete //wait for first map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED); app.waitForState(mapTask1, TaskState.SUCCEEDED);
long task1StartTime = mapTask1.getReport().getStartTime(); long task1StartTime = mapTask1.getReport().getStartTime();
@ -552,7 +588,7 @@ public class TestRecovery {
} }
class MRAppWithHistory extends MRApp { static class MRAppWithHistory extends MRApp {
public MRAppWithHistory(int maps, int reduces, boolean autoComplete, public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) { String testName, boolean cleanOnStart, int startCount) {
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
@ -567,7 +603,17 @@ public class TestRecovery {
@Override @Override
protected ContainerLauncher createContainerLauncher(AppContext context) { protected ContainerLauncher createContainerLauncher(AppContext context) {
MockContainerLauncher launcher = new MockContainerLauncher(); MockContainerLauncher launcher = new MockContainerLauncher() {
@Override
public void handle(ContainerLauncherEvent event) {
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
// Pass everything except the 2nd attempt of the first task.
if (taskAttemptID.getId() != 1
|| taskAttemptID.getTaskId().getId() != 0) {
super.handle(event);
}
}
};
launcher.shufflePort = 5467; launcher.shufflePort = 5467;
return launcher; return launcher;
} }
@ -581,7 +627,7 @@ public class TestRecovery {
} }
} }
class RecoveryServiceWithCustomDispatcher extends RecoveryService { static class RecoveryServiceWithCustomDispatcher extends RecoveryService {
public RecoveryServiceWithCustomDispatcher( public RecoveryServiceWithCustomDispatcher(
ApplicationAttemptId applicationAttemptId, Clock clock, ApplicationAttemptId applicationAttemptId, Clock clock,

View File

@ -25,7 +25,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -72,7 +73,7 @@ public class TestTaskImpl {
private Path remoteJobConfFile; private Path remoteJobConfFile;
private Collection<Token<? extends TokenIdentifier>> fsTokens; private Collection<Token<? extends TokenIdentifier>> fsTokens;
private Clock clock; private Clock clock;
private Set<TaskId> completedTasksFromPreviousRun; private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private MRAppMetrics metrics; private MRAppMetrics metrics;
private TaskImpl mockTask; private TaskImpl mockTask;
private ApplicationId appId; private ApplicationId appId;
@ -96,7 +97,7 @@ public class TestTaskImpl {
TaskAttemptListener taskAttemptListener, OutputCommitter committer, TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken, Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock, Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, int startCount, Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics) { MRAppMetrics metrics) {
super(jobId, taskType , partition, eventHandler, super(jobId, taskType , partition, eventHandler,
remoteJobConfFile, conf, taskAttemptListener, committer, remoteJobConfFile, conf, taskAttemptListener, committer,

View File

@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@SuppressWarnings("deprecation")
public class TypeConverter { public class TypeConverter {
private static RecordFactory recordFactory; private static RecordFactory recordFactory;
@ -116,8 +115,8 @@ public class TypeConverter {
} }
public static org.apache.hadoop.mapred.TaskID fromYarn(TaskId id) { public static org.apache.hadoop.mapred.TaskID fromYarn(TaskId id) {
return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()), fromYarn(id.getTaskType()), return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()),
id.getId()); fromYarn(id.getTaskType()), id.getId());
} }
public static TaskId toYarn(org.apache.hadoop.mapreduce.TaskID id) { public static TaskId toYarn(org.apache.hadoop.mapreduce.TaskID id) {