MAPREDUCE-4815. Speed up FileOutputCommitter#commitJob for many output files. (Siqi Li via gera)
(cherry picked from commit aa92b764a7
)
This commit is contained in:
parent
2690c72522
commit
6802e8fefc
|
@ -103,6 +103,9 @@ Release 2.7.0 - UNRELEASED
|
|||
|
||||
MAPREDUCE-6059. Speed up history server startup time (Siqi Li via aw)
|
||||
|
||||
MAPREDUCE-4815. Speed up FileOutputCommitter#commitJob for many output
|
||||
files. (Siqi Li via gera)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
MAPREDUCE-6210. Use getApplicationAttemptId() instead of getApplicationId()
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.lib.output;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -25,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -59,8 +61,12 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
|
||||
public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
|
||||
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
|
||||
public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
|
||||
"mapreduce.fileoutputcommitter.algorithm.version";
|
||||
public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
|
||||
private Path outputPath = null;
|
||||
private Path workPath = null;
|
||||
private final int algorithmVersion;
|
||||
|
||||
/**
|
||||
* Create a file output committer
|
||||
|
@ -87,6 +93,14 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
@Private
|
||||
public FileOutputCommitter(Path outputPath,
|
||||
JobContext context) throws IOException {
|
||||
Configuration conf = context.getConfiguration();
|
||||
algorithmVersion =
|
||||
conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
|
||||
FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT);
|
||||
LOG.info("File Output Committer Algorithm version is " + algorithmVersion);
|
||||
if (algorithmVersion != 1 && algorithmVersion != 2) {
|
||||
throw new IOException("Only 1 or 2 algorithm version is supported");
|
||||
}
|
||||
if (outputPath != null) {
|
||||
FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
|
||||
this.outputPath = fs.makeQualified(outputPath);
|
||||
|
@ -306,9 +320,12 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
if (hasOutputPath()) {
|
||||
Path finalOutput = getOutputPath();
|
||||
FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
|
||||
for(FileStatus stat: getAllCommittedTaskPaths(context)) {
|
||||
|
||||
if (algorithmVersion == 1) {
|
||||
for (FileStatus stat: getAllCommittedTaskPaths(context)) {
|
||||
mergePaths(fs, stat, finalOutput);
|
||||
}
|
||||
}
|
||||
|
||||
// delete the _temporary folder and create a _done file in the o/p folder
|
||||
cleanupJob(context);
|
||||
|
@ -418,26 +435,41 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
@Private
|
||||
public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
|
||||
throws IOException {
|
||||
|
||||
TaskAttemptID attemptId = context.getTaskAttemptID();
|
||||
if (hasOutputPath()) {
|
||||
context.progress();
|
||||
if(taskAttemptPath == null) {
|
||||
taskAttemptPath = getTaskAttemptPath(context);
|
||||
}
|
||||
Path committedTaskPath = getCommittedTaskPath(context);
|
||||
FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
|
||||
if (fs.exists(taskAttemptPath)) {
|
||||
if(fs.exists(committedTaskPath)) {
|
||||
if(!fs.delete(committedTaskPath, true)) {
|
||||
FileStatus taskAttemptDirStatus;
|
||||
try {
|
||||
taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
|
||||
} catch (FileNotFoundException e) {
|
||||
taskAttemptDirStatus = null;
|
||||
}
|
||||
|
||||
if (taskAttemptDirStatus != null) {
|
||||
if (algorithmVersion == 1) {
|
||||
Path committedTaskPath = getCommittedTaskPath(context);
|
||||
if (fs.exists(committedTaskPath)) {
|
||||
if (!fs.delete(committedTaskPath, true)) {
|
||||
throw new IOException("Could not delete " + committedTaskPath);
|
||||
}
|
||||
}
|
||||
if(!fs.rename(taskAttemptPath, committedTaskPath)) {
|
||||
if (!fs.rename(taskAttemptPath, committedTaskPath)) {
|
||||
throw new IOException("Could not rename " + taskAttemptPath + " to "
|
||||
+ committedTaskPath);
|
||||
}
|
||||
LOG.info("Saved output of task '" + attemptId + "' to " +
|
||||
committedTaskPath);
|
||||
} else {
|
||||
// directly merge everything from taskAttemptPath to output directory
|
||||
mergePaths(fs, taskAttemptDirStatus, outputPath);
|
||||
LOG.info("Saved output of task '" + attemptId + "' to " +
|
||||
outputPath);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("No Output found for " + attemptId);
|
||||
}
|
||||
|
@ -511,30 +543,41 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
throw new IOException ("Cannot recover task output for first attempt...");
|
||||
}
|
||||
|
||||
Path committedTaskPath = getCommittedTaskPath(context);
|
||||
Path previousCommittedTaskPath = getCommittedTaskPath(
|
||||
previousAttempt, context);
|
||||
FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
|
||||
FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration());
|
||||
|
||||
LOG.debug("Trying to recover task from " + previousCommittedTaskPath
|
||||
+ " into " + committedTaskPath);
|
||||
LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
|
||||
if (algorithmVersion == 1) {
|
||||
if (fs.exists(previousCommittedTaskPath)) {
|
||||
if(fs.exists(committedTaskPath)) {
|
||||
if(!fs.delete(committedTaskPath, true)) {
|
||||
Path committedTaskPath = getCommittedTaskPath(context);
|
||||
if (fs.exists(committedTaskPath)) {
|
||||
if (!fs.delete(committedTaskPath, true)) {
|
||||
throw new IOException("Could not delete "+committedTaskPath);
|
||||
}
|
||||
}
|
||||
//Rename can fail if the parent directory does not yet exist.
|
||||
Path committedParent = committedTaskPath.getParent();
|
||||
fs.mkdirs(committedParent);
|
||||
if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
|
||||
if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
|
||||
throw new IOException("Could not rename " + previousCommittedTaskPath +
|
||||
" to " + committedTaskPath);
|
||||
}
|
||||
LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
|
||||
} else {
|
||||
LOG.warn(attemptId+" had no output to recover.");
|
||||
}
|
||||
} else {
|
||||
// essentially a no-op, but for backwards compatibility
|
||||
// after upgrade to the new fileOutputCommitter,
|
||||
// check if there are any output left in committedTaskPath
|
||||
if (fs.exists(previousCommittedTaskPath)) {
|
||||
LOG.info("Recovering task for upgrading scenario, moving files from "
|
||||
+ previousCommittedTaskPath + " to " + outputPath);
|
||||
FileStatus from = fs.getFileStatus(previousCommittedTaskPath);
|
||||
mergePaths(fs, from, outputPath);
|
||||
}
|
||||
LOG.info("Done recovering task " + attemptId);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Output Path is null in recoverTask()");
|
||||
}
|
||||
|
|
|
@ -1740,6 +1740,60 @@
|
|||
contact with the RM has been re-established.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mapreduce.fileoutputcommitter.algorithm.version</name>
|
||||
<value>1</value>
|
||||
<description>The file output committer algorithm version
|
||||
valid algorithm version number: 1 or 2
|
||||
default to 1, which is the original algorithm
|
||||
|
||||
In algorithm version 1,
|
||||
|
||||
1. commitTask will rename directory
|
||||
$joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/
|
||||
to
|
||||
$joboutput/_temporary/$appAttemptID/$taskID/
|
||||
|
||||
2. recoverTask will also do a rename
|
||||
$joboutput/_temporary/$appAttemptID/$taskID/
|
||||
to
|
||||
$joboutput/_temporary/($appAttemptID + 1)/$taskID/
|
||||
|
||||
3. commitJob will merge every task output file in
|
||||
$joboutput/_temporary/$appAttemptID/$taskID/
|
||||
to
|
||||
$joboutput/, then it will delete $joboutput/_temporary/
|
||||
and write $joboutput/_SUCCESS
|
||||
|
||||
It has a performance regression, which is discussed in MAPREDUCE-4815.
|
||||
If a job generates many files to commit then the commitJob
|
||||
method call at the end of the job can take minutes.
|
||||
the commit is single-threaded and waits until all
|
||||
tasks have completed before commencing.
|
||||
|
||||
algorithm version 2 will change the behavior of commitTask,
|
||||
recoverTask, and commitJob.
|
||||
|
||||
1. commitTask will rename all files in
|
||||
$joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/
|
||||
to $joboutput/
|
||||
|
||||
2. recoverTask actually doesn't require to do anything, but for
|
||||
upgrade from version 1 to version 2 case, it will check if there
|
||||
are any files in
|
||||
$joboutput/_temporary/($appAttemptID - 1)/$taskID/
|
||||
and rename them to $joboutput/
|
||||
|
||||
3. commitJob can simply delete $joboutput/_temporary and write
|
||||
$joboutput/_SUCCESS
|
||||
|
||||
This algorithm will reduce the output commit time for
|
||||
large jobs by having the tasks commit directly to the final
|
||||
output directory as they were completing and commitJob had
|
||||
very little to do.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
|
||||
<value>1000</value>
|
||||
|
|
|
@ -50,7 +50,6 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
private Text val1 = new Text("val1");
|
||||
private Text val2 = new Text("val2");
|
||||
|
||||
|
||||
private void writeOutput(RecordWriter theRecordWriter,
|
||||
TaskAttemptContext context) throws IOException, InterruptedException {
|
||||
NullWritable nullWritable = NullWritable.get();
|
||||
|
@ -84,11 +83,15 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testRecovery() throws Exception {
|
||||
private void testRecoveryInternal(int commitVersion, int recoveryVersion)
|
||||
throws Exception {
|
||||
JobConf conf = new JobConf();
|
||||
FileOutputFormat.setOutputPath(conf, outDir);
|
||||
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
|
||||
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
|
||||
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
|
||||
commitVersion);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new FileOutputCommitter();
|
||||
|
@ -107,31 +110,59 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
if(committer.needsTaskCommit(tContext)) {
|
||||
committer.commitTask(tContext);
|
||||
}
|
||||
|
||||
Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
|
||||
File jtd1 = new File(jobTempDir1.toUri().getPath());
|
||||
assertTrue(jtd1.exists());
|
||||
if (commitVersion == 1) {
|
||||
assertTrue("Version 1 commits to temporary dir " + jtd1, jtd1.exists());
|
||||
validateContent(jobTempDir1);
|
||||
} else {
|
||||
assertFalse("Version 2 commits to output dir " + jtd1, jtd1.exists());
|
||||
}
|
||||
|
||||
//now while running the second app attempt,
|
||||
//recover the task output from first attempt
|
||||
JobConf conf2 = new JobConf(conf);
|
||||
conf2.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||
conf2.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
|
||||
conf2.setInt(org.apache.hadoop.mapreduce.lib.output.
|
||||
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
|
||||
recoveryVersion);
|
||||
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
|
||||
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
|
||||
FileOutputCommitter committer2 = new FileOutputCommitter();
|
||||
committer2.setupJob(jContext2);
|
||||
Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
|
||||
|
||||
committer2.recoverTask(tContext2);
|
||||
|
||||
Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
|
||||
File jtd2 = new File(jobTempDir2.toUri().getPath());
|
||||
assertTrue(jtd2.exists());
|
||||
if (recoveryVersion == 1) {
|
||||
assertTrue("Version 1 recovers to " + jtd2, jtd2.exists());
|
||||
validateContent(jobTempDir2);
|
||||
} else {
|
||||
assertFalse("Version 2 commits to output dir " + jtd2, jtd2.exists());
|
||||
if (commitVersion == 1) {
|
||||
assertTrue("Version 2 recovery moves to output dir from "
|
||||
+ jtd1 , jtd1.list().length == 0);
|
||||
}
|
||||
}
|
||||
|
||||
committer2.commitJob(jContext2);
|
||||
validateContent(outDir);
|
||||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
public void testRecoveryV1() throws Exception {
|
||||
testRecoveryInternal(1, 1);
|
||||
}
|
||||
|
||||
public void testRecoveryV2() throws Exception {
|
||||
testRecoveryInternal(2, 2);
|
||||
}
|
||||
|
||||
public void testRecoveryUpgradeV1V2() throws Exception {
|
||||
testRecoveryInternal(1, 2);
|
||||
}
|
||||
|
||||
private void validateContent(Path dir) throws IOException {
|
||||
File fdir = new File(dir.toUri().getPath());
|
||||
|
@ -171,10 +202,12 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
assert(dataFileFound && indexFileFound);
|
||||
}
|
||||
|
||||
public void testCommitter() throws Exception {
|
||||
private void testCommitterInternal(int version) throws Exception {
|
||||
JobConf conf = new JobConf();
|
||||
FileOutputFormat.setOutputPath(conf, outDir);
|
||||
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
|
||||
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new FileOutputCommitter();
|
||||
|
@ -200,10 +233,21 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
public void testMapFileOutputCommitter() throws Exception {
|
||||
public void testCommitterV1() throws Exception {
|
||||
testCommitterInternal(1);
|
||||
}
|
||||
|
||||
public void testCommitterV2() throws Exception {
|
||||
testCommitterInternal(2);
|
||||
}
|
||||
|
||||
private void testMapFileOutputCommitterInternal(int version)
|
||||
throws Exception {
|
||||
JobConf conf = new JobConf();
|
||||
FileOutputFormat.setOutputPath(conf, outDir);
|
||||
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
|
||||
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new FileOutputCommitter();
|
||||
|
@ -214,7 +258,8 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
|
||||
// write output
|
||||
MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
|
||||
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(null, conf, partFile, null);
|
||||
RecordWriter theRecordWriter =
|
||||
theOutputFormat.getRecordWriter(null, conf, partFile, null);
|
||||
writeMapFileOutput(theRecordWriter, tContext);
|
||||
|
||||
// do commit
|
||||
|
@ -228,10 +273,28 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
public void testMapOnlyNoOutput() throws Exception {
|
||||
public void testMapFileOutputCommitterV1() throws Exception {
|
||||
testMapFileOutputCommitterInternal(1);
|
||||
}
|
||||
|
||||
public void testMapFileOutputCommitterV2() throws Exception {
|
||||
testMapFileOutputCommitterInternal(2);
|
||||
}
|
||||
|
||||
public void testMapOnlyNoOutputV1() throws Exception {
|
||||
testMapOnlyNoOutputInternal(1);
|
||||
}
|
||||
|
||||
public void testMapOnlyNoOutputV2() throws Exception {
|
||||
testMapOnlyNoOutputInternal(2);
|
||||
}
|
||||
|
||||
private void testMapOnlyNoOutputInternal(int version) throws Exception {
|
||||
JobConf conf = new JobConf();
|
||||
//This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir);
|
||||
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
|
||||
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new FileOutputCommitter();
|
||||
|
@ -250,10 +313,13 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
public void testAbort() throws IOException, InterruptedException {
|
||||
private void testAbortInternal(int version)
|
||||
throws IOException, InterruptedException {
|
||||
JobConf conf = new JobConf();
|
||||
FileOutputFormat.setOutputPath(conf, outDir);
|
||||
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
|
||||
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new FileOutputCommitter();
|
||||
|
@ -283,6 +349,14 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
FileUtil.fullyDelete(out);
|
||||
}
|
||||
|
||||
public void testAbortV1() throws Exception {
|
||||
testAbortInternal(1);
|
||||
}
|
||||
|
||||
public void testAbortV2() throws Exception {
|
||||
testAbortInternal(2);
|
||||
}
|
||||
|
||||
public static class FakeFileSystem extends RawLocalFileSystem {
|
||||
public FakeFileSystem() {
|
||||
super();
|
||||
|
@ -299,11 +373,14 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
}
|
||||
|
||||
|
||||
public void testFailAbort() throws IOException, InterruptedException {
|
||||
private void testFailAbortInternal(int version)
|
||||
throws IOException, InterruptedException {
|
||||
JobConf conf = new JobConf();
|
||||
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
|
||||
conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
|
||||
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
|
||||
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
|
||||
conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
|
||||
FileOutputFormat.setOutputPath(conf, outDir);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
|
@ -353,6 +430,13 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
public void testFailAbortV1() throws Exception {
|
||||
testFailAbortInternal(1);
|
||||
}
|
||||
|
||||
public void testFailAbortV2() throws Exception {
|
||||
testFailAbortInternal(2);
|
||||
}
|
||||
public static String slurp(File f) throws IOException {
|
||||
int len = (int) f.length();
|
||||
byte[] buf = new byte[len];
|
||||
|
|
|
@ -109,12 +109,15 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testRecovery() throws Exception {
|
||||
private void testRecoveryInternal(int commitVersion, int recoveryVersion)
|
||||
throws Exception {
|
||||
Job job = Job.getInstance();
|
||||
FileOutputFormat.setOutputPath(job, outDir);
|
||||
Configuration conf = job.getConfiguration();
|
||||
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
|
||||
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
|
||||
commitVersion);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
|
||||
|
@ -130,16 +133,23 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
|
||||
// do commit
|
||||
committer.commitTask(tContext);
|
||||
|
||||
Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
|
||||
File jtd = new File(jobTempDir1.toUri().getPath());
|
||||
assertTrue(jtd.exists());
|
||||
if (commitVersion == 1) {
|
||||
assertTrue("Version 1 commits to temporary dir " + jtd, jtd.exists());
|
||||
validateContent(jtd);
|
||||
} else {
|
||||
assertFalse("Version 2 commits to output dir " + jtd, jtd.exists());
|
||||
}
|
||||
|
||||
//now while running the second app attempt,
|
||||
//recover the task output from first attempt
|
||||
Configuration conf2 = job.getConfiguration();
|
||||
conf2.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
|
||||
conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
|
||||
conf2.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
|
||||
recoveryVersion);
|
||||
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
|
||||
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
|
||||
FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2);
|
||||
|
@ -148,14 +158,34 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
File jtd2 = new File(jobTempDir2.toUri().getPath());
|
||||
|
||||
committer2.recoverTask(tContext2);
|
||||
assertTrue(jtd2.exists());
|
||||
if (recoveryVersion == 1) {
|
||||
assertTrue("Version 1 recovers to " + jtd2, jtd2.exists());
|
||||
validateContent(jtd2);
|
||||
} else {
|
||||
assertFalse("Version 2 commits to output dir " + jtd2, jtd2.exists());
|
||||
if (commitVersion == 1) {
|
||||
assertTrue("Version 2 recovery moves to output dir from "
|
||||
+ jtd , jtd.list().length == 0);
|
||||
}
|
||||
}
|
||||
|
||||
committer2.commitJob(jContext2);
|
||||
validateContent(outDir);
|
||||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
public void testRecoveryV1() throws Exception {
|
||||
testRecoveryInternal(1, 1);
|
||||
}
|
||||
|
||||
public void testRecoveryV2() throws Exception {
|
||||
testRecoveryInternal(2, 2);
|
||||
}
|
||||
|
||||
public void testRecoveryUpgradeV1V2() throws Exception {
|
||||
testRecoveryInternal(1, 2);
|
||||
}
|
||||
|
||||
private void validateContent(Path dir) throws IOException {
|
||||
validateContent(new File(dir.toUri().getPath()));
|
||||
}
|
||||
|
@ -198,11 +228,13 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
assert(dataFileFound && indexFileFound);
|
||||
}
|
||||
|
||||
public void testCommitter() throws Exception {
|
||||
private void testCommitterInternal(int version) throws Exception {
|
||||
Job job = Job.getInstance();
|
||||
FileOutputFormat.setOutputPath(job, outDir);
|
||||
Configuration conf = job.getConfiguration();
|
||||
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
|
||||
version);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
|
||||
|
@ -225,11 +257,22 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
public void testMapFileOutputCommitter() throws Exception {
|
||||
public void testCommitterV1() throws Exception {
|
||||
testCommitterInternal(1);
|
||||
}
|
||||
|
||||
public void testCommitterV2() throws Exception {
|
||||
testCommitterInternal(2);
|
||||
}
|
||||
|
||||
private void testMapFileOutputCommitterInternal(int version)
|
||||
throws Exception {
|
||||
Job job = Job.getInstance();
|
||||
FileOutputFormat.setOutputPath(job, outDir);
|
||||
Configuration conf = job.getConfiguration();
|
||||
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
|
||||
version);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
|
||||
|
@ -252,11 +295,37 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
public void testAbort() throws IOException, InterruptedException {
|
||||
public void testMapFileOutputCommitterV1() throws Exception {
|
||||
testMapFileOutputCommitterInternal(1);
|
||||
}
|
||||
|
||||
public void testMapFileOutputCommitterV2() throws Exception {
|
||||
testMapFileOutputCommitterInternal(2);
|
||||
}
|
||||
|
||||
public void testInvalidVersionNumber() throws IOException {
|
||||
Job job = Job.getInstance();
|
||||
FileOutputFormat.setOutputPath(job, outDir);
|
||||
Configuration conf = job.getConfiguration();
|
||||
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 3);
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
try {
|
||||
new FileOutputCommitter(outDir, tContext);
|
||||
fail("should've thrown an exception!");
|
||||
} catch (IOException e) {
|
||||
//test passed
|
||||
}
|
||||
}
|
||||
|
||||
private void testAbortInternal(int version)
|
||||
throws IOException, InterruptedException {
|
||||
Job job = Job.getInstance();
|
||||
FileOutputFormat.setOutputPath(job, outDir);
|
||||
Configuration conf = job.getConfiguration();
|
||||
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
|
||||
version);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
|
||||
|
@ -285,6 +354,14 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
public void testAbortV1() throws IOException, InterruptedException {
|
||||
testAbortInternal(1);
|
||||
}
|
||||
|
||||
public void testAbortV2() throws IOException, InterruptedException {
|
||||
testAbortInternal(2);
|
||||
}
|
||||
|
||||
public static class FakeFileSystem extends RawLocalFileSystem {
|
||||
public FakeFileSystem() {
|
||||
super();
|
||||
|
@ -301,13 +378,16 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
}
|
||||
|
||||
|
||||
public void testFailAbort() throws IOException, InterruptedException {
|
||||
private void testFailAbortInternal(int version)
|
||||
throws IOException, InterruptedException {
|
||||
Job job = Job.getInstance();
|
||||
Configuration conf = job.getConfiguration();
|
||||
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
|
||||
conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
|
||||
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
|
||||
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
|
||||
version);
|
||||
FileOutputFormat.setOutputPath(job, outDir);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
|
@ -353,6 +433,14 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
public void testFailAbortV1() throws Exception {
|
||||
testFailAbortInternal(1);
|
||||
}
|
||||
|
||||
public void testFailAbortV2() throws Exception {
|
||||
testFailAbortInternal(2);
|
||||
}
|
||||
|
||||
public static String slurp(File f) throws IOException {
|
||||
int len = (int) f.length();
|
||||
byte[] buf = new byte[len];
|
||||
|
|
Loading…
Reference in New Issue