diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index eecf02214fa..0bbe85c7d5a 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -348,6 +348,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6059. Speed up history server startup time (Siqi Li via aw)
+ MAPREDUCE-4815. Speed up FileOutputCommitter#commitJob for many output
+ files. (Siqi Li via gera)
+
BUG FIXES
MAPREDUCE-6210. Use getApplicationAttemptId() instead of getApplicationId()
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
index 55252f04f9f..28a85485c1a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.lib.output;
+import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.logging.Log;
@@ -25,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -57,10 +59,14 @@ public class FileOutputCommitter extends OutputCommitter {
@Deprecated
protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME;
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
- public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
- "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+ public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+ "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+ public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
+ "mapreduce.fileoutputcommitter.algorithm.version";
+ public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
private Path outputPath = null;
private Path workPath = null;
+ private final int algorithmVersion;
/**
* Create a file output committer
@@ -87,6 +93,14 @@ public class FileOutputCommitter extends OutputCommitter {
@Private
public FileOutputCommitter(Path outputPath,
JobContext context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ algorithmVersion =
+ conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT);
+ LOG.info("File Output Committer Algorithm version is " + algorithmVersion);
+ if (algorithmVersion != 1 && algorithmVersion != 2) {
+ throw new IOException("Only 1 or 2 algorithm version is supported");
+ }
if (outputPath != null) {
FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
this.outputPath = fs.makeQualified(outputPath);
@@ -248,14 +262,14 @@ public class FileOutputCommitter extends OutputCommitter {
return new Path(getJobAttemptPath(appAttemptId, out),
String.valueOf(context.getTaskAttemptID().getTaskID()));
}
-
+
private static class CommittedTaskFilter implements PathFilter {
@Override
public boolean accept(Path path) {
return !PENDING_DIR_NAME.equals(path.getName());
}
}
-
+
/**
* Get a list of all paths where output from committed tasks are stored.
* @param context the context of the current job
@@ -268,7 +282,7 @@ public class FileOutputCommitter extends OutputCommitter {
FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
}
-
+
/**
* Get the directory that the task should write results into.
* @return the work directory
@@ -295,7 +309,7 @@ public class FileOutputCommitter extends OutputCommitter {
LOG.warn("Output Path is null in setupJob()");
}
}
-
+
/**
* The job has completed so move all committed tasks to the final output dir.
* Delete the temporary directory, including all of the work directories.
@@ -306,8 +320,11 @@ public class FileOutputCommitter extends OutputCommitter {
if (hasOutputPath()) {
Path finalOutput = getOutputPath();
FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
- for(FileStatus stat: getAllCommittedTaskPaths(context)) {
- mergePaths(fs, stat, finalOutput);
+
+ if (algorithmVersion == 1) {
+ for (FileStatus stat: getAllCommittedTaskPaths(context)) {
+ mergePaths(fs, stat, finalOutput);
+ }
}
// delete the _temporary folder and create a _done file in the o/p folder
@@ -417,27 +434,42 @@ public class FileOutputCommitter extends OutputCommitter {
@Private
public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
- throws IOException {
+ throws IOException {
+
TaskAttemptID attemptId = context.getTaskAttemptID();
if (hasOutputPath()) {
context.progress();
if(taskAttemptPath == null) {
taskAttemptPath = getTaskAttemptPath(context);
}
- Path committedTaskPath = getCommittedTaskPath(context);
FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
- if (fs.exists(taskAttemptPath)) {
- if(fs.exists(committedTaskPath)) {
- if(!fs.delete(committedTaskPath, true)) {
- throw new IOException("Could not delete " + committedTaskPath);
+ FileStatus taskAttemptDirStatus;
+ try {
+ taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
+ } catch (FileNotFoundException e) {
+ taskAttemptDirStatus = null;
+ }
+
+ if (taskAttemptDirStatus != null) {
+ if (algorithmVersion == 1) {
+ Path committedTaskPath = getCommittedTaskPath(context);
+ if (fs.exists(committedTaskPath)) {
+ if (!fs.delete(committedTaskPath, true)) {
+ throw new IOException("Could not delete " + committedTaskPath);
+ }
}
+ if (!fs.rename(taskAttemptPath, committedTaskPath)) {
+ throw new IOException("Could not rename " + taskAttemptPath + " to "
+ + committedTaskPath);
+ }
+ LOG.info("Saved output of task '" + attemptId + "' to " +
+ committedTaskPath);
+ } else {
+ // directly merge everything from taskAttemptPath to output directory
+ mergePaths(fs, taskAttemptDirStatus, outputPath);
+ LOG.info("Saved output of task '" + attemptId + "' to " +
+ outputPath);
}
- if(!fs.rename(taskAttemptPath, committedTaskPath)) {
- throw new IOException("Could not rename " + taskAttemptPath + " to "
- + committedTaskPath);
- }
- LOG.info("Saved output of task '" + attemptId + "' to " +
- committedTaskPath);
} else {
LOG.warn("No Output found for " + attemptId);
}
@@ -511,32 +543,43 @@ public class FileOutputCommitter extends OutputCommitter {
throw new IOException ("Cannot recover task output for first attempt...");
}
- Path committedTaskPath = getCommittedTaskPath(context);
Path previousCommittedTaskPath = getCommittedTaskPath(
previousAttempt, context);
- FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
+ FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration());
- LOG.debug("Trying to recover task from " + previousCommittedTaskPath
- + " into " + committedTaskPath);
- if (fs.exists(previousCommittedTaskPath)) {
- if(fs.exists(committedTaskPath)) {
- if(!fs.delete(committedTaskPath, true)) {
- throw new IOException("Could not delete "+committedTaskPath);
+ LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
+ if (algorithmVersion == 1) {
+ if (fs.exists(previousCommittedTaskPath)) {
+ Path committedTaskPath = getCommittedTaskPath(context);
+ if (fs.exists(committedTaskPath)) {
+ if (!fs.delete(committedTaskPath, true)) {
+ throw new IOException("Could not delete "+committedTaskPath);
+ }
}
+ //Rename can fail if the parent directory does not yet exist.
+ Path committedParent = committedTaskPath.getParent();
+ fs.mkdirs(committedParent);
+ if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
+ throw new IOException("Could not rename " + previousCommittedTaskPath +
+ " to " + committedTaskPath);
+ }
+ } else {
+ LOG.warn(attemptId+" had no output to recover.");
}
- //Rename can fail if the parent directory does not yet exist.
- Path committedParent = committedTaskPath.getParent();
- fs.mkdirs(committedParent);
- if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
- throw new IOException("Could not rename " + previousCommittedTaskPath +
- " to " + committedTaskPath);
- }
- LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
} else {
- LOG.warn(attemptId+" had no output to recover.");
+ // essentially a no-op, but for backwards compatibility
+ // after upgrade to the new fileOutputCommitter,
+ // check if there are any output left in committedTaskPath
+ if (fs.exists(previousCommittedTaskPath)) {
+ LOG.info("Recovering task for upgrading scenario, moving files from "
+ + previousCommittedTaskPath + " to " + outputPath);
+ FileStatus from = fs.getFileStatus(previousCommittedTaskPath);
+ mergePaths(fs, from, outputPath);
+ }
+ LOG.info("Done recovering task " + attemptId);
}
} else {
LOG.warn("Output Path is null in recoverTask()");
}
}
-}
+}
\ No newline at end of file
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 11621839896..d7bec9c1852 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1317,6 +1317,60 @@
contact with the RM has been re-established.
+
+ mapreduce.fileoutputcommitter.algorithm.version
+ 1
+ The file output committer algorithm version
+ valid algorithm version number: 1 or 2
+ default to 1, which is the original algorithm
+
+ In algorithm version 1,
+
+ 1. commitTask will rename directory
+ $joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/
+ to
+ $joboutput/_temporary/$appAttemptID/$taskID/
+
+ 2. recoverTask will also do a rename
+ $joboutput/_temporary/$appAttemptID/$taskID/
+ to
+ $joboutput/_temporary/($appAttemptID + 1)/$taskID/
+
+ 3. commitJob will merge every task output file in
+ $joboutput/_temporary/$appAttemptID/$taskID/
+ to
+ $joboutput/, then it will delete $joboutput/_temporary/
+ and write $joboutput/_SUCCESS
+
+ It has a performance regression, which is discussed in MAPREDUCE-4815.
+ If a job generates many files to commit then the commitJob
+ method call at the end of the job can take minutes.
+ the commit is single-threaded and waits until all
+ tasks have completed before commencing.
+
+ algorithm version 2 will change the behavior of commitTask,
+ recoverTask, and commitJob.
+
+ 1. commitTask will rename all files in
+ $joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/
+ to $joboutput/
+
+ 2. recoverTask actually doesn't require to do anything, but for
+ upgrade from version 1 to version 2 case, it will check if there
+ are any files in
+ $joboutput/_temporary/($appAttemptID - 1)/$taskID/
+ and rename them to $joboutput/
+
+ 3. commitJob can simply delete $joboutput/_temporary and write
+ $joboutput/_SUCCESS
+
+ This algorithm will reduce the output commit time for
+ large jobs by having the tasks commit directly to the final
+ output directory as they were completing and commitJob had
+ very little to do.
+
+
+
yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms
1000
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
index 0859571d1f2..3207a71efe5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
@@ -50,7 +50,6 @@ public class TestFileOutputCommitter extends TestCase {
private Text val1 = new Text("val1");
private Text val2 = new Text("val2");
-
private void writeOutput(RecordWriter theRecordWriter,
TaskAttemptContext context) throws IOException, InterruptedException {
NullWritable nullWritable = NullWritable.get();
@@ -83,12 +82,16 @@ public class TestFileOutputCommitter extends TestCase {
theRecordWriter.close(null);
}
}
-
- public void testRecovery() throws Exception {
- JobConf conf = new JobConf();
+
+ private void testRecoveryInternal(int commitVersion, int recoveryVersion)
+ throws Exception {
+ JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
+ conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ commitVersion);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
@@ -99,7 +102,7 @@ public class TestFileOutputCommitter extends TestCase {
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
- RecordWriter theRecordWriter =
+ RecordWriter theRecordWriter =
theOutputFormat.getRecordWriter(null, conf, partFile, null);
writeOutput(theRecordWriter, tContext);
@@ -107,31 +110,59 @@ public class TestFileOutputCommitter extends TestCase {
if(committer.needsTaskCommit(tContext)) {
committer.commitTask(tContext);
}
+
Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
File jtd1 = new File(jobTempDir1.toUri().getPath());
- assertTrue(jtd1.exists());
- validateContent(jobTempDir1);
-
- //now while running the second app attempt,
+ if (commitVersion == 1) {
+ assertTrue("Version 1 commits to temporary dir " + jtd1, jtd1.exists());
+ validateContent(jobTempDir1);
+ } else {
+ assertFalse("Version 2 commits to output dir " + jtd1, jtd1.exists());
+ }
+
+ //now while running the second app attempt,
//recover the task output from first attempt
JobConf conf2 = new JobConf(conf);
conf2.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf2.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
+ conf2.setInt(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ recoveryVersion);
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
FileOutputCommitter committer2 = new FileOutputCommitter();
committer2.setupJob(jContext2);
- Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
-
+
committer2.recoverTask(tContext2);
+
+ Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
File jtd2 = new File(jobTempDir2.toUri().getPath());
- assertTrue(jtd2.exists());
- validateContent(jobTempDir2);
-
+ if (recoveryVersion == 1) {
+ assertTrue("Version 1 recovers to " + jtd2, jtd2.exists());
+ validateContent(jobTempDir2);
+ } else {
+ assertFalse("Version 2 commits to output dir " + jtd2, jtd2.exists());
+ if (commitVersion == 1) {
+ assertTrue("Version 2 recovery moves to output dir from "
+ + jtd1 , jtd1.list().length == 0);
+ }
+ }
+
committer2.commitJob(jContext2);
validateContent(outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
+ public void testRecoveryV1() throws Exception {
+ testRecoveryInternal(1, 1);
+ }
+
+ public void testRecoveryV2() throws Exception {
+ testRecoveryInternal(2, 2);
+ }
+
+ public void testRecoveryUpgradeV1V2() throws Exception {
+ testRecoveryInternal(1, 2);
+ }
private void validateContent(Path dir) throws IOException {
File fdir = new File(dir.toUri().getPath());
@@ -170,11 +201,13 @@ public class TestFileOutputCommitter extends TestCase {
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
-
- public void testCommitter() throws Exception {
+
+ private void testCommitterInternal(int version) throws Exception {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
@@ -200,21 +233,33 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString()));
}
- public void testMapFileOutputCommitter() throws Exception {
+ public void testCommitterV1() throws Exception {
+ testCommitterInternal(1);
+ }
+
+ public void testCommitterV2() throws Exception {
+ testCommitterInternal(2);
+ }
+
+ private void testMapFileOutputCommitterInternal(int version)
+ throws Exception {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
- FileOutputCommitter committer = new FileOutputCommitter();
-
+ FileOutputCommitter committer = new FileOutputCommitter();
+
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
- RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(null, conf, partFile, null);
+ RecordWriter theRecordWriter =
+ theOutputFormat.getRecordWriter(null, conf, partFile, null);
writeMapFileOutput(theRecordWriter, tContext);
// do commit
@@ -227,11 +272,29 @@ public class TestFileOutputCommitter extends TestCase {
validateMapFileOutputContent(FileSystem.get(conf), outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
-
- public void testMapOnlyNoOutput() throws Exception {
+
+ public void testMapFileOutputCommitterV1() throws Exception {
+ testMapFileOutputCommitterInternal(1);
+ }
+
+ public void testMapFileOutputCommitterV2() throws Exception {
+ testMapFileOutputCommitterInternal(2);
+ }
+
+ public void testMapOnlyNoOutputV1() throws Exception {
+ testMapOnlyNoOutputInternal(1);
+ }
+
+ public void testMapOnlyNoOutputV2() throws Exception {
+ testMapOnlyNoOutputInternal(2);
+ }
+
+ private void testMapOnlyNoOutputInternal(int version) throws Exception {
JobConf conf = new JobConf();
//This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
@@ -249,11 +312,14 @@ public class TestFileOutputCommitter extends TestCase {
// validate output
FileUtil.fullyDelete(new File(outDir.toString()));
}
-
- public void testAbort() throws IOException, InterruptedException {
+
+ private void testAbortInternal(int version)
+ throws IOException, InterruptedException {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
@@ -283,6 +349,14 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(out);
}
+ public void testAbortV1() throws Exception {
+ testAbortInternal(1);
+ }
+
+ public void testAbortV2() throws Exception {
+ testAbortInternal(2);
+ }
+
public static class FakeFileSystem extends RawLocalFileSystem {
public FakeFileSystem() {
super();
@@ -299,11 +373,14 @@ public class TestFileOutputCommitter extends TestCase {
}
- public void testFailAbort() throws IOException, InterruptedException {
+ private void testFailAbortInternal(int version)
+ throws IOException, InterruptedException {
JobConf conf = new JobConf();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
FileOutputFormat.setOutputPath(conf, outDir);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
@@ -353,6 +430,13 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString()));
}
+ public void testFailAbortV1() throws Exception {
+ testFailAbortInternal(1);
+ }
+
+ public void testFailAbortV2() throws Exception {
+ testFailAbortInternal(2);
+ }
public static String slurp(File f) throws IOException {
int len = (int) f.length();
byte[] buf = new byte[len];
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
index d20480ebba9..8f6030065fd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
@@ -109,12 +109,15 @@ public class TestFileOutputCommitter extends TestCase {
}
}
- public void testRecovery() throws Exception {
+ private void testRecoveryInternal(int commitVersion, int recoveryVersion)
+ throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+ conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ commitVersion);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@@ -130,32 +133,59 @@ public class TestFileOutputCommitter extends TestCase {
// do commit
committer.commitTask(tContext);
+
Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
File jtd = new File(jobTempDir1.toUri().getPath());
- assertTrue(jtd.exists());
- validateContent(jtd);
-
+ if (commitVersion == 1) {
+ assertTrue("Version 1 commits to temporary dir " + jtd, jtd.exists());
+ validateContent(jtd);
+ } else {
+ assertFalse("Version 2 commits to output dir " + jtd, jtd.exists());
+ }
+
//now while running the second app attempt,
//recover the task output from first attempt
Configuration conf2 = job.getConfiguration();
conf2.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
+ conf2.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ recoveryVersion);
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2);
committer2.setupJob(tContext2);
Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
File jtd2 = new File(jobTempDir2.toUri().getPath());
-
+
committer2.recoverTask(tContext2);
- assertTrue(jtd2.exists());
- validateContent(jtd2);
-
+ if (recoveryVersion == 1) {
+ assertTrue("Version 1 recovers to " + jtd2, jtd2.exists());
+ validateContent(jtd2);
+ } else {
+ assertFalse("Version 2 commits to output dir " + jtd2, jtd2.exists());
+ if (commitVersion == 1) {
+ assertTrue("Version 2 recovery moves to output dir from "
+ + jtd , jtd.list().length == 0);
+ }
+ }
+
committer2.commitJob(jContext2);
validateContent(outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
+ public void testRecoveryV1() throws Exception {
+ testRecoveryInternal(1, 1);
+ }
+
+ public void testRecoveryV2() throws Exception {
+ testRecoveryInternal(2, 2);
+ }
+
+ public void testRecoveryUpgradeV1V2() throws Exception {
+ testRecoveryInternal(1, 2);
+ }
+
private void validateContent(Path dir) throws IOException {
validateContent(new File(dir.toUri().getPath()));
}
@@ -197,12 +227,14 @@ public class TestFileOutputCommitter extends TestCase {
assert(fileCount > 0);
assert(dataFileFound && indexFileFound);
}
-
- public void testCommitter() throws Exception {
- Job job = Job.getInstance();
+
+ private void testCommitterInternal(int version) throws Exception {
+ Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+ conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@@ -225,11 +257,22 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString()));
}
- public void testMapFileOutputCommitter() throws Exception {
+ public void testCommitterV1() throws Exception {
+ testCommitterInternal(1);
+ }
+
+ public void testCommitterV2() throws Exception {
+ testCommitterInternal(2);
+ }
+
+ private void testMapFileOutputCommitterInternal(int version)
+ throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+ conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@@ -251,12 +294,38 @@ public class TestFileOutputCommitter extends TestCase {
validateMapFileOutputContent(FileSystem.get(job.getConfiguration()), outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
+
+ public void testMapFileOutputCommitterV1() throws Exception {
+ testMapFileOutputCommitterInternal(1);
+ }
- public void testAbort() throws IOException, InterruptedException {
+ public void testMapFileOutputCommitterV2() throws Exception {
+ testMapFileOutputCommitterInternal(2);
+ }
+
+ public void testInvalidVersionNumber() throws IOException {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+ conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 3);
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ try {
+ new FileOutputCommitter(outDir, tContext);
+ fail("should've thrown an exception!");
+ } catch (IOException e) {
+ //test passed
+ }
+ }
+
+ private void testAbortInternal(int version)
+ throws IOException, InterruptedException {
+ Job job = Job.getInstance();
+ FileOutputFormat.setOutputPath(job, outDir);
+ Configuration conf = job.getConfiguration();
+ conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+ conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@@ -285,6 +354,14 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString()));
}
+ public void testAbortV1() throws IOException, InterruptedException {
+ testAbortInternal(1);
+ }
+
+ public void testAbortV2() throws IOException, InterruptedException {
+ testAbortInternal(2);
+ }
+
public static class FakeFileSystem extends RawLocalFileSystem {
public FakeFileSystem() {
super();
@@ -301,13 +378,16 @@ public class TestFileOutputCommitter extends TestCase {
}
- public void testFailAbort() throws IOException, InterruptedException {
+ private void testFailAbortInternal(int version)
+ throws IOException, InterruptedException {
Job job = Job.getInstance();
Configuration conf = job.getConfiguration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+ conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+ version);
FileOutputFormat.setOutputPath(job, outDir);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
@@ -353,6 +433,14 @@ public class TestFileOutputCommitter extends TestCase {
FileUtil.fullyDelete(new File(outDir.toString()));
}
+ public void testFailAbortV1() throws Exception {
+ testFailAbortInternal(1);
+ }
+
+ public void testFailAbortV2() throws Exception {
+ testFailAbortInternal(2);
+ }
+
public static String slurp(File f) throws IOException {
int len = (int) f.length();
byte[] buf = new byte[len];