MAPREDUCE-4815. Speed up FileOutputCommitter#commitJob for many output files. (Siqi Li via gera)

(cherry picked from commit aa92b764a7)
This commit is contained in:
Gera Shegalov 2015-03-10 11:12:48 -07:00
parent 8c5642296d
commit fca89922c2
5 changed files with 349 additions and 77 deletions

View File

@ -89,6 +89,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6059. Speed up history server startup time (Siqi Li via aw) MAPREDUCE-6059. Speed up history server startup time (Siqi Li via aw)
MAPREDUCE-4815. Speed up FileOutputCommitter#commitJob for many output
files. (Siqi Li via gera)
BUG FIXES BUG FIXES
MAPREDUCE-6210. Use getApplicationAttemptId() instead of getApplicationId() MAPREDUCE-6210. Use getApplicationAttemptId() instead of getApplicationId()

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.lib.output; package org.apache.hadoop.mapreduce.lib.output;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -25,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -59,8 +61,12 @@ public class FileOutputCommitter extends OutputCommitter {
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
"mapreduce.fileoutputcommitter.marksuccessfuljobs"; "mapreduce.fileoutputcommitter.marksuccessfuljobs";
public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
"mapreduce.fileoutputcommitter.algorithm.version";
public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
private Path outputPath = null; private Path outputPath = null;
private Path workPath = null; private Path workPath = null;
private final int algorithmVersion;
/** /**
* Create a file output committer * Create a file output committer
@ -87,6 +93,14 @@ public class FileOutputCommitter extends OutputCommitter {
@Private @Private
public FileOutputCommitter(Path outputPath, public FileOutputCommitter(Path outputPath,
JobContext context) throws IOException { JobContext context) throws IOException {
Configuration conf = context.getConfiguration();
algorithmVersion =
conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT);
LOG.info("File Output Committer Algorithm version is " + algorithmVersion);
if (algorithmVersion != 1 && algorithmVersion != 2) {
throw new IOException("Only 1 or 2 algorithm version is supported");
}
if (outputPath != null) { if (outputPath != null) {
FileSystem fs = outputPath.getFileSystem(context.getConfiguration()); FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
this.outputPath = fs.makeQualified(outputPath); this.outputPath = fs.makeQualified(outputPath);
@ -306,9 +320,12 @@ public class FileOutputCommitter extends OutputCommitter {
if (hasOutputPath()) { if (hasOutputPath()) {
Path finalOutput = getOutputPath(); Path finalOutput = getOutputPath();
FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
if (algorithmVersion == 1) {
for (FileStatus stat: getAllCommittedTaskPaths(context)) { for (FileStatus stat: getAllCommittedTaskPaths(context)) {
mergePaths(fs, stat, finalOutput); mergePaths(fs, stat, finalOutput);
} }
}
// delete the _temporary folder and create a _done file in the o/p folder // delete the _temporary folder and create a _done file in the o/p folder
cleanupJob(context); cleanupJob(context);
@ -418,15 +435,24 @@ public class FileOutputCommitter extends OutputCommitter {
@Private @Private
public void commitTask(TaskAttemptContext context, Path taskAttemptPath) public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
throws IOException { throws IOException {
TaskAttemptID attemptId = context.getTaskAttemptID(); TaskAttemptID attemptId = context.getTaskAttemptID();
if (hasOutputPath()) { if (hasOutputPath()) {
context.progress(); context.progress();
if(taskAttemptPath == null) { if(taskAttemptPath == null) {
taskAttemptPath = getTaskAttemptPath(context); taskAttemptPath = getTaskAttemptPath(context);
} }
Path committedTaskPath = getCommittedTaskPath(context);
FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
if (fs.exists(taskAttemptPath)) { FileStatus taskAttemptDirStatus;
try {
taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
} catch (FileNotFoundException e) {
taskAttemptDirStatus = null;
}
if (taskAttemptDirStatus != null) {
if (algorithmVersion == 1) {
Path committedTaskPath = getCommittedTaskPath(context);
if (fs.exists(committedTaskPath)) { if (fs.exists(committedTaskPath)) {
if (!fs.delete(committedTaskPath, true)) { if (!fs.delete(committedTaskPath, true)) {
throw new IOException("Could not delete " + committedTaskPath); throw new IOException("Could not delete " + committedTaskPath);
@ -438,6 +464,12 @@ public class FileOutputCommitter extends OutputCommitter {
} }
LOG.info("Saved output of task '" + attemptId + "' to " + LOG.info("Saved output of task '" + attemptId + "' to " +
committedTaskPath); committedTaskPath);
} else {
// directly merge everything from taskAttemptPath to output directory
mergePaths(fs, taskAttemptDirStatus, outputPath);
LOG.info("Saved output of task '" + attemptId + "' to " +
outputPath);
}
} else { } else {
LOG.warn("No Output found for " + attemptId); LOG.warn("No Output found for " + attemptId);
} }
@ -511,14 +543,14 @@ public class FileOutputCommitter extends OutputCommitter {
throw new IOException ("Cannot recover task output for first attempt..."); throw new IOException ("Cannot recover task output for first attempt...");
} }
Path committedTaskPath = getCommittedTaskPath(context);
Path previousCommittedTaskPath = getCommittedTaskPath( Path previousCommittedTaskPath = getCommittedTaskPath(
previousAttempt, context); previousAttempt, context);
FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration()); FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration());
LOG.debug("Trying to recover task from " + previousCommittedTaskPath LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
+ " into " + committedTaskPath); if (algorithmVersion == 1) {
if (fs.exists(previousCommittedTaskPath)) { if (fs.exists(previousCommittedTaskPath)) {
Path committedTaskPath = getCommittedTaskPath(context);
if (fs.exists(committedTaskPath)) { if (fs.exists(committedTaskPath)) {
if (!fs.delete(committedTaskPath, true)) { if (!fs.delete(committedTaskPath, true)) {
throw new IOException("Could not delete "+committedTaskPath); throw new IOException("Could not delete "+committedTaskPath);
@ -531,10 +563,21 @@ public class FileOutputCommitter extends OutputCommitter {
throw new IOException("Could not rename " + previousCommittedTaskPath + throw new IOException("Could not rename " + previousCommittedTaskPath +
" to " + committedTaskPath); " to " + committedTaskPath);
} }
LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
} else { } else {
LOG.warn(attemptId+" had no output to recover."); LOG.warn(attemptId+" had no output to recover.");
} }
} else {
// essentially a no-op, but for backwards compatibility
// after upgrade to the new fileOutputCommitter,
// check if there are any output left in committedTaskPath
if (fs.exists(previousCommittedTaskPath)) {
LOG.info("Recovering task for upgrading scenario, moving files from "
+ previousCommittedTaskPath + " to " + outputPath);
FileStatus from = fs.getFileStatus(previousCommittedTaskPath);
mergePaths(fs, from, outputPath);
}
LOG.info("Done recovering task " + attemptId);
}
} else { } else {
LOG.warn("Output Path is null in recoverTask()"); LOG.warn("Output Path is null in recoverTask()");
} }

View File

@ -1740,6 +1740,60 @@
contact with the RM has been re-established.</description> contact with the RM has been re-established.</description>
</property> </property>
<property>
<name>mapreduce.fileoutputcommitter.algorithm.version</name>
<value>1</value>
<description>The file output committer algorithm version
valid algorithm version number: 1 or 2
default to 1, which is the original algorithm
In algorithm version 1,
1. commitTask will rename directory
$joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/
to
$joboutput/_temporary/$appAttemptID/$taskID/
2. recoverTask will also do a rename
$joboutput/_temporary/$appAttemptID/$taskID/
to
$joboutput/_temporary/($appAttemptID + 1)/$taskID/
3. commitJob will merge every task output file in
$joboutput/_temporary/$appAttemptID/$taskID/
to
$joboutput/, then it will delete $joboutput/_temporary/
and write $joboutput/_SUCCESS
It has a performance regression, which is discussed in MAPREDUCE-4815.
If a job generates many files to commit then the commitJob
method call at the end of the job can take minutes.
the commit is single-threaded and waits until all
tasks have completed before commencing.
algorithm version 2 will change the behavior of commitTask,
recoverTask, and commitJob.
1. commitTask will rename all files in
$joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/
to $joboutput/
2. recoverTask actually doesn't require to do anything, but for
upgrade from version 1 to version 2 case, it will check if there
are any files in
$joboutput/_temporary/($appAttemptID - 1)/$taskID/
and rename them to $joboutput/
3. commitJob can simply delete $joboutput/_temporary and write
$joboutput/_SUCCESS
This algorithm will reduce the output commit time for
large jobs by having the tasks commit directly to the final
output directory as they were completing and commitJob had
very little to do.
</description>
</property>
<property> <property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name> <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value> <value>1000</value>

View File

@ -50,7 +50,6 @@ public class TestFileOutputCommitter extends TestCase {
private Text val1 = new Text("val1"); private Text val1 = new Text("val1");
private Text val2 = new Text("val2"); private Text val2 = new Text("val2");
private void writeOutput(RecordWriter theRecordWriter, private void writeOutput(RecordWriter theRecordWriter,
TaskAttemptContext context) throws IOException, InterruptedException { TaskAttemptContext context) throws IOException, InterruptedException {
NullWritable nullWritable = NullWritable.get(); NullWritable nullWritable = NullWritable.get();
@ -84,11 +83,15 @@ public class TestFileOutputCommitter extends TestCase {
} }
} }
public void testRecovery() throws Exception { private void testRecoveryInternal(int commitVersion, int recoveryVersion)
throws Exception {
JobConf conf = new JobConf(); JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir); FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt); conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1); conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
commitVersion);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(); FileOutputCommitter committer = new FileOutputCommitter();
@ -107,31 +110,59 @@ public class TestFileOutputCommitter extends TestCase {
if(committer.needsTaskCommit(tContext)) { if(committer.needsTaskCommit(tContext)) {
committer.commitTask(tContext); committer.commitTask(tContext);
} }
Path jobTempDir1 = committer.getCommittedTaskPath(tContext); Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
File jtd1 = new File(jobTempDir1.toUri().getPath()); File jtd1 = new File(jobTempDir1.toUri().getPath());
assertTrue(jtd1.exists()); if (commitVersion == 1) {
assertTrue("Version 1 commits to temporary dir " + jtd1, jtd1.exists());
validateContent(jobTempDir1); validateContent(jobTempDir1);
} else {
assertFalse("Version 2 commits to output dir " + jtd1, jtd1.exists());
}
//now while running the second app attempt, //now while running the second app attempt,
//recover the task output from first attempt //recover the task output from first attempt
JobConf conf2 = new JobConf(conf); JobConf conf2 = new JobConf(conf);
conf2.set(JobContext.TASK_ATTEMPT_ID, attempt); conf2.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf2.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2); conf2.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
conf2.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
recoveryVersion);
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID()); JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID); TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
FileOutputCommitter committer2 = new FileOutputCommitter(); FileOutputCommitter committer2 = new FileOutputCommitter();
committer2.setupJob(jContext2); committer2.setupJob(jContext2);
Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
committer2.recoverTask(tContext2); committer2.recoverTask(tContext2);
Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
File jtd2 = new File(jobTempDir2.toUri().getPath()); File jtd2 = new File(jobTempDir2.toUri().getPath());
assertTrue(jtd2.exists()); if (recoveryVersion == 1) {
assertTrue("Version 1 recovers to " + jtd2, jtd2.exists());
validateContent(jobTempDir2); validateContent(jobTempDir2);
} else {
assertFalse("Version 2 commits to output dir " + jtd2, jtd2.exists());
if (commitVersion == 1) {
assertTrue("Version 2 recovery moves to output dir from "
+ jtd1 , jtd1.list().length == 0);
}
}
committer2.commitJob(jContext2); committer2.commitJob(jContext2);
validateContent(outDir); validateContent(outDir);
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
public void testRecoveryV1() throws Exception {
testRecoveryInternal(1, 1);
}
public void testRecoveryV2() throws Exception {
testRecoveryInternal(2, 2);
}
public void testRecoveryUpgradeV1V2() throws Exception {
testRecoveryInternal(1, 2);
}
private void validateContent(Path dir) throws IOException { private void validateContent(Path dir) throws IOException {
File fdir = new File(dir.toUri().getPath()); File fdir = new File(dir.toUri().getPath());
@ -171,10 +202,12 @@ public class TestFileOutputCommitter extends TestCase {
assert(dataFileFound && indexFileFound); assert(dataFileFound && indexFileFound);
} }
public void testCommitter() throws Exception { private void testCommitterInternal(int version) throws Exception {
JobConf conf = new JobConf(); JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir); FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt); conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(); FileOutputCommitter committer = new FileOutputCommitter();
@ -200,10 +233,21 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
public void testMapFileOutputCommitter() throws Exception { public void testCommitterV1() throws Exception {
testCommitterInternal(1);
}
public void testCommitterV2() throws Exception {
testCommitterInternal(2);
}
private void testMapFileOutputCommitterInternal(int version)
throws Exception {
JobConf conf = new JobConf(); JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir); FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt); conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(); FileOutputCommitter committer = new FileOutputCommitter();
@ -214,7 +258,8 @@ public class TestFileOutputCommitter extends TestCase {
// write output // write output
MapFileOutputFormat theOutputFormat = new MapFileOutputFormat(); MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(null, conf, partFile, null); RecordWriter theRecordWriter =
theOutputFormat.getRecordWriter(null, conf, partFile, null);
writeMapFileOutput(theRecordWriter, tContext); writeMapFileOutput(theRecordWriter, tContext);
// do commit // do commit
@ -228,10 +273,28 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
public void testMapOnlyNoOutput() throws Exception { public void testMapFileOutputCommitterV1() throws Exception {
testMapFileOutputCommitterInternal(1);
}
public void testMapFileOutputCommitterV2() throws Exception {
testMapFileOutputCommitterInternal(2);
}
public void testMapOnlyNoOutputV1() throws Exception {
testMapOnlyNoOutputInternal(1);
}
public void testMapOnlyNoOutputV2() throws Exception {
testMapOnlyNoOutputInternal(2);
}
private void testMapOnlyNoOutputInternal(int version) throws Exception {
JobConf conf = new JobConf(); JobConf conf = new JobConf();
//This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir); //This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt); conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(); FileOutputCommitter committer = new FileOutputCommitter();
@ -250,10 +313,13 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
public void testAbort() throws IOException, InterruptedException { private void testAbortInternal(int version)
throws IOException, InterruptedException {
JobConf conf = new JobConf(); JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir); FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt); conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(); FileOutputCommitter committer = new FileOutputCommitter();
@ -283,6 +349,14 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(out); FileUtil.fullyDelete(out);
} }
public void testAbortV1() throws Exception {
testAbortInternal(1);
}
public void testAbortV2() throws Exception {
testAbortInternal(2);
}
public static class FakeFileSystem extends RawLocalFileSystem { public static class FakeFileSystem extends RawLocalFileSystem {
public FakeFileSystem() { public FakeFileSystem() {
super(); super();
@ -299,11 +373,14 @@ public class TestFileOutputCommitter extends TestCase {
} }
public void testFailAbort() throws IOException, InterruptedException { private void testFailAbortInternal(int version)
throws IOException, InterruptedException {
JobConf conf = new JobConf(); JobConf conf = new JobConf();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///"); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class); conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt); conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1); conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
FileOutputFormat.setOutputPath(conf, outDir); FileOutputFormat.setOutputPath(conf, outDir);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
@ -353,6 +430,13 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
public void testFailAbortV1() throws Exception {
testFailAbortInternal(1);
}
public void testFailAbortV2() throws Exception {
testFailAbortInternal(2);
}
public static String slurp(File f) throws IOException { public static String slurp(File f) throws IOException {
int len = (int) f.length(); int len = (int) f.length();
byte[] buf = new byte[len]; byte[] buf = new byte[len];

View File

@ -109,12 +109,15 @@ public class TestFileOutputCommitter extends TestCase {
} }
} }
public void testRecovery() throws Exception { private void testRecoveryInternal(int commitVersion, int recoveryVersion)
throws Exception {
Job job = Job.getInstance(); Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir); FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration(); Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
commitVersion);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@ -130,16 +133,23 @@ public class TestFileOutputCommitter extends TestCase {
// do commit // do commit
committer.commitTask(tContext); committer.commitTask(tContext);
Path jobTempDir1 = committer.getCommittedTaskPath(tContext); Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
File jtd = new File(jobTempDir1.toUri().getPath()); File jtd = new File(jobTempDir1.toUri().getPath());
assertTrue(jtd.exists()); if (commitVersion == 1) {
assertTrue("Version 1 commits to temporary dir " + jtd, jtd.exists());
validateContent(jtd); validateContent(jtd);
} else {
assertFalse("Version 2 commits to output dir " + jtd, jtd.exists());
}
//now while running the second app attempt, //now while running the second app attempt,
//recover the task output from first attempt //recover the task output from first attempt
Configuration conf2 = job.getConfiguration(); Configuration conf2 = job.getConfiguration();
conf2.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf2.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2); conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
conf2.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
recoveryVersion);
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID()); JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID); TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2); FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2);
@ -148,14 +158,34 @@ public class TestFileOutputCommitter extends TestCase {
File jtd2 = new File(jobTempDir2.toUri().getPath()); File jtd2 = new File(jobTempDir2.toUri().getPath());
committer2.recoverTask(tContext2); committer2.recoverTask(tContext2);
assertTrue(jtd2.exists()); if (recoveryVersion == 1) {
assertTrue("Version 1 recovers to " + jtd2, jtd2.exists());
validateContent(jtd2); validateContent(jtd2);
} else {
assertFalse("Version 2 commits to output dir " + jtd2, jtd2.exists());
if (commitVersion == 1) {
assertTrue("Version 2 recovery moves to output dir from "
+ jtd , jtd.list().length == 0);
}
}
committer2.commitJob(jContext2); committer2.commitJob(jContext2);
validateContent(outDir); validateContent(outDir);
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
public void testRecoveryV1() throws Exception {
testRecoveryInternal(1, 1);
}
public void testRecoveryV2() throws Exception {
testRecoveryInternal(2, 2);
}
public void testRecoveryUpgradeV1V2() throws Exception {
testRecoveryInternal(1, 2);
}
private void validateContent(Path dir) throws IOException { private void validateContent(Path dir) throws IOException {
validateContent(new File(dir.toUri().getPath())); validateContent(new File(dir.toUri().getPath()));
} }
@ -198,11 +228,13 @@ public class TestFileOutputCommitter extends TestCase {
assert(dataFileFound && indexFileFound); assert(dataFileFound && indexFileFound);
} }
public void testCommitter() throws Exception { private void testCommitterInternal(int version) throws Exception {
Job job = Job.getInstance(); Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir); FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration(); Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@ -225,11 +257,22 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
public void testMapFileOutputCommitter() throws Exception { public void testCommitterV1() throws Exception {
testCommitterInternal(1);
}
public void testCommitterV2() throws Exception {
testCommitterInternal(2);
}
private void testMapFileOutputCommitterInternal(int version)
throws Exception {
Job job = Job.getInstance(); Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir); FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration(); Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@ -252,11 +295,37 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
public void testAbort() throws IOException, InterruptedException { public void testMapFileOutputCommitterV1() throws Exception {
testMapFileOutputCommitterInternal(1);
}
public void testMapFileOutputCommitterV2() throws Exception {
testMapFileOutputCommitterInternal(2);
}
public void testInvalidVersionNumber() throws IOException {
Job job = Job.getInstance(); Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir); FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration(); Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 3);
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
try {
new FileOutputCommitter(outDir, tContext);
fail("should've thrown an exception!");
} catch (IOException e) {
//test passed
}
}
private void testAbortInternal(int version)
throws IOException, InterruptedException {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@ -285,6 +354,14 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
public void testAbortV1() throws IOException, InterruptedException {
testAbortInternal(1);
}
public void testAbortV2() throws IOException, InterruptedException {
testAbortInternal(2);
}
public static class FakeFileSystem extends RawLocalFileSystem { public static class FakeFileSystem extends RawLocalFileSystem {
public FakeFileSystem() { public FakeFileSystem() {
super(); super();
@ -301,13 +378,16 @@ public class TestFileOutputCommitter extends TestCase {
} }
public void testFailAbort() throws IOException, InterruptedException { private void testFailAbortInternal(int version)
throws IOException, InterruptedException {
Job job = Job.getInstance(); Job job = Job.getInstance();
Configuration conf = job.getConfiguration(); Configuration conf = job.getConfiguration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///"); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class); conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
FileOutputFormat.setOutputPath(job, outDir); FileOutputFormat.setOutputPath(job, outDir);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
@ -353,6 +433,14 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString())); FileUtil.fullyDelete(new File(outDir.toString()));
} }
public void testFailAbortV1() throws Exception {
testFailAbortInternal(1);
}
public void testFailAbortV2() throws Exception {
testFailAbortInternal(2);
}
public static String slurp(File f) throws IOException { public static String slurp(File f) throws IOException {
int len = (int) f.length(); int len = (int) f.length();
byte[] buf = new byte[len]; byte[] buf = new byte[len];