MAPREDUCE-3808. Fixed an NPE in FileOutputCommitter for jobs with maps but no reduces. Contributed by Robert Joseph Evans.

svn merge --ignore-ancestry -c 1241217 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1241218 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-02-06 22:17:18 +00:00
parent 287501b2bc
commit 15c308d948
4 changed files with 79 additions and 37 deletions

View File

@ -670,6 +670,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3813. Added a cache for resolved racks. (vinodkv via acmurthy) MAPREDUCE-3813. Added a cache for resolved racks. (vinodkv via acmurthy)
MAPREDUCE-3808. Fixed an NPE in FileOutputCommitter for jobs with maps
but no reduces. (Robert Joseph Evans via vinodkv)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -85,18 +85,21 @@ private static Path getOutputPath(TaskAttemptContext context) {
*/ */
@Private @Private
Path getJobAttemptPath(JobContext context) { Path getJobAttemptPath(JobContext context) {
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter Path out = getOutputPath(context);
.getJobAttemptPath(context, getOutputPath(context)); return out == null ? null :
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.getJobAttemptPath(context, out);
} }
@Private @Private
Path getTaskAttemptPath(TaskAttemptContext context) throws IOException { Path getTaskAttemptPath(TaskAttemptContext context) throws IOException {
return getTaskAttemptPath(context, getOutputPath(context)); Path out = getOutputPath(context);
return out == null ? null : getTaskAttemptPath(context, out);
} }
private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException { private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException {
Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf()); Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf());
if(workPath == null) { if(workPath == null && out != null) {
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.getTaskAttemptPath(context, out); .getTaskAttemptPath(context, out);
} }
@ -110,14 +113,17 @@ private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOE
* @return the path where the output of a committed task is stored until * @return the path where the output of a committed task is stored until
* the entire job is committed. * the entire job is committed.
*/ */
@Private
Path getCommittedTaskPath(TaskAttemptContext context) { Path getCommittedTaskPath(TaskAttemptContext context) {
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter Path out = getOutputPath(context);
.getCommittedTaskPath(context, getOutputPath(context)); return out == null ? null :
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.getCommittedTaskPath(context, out);
} }
public Path getWorkPath(TaskAttemptContext context, Path outputPath) public Path getWorkPath(TaskAttemptContext context, Path outputPath)
throws IOException { throws IOException {
return getTaskAttemptPath(context, outputPath); return outputPath == null ? null : getTaskAttemptPath(context, outputPath);
} }
@Override @Override
@ -156,6 +162,7 @@ public void abortJob(JobContext context, int runState)
getWrapped(context).abortJob(context, state); getWrapped(context).abortJob(context, state);
} }
@Override
public void setupTask(TaskAttemptContext context) throws IOException { public void setupTask(TaskAttemptContext context) throws IOException {
getWrapped(context).setupTask(context); getWrapped(context).setupTask(context);
} }

View File

@ -495,6 +495,7 @@ public boolean isRecoverySupported() {
@Override @Override
public void recoverTask(TaskAttemptContext context) public void recoverTask(TaskAttemptContext context)
throws IOException { throws IOException {
if(hasOutputPath()) {
context.progress(); context.progress();
TaskAttemptID attemptId = context.getTaskAttemptID(); TaskAttemptID attemptId = context.getTaskAttemptID();
int previousAttempt = getAppAttemptId(context) - 1; int previousAttempt = getAppAttemptId(context) - 1;
@ -526,5 +527,8 @@ public void recoverTask(TaskAttemptContext context)
} else { } else {
LOG.warn(attemptId+" had no output to recover."); LOG.warn(attemptId+" had no output to recover.");
} }
} else {
LOG.warn("Output Path is null in recoverTask()");
}
} }
} }

View File

@ -104,7 +104,9 @@ public void testRecovery() throws Exception {
writeOutput(theRecordWriter, tContext); writeOutput(theRecordWriter, tContext);
// do commit // do commit
if(committer.needsTaskCommit(tContext)) {
committer.commitTask(tContext); committer.commitTask(tContext);
}
Path jobTempDir1 = committer.getCommittedTaskPath(tContext); Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
File jtd1 = new File(jobTempDir1.toUri().getPath()); File jtd1 = new File(jobTempDir1.toUri().getPath());
assertTrue(jtd1.exists()); assertTrue(jtd1.exists());
@ -188,7 +190,9 @@ public void testCommitter() throws Exception {
writeOutput(theRecordWriter, tContext); writeOutput(theRecordWriter, tContext);
// do commit // do commit
if(committer.needsTaskCommit(tContext)) {
committer.commitTask(tContext); committer.commitTask(tContext);
}
committer.commitJob(jContext); committer.commitJob(jContext);
// validate output // validate output
@ -214,7 +218,9 @@ public void testMapFileOutputCommitter() throws Exception {
writeMapFileOutput(theRecordWriter, tContext); writeMapFileOutput(theRecordWriter, tContext);
// do commit // do commit
if(committer.needsTaskCommit(tContext)) {
committer.commitTask(tContext); committer.commitTask(tContext);
}
committer.commitJob(jContext); committer.commitJob(jContext);
// validate output // validate output
@ -222,6 +228,28 @@ public void testMapFileOutputCommitter() throws Exception {
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
public void testMapOnlyNoOutput() throws Exception {
JobConf conf = new JobConf();
//This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
if(committer.needsTaskCommit(tContext)) {
// do commit
committer.commitTask(tContext);
}
committer.commitJob(jContext);
// validate output
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testAbort() throws IOException, InterruptedException { public void testAbort() throws IOException, InterruptedException {
JobConf conf = new JobConf(); JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir); FileOutputFormat.setOutputPath(conf, outDir);