MAPREDUCE-7029. FileOutputCommitter is slow on filesystems lacking recursive delete. Contributed by Karthik Palaniappan

This commit is contained in:
Jason Lowe 2018-01-17 10:32:06 -06:00
parent fd09f72151
commit c228a7c707
3 changed files with 68 additions and 5 deletions

View File

@ -85,6 +85,17 @@ public class FileOutputCommitter extends OutputCommitter {
// default value to be 1 to keep consistent with previous behavior // default value to be 1 to keep consistent with previous behavior
public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1; public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1;
// Whether tasks should delete their task temporary directories. This is
// purely an optimization for filesystems without O(1) recursive delete, as
// commitJob will recursively delete the entire job temporary directory.
// HDFS has O(1) recursive delete, so this parameter is left false by default.
// Users of object stores, for example, may want to set this to true. Note:
// this is only used if mapreduce.fileoutputcommitter.algorithm.version=2
public static final String FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED =
"mapreduce.fileoutputcommitter.task.cleanup.enabled";
public static final boolean
FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false;
private Path outputPath = null; private Path outputPath = null;
private Path workPath = null; private Path workPath = null;
private final int algorithmVersion; private final int algorithmVersion;
@ -586,6 +597,17 @@ public class FileOutputCommitter extends OutputCommitter {
mergePaths(fs, taskAttemptDirStatus, outputPath); mergePaths(fs, taskAttemptDirStatus, outputPath);
LOG.info("Saved output of task '" + attemptId + "' to " + LOG.info("Saved output of task '" + attemptId + "' to " +
outputPath); outputPath);
if (context.getConfiguration().getBoolean(
FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED,
FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT)) {
LOG.debug(String.format(
"Deleting the temporary directory of '%s': '%s'",
attemptId, taskAttemptPath));
if(!fs.delete(taskAttemptPath, true)) {
LOG.warn("Could not delete " + taskAttemptPath);
}
}
} }
} else { } else {
LOG.warn("No Output found for " + attemptId); LOG.warn("No Output found for " + attemptId);

View File

@ -1514,6 +1514,17 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.fileoutputcommitter.task.cleanup.enabled</name>
<value>false</value>
<description>Whether tasks should delete their task temporary directories. This is purely an
optimization for filesystems without O(1) recursive delete, as commitJob will recursively delete
the entire job temporary directory. HDFS has O(1) recursive delete, so this parameter is left
false by default. Users of object stores, for example, may want to set this to true.
Note: this is only used if mapreduce.fileoutputcommitter.algorithm.version=2</description>
</property>
<property> <property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name> <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value> <value>1000</value>

View File

@ -249,13 +249,18 @@ public class TestFileOutputCommitter extends TestCase {
assert(dataFileFound && indexFileFound); assert(dataFileFound && indexFileFound);
} }
private void testCommitterInternal(int version) throws Exception { private void testCommitterInternal(int version, boolean taskCleanup)
throws Exception {
Job job = Job.getInstance(); Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir); FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration(); Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, conf.setInt(
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version); version);
conf.setBoolean(
FileOutputCommitter.FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED,
taskCleanup);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@ -269,9 +274,30 @@ public class TestFileOutputCommitter extends TestCase {
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext); writeOutput(theRecordWriter, tContext);
// check task and job temp directories exist
File jobOutputDir = new File(
new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME).toString());
File taskOutputDir = new File(Path.getPathWithoutSchemeAndAuthority(
committer.getWorkPath()).toString());
assertTrue("job temp dir does not exist", jobOutputDir.exists());
assertTrue("task temp dir does not exist", taskOutputDir.exists());
// do commit // do commit
committer.commitTask(tContext); committer.commitTask(tContext);
assertTrue("job temp dir does not exist", jobOutputDir.exists());
if (version == 1 || taskCleanup) {
// Task temp dir gets renamed in v1 and deleted if taskCleanup is
// enabled in v2
assertFalse("task temp dir still exists", taskOutputDir.exists());
} else {
// By default, in v2 the task temp dir is only deleted during commitJob
assertTrue("task temp dir does not exist", taskOutputDir.exists());
}
// Entire job temp directory gets deleted, including task temp dir
committer.commitJob(jContext); committer.commitJob(jContext);
assertFalse("job temp dir still exists", jobOutputDir.exists());
assertFalse("task temp dir still exists", taskOutputDir.exists());
// validate output // validate output
validateContent(outDir); validateContent(outDir);
@ -279,13 +305,17 @@ public class TestFileOutputCommitter extends TestCase {
} }
public void testCommitterV1() throws Exception { public void testCommitterV1() throws Exception {
testCommitterInternal(1); testCommitterInternal(1, false);
} }
public void testCommitterV2() throws Exception { public void testCommitterV2() throws Exception {
testCommitterInternal(2); testCommitterInternal(2, false);
} }
public void testCommitterV2TaskCleanupEnabled() throws Exception {
testCommitterInternal(2, true);
}
public void testCommitterWithDuplicatedCommitV1() throws Exception { public void testCommitterWithDuplicatedCommitV1() throws Exception {
testCommitterWithDuplicatedCommitInternal(1); testCommitterWithDuplicatedCommitInternal(1);
} }