MAPREDUCE-2708. Designed and implemented MR Application Master recovery to make MR AMs resume their progress after restart. Contributed by Sharad Agarwal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1188043 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-24 08:41:48 +00:00
parent 6e0991704f
commit 4086566144
12 changed files with 392 additions and 102 deletions

View File

@ -115,6 +115,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3104. Implemented Application-acls. (vinodkv)
MAPREDUCE-2708. Designed and implemented MR Application Master recovery to
make MR AMs resume their progress after restart. (Sharad Agarwal via vinodkv)
IMPROVEMENTS
MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via

View File

@ -29,11 +29,9 @@
import org.apache.hadoop.mapreduce.ID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
public class MapReduceChildJVM {
@ -131,6 +129,8 @@ public static void setVMEnv(Map<String, String> environment,
MRJobConfig.STDERR_LOGFILE_ENV,
getTaskLogFile(TaskLog.LogName.STDERR)
);
environment.put(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV,
conf.get(MRJobConfig.APPLICATION_ATTEMPT_ID).toString());
}
private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {

View File

@ -239,6 +239,14 @@ private static JobConf configureTask(Task task, Credentials credentials,
Token<JobTokenIdentifier> jt) throws IOException {
final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
job.setCredentials(credentials);
String appAttemptIdEnv = System
.getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
// Set it in conf, so as to be able to be used the the OutputCommitter.
job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer
.parseInt(appAttemptIdEnv));
// set tcp nodelay
job.setBoolean("ipc.client.tcpnodelay", true);
job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,

View File

@ -36,18 +36,25 @@
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -76,12 +83,14 @@
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException;
@ -93,6 +102,8 @@
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
@ -121,6 +132,9 @@ public class MRAppMaster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private Clock clock;
private final long startTime;
private final long appSubmitTime;
@ -143,6 +157,9 @@ public class MRAppMaster extends CompositeService {
private TaskAttemptListener taskAttemptListener;
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
private JobId jobId;
private boolean newApiCommitter;
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
private boolean inRecovery = false;
@ -182,15 +199,39 @@ public void init(final Configuration conf) {
// for an app later
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false)
&& appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. Will try to recover from previous life.");
recoveryServ = new RecoveryService(appAttemptID, clock);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
newApiCommitter = false;
jobId = MRBuilderUtils.newJobId(appAttemptID.getApplicationId(),
appAttemptID.getApplicationId().getId());
int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if ((numReduceTasks > 0 &&
conf.getBoolean("mapred.reducer.new-api", false)) ||
(numReduceTasks == 0 &&
conf.getBoolean("mapred.mapper.new-api", false))) {
newApiCommitter = true;
LOG.info("Using mapred newApiCommitter.");
}
committer = createOutputCommitter(conf);
boolean recoveryEnabled = conf.getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
boolean recoverySupportedByCommitter = committer.isRecoverySupported();
if (recoveryEnabled && recoverySupportedByCommitter
&& appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. "
+ "Will try to recover from previous life on best effort basis.");
recoveryServ = new RecoveryService(appAttemptID, clock,
committer);
addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock();
inRecovery = true;
} else {
LOG.info("Not starting RecoveryService: recoveryEnabled: "
+ recoveryEnabled + " recoverySupportedByCommitter: "
+ recoverySupportedByCommitter + " ApplicationAttemptID: "
+ appAttemptID.getAttemptId());
dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
}
@ -253,7 +294,36 @@ public void init(final Configuration conf) {
super.init(conf);
} // end of init()
private OutputCommitter createOutputCommitter(Configuration conf) {
OutputCommitter committer = null;
LOG.info("OutputCommitter set in config "
+ conf.get("mapred.output.committer.class"));
if (newApiCommitter) {
org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
.newTaskId(jobId, 0, TaskType.MAP);
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
.newTaskAttemptId(taskID, 0);
TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attemptID));
OutputFormat outputFormat;
try {
outputFormat = ReflectionUtils.newInstance(taskContext
.getOutputFormatClass(), conf);
committer = outputFormat.getOutputCommitter(taskContext);
} catch (Exception e) {
throw new YarnException(e);
}
} else {
committer = ReflectionUtils.newInstance(conf.getClass(
"mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), conf);
}
LOG.info("OutputCommitter is " + committer.getClass().getName());
return committer;
}
protected boolean keepJobFiles(JobConf conf) {
return (conf.getKeepTaskFilesPattern() != null || conf
.getKeepFailedTaskFiles());
@ -348,10 +418,10 @@ protected EventHandler<JobFinishEvent> createJobFinishEventHandler() {
protected Job createJob(Configuration conf) {
// create single job
Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, currentUser.getUserName(),
appSubmitTime, amInfos);
Job newJob = new JobImpl(jobId, appAttemptID, conf, dispatcher
.getEventHandler(), taskAttemptListener, jobTokenSecretManager,
fsTokens, clock, completedTasksFromPreviousRun, metrics, committer,
newApiCommitter, currentUser.getUserName(), appSubmitTime, amInfos);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
@ -471,6 +541,22 @@ public ApplicationId getAppID() {
return appAttemptID.getApplicationId();
}
public ApplicationAttemptId getAttemptID() {
return appAttemptID;
}
public JobId getJobId() {
return jobId;
}
public OutputCommitter getCommitter() {
return committer;
}
public boolean isNewApiCommitter() {
return newApiCommitter;
}
public int getStartCount() {
return appAttemptID.getAttemptId();
}

View File

@ -39,15 +39,12 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@ -64,7 +61,6 @@
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
@ -98,14 +94,11 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@ -126,15 +119,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// Maximum no. of fetch-failure notifications after which map task is failed
private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
//final fields
private final ApplicationAttemptId applicationAttemptId;
private final Clock clock;
private final JobACLsManager aclsManager;
private final String username;
private final OutputCommitter committer;
private final Map<JobACL, AccessControlList> jobACLs;
private final Set<TaskId> completedTasksFromPreviousRun;
private final List<AMInfo> amInfos;
@ -142,6 +133,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final Lock writeLock;
private final JobId jobId;
private final String jobName;
private final boolean newApiCommitter;
private final org.apache.hadoop.mapreduce.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener;
private final Object tasksSyncHandle = new Object();
@ -167,7 +159,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private Path remoteJobSubmitDir;
public Path remoteJobConfFile;
private JobContext jobContext;
private OutputCommitter committer;
private int allowedMapFailuresPercent = 0;
private int allowedReduceFailuresPercent = 0;
private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
@ -367,14 +358,16 @@ JobEventType.JOB_KILL, new KillTasksTransition())
private Token<JobTokenIdentifier> jobToken;
private JobTokenSecretManager jobTokenSecretManager;
public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock,
Credentials fsTokenCredentials, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
String userName, long appSubmitTime, List<AMInfo> amInfos) {
OutputCommitter committer, boolean newApiCommitter, String userName,
long appSubmitTime, List<AMInfo> amInfos) {
this.applicationAttemptId = applicationAttemptId;
this.jobId = recordFactory.newRecordInstance(JobId.class);
this.jobId = jobId;
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
this.conf = conf;
this.metrics = metrics;
@ -383,15 +376,9 @@ public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf,
this.amInfos = amInfos;
this.userName = userName;
this.appSubmitTime = appSubmitTime;
ApplicationId applicationId = applicationAttemptId.getApplicationId();
jobId.setAppId(applicationId);
jobId.setId(applicationId.getId());
oldJobId = TypeConverter.fromYarn(jobId);
LOG.info("Job created" +
" appId=" + applicationId +
" jobId=" + jobId +
" oldJobId=" + oldJobId);
this.oldJobId = TypeConverter.fromYarn(jobId);
this.newApiCommitter = newApiCommitter;
this.taskAttemptListener = taskAttemptListener;
this.eventHandler = eventHandler;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@ -400,6 +387,7 @@ public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf,
this.fsTokens = fsTokenCredentials;
this.jobTokenSecretManager = jobTokenSecretManager;
this.committer = committer;
this.aclsManager = new JobACLsManager(conf);
this.username = System.getProperty("user.name");
@ -854,47 +842,13 @@ public JobState transition(JobImpl job, JobEvent event) {
checkTaskLimits();
boolean newApiCommitter = false;
if ((job.numReduceTasks > 0 &&
job.conf.getBoolean("mapred.reducer.new-api", false)) ||
(job.numReduceTasks == 0 &&
job.conf.getBoolean("mapred.mapper.new-api", false))) {
newApiCommitter = true;
LOG.info("Using mapred newApiCommitter.");
}
LOG.info("OutputCommitter set in config " + job.conf.get(
"mapred.output.committer.class"));
if (newApiCommitter) {
if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
= RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(TaskId.class));
attemptID.getTaskId().setJobId(job.jobId);
attemptID.getTaskId().setTaskType(TaskType.MAP);
TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.conf,
TypeConverter.fromYarn(attemptID));
try {
OutputFormat outputFormat = ReflectionUtils.newInstance(
taskContext.getOutputFormatClass(), job.conf);
job.committer = outputFormat.getOutputCommitter(taskContext);
} catch(Exception e) {
throw new IOException("Failed to assign outputcommitter", e);
}
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
new JobConf(job.conf), job.oldJobId);
job.committer = ReflectionUtils.newInstance(
job.conf.getClass("mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), job.conf);
}
LOG.info("OutputCommitter is " + job.committer.getClass().getName());
long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {

View File

@ -32,17 +32,23 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@ -53,6 +59,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
@ -84,9 +91,6 @@
//TODO:
//task cleanup for all non completed tasks
//change job output committer to have
// - atomic job output promotion
// - recover output of completed tasks
public class RecoveryService extends CompositeService implements Recovery {
@ -95,6 +99,7 @@ public class RecoveryService extends CompositeService implements Recovery {
private static final Log LOG = LogFactory.getLog(RecoveryService.class);
private final ApplicationAttemptId applicationAttemptId;
private final OutputCommitter committer;
private final Dispatcher dispatcher;
private final ControlledClock clock;
@ -108,9 +113,10 @@ public class RecoveryService extends CompositeService implements Recovery {
private volatile boolean recoveryMode = false;
public RecoveryService(ApplicationAttemptId applicationAttemptId,
Clock clock) {
Clock clock, OutputCommitter committer) {
super("RecoveringDispatcher");
this.applicationAttemptId = applicationAttemptId;
this.committer = committer;
this.dispatcher = new RecoveryDispatcher();
this.clock = new ControlledClock(clock);
addService((Service) dispatcher);
@ -122,17 +128,17 @@ public void init(Configuration conf) {
// parse the history file
try {
parse();
if (completedTasks.size() > 0) {
recoveryMode = true;
LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS " +
"TO RECOVER " + completedTasks.size());
LOG.info("Job launch time " + jobInfo.getLaunchTime());
clock.setTime(jobInfo.getLaunchTime());
}
} catch (IOException e) {
} catch (Exception e) {
LOG.warn(e);
LOG.warn("Could not parse the old history file. Aborting recovery. "
+ "Starting afresh.");
+ "Starting afresh.", e);
}
if (completedTasks.size() > 0) {
recoveryMode = true;
LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS "
+ "TO RECOVER " + completedTasks.size());
LOG.info("Job launch time " + jobInfo.getLaunchTime());
clock.setTime(jobInfo.getLaunchTime());
}
}
@ -315,6 +321,20 @@ else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)
TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
switch (state) {
case SUCCEEDED:
//recover the task output
TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
attInfo.getAttemptId());
try {
committer.recoverTask(taskContext);
} catch (IOException e) {
actualHandler.handle(new JobDiagnosticsUpdateEvent(
aId.getTaskId().getJobId(), "Error in recovering task output " +
e.getMessage()));
actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
JobEventType.INTERNAL_ERROR));
}
LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
// send the done event
LOG.info("Sending done event to " + aId);
actualHandler.handle(new TaskAttemptEvent(aId,
@ -334,6 +354,16 @@ else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)
return;
}
else if (event.getType() ==
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
TaskAttemptId aId = ((ContainerLauncherEvent) event)
.getTaskAttemptID();
actualHandler.handle(
new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
return;
}
// delegate to the actual handler
actualHandler.handle(event);
}

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@ -264,9 +265,11 @@ protected Job createJob(Configuration conf) {
} catch (IOException e) {
throw new YarnException(e);
}
Job newJob = new TestJob(conf, getAppID(), getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
currentUser.getUserName());
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
getCommitter(), isNewApiCommitter(),
currentUser.getUserName());
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
@ -413,13 +416,15 @@ protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
return localStateMachine;
}
public TestJob(Configuration conf, ApplicationId applicationId,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
Clock clock, String user) {
super(getApplicationAttemptId(applicationId, getStartCount()), conf,
eventHandler, taskAttemptListener, new JobTokenSecretManager(),
new Credentials(), clock, getCompletedTaskFromPreviousRun(), metrics,
user, System.currentTimeMillis(), getAllAMInfos());
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock,
OutputCommitter committer, boolean newApiCommitter, String user) {
super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics, committer,
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos());
// This "this leak" is okay because the retained pointer is in an
// instance variable.

