MAPREDUCE-5367. Local jobs all use same local working directory (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1510610 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2013-08-05 17:07:58 +00:00
parent 34ae96fd8c
commit 6ea797d0d0
2 changed files with 13 additions and 12 deletions

View File

@ -211,6 +211,9 @@ Release 2.1.1-beta - UNRELEASED
MAPREDUCE-5440. TestCopyCommitter Fails on JDK7 (Robert Parker via jlowe)
MAPREDUCE-5367. Local jobs all use same local working directory
(Sandy Ryza)
Release 2.1.0-beta - 2013-08-06
INCOMPATIBLE CHANGES

View File

@ -146,7 +146,9 @@ public class LocalJobRunner implements ClientProtocol {
this.id = jobid;
JobConf conf = new JobConf(systemJobFile);
this.localFs = FileSystem.getLocal(conf);
this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir));
String user = UserGroupInformation.getCurrentUser().getShortUserName();
this.localJobDir = localFs.makeQualified(new Path(
new Path(conf.getLocalPath(jobDir), user), jobid.toString()));
this.localJobFile = new Path(this.localJobDir, id + ".xml");
// Manage the distributed cache. If there are files to be copied,
@ -217,7 +219,7 @@ public class LocalJobRunner implements ClientProtocol {
info.getSplitIndex(), 1);
map.setUser(UserGroupInformation.getCurrentUser().
getShortUserName());
setupChildMapredLocalDirs(map, localConf);
setupChildMapredLocalDirs(localJobDir, map, localConf);
MapOutputFile mapOutput = new MROutputFiles();
mapOutput.setConf(localConf);
@ -412,7 +414,7 @@ public class LocalJobRunner implements ClientProtocol {
getShortUserName());
JobConf localConf = new JobConf(job);
localConf.set("mapreduce.jobtracker.address", "local");
setupChildMapredLocalDirs(reduce, localConf);
setupChildMapredLocalDirs(localJobDir, reduce, localConf);
// move map output to reduce input
for (int i = 0; i < mapIds.size(); i++) {
if (!this.isInterrupted()) {
@ -839,31 +841,27 @@ public class LocalJobRunner implements ClientProtocol {
throw new UnsupportedOperationException("Not supported");
}
static void setupChildMapredLocalDirs(Task t, JobConf conf) {
static void setupChildMapredLocalDirs(Path localJobDir, Task t, JobConf conf) {
String[] localDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR);
String jobId = t.getJobID().toString();
String taskId = t.getTaskID().toString();
boolean isCleanup = t.isTaskCleanupTask();
String user = t.getUser();
StringBuffer childMapredLocalDir =
new StringBuffer(localDirs[0] + Path.SEPARATOR
+ getLocalTaskDir(user, jobId, taskId, isCleanup));
+ getLocalTaskDir(localJobDir, taskId, isCleanup));
for (int i = 1; i < localDirs.length; i++) {
childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
+ getLocalTaskDir(user, jobId, taskId, isCleanup));
+ getLocalTaskDir(localJobDir, taskId, isCleanup));
}
LOG.debug(MRConfig.LOCAL_DIR + " for child : " + childMapredLocalDir);
conf.set(MRConfig.LOCAL_DIR, childMapredLocalDir.toString());
}
static final String TASK_CLEANUP_SUFFIX = ".cleanup";
static final String SUBDIR = jobDir;
static final String JOBCACHE = "jobcache";
static String getLocalTaskDir(String user, String jobid, String taskid,
static String getLocalTaskDir(Path localJobDir, String taskid,
boolean isCleanupAttempt) {
String taskDir = SUBDIR + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE
+ Path.SEPARATOR + jobid + Path.SEPARATOR + taskid;
String taskDir = localJobDir.toString() + Path.SEPARATOR + taskid;
if (isCleanupAttempt) {
taskDir = taskDir + TASK_CLEANUP_SUFFIX;
}