MAPREDUCE-5485. Allow repeating job commit by extending OutputCommitter API. Contributed by Junping Du

This commit is contained in:
Jian He 2015-11-16 10:44:06 -08:00
parent dceed6b0a8
commit f562c155d2
9 changed files with 546 additions and 31 deletions

View File

@ -133,6 +133,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-5763. Warn message about httpshuffle in NM logs. MAPREDUCE-5763. Warn message about httpshuffle in NM logs.
(Akira AJISAKA via ozawa) (Akira AJISAKA via ozawa)
MAPREDUCE-5485. Allow repeating job commit by extending OutputCommitter
API. Contributed by Junping Du
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via

View File

@ -301,10 +301,12 @@ public class MRAppMaster extends CompositeService {
} }
boolean copyHistory = false; boolean copyHistory = false;
committer = createOutputCommitter(conf);
try { try {
String user = UserGroupInformation.getCurrentUser().getShortUserName(); String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user); Path stagingDir = MRApps.getStagingAreaDir(conf, user);
FileSystem fs = getFileSystem(conf); FileSystem fs = getFileSystem(conf);
boolean stagingExists = fs.exists(stagingDir); boolean stagingExists = fs.exists(stagingDir);
Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId); Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
boolean commitStarted = fs.exists(startCommitFile); boolean commitStarted = fs.exists(startCommitFile);
@ -341,17 +343,24 @@ public class MRAppMaster extends CompositeService {
"before it crashed. Not retrying."; "before it crashed. Not retrying.";
forcedState = JobStateInternal.FAILED; forcedState = JobStateInternal.FAILED;
} else { } else {
//The commit is still pending, commit error if (isCommitJobRepeatable()) {
shutDownMessage = // cleanup previous half done commits if committer supports
"Job commit from a prior MRAppMaster attempt is " + // repeatable job commit.
"potentially in progress. Preventing multiple commit executions"; errorHappenedShutDown = false;
forcedState = JobStateInternal.ERROR; cleanupInterruptedCommit(conf, fs, startCommitFile);
} else {
//The commit is still pending, commit error
shutDownMessage =
"Job commit from a prior MRAppMaster attempt is " +
"potentially in progress. Preventing multiple commit executions";
forcedState = JobStateInternal.ERROR;
}
} }
} }
} catch (IOException e) { } catch (IOException e) {
throw new YarnRuntimeException("Error while initializing", e); throw new YarnRuntimeException("Error while initializing", e);
} }
if (errorHappenedShutDown) { if (errorHappenedShutDown) {
NoopEventHandler eater = new NoopEventHandler(); NoopEventHandler eater = new NoopEventHandler();
//We do not have a JobEventDispatcher in this path //We do not have a JobEventDispatcher in this path
@ -397,7 +406,6 @@ public class MRAppMaster extends CompositeService {
addIfService(cpHist); addIfService(cpHist);
} }
} else { } else {
committer = createOutputCommitter(conf);
//service to handle requests from JobClient //service to handle requests from JobClient
clientService = createClientService(context); clientService = createClientService(context);
@ -474,6 +482,38 @@ public class MRAppMaster extends CompositeService {
return new AsyncDispatcher(); return new AsyncDispatcher();
} }
private boolean isCommitJobRepeatable() throws IOException {
boolean isRepeatable = false;
Configuration conf = getConfig();
if (committer != null) {
final JobContext jobContext = getJobContextFromConf(conf);
isRepeatable = callWithJobClassLoader(conf,
new ExceptionAction<Boolean>() {
public Boolean call(Configuration conf) throws IOException {
return committer.isCommitJobRepeatable(jobContext);
}
});
}
return isRepeatable;
}
private JobContext getJobContextFromConf(Configuration conf) {
if (newApiCommitter) {
return new JobContextImpl(conf, TypeConverter.fromYarn(getJobId()));
} else {
return new org.apache.hadoop.mapred.JobContextImpl(
new JobConf(conf), TypeConverter.fromYarn(getJobId()));
}
}
private void cleanupInterruptedCommit(Configuration conf,
FileSystem fs, Path startCommitFile) throws IOException {
LOG.info("Delete startJobCommitFile in case commit is not finished as " +
"successful or failed.");
fs.delete(startCommitFile, false);
}
private OutputCommitter createOutputCommitter(Configuration conf) { private OutputCommitter createOutputCommitter(Configuration conf) {
return callWithJobClassLoader(conf, new Action<OutputCommitter>() { return callWithJobClassLoader(conf, new Action<OutputCommitter>() {
public OutputCommitter call(Configuration conf) { public OutputCommitter call(Configuration conf) {
@ -1193,14 +1233,7 @@ public class MRAppMaster extends CompositeService {
boolean isSupported = false; boolean isSupported = false;
Configuration conf = getConfig(); Configuration conf = getConfig();
if (committer != null) { if (committer != null) {
final JobContext _jobContext; final JobContext _jobContext = getJobContextFromConf(conf);
if (newApiCommitter) {
_jobContext = new JobContextImpl(
conf, TypeConverter.fromYarn(getJobId()));
} else {
_jobContext = new org.apache.hadoop.mapred.JobContextImpl(
new JobConf(conf), TypeConverter.fromYarn(getJobId()));
}
isSupported = callWithJobClassLoader(conf, isSupported = callWithJobClassLoader(conf,
new ExceptionAction<Boolean>() { new ExceptionAction<Boolean>() {
public Boolean call(Configuration conf) throws IOException { public Boolean call(Configuration conf) throws IOException {

View File

@ -261,27 +261,38 @@ public class CommitterEventHandler extends AbstractService
} }
} }
private void touchz(Path p) throws IOException { // If job commit is repeatable, then we should allow
fs.create(p, false).close(); // startCommitFile/endCommitSuccessFile/endCommitFailureFile to be written
// by other AM before.
private void touchz(Path p, boolean overwrite) throws IOException {
fs.create(p, overwrite).close();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected void handleJobCommit(CommitterJobCommitEvent event) { protected void handleJobCommit(CommitterJobCommitEvent event) {
boolean commitJobIsRepeatable = false;
try { try {
touchz(startCommitFile); commitJobIsRepeatable = committer.isCommitJobRepeatable(
event.getJobContext());
} catch (IOException e) {
LOG.warn("Exception in committer.isCommitJobRepeatable():", e);
}
try {
touchz(startCommitFile, commitJobIsRepeatable);
jobCommitStarted(); jobCommitStarted();
waitForValidCommitWindow(); waitForValidCommitWindow();
committer.commitJob(event.getJobContext()); committer.commitJob(event.getJobContext());
touchz(endCommitSuccessFile); touchz(endCommitSuccessFile, commitJobIsRepeatable);
context.getEventHandler().handle( context.getEventHandler().handle(
new JobCommitCompletedEvent(event.getJobID())); new JobCommitCompletedEvent(event.getJobID()));
} catch (Exception e) { } catch (Exception e) {
LOG.error("Could not commit job", e);
try { try {
touchz(endCommitFailureFile); touchz(endCommitFailureFile, commitJobIsRepeatable);
} catch (Exception e2) { } catch (Exception e2) {
LOG.error("could not create failure file.", e2); LOG.error("could not create failure file.", e2);
} }
LOG.error("Could not commit job", e);
context.getEventHandler().handle( context.getEventHandler().handle(
new JobCommitFailedEvent(event.getJobID(), new JobCommitFailedEvent(event.getJobID(),
StringUtils.stringifyException(e))); StringUtils.stringifyException(e)));

View File

@ -182,13 +182,18 @@ public class FileOutputCommitter extends OutputCommitter {
throws IOException { throws IOException {
return getWrapped(context).needsTaskCommit(context, getTaskAttemptPath(context)); return getWrapped(context).needsTaskCommit(context, getTaskAttemptPath(context));
} }
@Override @Override
@Deprecated @Deprecated
public boolean isRecoverySupported() { public boolean isRecoverySupported() {
return true; return true;
} }
@Override
public boolean isCommitJobRepeatable(JobContext context) throws IOException {
return getWrapped(context).isCommitJobRepeatable(context);
}
@Override @Override
public boolean isRecoverySupported(JobContext context) throws IOException { public boolean isRecoverySupported(JobContext context) throws IOException {
return getWrapped(context).isRecoverySupported(context); return getWrapped(context).isRecoverySupported(context);

View File

@ -192,7 +192,7 @@ public abstract class OutputCommitter
* *
* If task output recovery is supported, job restart can be done more * If task output recovery is supported, job restart can be done more
* efficiently. * efficiently.
* *
* @param jobContext * @param jobContext
* Context of the job whose output is being written. * Context of the job whose output is being written.
* @return <code>true</code> if task output recovery is supported, * @return <code>true</code> if task output recovery is supported,
@ -204,6 +204,38 @@ public abstract class OutputCommitter
return isRecoverySupported(); return isRecoverySupported();
} }
/**
* Returns true if an in-progress job commit can be retried. If the MR AM is
* re-run then it will check this value to determine if it can retry an
* in-progress commit that was started by a previous version.
* Note that in rare scenarios, the previous AM version might still be running
* at that time, due to system anomalies. Hence if this method returns true
* then the retry commit operation should be able to run concurrently with
* the previous operation.
*
* If repeatable job commit is supported, job restart can tolerate previous
* AM failures during job commit.
*
* By default, it is not supported. Extended classes (like:
* FileOutputCommitter) should explicitly override it if provide support.
*
* @param jobContext
* Context of the job whose output is being written.
* @return <code>true</code> repeatable job commit is supported,
* <code>false</code> otherwise
* @throws IOException
*/
public boolean isCommitJobRepeatable(JobContext jobContext) throws
IOException {
return false;
}
@Override
public boolean isCommitJobRepeatable(org.apache.hadoop.mapreduce.JobContext
jobContext) throws IOException {
return isCommitJobRepeatable((JobContext) jobContext);
}
/** /**
* Recover the task output. * Recover the task output.
* *

View File

@ -189,6 +189,32 @@ public abstract class OutputCommitter {
return false; return false;
} }
/**
* Returns true if an in-progress job commit can be retried. If the MR AM is
* re-run then it will check this value to determine if it can retry an
* in-progress commit that was started by a previous version.
* Note that in rare scenarios, the previous AM version might still be running
* at that time, due to system anomalies. Hence if this method returns true
* then the retry commit operation should be able to run concurrently with
* the previous operation.
*
* If repeatable job commit is supported, job restart can tolerate previous
* AM failures during job commit.
*
* By default, it is not supported. Extended classes (like:
* FileOutputCommitter) should explicitly override it if provide support.
*
* @param jobContext
* Context of the job whose output is being written.
* @return <code>true</code> repeatable job commit is supported,
* <code>false</code> otherwise
* @throws IOException
*/
public boolean isCommitJobRepeatable(JobContext jobContext)
throws IOException {
return false;
}
/** /**
* Is task output recovery supported for restarting jobs? * Is task output recovery supported for restarting jobs?
* *

View File

@ -38,6 +38,8 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import com.google.common.annotations.VisibleForTesting;
/** An {@link OutputCommitter} that commits files specified /** An {@link OutputCommitter} that commits files specified
* in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
**/ **/
@ -76,6 +78,13 @@ public class FileOutputCommitter extends OutputCommitter {
public static final boolean public static final boolean
FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT = false; FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT = false;
// Number of attempts when failure happens in commit job
public static final String FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS =
"mapreduce.fileoutputcommitter.failures.attempts";
// default value to be 1 to keep consistent with previous behavior
public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1;
private Path outputPath = null; private Path outputPath = null;
private Path workPath = null; private Path workPath = null;
private final int algorithmVersion; private final int algorithmVersion;
@ -340,12 +349,40 @@ public class FileOutputCommitter extends OutputCommitter {
} }
/** /**
* The job has completed so move all committed tasks to the final output dir. * The job has completed, so do works in commitJobInternal().
* Could retry on failure if using algorithm 2.
* @param context the job's context
*/
public void commitJob(JobContext context) throws IOException {
int maxAttemptsOnFailure = isCommitJobRepeatable(context) ?
context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT) : 1;
int attempt = 0;
boolean jobCommitNotFinished = true;
while (jobCommitNotFinished) {
try {
commitJobInternal(context);
jobCommitNotFinished = false;
} catch (Exception e) {
if (++attempt >= maxAttemptsOnFailure) {
throw e;
} else {
LOG.warn("Exception get thrown in job commit, retry (" + attempt +
") time.", e);
}
}
}
}
/**
* The job has completed, so do following commit job, include:
* Move all committed tasks to the final output dir (algorithm 1 only).
* Delete the temporary directory, including all of the work directories. * Delete the temporary directory, including all of the work directories.
* Create a _SUCCESS file to make it as successful. * Create a _SUCCESS file to make it as successful.
* @param context the job's context * @param context the job's context
*/ */
public void commitJob(JobContext context) throws IOException { @VisibleForTesting
protected void commitJobInternal(JobContext context) throws IOException {
if (hasOutputPath()) { if (hasOutputPath()) {
Path finalOutput = getOutputPath(); Path finalOutput = getOutputPath();
FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
@ -377,9 +414,17 @@ public class FileOutputCommitter extends OutputCommitter {
} }
// True if the job requires output.dir marked on successful job. // True if the job requires output.dir marked on successful job.
// Note that by default it is set to true. // Note that by default it is set to true.
if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { if (context.getConfiguration().getBoolean(
SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME); Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
fs.create(markerPath).close(); // If job commit is repeatable and previous/another AM could write
// mark file already, we need to set overwritten to be true explicitly
// in case other FS implementations don't overwritten by default.
if (isCommitJobRepeatable(context)) {
fs.create(markerPath, true).close();
} else {
fs.create(markerPath).close();
}
} }
} else { } else {
LOG.warn("Output Path is null in commitJob()"); LOG.warn("Output Path is null in commitJob()");
@ -458,7 +503,16 @@ public class FileOutputCommitter extends OutputCommitter {
Path pendingJobAttemptsPath = getPendingJobAttemptsPath(); Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
FileSystem fs = pendingJobAttemptsPath FileSystem fs = pendingJobAttemptsPath
.getFileSystem(context.getConfiguration()); .getFileSystem(context.getConfiguration());
fs.delete(pendingJobAttemptsPath, true); // if job allow repeatable commit and pendingJobAttemptsPath could be
// deleted by previous AM, we should tolerate FileNotFoundException in
// this case.
try {
fs.delete(pendingJobAttemptsPath, true);
} catch (FileNotFoundException e) {
if (!isCommitJobRepeatable(context)) {
throw e;
}
}
} else { } else {
LOG.warn("Output Path is null in cleanupJob()"); LOG.warn("Output Path is null in cleanupJob()");
} }
@ -594,7 +648,12 @@ public class FileOutputCommitter extends OutputCommitter {
public boolean isRecoverySupported() { public boolean isRecoverySupported() {
return true; return true;
} }
@Override
public boolean isCommitJobRepeatable(JobContext context) throws IOException {
return algorithmVersion == 2;
}
@Override @Override
public void recoverTask(TaskAttemptContext context) public void recoverTask(TaskAttemptContext context)
throws IOException { throws IOException {

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.junit.Assert;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -202,6 +203,112 @@ public class TestFileOutputCommitter extends TestCase {
assert(dataFileFound && indexFileFound); assert(dataFileFound && indexFileFound);
} }
public void testCommitterWithFailureV1() throws Exception {
testCommitterWithFailureInternal(1, 1);
testCommitterWithFailureInternal(1, 2);
}
public void testCommitterWithFailureV2() throws Exception {
testCommitterWithFailureInternal(2, 1);
testCommitterWithFailureInternal(2, 2);
}
private void testCommitterWithFailureInternal(int version, int maxAttempts) throws Exception {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.
FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS, maxAttempts);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new CommitterWithFailedThenSucceed();
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter =
theOutputFormat.getRecordWriter(null, conf, partFile, null);
writeOutput(theRecordWriter, tContext);
// do commit
if(committer.needsTaskCommit(tContext)) {
committer.commitTask(tContext);
}
try {
committer.commitJob(jContext);
// (1,1), (1,2), (2,1) shouldn't reach to here.
if (version == 1 || maxAttempts <= 1) {
Assert.fail("Commit successful: wrong behavior for version 1.");
}
} catch (IOException e) {
// (2,2) shouldn't reach to here.
if (version == 2 && maxAttempts > 2) {
Assert.fail("Commit failed: wrong behavior for version 2.");
}
}
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testCommitterWithDuplicatedCommitV1() throws Exception {
testCommitterWithDuplicatedCommitInternal(1);
}
public void testCommitterWithDuplicatedCommitV2() throws Exception {
testCommitterWithDuplicatedCommitInternal(2);
}
private void testCommitterWithDuplicatedCommitInternal(int version) throws
Exception {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter =
theOutputFormat.getRecordWriter(null, conf, partFile, null);
writeOutput(theRecordWriter, tContext);
// do commit
if(committer.needsTaskCommit(tContext)) {
committer.commitTask(tContext);
}
committer.commitJob(jContext);
// validate output
validateContent(outDir);
// commit again
try {
committer.commitJob(jContext);
if (version == 1) {
Assert.fail("Duplicate commit successful: wrong behavior " +
"for version 1.");
}
} catch (IOException e) {
if (version == 2) {
Assert.fail("Duplicate commit failed: wrong behavior for version 2.");
}
}
FileUtil.fullyDelete(new File(outDir.toString()));
}
private void testCommitterInternal(int version) throws Exception { private void testCommitterInternal(int version) throws Exception {
JobConf conf = new JobConf(); JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir); FileOutputFormat.setOutputPath(conf, outDir);
@ -451,4 +558,48 @@ public class TestFileOutputCommitter extends TestCase {
return contents; return contents;
} }
/**
* The class provides a overrided implementation of commitJobInternal which
* causes the commit failed for the first time then succeed.
*/
public static class CommitterWithFailedThenSucceed extends
FileOutputCommitter {
boolean firstTimeFail = true;
public CommitterWithFailedThenSucceed() throws IOException {
super();
}
@Override
public void commitJob(JobContext context) throws IOException {
JobConf conf = context.getJobConf();
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter wrapped =
new CommitterFailedFirst(FileOutputFormat.getOutputPath(conf),
context);
wrapped.commitJob(context);
}
}
public static class CommitterFailedFirst extends
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter {
boolean firstTimeFail = true;
public CommitterFailedFirst(Path outputPath,
JobContext context) throws IOException {
super(outputPath, context);
}
@Override
protected void commitJobInternal(org.apache.hadoop.mapreduce.JobContext
context) throws IOException {
super.commitJobInternal(context);
if (firstTimeFail) {
firstTimeFail = false;
throw new IOException();
} else {
// succeed then, nothing to do
}
}
}
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.lib.output;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -28,6 +29,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -283,6 +285,174 @@ public class TestFileOutputCommitter extends TestCase {
public void testCommitterV2() throws Exception { public void testCommitterV2() throws Exception {
testCommitterInternal(2); testCommitterInternal(2);
} }
public void testCommitterWithDuplicatedCommitV1() throws Exception {
testCommitterWithDuplicatedCommitInternal(1);
}
public void testCommitterWithDuplicatedCommitV2() throws Exception {
testCommitterWithDuplicatedCommitInternal(2);
}
private void testCommitterWithDuplicatedCommitInternal(int version) throws
Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
// validate output
validateContent(outDir);
// commit job again on a successful commit job.
try {
committer.commitJob(jContext);
if (version == 1) {
Assert.fail("Duplicate commit success: wrong behavior for version 1.");
}
} catch (IOException e) {
if (version == 2) {
Assert.fail("Duplicate commit failed: wrong behavior for version 2.");
}
}
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testCommitterWithFailureV1() throws Exception {
testCommitterWithFailureInternal(1, 1);
testCommitterWithFailureInternal(1, 2);
}
public void testCommitterWithFailureV2() throws Exception {
testCommitterWithFailureInternal(2, 1);
testCommitterWithFailureInternal(2, 2);
}
private void testCommitterWithFailureInternal(int version, int maxAttempts)
throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
maxAttempts);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new CommitterWithFailedThenSucceed(outDir,
tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
try {
committer.commitJob(jContext);
// (1,1), (1,2), (2,1) shouldn't reach to here.
if (version == 1 || maxAttempts <= 1) {
Assert.fail("Commit successful: wrong behavior for version 1.");
}
} catch (IOException e) {
// (2,2) shouldn't reach to here.
if (version == 2 && maxAttempts > 2) {
Assert.fail("Commit failed: wrong behavior for version 2.");
}
}
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testCommitterRepeatableV1() throws Exception {
testCommitterRetryInternal(1);
}
public void testCommitterRepeatableV2() throws Exception {
testCommitterRetryInternal(2);
}
// retry committer for 2 times.
private void testCommitterRetryInternal(int version)
throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
// only attempt for 1 time.
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
1);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new CommitterWithFailedThenSucceed(outDir,
tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
try {
committer.commitJob(jContext);
Assert.fail("Commit successful: wrong behavior for the first time " +
"commit.");
} catch (IOException e) {
// commit again.
try {
committer.commitJob(jContext);
// version 1 shouldn't reach to here.
if (version == 1) {
Assert.fail("Commit successful after retry: wrong behavior for " +
"version 1.");
}
} catch (FileNotFoundException ex) {
if (version == 2) {
Assert.fail("Commit failed after retry: wrong behavior for" +
" version 2.");
}
assertTrue(ex.getMessage().contains(committer.getJobAttemptPath(
jContext).toString() + " does not exist"));
}
}
FileUtil.fullyDelete(new File(outDir.toString()));
}
private void testMapFileOutputCommitterInternal(int version) private void testMapFileOutputCommitterInternal(int version)
throws Exception { throws Exception {
@ -292,7 +462,7 @@ public class TestFileOutputCommitter extends TestCase {
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version); version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@ -584,4 +754,29 @@ public class TestFileOutputCommitter extends TestCase {
return contents; return contents;
} }
/**
* The class provides a overrided implementation of commitJobInternal which
* causes the commit failed for the first time then succeed.
*/
public static class CommitterWithFailedThenSucceed extends
FileOutputCommitter {
boolean firstTimeFail = true;
public CommitterWithFailedThenSucceed(Path outputPath,
JobContext context) throws IOException {
super(outputPath, context);
}
@Override
protected void commitJobInternal(JobContext context) throws IOException {
super.commitJobInternal(context);
if (firstTimeFail) {
firstTimeFail = false;
throw new IOException();
} else {
// succeed then, nothing to do
}
}
}
} }