diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index cbae575b1a7..94af3387d76 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -400,7 +401,7 @@ public class FileOutputCommitter extends PathOutputCommitter { if (algorithmVersion == 1) { for (FileStatus stat: getAllCommittedTaskPaths(context)) { - mergePaths(fs, stat, finalOutput); + mergePaths(fs, stat, finalOutput, context); } } @@ -451,10 +452,11 @@ public class FileOutputCommitter extends PathOutputCommitter { * @throws IOException on any error */ private void mergePaths(FileSystem fs, final FileStatus from, - final Path to) throws IOException { + final Path to, JobContext context) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Merging data from " + from + " to " + to); } + reportProgress(context); FileStatus toStat; try { toStat = fs.getFileStatus(to); @@ -478,22 +480,28 @@ public class FileOutputCommitter extends PathOutputCommitter { if (!fs.delete(to, true)) { throw new IOException("Failed to delete " + to); } - renameOrMerge(fs, from, to); + renameOrMerge(fs, from, to, context); } else { //It is a directory so merge everything in the directories for (FileStatus subFrom : fs.listStatus(from.getPath())) { Path subTo = new Path(to, subFrom.getPath().getName()); - mergePaths(fs, subFrom, subTo); + mergePaths(fs, subFrom, subTo, context); } } } else { - renameOrMerge(fs, from, to); + renameOrMerge(fs, from, to, context); } } } - private void renameOrMerge(FileSystem fs, FileStatus from, Path to) - throws IOException { + private void reportProgress(JobContext context) { + if (context instanceof Progressable) { + ((Progressable) context).progress(); + } + } + + private void renameOrMerge(FileSystem fs, FileStatus from, Path to, + JobContext context) throws IOException { if (algorithmVersion == 1) { if (!fs.rename(from.getPath(), to)) { throw new IOException("Failed to rename " + from + " to " + to); @@ -502,7 +510,7 @@ public class FileOutputCommitter extends PathOutputCommitter { fs.mkdirs(to); for (FileStatus subFrom : fs.listStatus(from.getPath())) { Path subTo = new Path(to, subFrom.getPath().getName()); - mergePaths(fs, subFrom, subTo); + mergePaths(fs, subFrom, subTo, context); } } } @@ -594,7 +602,7 @@ public class FileOutputCommitter extends PathOutputCommitter { committedTaskPath); } else { // directly merge everything from taskAttemptPath to output directory - mergePaths(fs, taskAttemptDirStatus, outputPath); + mergePaths(fs, taskAttemptDirStatus, outputPath, context); LOG.info("Saved output of task '" + attemptId + "' to " + outputPath); @@ -718,7 +726,7 @@ public class FileOutputCommitter extends PathOutputCommitter { FileStatus from = fs.getFileStatus(previousCommittedTaskPath); LOG.info("Recovering task for upgrading scenario, moving files from " + previousCommittedTaskPath + " to " + outputPath); - mergePaths(fs, from, outputPath); + mergePaths(fs, from, outputPath, context); } catch (FileNotFoundException ignored) { } LOG.info("Done recovering task " + attemptId); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java index fc43dce1830..dd717a66cd7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -58,6 +58,10 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + @SuppressWarnings("unchecked") public class TestFileOutputCommitter { private static final Path outDir = new Path( @@ -434,6 +438,35 @@ public class TestFileOutputCommitter { FileUtil.fullyDelete(new File(outDir.toString())); } + @Test + public void testProgressDuringMerge() throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, + 2); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = spy(new TaskAttemptContextImpl(conf, taskID)); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + MapFileOutputFormat theOutputFormat = new MapFileOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); + writeMapFileOutput(theRecordWriter, tContext); + + // do commit + committer.commitTask(tContext); + //make sure progress flag was set. + // The first time it is set is during commit but ensure that + // mergePaths call makes it go again. + verify(tContext, atLeast(2)).progress(); + } + @Test public void testCommitterRepeatableV1() throws Exception { testCommitterRetryInternal(1);