MAPREDUCE-6478. Add an option to skip cleanupJob stage or ignore cleanup failure during commitJob. (Junping Du via wangda)
This commit is contained in:
parent
4a657e9326
commit
6041fc75d8
|
@ -64,9 +64,23 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
|
||||
"mapreduce.fileoutputcommitter.algorithm.version";
|
||||
public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
|
||||
// Skip cleanup _temporary folders under job's output directory
|
||||
public static final String FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED =
|
||||
"mapreduce.fileoutputcommitter.cleanup.skipped";
|
||||
public static final boolean
|
||||
FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT = false;
|
||||
|
||||
// Ignore exceptions in cleanup _temporary folder under job's output directory
|
||||
public static final String FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED =
|
||||
"mapreduce.fileoutputcommitter.cleanup-failures.ignored";
|
||||
public static final boolean
|
||||
FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT = false;
|
||||
|
||||
private Path outputPath = null;
|
||||
private Path workPath = null;
|
||||
private final int algorithmVersion;
|
||||
private final boolean skipCleanup;
|
||||
private final boolean ignoreCleanupFailures;
|
||||
|
||||
/**
|
||||
* Create a file output committer
|
||||
|
@ -101,6 +115,21 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
if (algorithmVersion != 1 && algorithmVersion != 2) {
|
||||
throw new IOException("Only 1 or 2 algorithm version is supported");
|
||||
}
|
||||
|
||||
// if skip cleanup
|
||||
skipCleanup = conf.getBoolean(
|
||||
FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED,
|
||||
FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT);
|
||||
|
||||
// if ignore failures in cleanup
|
||||
ignoreCleanupFailures = conf.getBoolean(
|
||||
FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED,
|
||||
FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT);
|
||||
|
||||
LOG.info("FileOutputCommitter skip cleanup _temporary folders under " +
|
||||
"output directory:" + skipCleanup + ", ignore cleanup failures: " +
|
||||
ignoreCleanupFailures);
|
||||
|
||||
if (outputPath != null) {
|
||||
FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
|
||||
this.outputPath = fs.makeQualified(outputPath);
|
||||
|
@ -327,8 +356,25 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
}
|
||||
}
|
||||
|
||||
// delete the _temporary folder and create a _done file in the o/p folder
|
||||
if (skipCleanup) {
|
||||
LOG.info("Skip cleanup the _temporary folders under job's output " +
|
||||
"directory in commitJob.");
|
||||
} else {
|
||||
// delete the _temporary folder and create a _done file in the o/p
|
||||
// folder
|
||||
try {
|
||||
cleanupJob(context);
|
||||
} catch (IOException e) {
|
||||
if (ignoreCleanupFailures) {
|
||||
// swallow exceptions in cleanup as user configure to make sure
|
||||
// commitJob could be success even when cleanup get failure.
|
||||
LOG.error("Error in cleanup job, manually cleanup is needed.", e);
|
||||
} else {
|
||||
// throw back exception to fail commitJob.
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
// True if the job requires output.dir marked on successful job.
|
||||
// Note that by default it is set to true.
|
||||
if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
|
||||
|
|
|
@ -399,6 +399,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-2597. MiniYARNCluster should propagate reason for AHS not starting.
|
||||
(stevel)
|
||||
|
||||
MAPREDUCE-6478. Add an option to skip cleanupJob stage or ignore cleanup
|
||||
failure during commitJob. (Junping Du via wangda)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||
|
|
Loading…
Reference in New Issue