From c7fb49b3c5b8aa394f578e644bbc13fc9cbfcaca Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Fri, 14 Oct 2011 01:16:30 +0000 Subject: [PATCH] MAPREDUCE-3170. Fixed job output commit for deep hierarchies. Contributed by Hitesh Shah. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1183185 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapred/FileOutputCommitter.java | 87 +++++++++++------- .../lib/output/FileOutputCommitter.java | 90 +++++++++++++------ .../mapred/TestFileOutputCommitter.java | 70 ++++++++++++++- .../lib/output/TestFileOutputCommitter.java | 70 ++++++++++++++- 5 files changed, 252 insertions(+), 68 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 99134d61f3f..7f35debfc9f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1608,6 +1608,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2789. Complete schedulingInfo on CLI. (Eric Payne via acmurthy) + MAPREDUCE-3170. Fixed job output commit for deep hierarchies. (Hitesh Shah + via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java index 84e7b697251..32b6e2232d0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java @@ -71,27 +71,30 @@ public class FileOutputCommitter extends OutputCommitter { //delete the task temp directory from the current jobtempdir JobConf conf = context.getJobConf(); Path outputPath = FileOutputFormat.getOutputPath(conf); - FileSystem outputFileSystem = outputPath.getFileSystem(conf); - Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + - Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME); - FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); - if (fileSys.exists(tmpDir)) { - fileSys.delete(tmpDir, true); - } else { - LOG.warn("Task temp dir could not be deleted " + tmpDir); - } - - //move the job output to final place - Path jobOutputPath = - new Path(outputPath, getJobAttemptBaseDirName(context)); - moveJobOutputs(outputFileSystem, outputPath, jobOutputPath); - - // delete the _temporary folder in the output folder - cleanupJob(context); - // check if the output-dir marking is required - if (shouldMarkOutputDir(context.getJobConf())) { - // create a _success file in the output folder - markOutputDirSuccessful(context); + if (outputPath != null) { + FileSystem outputFileSystem = outputPath.getFileSystem(conf); + Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + + Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME); + FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); + if (fileSys.exists(tmpDir)) { + fileSys.delete(tmpDir, true); + } else { + LOG.warn("Task temp dir could not be deleted " + tmpDir); + } + + //move the job output to final place + Path jobOutputPath = + new Path(outputPath, getJobAttemptBaseDirName(context)); + moveJobOutputs(outputFileSystem, + jobOutputPath, outputPath, jobOutputPath); + + // delete the _temporary folder in the output folder + cleanupJob(context); + // check if the output-dir marking is required + if (shouldMarkOutputDir(context.getJobConf())) { + // create a _success file in the output folder + markOutputDirSuccessful(context); + } } } @@ -109,10 +112,14 @@ public class FileOutputCommitter extends OutputCommitter { } } - private void moveJobOutputs(FileSystem fs, + private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath, Path finalOutputDir, Path jobOutput) throws IOException { + LOG.debug("Told to move job output from " + jobOutput + + " to " + finalOutputDir + + " and orig job output path is " + origJobOutputPath); if (fs.isFile(jobOutput)) { - Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput); + Path finalOutputPath = + getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath); if (!fs.rename(jobOutput, finalOutputPath)) { if (!fs.delete(finalOutputPath, true)) { throw new IOException("Failed to delete earlier output of job"); @@ -121,18 +128,23 @@ public class FileOutputCommitter extends OutputCommitter { throw new IOException("Failed to save output of job"); } } - LOG.debug("Moved " + jobOutput + " to " + finalOutputPath); + LOG.debug("Moved job output file from " + jobOutput + " to " + + finalOutputPath); } else if (fs.getFileStatus(jobOutput).isDirectory()) { + LOG.debug("Job output file " + jobOutput + " is a dir"); FileStatus[] paths = fs.listStatus(jobOutput); - Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput); + Path finalOutputPath = + getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath); fs.mkdirs(finalOutputPath); + LOG.debug("Creating dirs along job output path " + finalOutputPath); if (paths != null) { for (FileStatus path : paths) { - moveJobOutputs(fs, finalOutputDir, path.getPath()); + moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath()); } } } } + @Override @Deprecated public void cleanupJob(JobContext context) throws IOException { @@ -199,8 +211,10 @@ public class FileOutputCommitter extends OutputCommitter { throws IOException { TaskAttemptID attemptId = context.getTaskAttemptID(); context.getProgressible().progress(); + LOG.debug("Told to move taskoutput from " + taskOutput + + " to " + jobOutputDir); if (fs.isFile(taskOutput)) { - Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, + Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput, getTempTaskOutputPath(context)); if (!fs.rename(taskOutput, finalOutputPath)) { if (!fs.delete(finalOutputPath, true)) { @@ -214,10 +228,12 @@ public class FileOutputCommitter extends OutputCommitter { } LOG.debug("Moved " + taskOutput + " to " + finalOutputPath); } else if(fs.getFileStatus(taskOutput).isDirectory()) { + LOG.debug("Taskoutput " + taskOutput + " is a dir"); FileStatus[] paths = fs.listStatus(taskOutput); - Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, + Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput, getTempTaskOutputPath(context)); fs.mkdirs(finalOutputPath); + LOG.debug("Creating dirs along path " + finalOutputPath); if (paths != null) { for (FileStatus path : paths) { moveTaskOutputs(context, fs, jobOutputDir, path.getPath()); @@ -235,14 +251,16 @@ public class FileOutputCommitter extends OutputCommitter { } } - private Path getFinalPath(Path jobOutputDir, Path taskOutput, + @SuppressWarnings("deprecation") + private Path getFinalPath(FileSystem fs, Path jobOutputDir, Path taskOutput, Path taskOutputPath) throws IOException { - URI taskOutputUri = taskOutput.toUri(); - URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri); + URI taskOutputUri = taskOutput.makeQualified(fs).toUri(); + URI taskOutputPathUri = taskOutputPath.makeQualified(fs).toUri(); + URI relativePath = taskOutputPathUri.relativize(taskOutputUri); if (taskOutputUri == relativePath) { //taskOutputPath is not a parent of taskOutput throw new IOException("Can not get the relative path: base = " + - taskOutputPath + " child = " + taskOutput); + taskOutputPathUri + " child = " + taskOutputUri); } if (relativePath.getPath().length() > 0) { return new Path(jobOutputDir, relativePath.getPath()); @@ -325,7 +343,10 @@ public class FileOutputCommitter extends OutputCommitter { new Path(outputPath, getJobAttemptBaseDirName(previousAttempt)); if (outputFileSystem.exists(pathToRecover)) { // Move the task outputs to their final place - moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover); + LOG.debug("Trying to recover task from " + pathToRecover + + " into " + jobOutputPath); + moveJobOutputs(outputFileSystem, + pathToRecover, jobOutputPath, pathToRecover); LOG.info("Saved output of job to " + jobOutputPath); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 26390c7df2a..497ca317fd3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -111,32 +111,48 @@ public class FileOutputCommitter extends OutputCommitter { * @param context the job's context */ public void commitJob(JobContext context) throws IOException { - //delete the task temp directory from the current jobtempdir - Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + - Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME); - FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); - if (fileSys.exists(tmpDir)) { - fileSys.delete(tmpDir, true); - } else { - LOG.warn("Task temp dir could not be deleted " + tmpDir); - } - - //move the job output to final place - Path jobOutputPath = - new Path(outputPath, getJobAttemptBaseDirName(context)); - moveJobOutputs(outputFileSystem, outputPath, jobOutputPath); - - // delete the _temporary folder and create a _done file in the o/p folder - cleanupJob(context); - if (shouldMarkOutputDir(context.getConfiguration())) { - markOutputDirSuccessful(context); + if (outputPath != null) { + //delete the task temp directory from the current jobtempdir + Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + + Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME); + FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); + if (fileSys.exists(tmpDir)) { + fileSys.delete(tmpDir, true); + } else { + LOG.warn("Task temp dir could not be deleted " + tmpDir); + } + + //move the job output to final place + Path jobOutputPath = + new Path(outputPath, getJobAttemptBaseDirName(context)); + moveJobOutputs(outputFileSystem, jobOutputPath, outputPath, jobOutputPath); + + // delete the _temporary folder and create a _done file in the o/p folder + cleanupJob(context); + if (shouldMarkOutputDir(context.getConfiguration())) { + markOutputDirSuccessful(context); + } } } - private void moveJobOutputs(FileSystem fs, + /** + * Move job output to final location + * @param fs Filesystem handle + * @param origJobOutputPath The original location of the job output + * Required to generate the relative path for correct moving of data. + * @param finalOutputDir The final output directory to which the job output + * needs to be moved + * @param jobOutput The current job output directory being moved + * @throws IOException + */ + private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath, Path finalOutputDir, Path jobOutput) throws IOException { + LOG.debug("Told to move job output from " + jobOutput + + " to " + finalOutputDir + + " and orig job output path is " + origJobOutputPath); if (fs.isFile(jobOutput)) { - Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput); + Path finalOutputPath = + getFinalPath(finalOutputDir, jobOutput, origJobOutputPath); if (!fs.rename(jobOutput, finalOutputPath)) { if (!fs.delete(finalOutputPath, true)) { throw new IOException("Failed to delete earlier output of job"); @@ -145,14 +161,18 @@ public class FileOutputCommitter extends OutputCommitter { throw new IOException("Failed to save output of job"); } } - LOG.debug("Moved " + jobOutput + " to " + finalOutputPath); + LOG.debug("Moved job output file from " + jobOutput + " to " + + finalOutputPath); } else if (fs.getFileStatus(jobOutput).isDirectory()) { + LOG.debug("Job output file " + jobOutput + " is a dir"); FileStatus[] paths = fs.listStatus(jobOutput); - Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput); + Path finalOutputPath = + getFinalPath(finalOutputDir, jobOutput, origJobOutputPath); fs.mkdirs(finalOutputPath); + LOG.debug("Creating dirs along job output path " + finalOutputPath); if (paths != null) { for (FileStatus path : paths) { - moveJobOutputs(fs, finalOutputDir, path.getPath()); + moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath()); } } } @@ -233,6 +253,8 @@ public class FileOutputCommitter extends OutputCommitter { throws IOException { TaskAttemptID attemptId = context.getTaskAttemptID(); context.progress(); + LOG.debug("Told to move taskoutput from " + taskOutput + + " to " + jobOutputDir); if (fs.isFile(taskOutput)) { Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath); @@ -248,9 +270,11 @@ public class FileOutputCommitter extends OutputCommitter { } LOG.debug("Moved " + taskOutput + " to " + finalOutputPath); } else if(fs.getFileStatus(taskOutput).isDirectory()) { + LOG.debug("Taskoutput " + taskOutput + " is a dir"); FileStatus[] paths = fs.listStatus(taskOutput); Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath); fs.mkdirs(finalOutputPath); + LOG.debug("Creating dirs along path " + finalOutputPath); if (paths != null) { for (FileStatus path : paths) { moveTaskOutputs(context, fs, jobOutputDir, path.getPath()); @@ -281,12 +305,17 @@ public class FileOutputCommitter extends OutputCommitter { * @throws IOException */ private Path getFinalPath(Path jobOutputDir, Path taskOutput, - Path taskOutputPath) throws IOException { - URI taskOutputUri = taskOutput.toUri(); - URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri); + Path taskOutputPath) throws IOException { + URI taskOutputUri = taskOutput.makeQualified(outputFileSystem.getUri(), + outputFileSystem.getWorkingDirectory()).toUri(); + URI taskOutputPathUri = + taskOutputPath.makeQualified( + outputFileSystem.getUri(), + outputFileSystem.getWorkingDirectory()).toUri(); + URI relativePath = taskOutputPathUri.relativize(taskOutputUri); if (taskOutputUri == relativePath) { throw new IOException("Can not get the relative path: base = " + - taskOutputPath + " child = " + taskOutput); + taskOutputPathUri + " child = " + taskOutputUri); } if (relativePath.getPath().length() > 0) { return new Path(jobOutputDir, relativePath.getPath()); @@ -334,9 +363,12 @@ public class FileOutputCommitter extends OutputCommitter { Path pathToRecover = new Path(outputPath, getJobAttemptBaseDirName(previousAttempt)); + LOG.debug("Trying to recover task from " + pathToRecover + + " into " + jobOutputPath); if (outputFileSystem.exists(pathToRecover)) { // Move the task outputs to their final place - moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover); + moveJobOutputs(outputFileSystem, + pathToRecover, jobOutputPath, pathToRecover); LOG.info("Saved output of job to " + jobOutputPath); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java index c62ad644e91..2a2238587f5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java @@ -25,13 +25,17 @@ import java.net.URI; import junit.framework.TestCase; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; + @SuppressWarnings("unchecked") public class TestFileOutputCommitter extends TestCase { private static Path outDir = new Path(System.getProperty("test.build.data", @@ -65,6 +69,20 @@ public class TestFileOutputCommitter extends TestCase { } } + private void writeMapFileOutput(RecordWriter theRecordWriter, + TaskAttemptContext context) throws IOException, InterruptedException { + try { + int key = 0; + for (int i = 0 ; i < 10; ++i) { + key = i; + Text val = (i%2 == 1) ? val1 : val2; + theRecordWriter.write(new LongWritable(key), + val); + } + } finally { + theRecordWriter.close(null); + } + } public void testRecovery() throws Exception { JobConf conf = new JobConf(); @@ -91,9 +109,7 @@ public class TestFileOutputCommitter extends TestCase { FileOutputCommitter.getJobAttemptBaseDirName( conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0))); assertTrue((new File(jobTempDir1.toString()).exists())); - validateContent(jobTempDir1); - - + validateContent(jobTempDir1); //now while running the second app attempt, //recover the task output from first attempt @@ -131,6 +147,29 @@ public class TestFileOutputCommitter extends TestCase { assertEquals(output, expectedOutput.toString()); } + private void validateMapFileOutputContent( + FileSystem fs, Path dir) throws IOException { + // map output is a directory with index and data files + Path expectedMapDir = new Path(dir, partFile); + assert(fs.getFileStatus(expectedMapDir).isDirectory()); + FileStatus[] files = fs.listStatus(expectedMapDir); + int fileCount = 0; + boolean dataFileFound = false; + boolean indexFileFound = false; + for (FileStatus f : files) { + if (f.isFile()) { + ++fileCount; + if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) { + indexFileFound = true; + } + else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) { + dataFileFound = true; + } + } + } + assert(fileCount > 0); + assert(dataFileFound && indexFileFound); + } public void testCommitter() throws Exception { JobConf conf = new JobConf(); @@ -159,6 +198,31 @@ public class TestFileOutputCommitter extends TestCase { FileUtil.fullyDelete(new File(outDir.toString())); } + public void testMapFileOutputCommitter() throws Exception { + JobConf conf = new JobConf(); + FileOutputFormat.setOutputPath(conf, outDir); + conf.set(JobContext.TASK_ATTEMPT_ID, attempt); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + MapFileOutputFormat theOutputFormat = new MapFileOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(null, conf, partFile, null); + writeMapFileOutput(theRecordWriter, tContext); + + // do commit + committer.commitTask(tContext); + committer.commitJob(jContext); + + // validate output + validateMapFileOutputContent(FileSystem.get(conf), outDir); + FileUtil.fullyDelete(new File(outDir.toString())); + } public void testAbort() throws IOException, InterruptedException { JobConf conf = new JobConf(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java index 6e8941bd7ec..6708d0443c7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -26,10 +26,13 @@ import java.net.URI; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; @@ -75,6 +78,20 @@ public class TestFileOutputCommitter extends TestCase { } } + private void writeMapFileOutput(RecordWriter theRecordWriter, + TaskAttemptContext context) throws IOException, InterruptedException { + try { + int key = 0; + for (int i = 0 ; i < 10; ++i) { + key = i; + Text val = (i%2 == 1) ? val1 : val2; + theRecordWriter.write(new LongWritable(key), + val); + } + } finally { + theRecordWriter.close(context); + } + } public void testRecovery() throws Exception { Job job = Job.getInstance(); @@ -101,9 +118,7 @@ public class TestFileOutputCommitter extends TestCase { FileOutputCommitter.getJobAttemptBaseDirName( conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0))); assertTrue((new File(jobTempDir1.toString()).exists())); - validateContent(jobTempDir1); - - + validateContent(jobTempDir1); //now while running the second app attempt, //recover the task output from first attempt @@ -141,6 +156,29 @@ public class TestFileOutputCommitter extends TestCase { assertEquals(output, expectedOutput.toString()); } + private void validateMapFileOutputContent( + FileSystem fs, Path dir) throws IOException { + // map output is a directory with index and data files + Path expectedMapDir = new Path(dir, partFile); + assert(fs.getFileStatus(expectedMapDir).isDirectory()); + FileStatus[] files = fs.listStatus(expectedMapDir); + int fileCount = 0; + boolean dataFileFound = false; + boolean indexFileFound = false; + for (FileStatus f : files) { + if (f.isFile()) { + ++fileCount; + if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) { + indexFileFound = true; + } + else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) { + dataFileFound = true; + } + } + } + assert(fileCount > 0); + assert(dataFileFound && indexFileFound); + } public void testCommitter() throws Exception { Job job = Job.getInstance(); @@ -169,6 +207,32 @@ public class TestFileOutputCommitter extends TestCase { FileUtil.fullyDelete(new File(outDir.toString())); } + public void testMapFileOutputCommitter() throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + MapFileOutputFormat theOutputFormat = new MapFileOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); + writeMapFileOutput(theRecordWriter, tContext); + + // do commit + committer.commitTask(tContext); + committer.commitJob(jContext); + + // validate output + validateMapFileOutputContent(FileSystem.get(job.getConfiguration()), outDir); + FileUtil.fullyDelete(new File(outDir.toString())); + } public void testAbort() throws IOException, InterruptedException { Job job = Job.getInstance();