View File

@ -342,10 +342,10 @@ private static class FakeJob extends JobImpl {
public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf,
int numMaps, int numReduces) {
super(appAttemptID, conf, null, null, null, null, null, null, null, null,
System.currentTimeMillis(), null);
this.jobId = MRBuilderUtils
.newJobId(appAttemptID.getApplicationId(), 0);
super(MRBuilderUtils.newJobId(appAttemptID.getApplicationId(), 0),
appAttemptID, conf, null, null, null, null, null, null, null, null,
true, null, System.currentTimeMillis(), null);
this.jobId = getID();
this.numMaps = numMaps;
this.numReduces = numReduces;
}

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.mapreduce.v2.app;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Iterator;
import junit.framework.Assert;
@ -25,10 +28,21 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
@ -37,20 +51,34 @@
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
public class TestRecovery {
private static final Log LOG = LogFactory.getLog(TestRecovery.class);
private static Path outputDir = new Path(new File("target",
TestRecovery.class.getName()).getAbsolutePath() +
Path.SEPARATOR + "out");
private static String partFile = "part-r-00000";
private Text key1 = new Text("key1");
private Text key2 = new Text("key2");
private Text val1 = new Text("val1");
private Text val2 = new Text("val2");
@Test
public void testCrashed() throws Exception {
int runCount = 0;
long am1StartTimeEst = System.currentTimeMillis();
MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
long jobStartTime = job.getReport().getStartTime();
@ -135,6 +163,9 @@ public void testCrashed() throws Exception {
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
@ -201,7 +232,165 @@ public void testCrashed() throws Exception {
// TODO Add verification of additional data from jobHistory - whatever was
// available in the failed attempt should be available here
}
@Test
public void testOutputRecovery() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task reduceTask1 = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
.next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
//send the done signal to the map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(reduceTask1, TaskState.RUNNING);
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
// write output corresponding to reduce1
writeOutput(reduce1Attempt1, conf);
//send the done signal to the 1st reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for first reduce task to complete
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
//stop the app before the job completes.
app.stop();
//rerun
//in rerun the map will be recovered from previous run
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, ++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
reduceTask1 = it.next();
Task reduceTask2 = it.next();
// map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// first reduce will be recovered, no need to send done
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
app.waitForState(reduceTask2, TaskState.RUNNING);
TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
.iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
//send the done signal to the 2nd reduce task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce2Attempt.getID(),
TaskAttemptEventType.TA_DONE));
//wait to get it completed
app.waitForState(reduceTask2, TaskState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
validateOutput();
}
private void writeOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attempt.getID()));
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat
.getRecordWriter(tContext);
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key1, val1);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val1);
theRecordWriter.write(nullWritable, val2);
theRecordWriter.write(key2, nullWritable);
theRecordWriter.write(key1, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key2, val2);
} finally {
theRecordWriter.close(tContext);
}
OutputFormat outputFormat = ReflectionUtils.newInstance(
tContext.getOutputFormatClass(), conf);
OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
committer.commitTask(tContext);
}
private void validateOutput() throws IOException {
File expectedFile = new File(new Path(outputDir, partFile).toString());
StringBuffer expectedOutput = new StringBuffer();
expectedOutput.append(key1).append('\t').append(val1).append("\n");
expectedOutput.append(val1).append("\n");
expectedOutput.append(val2).append("\n");
expectedOutput.append(key2).append("\n");
expectedOutput.append(key1).append("\n");
expectedOutput.append(key2).append('\t').append(val2).append("\n");
String output = slurp(expectedFile);
Assert.assertEquals(output, expectedOutput.toString());
}
public static String slurp(File f) throws IOException {
int len = (int) f.length();
byte[] buf = new byte[len];
FileInputStream in = new FileInputStream(f);
String contents = null;
try {
in.read(buf, 0, len);
contents = new String(buf, "UTF-8");
} finally {
in.close();
}
return contents;
}
class MRAppWithHistory extends MRApp {
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) {

View File

@ -449,6 +449,8 @@ public interface MRJobConfig {
public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV";
// This should be the directory where splits file gets localized on the node
// running ApplicationMaster.
public static final String JOB_SUBMIT_DIR = "jobSubmitDir";

View File

@ -25,7 +25,6 @@
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
import org.apache.commons.logging.Log;
@ -72,6 +71,7 @@ synchronized void write(HistoryEvent event) throws IOException {
void flush() throws IOException {
encoder.flush();
out.flush();
out.hflush();
}
void close() throws IOException {

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.lib.output;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobContext;
@ -56,6 +58,17 @@ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
}
public void setupJob(JobContext jobContext) { }
public void setupTask(TaskAttemptContext taskContext) { }
@Override
public boolean isRecoverySupported() {
return true;
}
@Override
public void recoverTask(TaskAttemptContext taskContext)
throws IOException {
// Nothing to do for recovering the task.
}
};
}
}