MAPREDUCE-4815. Speed up FileOutputCommitter#commitJob for many output files. (Siqi Li via gera)

This commit is contained in:
Gera Shegalov 2015-03-10 11:12:48 -07:00
parent a380643d20
commit aa92b764a7
5 changed files with 349 additions and 77 deletions

View File

@ -348,6 +348,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6059. Speed up history server startup time (Siqi Li via aw)
MAPREDUCE-4815. Speed up FileOutputCommitter#commitJob for many output
files. (Siqi Li via gera)
BUG FIXES
MAPREDUCE-6210. Use getApplicationAttemptId() instead of getApplicationId()

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.lib.output;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.logging.Log;
@ -25,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -57,10 +59,14 @@ public class FileOutputCommitter extends OutputCommitter {
@Deprecated
protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME;
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
"mapreduce.fileoutputcommitter.algorithm.version";
public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
private Path outputPath = null;
private Path workPath = null;
private final int algorithmVersion;
/**
* Create a file output committer
@ -87,6 +93,14 @@ public class FileOutputCommitter extends OutputCommitter {
@Private
public FileOutputCommitter(Path outputPath,
JobContext context) throws IOException {
Configuration conf = context.getConfiguration();
algorithmVersion =
conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT);
LOG.info("File Output Committer Algorithm version is " + algorithmVersion);
if (algorithmVersion != 1 && algorithmVersion != 2) {
throw new IOException("Only 1 or 2 algorithm version is supported");
}
if (outputPath != null) {
FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
this.outputPath = fs.makeQualified(outputPath);
@ -248,14 +262,14 @@ public class FileOutputCommitter extends OutputCommitter {
return new Path(getJobAttemptPath(appAttemptId, out),
String.valueOf(context.getTaskAttemptID().getTaskID()));
}
private static class CommittedTaskFilter implements PathFilter {
@Override
public boolean accept(Path path) {
return !PENDING_DIR_NAME.equals(path.getName());
}
}
/**
* Get a list of all paths where output from committed tasks are stored.
* @param context the context of the current job
@ -268,7 +282,7 @@ public class FileOutputCommitter extends OutputCommitter {
FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
}
/**
* Get the directory that the task should write results into.
* @return the work directory
@ -295,7 +309,7 @@ public class FileOutputCommitter extends OutputCommitter {
LOG.warn("Output Path is null in setupJob()");
}
}
/**
* The job has completed so move all committed tasks to the final output dir.
* Delete the temporary directory, including all of the work directories.
@ -306,8 +320,11 @@ public class FileOutputCommitter extends OutputCommitter {
if (hasOutputPath()) {
Path finalOutput = getOutputPath();
FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
for(FileStatus stat: getAllCommittedTaskPaths(context)) {
mergePaths(fs, stat, finalOutput);
if (algorithmVersion == 1) {
for (FileStatus stat: getAllCommittedTaskPaths(context)) {
mergePaths(fs, stat, finalOutput);
}
}
// delete the _temporary folder and create a _done file in the o/p folder
@ -417,27 +434,42 @@ public class FileOutputCommitter extends OutputCommitter {
@Private
public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
throws IOException {
throws IOException {
TaskAttemptID attemptId = context.getTaskAttemptID();
if (hasOutputPath()) {
context.progress();
if(taskAttemptPath == null) {
taskAttemptPath = getTaskAttemptPath(context);
}
Path committedTaskPath = getCommittedTaskPath(context);
FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
if (fs.exists(taskAttemptPath)) {
if(fs.exists(committedTaskPath)) {
if(!fs.delete(committedTaskPath, true)) {
throw new IOException("Could not delete " + committedTaskPath);
FileStatus taskAttemptDirStatus;
try {
taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
} catch (FileNotFoundException e) {
taskAttemptDirStatus = null;
}
if (taskAttemptDirStatus != null) {
if (algorithmVersion == 1) {
Path committedTaskPath = getCommittedTaskPath(context);
if (fs.exists(committedTaskPath)) {
if (!fs.delete(committedTaskPath, true)) {
throw new IOException("Could not delete " + committedTaskPath);
}
}
if (!fs.rename(taskAttemptPath, committedTaskPath)) {
throw new IOException("Could not rename " + taskAttemptPath + " to "
+ committedTaskPath);
}
LOG.info("Saved output of task '" + attemptId + "' to " +
committedTaskPath);
} else {
// directly merge everything from taskAttemptPath to output directory
mergePaths(fs, taskAttemptDirStatus, outputPath);
LOG.info("Saved output of task '" + attemptId + "' to " +
outputPath);
}
if(!fs.rename(taskAttemptPath, committedTaskPath)) {
throw new IOException("Could not rename " + taskAttemptPath + " to "
+ committedTaskPath);
}
LOG.info("Saved output of task '" + attemptId + "' to " +
committedTaskPath);
} else {
LOG.warn("No Output found for " + attemptId);
}
@ -511,32 +543,43 @@ public class FileOutputCommitter extends OutputCommitter {
throw new IOException ("Cannot recover task output for first attempt...");
}
Path committedTaskPath = getCommittedTaskPath(context);
Path previousCommittedTaskPath = getCommittedTaskPath(
previousAttempt, context);
FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration());
LOG.debug("Trying to recover task from " + previousCommittedTaskPath
+ " into " + committedTaskPath);
if (fs.exists(previousCommittedTaskPath)) {
if(fs.exists(committedTaskPath)) {
if(!fs.delete(committedTaskPath, true)) {
throw new IOException("Could not delete "+committedTaskPath);
LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
if (algorithmVersion == 1) {
if (fs.exists(previousCommittedTaskPath)) {
Path committedTaskPath = getCommittedTaskPath(context);
if (fs.exists(committedTaskPath)) {
if (!fs.delete(committedTaskPath, true)) {
throw new IOException("Could not delete "+committedTaskPath);
}
}
//Rename can fail if the parent directory does not yet exist.
Path committedParent = committedTaskPath.getParent();
fs.mkdirs(committedParent);
if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
throw new IOException("Could not rename " + previousCommittedTaskPath +
" to " + committedTaskPath);
}
} else {
LOG.warn(attemptId+" had no output to recover.");
}
//Rename can fail if the parent directory does not yet exist.
Path committedParent = committedTaskPath.getParent();
fs.mkdirs(committedParent);
if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
throw new IOException("Could not rename " + previousCommittedTaskPath +
" to " + committedTaskPath);
}
LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
} else {
LOG.warn(attemptId+" had no output to recover.");
// essentially a no-op, but for backwards compatibility
// after upgrade to the new fileOutputCommitter,
// check if there are any output left in committedTaskPath
if (fs.exists(previousCommittedTaskPath)) {
LOG.info("Recovering task for upgrading scenario, moving files from "
+ previousCommittedTaskPath + " to " + outputPath);
FileStatus from = fs.getFileStatus(previousCommittedTaskPath);
mergePaths(fs, from, outputPath);
}
LOG.info("Done recovering task " + attemptId);
}
} else {
LOG.warn("Output Path is null in recoverTask()");
}
}
}
}

View File

@ -1317,6 +1317,60 @@
contact with the RM has been re-established.</description>
</property>
<property>
<name>mapreduce.fileoutputcommitter.algorithm.version</name>
<value>1</value>
<description>The file output committer algorithm version
valid algorithm version number: 1 or 2
default to 1, which is the original algorithm
In algorithm version 1,
1. commitTask will rename directory
$joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/
to
$joboutput/_temporary/$appAttemptID/$taskID/
2. recoverTask will also do a rename
$joboutput/_temporary/$appAttemptID/$taskID/
to
$joboutput/_temporary/($appAttemptID + 1)/$taskID/
3. commitJob will merge every task output file in
$joboutput/_temporary/$appAttemptID/$taskID/
to
$joboutput/, then it will delete $joboutput/_temporary/
and write $joboutput/_SUCCESS
It has a performance regression, which is discussed in MAPREDUCE-4815.
If a job generates many files to commit then the commitJob
method call at the end of the job can take minutes.
the commit is single-threaded and waits until all
tasks have completed before commencing.
algorithm version 2 will change the behavior of commitTask,
recoverTask, and commitJob.
1. commitTask will rename all files in
$joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/
to $joboutput/
2. recoverTask actually doesn't require to do anything, but for
upgrade from version 1 to version 2 case, it will check if there
are any files in
$joboutput/_temporary/($appAttemptID - 1)/$taskID/
and rename them to $joboutput/
3. commitJob can simply delete $joboutput/_temporary and write
$joboutput/_SUCCESS
This algorithm will reduce the output commit time for
large jobs by having the tasks commit directly to the final
output directory as they were completing and commitJob had
very little to do.
</description>
</property>
<property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value>

View File

@ -50,7 +50,6 @@ public class TestFileOutputCommitter extends TestCase {
private Text val1 = new Text("val1");
private Text val2 = new Text("val2");
private void writeOutput(RecordWriter theRecordWriter,
TaskAttemptContext context) throws IOException, InterruptedException {
NullWritable nullWritable = NullWritable.get();
@ -83,12 +82,16 @@ public class TestFileOutputCommitter extends TestCase {
theRecordWriter.close(null);
}
}
public void testRecovery() throws Exception {
JobConf conf = new JobConf();
private void testRecoveryInternal(int commitVersion, int recoveryVersion)
throws Exception {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
commitVersion);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
@ -99,7 +102,7 @@ public class TestFileOutputCommitter extends TestCase {
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter =
RecordWriter theRecordWriter =
theOutputFormat.getRecordWriter(null, conf, partFile, null);
writeOutput(theRecordWriter, tContext);
@ -107,31 +110,59 @@ public class TestFileOutputCommitter extends TestCase {
if(committer.needsTaskCommit(tContext)) {
committer.commitTask(tContext);
}
Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
File jtd1 = new File(jobTempDir1.toUri().getPath());
assertTrue(jtd1.exists());
validateContent(jobTempDir1);
//now while running the second app attempt,
if (commitVersion == 1) {
assertTrue("Version 1 commits to temporary dir " + jtd1, jtd1.exists());
validateContent(jobTempDir1);
} else {
assertFalse("Version 2 commits to output dir " + jtd1, jtd1.exists());
}
//now while running the second app attempt,
//recover the task output from first attempt
JobConf conf2 = new JobConf(conf);
conf2.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf2.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
conf2.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
recoveryVersion);
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
FileOutputCommitter committer2 = new FileOutputCommitter();
committer2.setupJob(jContext2);
Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
committer2.recoverTask(tContext2);
Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
File jtd2 = new File(jobTempDir2.toUri().getPath());
assertTrue(jtd2.exists());
validateContent(jobTempDir2);
if (recoveryVersion == 1) {
assertTrue("Version 1 recovers to " + jtd2, jtd2.exists());
validateContent(jobTempDir2);
} else {
assertFalse("Version 2 commits to output dir " + jtd2, jtd2.exists());
if (commitVersion == 1) {
assertTrue("Version 2 recovery moves to output dir from "
+ jtd1 , jtd1.list().length == 0);
}
}
committer2.commitJob(jContext2);
validateContent(outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testRecoveryV1() throws Exception {
testRecoveryInternal(1, 1);
}
public void testRecoveryV2() throws Exception {
testRecoveryInternal(2, 2);
}
public void testRecoveryUpgradeV1V2() throws Exception {
testRecoveryInternal(1, 2);
}
private void validateContent(Path dir) throws IOException {
File fdir = new File(dir.toUri().getPath());
@ -170,11 +201,13 @@ public class TestFileOutputCommitter extends TestCase {
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
public void testCommitter() throws Exception {
private void testCommitterInternal(int version) throws Exception {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
@ -200,21 +233,33 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testMapFileOutputCommitter() throws Exception {
public void testCommitterV1() throws Exception {
testCommitterInternal(1);
}
public void testCommitterV2() throws Exception {
testCommitterInternal(2);
}
private void testMapFileOutputCommitterInternal(int version)
throws Exception {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
FileOutputCommitter committer = new FileOutputCommitter();
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(null, conf, partFile, null);
RecordWriter theRecordWriter =
theOutputFormat.getRecordWriter(null, conf, partFile, null);
writeMapFileOutput(theRecordWriter, tContext);
// do commit
@ -227,11 +272,29 @@ public class TestFileOutputCommitter extends TestCase {
validateMapFileOutputContent(FileSystem.get(conf), outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testMapOnlyNoOutput() throws Exception {
public void testMapFileOutputCommitterV1() throws Exception {
testMapFileOutputCommitterInternal(1);
}
public void testMapFileOutputCommitterV2() throws Exception {
testMapFileOutputCommitterInternal(2);
}
public void testMapOnlyNoOutputV1() throws Exception {
testMapOnlyNoOutputInternal(1);
}
public void testMapOnlyNoOutputV2() throws Exception {
testMapOnlyNoOutputInternal(2);
}
private void testMapOnlyNoOutputInternal(int version) throws Exception {
JobConf conf = new JobConf();
//This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
@ -249,11 +312,14 @@ public class TestFileOutputCommitter extends TestCase {
// validate output
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testAbort() throws IOException, InterruptedException {
private void testAbortInternal(int version)
throws IOException, InterruptedException {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
@ -283,6 +349,14 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(out);
}
public void testAbortV1() throws Exception {
testAbortInternal(1);
}
public void testAbortV2() throws Exception {
testAbortInternal(2);
}
public static class FakeFileSystem extends RawLocalFileSystem {
public FakeFileSystem() {
super();
@ -299,11 +373,14 @@ public class TestFileOutputCommitter extends TestCase {
}
public void testFailAbort() throws IOException, InterruptedException {
private void testFailAbortInternal(int version)
throws IOException, InterruptedException {
JobConf conf = new JobConf();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
FileOutputFormat.setOutputPath(conf, outDir);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
@ -353,6 +430,13 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testFailAbortV1() throws Exception {
testFailAbortInternal(1);
}
public void testFailAbortV2() throws Exception {
testFailAbortInternal(2);
}
public static String slurp(File f) throws IOException {
int len = (int) f.length();
byte[] buf = new byte[len];

View File

@ -109,12 +109,15 @@ public class TestFileOutputCommitter extends TestCase {
}
}
public void testRecovery() throws Exception {
private void testRecoveryInternal(int commitVersion, int recoveryVersion)
throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
commitVersion);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@ -130,32 +133,59 @@ public class TestFileOutputCommitter extends TestCase {
// do commit
committer.commitTask(tContext);
Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
File jtd = new File(jobTempDir1.toUri().getPath());
assertTrue(jtd.exists());
validateContent(jtd);
if (commitVersion == 1) {
assertTrue("Version 1 commits to temporary dir " + jtd, jtd.exists());
validateContent(jtd);
} else {
assertFalse("Version 2 commits to output dir " + jtd, jtd.exists());
}
//now while running the second app attempt,
//recover the task output from first attempt
Configuration conf2 = job.getConfiguration();
conf2.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
conf2.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
recoveryVersion);
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2);
committer2.setupJob(tContext2);
Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
File jtd2 = new File(jobTempDir2.toUri().getPath());
committer2.recoverTask(tContext2);
assertTrue(jtd2.exists());
validateContent(jtd2);
if (recoveryVersion == 1) {
assertTrue("Version 1 recovers to " + jtd2, jtd2.exists());
validateContent(jtd2);
} else {
assertFalse("Version 2 commits to output dir " + jtd2, jtd2.exists());
if (commitVersion == 1) {
assertTrue("Version 2 recovery moves to output dir from "
+ jtd , jtd.list().length == 0);
}
}
committer2.commitJob(jContext2);
validateContent(outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testRecoveryV1() throws Exception {
testRecoveryInternal(1, 1);
}
public void testRecoveryV2() throws Exception {
testRecoveryInternal(2, 2);
}
public void testRecoveryUpgradeV1V2() throws Exception {
testRecoveryInternal(1, 2);
}
private void validateContent(Path dir) throws IOException {
validateContent(new File(dir.toUri().getPath()));
}
@ -197,12 +227,14 @@ public class TestFileOutputCommitter extends TestCase {
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
public void testCommitter() throws Exception {
Job job = Job.getInstance();
private void testCommitterInternal(int version) throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@ -225,11 +257,22 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testMapFileOutputCommitter() throws Exception {
public void testCommitterV1() throws Exception {
testCommitterInternal(1);
}
public void testCommitterV2() throws Exception {
testCommitterInternal(2);
}
private void testMapFileOutputCommitterInternal(int version)
throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@ -251,12 +294,38 @@ public class TestFileOutputCommitter extends TestCase {
validateMapFileOutputContent(FileSystem.get(job.getConfiguration()), outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testMapFileOutputCommitterV1() throws Exception {
testMapFileOutputCommitterInternal(1);
}
public void testAbort() throws IOException, InterruptedException {
public void testMapFileOutputCommitterV2() throws Exception {
testMapFileOutputCommitterInternal(2);
}
public void testInvalidVersionNumber() throws IOException {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 3);
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
try {
new FileOutputCommitter(outDir, tContext);
fail("should've thrown an exception!");
} catch (IOException e) {
//test passed
}
}
private void testAbortInternal(int version)
throws IOException, InterruptedException {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@ -285,6 +354,14 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testAbortV1() throws IOException, InterruptedException {
testAbortInternal(1);
}
public void testAbortV2() throws IOException, InterruptedException {
testAbortInternal(2);
}
public static class FakeFileSystem extends RawLocalFileSystem {
public FakeFileSystem() {
super();
@ -301,13 +378,16 @@ public class TestFileOutputCommitter extends TestCase {
}
public void testFailAbort() throws IOException, InterruptedException {
private void testFailAbortInternal(int version)
throws IOException, InterruptedException {
Job job = Job.getInstance();
Configuration conf = job.getConfiguration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
FileOutputFormat.setOutputPath(job, outDir);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
@ -353,6 +433,14 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testFailAbortV1() throws Exception {
testFailAbortInternal(1);
}
public void testFailAbortV2() throws Exception {
testFailAbortInternal(2);
}
public static String slurp(File f) throws IOException {
int len = (int) f.length();
byte[] buf = new byte[len];