MAPREDUCE-3090. Fix MR AM to use ApplicationAttemptId rather than (ApplicationId, startCount) consistently.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1175718 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-09-26 08:44:41 +00:00
parent 5a3040cad4
commit c9a7d3dbf9
5 changed files with 58 additions and 47 deletions

View File

@ -1423,6 +1423,9 @@ Release 0.23.0 - Unreleased
"mapreduce.jobtracker.address" configuration value for
JobTracker: "local" (Venu Gopala Rao via mahadev)
MAPREDUCE-3090. Fix MR AM to use ApplicationAttemptId rather than
(ApplicationId, startCount) consistently. (acmurthy)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -115,8 +115,6 @@ public class MRAppMaster extends CompositeService {
private Clock clock;
private final long startTime = System.currentTimeMillis();
private String appName;
private final int startCount;
private final ApplicationId appID;
private final ApplicationAttemptId appAttemptID;
protected final MRAppMetrics metrics;
private Set<TaskId> completedTasksFromPreviousRun;
@ -134,21 +132,16 @@ public class MRAppMaster extends CompositeService {
private Job job;
public MRAppMaster(ApplicationId applicationId, int startCount) {
this(applicationId, new SystemClock(), startCount);
public MRAppMaster(ApplicationAttemptId applicationAttemptId) {
this(applicationAttemptId, new SystemClock());
}
public MRAppMaster(ApplicationId applicationId, Clock clock, int startCount) {
public MRAppMaster(ApplicationAttemptId applicationAttemptId, Clock clock) {
super(MRAppMaster.class.getName());
this.clock = clock;
this.appID = applicationId;
this.appAttemptID = RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(ApplicationAttemptId.class);
this.appAttemptID.setApplicationId(appID);
this.appAttemptID.setAttemptId(startCount);
this.startCount = startCount;
this.appAttemptID = applicationAttemptId;
this.metrics = MRAppMetrics.create();
LOG.info("Created MRAppMaster for application " + applicationId);
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@Override
@ -160,9 +153,9 @@ public class MRAppMaster extends CompositeService {
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false)
&& startCount > 1) {
&& appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. Will try to recover from previous life.");
Recovery recoveryServ = new RecoveryService(appID, clock, startCount);
Recovery recoveryServ = new RecoveryService(appAttemptID, clock);
addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock();
@ -265,8 +258,8 @@ public class MRAppMaster extends CompositeService {
// ////////// End of obtaining the tokens needed by the job. //////////
// create single job
Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock, startCount,
Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, currentUser.getUserName());
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
@ -377,11 +370,11 @@ public class MRAppMaster extends CompositeService {
}
public ApplicationId getAppID() {
return appID;
return appAttemptID.getApplicationId();
}
public int getStartCount() {
return startCount;
return appAttemptID.getAttemptId();
}
public AppContext getContext() {
@ -506,7 +499,7 @@ public class MRAppMaster extends CompositeService {
@Override
public ApplicationId getApplicationID() {
return appID;
return appAttemptID.getApplicationId();
}
@Override
@ -659,8 +652,7 @@ public class MRAppMaster extends CompositeService {
}
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
MRAppMaster appMaster = new MRAppMaster(applicationAttemptId
.getApplicationId(), applicationAttemptId.getAttemptId());
MRAppMaster appMaster = new MRAppMaster(applicationAttemptId);
Runtime.getRuntime().addShutdownHook(
new CompositeServiceShutdownHook(appMaster));
YarnConfiguration conf = new YarnConfiguration(new JobConf());

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceChildJVM;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -101,6 +100,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -129,11 +129,11 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
RecordFactoryProvider.getRecordFactory(null);
//final fields
private final ApplicationAttemptId applicationAttemptId;
private final Clock clock;
private final JobACLsManager aclsManager;
private final String username;
private final Map<JobACL, AccessControlList> jobACLs;
private final int startCount;
private final Set<TaskId> completedTasksFromPreviousRun;
private final Lock readLock;
private final Lock writeLock;
@ -365,26 +365,26 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private Token<JobTokenIdentifier> jobToken;
private JobTokenSecretManager jobTokenSecretManager;
public JobImpl(ApplicationId appID, Configuration conf,
public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock, int startCount,
Credentials fsTokenCredentials, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
String userName) {
this.applicationAttemptId = applicationAttemptId;
this.jobId = recordFactory.newRecordInstance(JobId.class);
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
this.conf = conf;
this.metrics = metrics;
this.clock = clock;
this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
this.startCount = startCount;
this.userName = userName;
jobId.setAppId(appID);
jobId.setId(appID.getId());
ApplicationId applicationId = applicationAttemptId.getApplicationId();
jobId.setAppId(applicationId);
jobId.setId(applicationId.getId());
oldJobId = TypeConverter.fromYarn(jobId);
LOG.info("Job created" +
" appId=" + appID +
" appId=" + applicationId +
" jobId=" + jobId +
" oldJobId=" + oldJobId);
@ -1078,7 +1078,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
job.conf, splits[i],
job.taskAttemptListener,
job.committer, job.jobToken, job.fsTokens.getAllTokens(),
job.clock, job.completedTasksFromPreviousRun, job.startCount,
job.clock, job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics);
job.addTask(task);
}
@ -1095,7 +1096,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
job.conf, job.numMapTasks,
job.taskAttemptListener, job.committer, job.jobToken,
job.fsTokens.getAllTokens(), job.clock,
job.completedTasksFromPreviousRun, job.startCount, job.metrics);
job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics);
job.addTask(task);
}
LOG.info("Number of reduces for job " + job.jobId + " = "

View File

@ -58,7 +58,7 @@ import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -92,10 +92,9 @@ public class RecoveryService extends CompositeService implements Recovery {
private static final Log LOG = LogFactory.getLog(RecoveryService.class);
private final ApplicationId appID;
private final ApplicationAttemptId applicationAttemptId;
private final Dispatcher dispatcher;
private final ControlledClock clock;
private final int startCount;
private JobInfo jobInfo = null;
private final Map<TaskId, TaskInfo> completedTasks =
@ -106,10 +105,10 @@ public class RecoveryService extends CompositeService implements Recovery {
private volatile boolean recoveryMode = false;
public RecoveryService(ApplicationId appID, Clock clock, int startCount) {
public RecoveryService(ApplicationAttemptId applicationAttemptId,
Clock clock) {
super("RecoveringDispatcher");
this.appID = appID;
this.startCount = startCount;
this.applicationAttemptId = applicationAttemptId;
this.dispatcher = new RecoveryDispatcher();
this.clock = new ControlledClock(clock);
addService((Service) dispatcher);
@ -152,7 +151,8 @@ public class RecoveryService extends CompositeService implements Recovery {
private void parse() throws IOException {
// TODO: parse history file based on startCount
String jobName = TypeConverter.fromYarn(appID).toString();
String jobName =
TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
FSDataInputStream in = null;
Path historyFile = null;
@ -160,8 +160,9 @@ public class RecoveryService extends CompositeService implements Recovery {
new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
getConfig());
//read the previous history file
historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
histDirPath, jobName, startCount - 1)); //read the previous history file
histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));
in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse();

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -91,7 +92,7 @@ public class MRApp extends MRAppMaster {
private File testWorkDir;
private Path testAbsPath;
private final RecordFactory recordFactory =
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
//if true, tasks complete automatically as soon as they are launched
@ -100,7 +101,7 @@ public class MRApp extends MRAppMaster {
static ApplicationId applicationId;
static {
applicationId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
applicationId = recordFactory.newRecordInstance(ApplicationId.class);
applicationId.setClusterTimestamp(0);
applicationId.setId(0);
}
@ -109,8 +110,18 @@ public class MRApp extends MRAppMaster {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) {
super(applicationId, startCount);
private static ApplicationAttemptId getApplicationAttemptId(
ApplicationId applicationId, int startCount) {
ApplicationAttemptId applicationAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
applicationAttemptId.setApplicationId(applicationId);
applicationAttemptId.setAttemptId(startCount);
return applicationAttemptId;
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
super(getApplicationAttemptId(applicationId, startCount));
this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath());
LOG.info("PathUsed: " + testAbsPath);
@ -391,11 +402,12 @@ public class MRApp extends MRAppMaster {
return localStateMachine;
}
public TestJob(Configuration conf, ApplicationId appID,
public TestJob(Configuration conf, ApplicationId applicationId,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
Clock clock, String user) {
super(appID, conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock, getStartCount(),
super(getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics, user);
// This "this leak" is okay because the retained pointer is in an