MAPREDUCE-2708. Designed and implemented MR Application Master recovery to make MR AMs resume their progress after restart. Contributed by Sharad Agarwal.

svn merge -c r1188043 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1188044 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-24 08:44:54 +00:00
parent 8f5c47794f
commit 6069c2a82d
12 changed files with 392 additions and 102 deletions

View File

@ -62,6 +62,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3104. Implemented Application-acls. (vinodkv) MAPREDUCE-3104. Implemented Application-acls. (vinodkv)
MAPREDUCE-2708. Designed and implemented MR Application Master recovery to
make MR AMs resume their progress after restart. (Sharad Agarwal via vinodkv)
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via

View File

@ -29,11 +29,9 @@ import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapreduce.ID; import org.apache.hadoop.mapreduce.ID;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
public class MapReduceChildJVM { public class MapReduceChildJVM {
@ -131,6 +129,8 @@ public class MapReduceChildJVM {
MRJobConfig.STDERR_LOGFILE_ENV, MRJobConfig.STDERR_LOGFILE_ENV,
getTaskLogFile(TaskLog.LogName.STDERR) getTaskLogFile(TaskLog.LogName.STDERR)
); );
environment.put(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV,
conf.get(MRJobConfig.APPLICATION_ATTEMPT_ID).toString());
} }
private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) { private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {

View File

@ -239,6 +239,14 @@ class YarnChild {
Token<JobTokenIdentifier> jt) throws IOException { Token<JobTokenIdentifier> jt) throws IOException {
final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE); final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
job.setCredentials(credentials); job.setCredentials(credentials);
String appAttemptIdEnv = System
.getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
// Set it in conf, so as to be able to be used the the OutputCommitter.
job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer
.parseInt(appAttemptIdEnv));
// set tcp nodelay // set tcp nodelay
job.setBoolean("ipc.client.tcpnodelay", true); job.setBoolean("ipc.client.tcpnodelay", true);
job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,

View File

@ -36,18 +36,25 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher; import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl; import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService; import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
@ -76,12 +83,14 @@ import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner; import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl; import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
@ -93,6 +102,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
@ -121,6 +132,9 @@ public class MRAppMaster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MRAppMaster.class); private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private Clock clock; private Clock clock;
private final long startTime; private final long startTime;
private final long appSubmitTime; private final long appSubmitTime;
@ -143,6 +157,9 @@ public class MRAppMaster extends CompositeService {
private TaskAttemptListener taskAttemptListener; private TaskAttemptListener taskAttemptListener;
private JobTokenSecretManager jobTokenSecretManager = private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager(); new JobTokenSecretManager();
private JobId jobId;
private boolean newApiCommitter;
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher; private JobEventDispatcher jobEventDispatcher;
private boolean inRecovery = false; private boolean inRecovery = false;
@ -182,15 +199,39 @@ public class MRAppMaster extends CompositeService {
// for an app later // for an app later
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>"); appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false) conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
newApiCommitter = false;
jobId = MRBuilderUtils.newJobId(appAttemptID.getApplicationId(),
appAttemptID.getApplicationId().getId());
int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if ((numReduceTasks > 0 &&
conf.getBoolean("mapred.reducer.new-api", false)) ||
(numReduceTasks == 0 &&
conf.getBoolean("mapred.mapper.new-api", false))) {
newApiCommitter = true;
LOG.info("Using mapred newApiCommitter.");
}
committer = createOutputCommitter(conf);
boolean recoveryEnabled = conf.getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
boolean recoverySupportedByCommitter = committer.isRecoverySupported();
if (recoveryEnabled && recoverySupportedByCommitter
&& appAttemptID.getAttemptId() > 1) { && appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. Will try to recover from previous life."); LOG.info("Recovery is enabled. "
recoveryServ = new RecoveryService(appAttemptID, clock); + "Will try to recover from previous life on best effort basis.");
recoveryServ = new RecoveryService(appAttemptID, clock,
committer);
addIfService(recoveryServ); addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher(); dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock(); clock = recoveryServ.getClock();
inRecovery = true; inRecovery = true;
} else { } else {
LOG.info("Not starting RecoveryService: recoveryEnabled: "
+ recoveryEnabled + " recoverySupportedByCommitter: "
+ recoverySupportedByCommitter + " ApplicationAttemptID: "
+ appAttemptID.getAttemptId());
dispatcher = new AsyncDispatcher(); dispatcher = new AsyncDispatcher();
addIfService(dispatcher); addIfService(dispatcher);
} }
@ -253,6 +294,35 @@ public class MRAppMaster extends CompositeService {
super.init(conf); super.init(conf);
} // end of init() } // end of init()
private OutputCommitter createOutputCommitter(Configuration conf) {
OutputCommitter committer = null;
LOG.info("OutputCommitter set in config "
+ conf.get("mapred.output.committer.class"));
if (newApiCommitter) {
org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
.newTaskId(jobId, 0, TaskType.MAP);
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
.newTaskAttemptId(taskID, 0);
TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attemptID));
OutputFormat outputFormat;
try {
outputFormat = ReflectionUtils.newInstance(taskContext
.getOutputFormatClass(), conf);
committer = outputFormat.getOutputCommitter(taskContext);
} catch (Exception e) {
throw new YarnException(e);
}
} else {
committer = ReflectionUtils.newInstance(conf.getClass(
"mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), conf);
}
LOG.info("OutputCommitter is " + committer.getClass().getName());
return committer;
}
protected boolean keepJobFiles(JobConf conf) { protected boolean keepJobFiles(JobConf conf) {
return (conf.getKeepTaskFilesPattern() != null || conf return (conf.getKeepTaskFilesPattern() != null || conf
@ -348,10 +418,10 @@ public class MRAppMaster extends CompositeService {
protected Job createJob(Configuration conf) { protected Job createJob(Configuration conf) {
// create single job // create single job
Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(), Job newJob = new JobImpl(jobId, appAttemptID, conf, dispatcher
taskAttemptListener, jobTokenSecretManager, fsTokens, clock, .getEventHandler(), taskAttemptListener, jobTokenSecretManager,
completedTasksFromPreviousRun, metrics, currentUser.getUserName(), fsTokens, clock, completedTasksFromPreviousRun, metrics, committer,
appSubmitTime, amInfos); newApiCommitter, currentUser.getUserName(), appSubmitTime, amInfos);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob); ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class, dispatcher.register(JobFinishEvent.Type.class,
@ -471,6 +541,22 @@ public class MRAppMaster extends CompositeService {
return appAttemptID.getApplicationId(); return appAttemptID.getApplicationId();
} }
public ApplicationAttemptId getAttemptID() {
return appAttemptID;
}
public JobId getJobId() {
return jobId;
}
public OutputCommitter getCommitter() {
return committer;
}
public boolean isNewApiCommitter() {
return newApiCommitter;
}
public int getStartCount() { public int getStartCount() {
return appAttemptID.getAttemptId(); return appAttemptID.getAttemptId();
} }

View File

@ -39,15 +39,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@ -64,7 +61,6 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.Counters;
@ -98,14 +94,11 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.MultipleArcTransition;
@ -127,14 +120,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// Maximum no. of fetch-failure notifications after which map task is failed // Maximum no. of fetch-failure notifications after which map task is failed
private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3; private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
//final fields //final fields
private final ApplicationAttemptId applicationAttemptId; private final ApplicationAttemptId applicationAttemptId;
private final Clock clock; private final Clock clock;
private final JobACLsManager aclsManager; private final JobACLsManager aclsManager;
private final String username; private final String username;
private final OutputCommitter committer;
private final Map<JobACL, AccessControlList> jobACLs; private final Map<JobACL, AccessControlList> jobACLs;
private final Set<TaskId> completedTasksFromPreviousRun; private final Set<TaskId> completedTasksFromPreviousRun;
private final List<AMInfo> amInfos; private final List<AMInfo> amInfos;
@ -142,6 +133,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final Lock writeLock; private final Lock writeLock;
private final JobId jobId; private final JobId jobId;
private final String jobName; private final String jobName;
private final boolean newApiCommitter;
private final org.apache.hadoop.mapreduce.JobID oldJobId; private final org.apache.hadoop.mapreduce.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener; private final TaskAttemptListener taskAttemptListener;
private final Object tasksSyncHandle = new Object(); private final Object tasksSyncHandle = new Object();
@ -167,7 +159,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private Path remoteJobSubmitDir; private Path remoteJobSubmitDir;
public Path remoteJobConfFile; public Path remoteJobConfFile;
private JobContext jobContext; private JobContext jobContext;
private OutputCommitter committer;
private int allowedMapFailuresPercent = 0; private int allowedMapFailuresPercent = 0;
private int allowedReduceFailuresPercent = 0; private int allowedReduceFailuresPercent = 0;
private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents; private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
@ -367,14 +358,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private Token<JobTokenIdentifier> jobToken; private Token<JobTokenIdentifier> jobToken;
private JobTokenSecretManager jobTokenSecretManager; private JobTokenSecretManager jobTokenSecretManager;
public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf, public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager, JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock, Credentials fsTokenCredentials, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics, Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
String userName, long appSubmitTime, List<AMInfo> amInfos) { OutputCommitter committer, boolean newApiCommitter, String userName,
long appSubmitTime, List<AMInfo> amInfos) {
this.applicationAttemptId = applicationAttemptId; this.applicationAttemptId = applicationAttemptId;
this.jobId = recordFactory.newRecordInstance(JobId.class); this.jobId = jobId;
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>"); this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
this.conf = conf; this.conf = conf;
this.metrics = metrics; this.metrics = metrics;
@ -383,14 +376,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
this.amInfos = amInfos; this.amInfos = amInfos;
this.userName = userName; this.userName = userName;
this.appSubmitTime = appSubmitTime; this.appSubmitTime = appSubmitTime;
ApplicationId applicationId = applicationAttemptId.getApplicationId(); this.oldJobId = TypeConverter.fromYarn(jobId);
jobId.setAppId(applicationId); this.newApiCommitter = newApiCommitter;
jobId.setId(applicationId.getId());
oldJobId = TypeConverter.fromYarn(jobId);
LOG.info("Job created" +
" appId=" + applicationId +
" jobId=" + jobId +
" oldJobId=" + oldJobId);
this.taskAttemptListener = taskAttemptListener; this.taskAttemptListener = taskAttemptListener;
this.eventHandler = eventHandler; this.eventHandler = eventHandler;
@ -400,6 +387,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
this.fsTokens = fsTokenCredentials; this.fsTokens = fsTokenCredentials;
this.jobTokenSecretManager = jobTokenSecretManager; this.jobTokenSecretManager = jobTokenSecretManager;
this.committer = committer;
this.aclsManager = new JobACLsManager(conf); this.aclsManager = new JobACLsManager(conf);
this.username = System.getProperty("user.name"); this.username = System.getProperty("user.name");
@ -854,47 +842,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
checkTaskLimits(); checkTaskLimits();
if (job.newApiCommitter) {
boolean newApiCommitter = false;
if ((job.numReduceTasks > 0 &&
job.conf.getBoolean("mapred.reducer.new-api", false)) ||
(job.numReduceTasks == 0 &&
job.conf.getBoolean("mapred.mapper.new-api", false))) {
newApiCommitter = true;
LOG.info("Using mapred newApiCommitter.");
}
LOG.info("OutputCommitter set in config " + job.conf.get(
"mapred.output.committer.class"));
if (newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf, job.jobContext = new JobContextImpl(job.conf,
job.oldJobId); job.oldJobId);
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
= RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(TaskId.class));
attemptID.getTaskId().setJobId(job.jobId);
attemptID.getTaskId().setTaskType(TaskType.MAP);
TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.conf,
TypeConverter.fromYarn(attemptID));
try {
OutputFormat outputFormat = ReflectionUtils.newInstance(
taskContext.getOutputFormatClass(), job.conf);
job.committer = outputFormat.getOutputCommitter(taskContext);
} catch(Exception e) {
throw new IOException("Failed to assign outputcommitter", e);
}
} else { } else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl( job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
new JobConf(job.conf), job.oldJobId); new JobConf(job.conf), job.oldJobId);
job.committer = ReflectionUtils.newInstance(
job.conf.getClass("mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), job.conf);
} }
LOG.info("OutputCommitter is " + job.committer.getClass().getName());
long inputLength = 0; long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) { for (int i = 0; i < job.numMapTasks; ++i) {

View File

@ -32,17 +32,23 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@ -53,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
@ -84,9 +91,6 @@ import org.apache.hadoop.yarn.service.Service;
//TODO: //TODO:
//task cleanup for all non completed tasks //task cleanup for all non completed tasks
//change job output committer to have
// - atomic job output promotion
// - recover output of completed tasks
public class RecoveryService extends CompositeService implements Recovery { public class RecoveryService extends CompositeService implements Recovery {
@ -95,6 +99,7 @@ public class RecoveryService extends CompositeService implements Recovery {
private static final Log LOG = LogFactory.getLog(RecoveryService.class); private static final Log LOG = LogFactory.getLog(RecoveryService.class);
private final ApplicationAttemptId applicationAttemptId; private final ApplicationAttemptId applicationAttemptId;
private final OutputCommitter committer;
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
private final ControlledClock clock; private final ControlledClock clock;
@ -108,9 +113,10 @@ public class RecoveryService extends CompositeService implements Recovery {
private volatile boolean recoveryMode = false; private volatile boolean recoveryMode = false;
public RecoveryService(ApplicationAttemptId applicationAttemptId, public RecoveryService(ApplicationAttemptId applicationAttemptId,
Clock clock) { Clock clock, OutputCommitter committer) {
super("RecoveringDispatcher"); super("RecoveringDispatcher");
this.applicationAttemptId = applicationAttemptId; this.applicationAttemptId = applicationAttemptId;
this.committer = committer;
this.dispatcher = new RecoveryDispatcher(); this.dispatcher = new RecoveryDispatcher();
this.clock = new ControlledClock(clock); this.clock = new ControlledClock(clock);
addService((Service) dispatcher); addService((Service) dispatcher);
@ -122,17 +128,17 @@ public class RecoveryService extends CompositeService implements Recovery {
// parse the history file // parse the history file
try { try {
parse(); parse();
if (completedTasks.size() > 0) { } catch (Exception e) {
recoveryMode = true;
LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS " +
"TO RECOVER " + completedTasks.size());
LOG.info("Job launch time " + jobInfo.getLaunchTime());
clock.setTime(jobInfo.getLaunchTime());
}
} catch (IOException e) {
LOG.warn(e); LOG.warn(e);
LOG.warn("Could not parse the old history file. Aborting recovery. " LOG.warn("Could not parse the old history file. Aborting recovery. "
+ "Starting afresh."); + "Starting afresh.", e);
}
if (completedTasks.size() > 0) {
recoveryMode = true;
LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS "
+ "TO RECOVER " + completedTasks.size());
LOG.info("Job launch time " + jobInfo.getLaunchTime());
clock.setTime(jobInfo.getLaunchTime());
} }
} }
@ -315,6 +321,20 @@ public class RecoveryService extends CompositeService implements Recovery {
TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus()); TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
switch (state) { switch (state) {
case SUCCEEDED: case SUCCEEDED:
//recover the task output
TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
attInfo.getAttemptId());
try {
committer.recoverTask(taskContext);
} catch (IOException e) {
actualHandler.handle(new JobDiagnosticsUpdateEvent(
aId.getTaskId().getJobId(), "Error in recovering task output " +
e.getMessage()));
actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
JobEventType.INTERNAL_ERROR));
}
LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
// send the done event // send the done event
LOG.info("Sending done event to " + aId); LOG.info("Sending done event to " + aId);
actualHandler.handle(new TaskAttemptEvent(aId, actualHandler.handle(new TaskAttemptEvent(aId,
@ -334,6 +354,16 @@ public class RecoveryService extends CompositeService implements Recovery {
return; return;
} }
else if (event.getType() ==
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
TaskAttemptId aId = ((ContainerLauncherEvent) event)
.getTaskAttemptID();
actualHandler.handle(
new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
return;
}
// delegate to the actual handler // delegate to the actual handler
actualHandler.handle(event); actualHandler.handle(event);
} }

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.WrappedJvmID; import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@ -264,8 +265,10 @@ public class MRApp extends MRAppMaster {
} catch (IOException e) { } catch (IOException e) {
throw new YarnException(e); throw new YarnException(e);
} }
Job newJob = new TestJob(conf, getAppID(), getDispatcher().getEventHandler(), Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(), getTaskAttemptListener(), getContext().getClock(),
getCommitter(), isNewApiCommitter(),
currentUser.getUserName()); currentUser.getUserName());
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
@ -413,13 +416,15 @@ public class MRApp extends MRAppMaster {
return localStateMachine; return localStateMachine;
} }
public TestJob(Configuration conf, ApplicationId applicationId, public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Configuration conf, EventHandler eventHandler,
Clock clock, String user) { TaskAttemptListener taskAttemptListener, Clock clock,
super(getApplicationAttemptId(applicationId, getStartCount()), conf, OutputCommitter committer, boolean newApiCommitter, String user) {
eventHandler, taskAttemptListener, new JobTokenSecretManager(), super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
new Credentials(), clock, getCompletedTaskFromPreviousRun(), metrics, conf, eventHandler, taskAttemptListener,
user, System.currentTimeMillis(), getAllAMInfos()); new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics, committer,
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos());
// This "this leak" is okay because the retained pointer is in an // This "this leak" is okay because the retained pointer is in an
// instance variable. // instance variable.

View File

@ -342,10 +342,10 @@ public class TestRMContainerAllocator {
public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf, public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf,
int numMaps, int numReduces) { int numMaps, int numReduces) {
super(appAttemptID, conf, null, null, null, null, null, null, null, null, super(MRBuilderUtils.newJobId(appAttemptID.getApplicationId(), 0),
System.currentTimeMillis(), null); appAttemptID, conf, null, null, null, null, null, null, null, null,
this.jobId = MRBuilderUtils true, null, System.currentTimeMillis(), null);
.newJobId(appAttemptID.getApplicationId(), 0); this.jobId = getID();
this.numMaps = numMaps; this.numMaps = numMaps;
this.numReduces = numReduces; this.numReduces = numReduces;
} }

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.mapreduce.v2.app;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import junit.framework.Assert; import junit.framework.Assert;
@ -25,10 +28,21 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
@ -37,20 +51,34 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test; import org.junit.Test;
public class TestRecovery { public class TestRecovery {
private static final Log LOG = LogFactory.getLog(TestRecovery.class); private static final Log LOG = LogFactory.getLog(TestRecovery.class);
private static Path outputDir = new Path(new File("target",
TestRecovery.class.getName()).getAbsolutePath() +
Path.SEPARATOR + "out");
private static String partFile = "part-r-00000";
private Text key1 = new Text("key1");
private Text key2 = new Text("key2");
private Text val1 = new Text("val1");
private Text val2 = new Text("val2");
@Test @Test
public void testCrashed() throws Exception { public void testCrashed() throws Exception {
int runCount = 0; int runCount = 0;
long am1StartTimeEst = System.currentTimeMillis(); long am1StartTimeEst = System.currentTimeMillis();
MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount); MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf); Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING); app.waitForState(job, JobState.RUNNING);
long jobStartTime = job.getReport().getStartTime(); long jobStartTime = job.getReport().getStartTime();
@ -135,6 +163,9 @@ public class TestRecovery {
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount); app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
conf = new Configuration(); conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf); job = app.submit(conf);
app.waitForState(job, JobState.RUNNING); app.waitForState(job, JobState.RUNNING);
@ -202,6 +233,164 @@ public class TestRecovery {
// available in the failed attempt should be available here // available in the failed attempt should be available here
} }
@Test
public void testOutputRecovery() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task reduceTask1 = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
.next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
//send the done signal to the map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(reduceTask1, TaskState.RUNNING);
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
// write output corresponding to reduce1
writeOutput(reduce1Attempt1, conf);
//send the done signal to the 1st reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for first reduce task to complete
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
//stop the app before the job completes.
app.stop();
//rerun
//in rerun the map will be recovered from previous run
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, ++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
reduceTask1 = it.next();
Task reduceTask2 = it.next();
// map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// first reduce will be recovered, no need to send done
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
app.waitForState(reduceTask2, TaskState.RUNNING);
TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
.iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
//send the done signal to the 2nd reduce task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce2Attempt.getID(),
TaskAttemptEventType.TA_DONE));
//wait to get it completed
app.waitForState(reduceTask2, TaskState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
validateOutput();
}
private void writeOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attempt.getID()));
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat
.getRecordWriter(tContext);
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key1, val1);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val1);
theRecordWriter.write(nullWritable, val2);
theRecordWriter.write(key2, nullWritable);
theRecordWriter.write(key1, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key2, val2);
} finally {
theRecordWriter.close(tContext);
}
OutputFormat outputFormat = ReflectionUtils.newInstance(
tContext.getOutputFormatClass(), conf);
OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
committer.commitTask(tContext);
}
private void validateOutput() throws IOException {
File expectedFile = new File(new Path(outputDir, partFile).toString());
StringBuffer expectedOutput = new StringBuffer();
expectedOutput.append(key1).append('\t').append(val1).append("\n");
expectedOutput.append(val1).append("\n");
expectedOutput.append(val2).append("\n");
expectedOutput.append(key2).append("\n");
expectedOutput.append(key1).append("\n");
expectedOutput.append(key2).append('\t').append(val2).append("\n");
String output = slurp(expectedFile);
Assert.assertEquals(output, expectedOutput.toString());
}
public static String slurp(File f) throws IOException {
int len = (int) f.length();
byte[] buf = new byte[len];
FileInputStream in = new FileInputStream(f);
String contents = null;
try {
in.read(buf, 0, len);
contents = new String(buf, "UTF-8");
} finally {
in.close();
}
return contents;
}
class MRAppWithHistory extends MRApp { class MRAppWithHistory extends MRApp {
public MRAppWithHistory(int maps, int reduces, boolean autoComplete, public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) { String testName, boolean cleanOnStart, int startCount) {

View File

@ -449,6 +449,8 @@ public interface MRJobConfig {
public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV"; public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV"; public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV";
// This should be the directory where splits file gets localized on the node // This should be the directory where splits file gets localized on the node
// running ApplicationMaster. // running ApplicationMaster.
public static final String JOB_SUBMIT_DIR = "jobSubmitDir"; public static final String JOB_SUBMIT_DIR = "jobSubmitDir";

View File

@ -25,7 +25,6 @@ import org.apache.avro.Schema;
import org.apache.avro.io.DatumWriter; import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder; import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory; import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8; import org.apache.avro.util.Utf8;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -72,6 +71,7 @@ class EventWriter {
void flush() throws IOException { void flush() throws IOException {
encoder.flush(); encoder.flush();
out.flush(); out.flush();
out.hflush();
} }
void close() throws IOException { void close() throws IOException {

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.lib.output; package org.apache.hadoop.mapreduce.lib.output;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
@ -56,6 +58,17 @@ public class NullOutputFormat<K, V> extends OutputFormat<K, V> {
} }
public void setupJob(JobContext jobContext) { } public void setupJob(JobContext jobContext) { }
public void setupTask(TaskAttemptContext taskContext) { } public void setupTask(TaskAttemptContext taskContext) { }
@Override
public boolean isRecoverySupported() {
return true;
}
@Override
public void recoverTask(TaskAttemptContext taskContext)
throws IOException {
// Nothing to do for recovering the task.
}
}; };
} }
} }