From f95f416fad4a4acad2f39065ac20a1b2c0b07bbb Mon Sep 17 00:00:00 2001 From: Jian He Date: Mon, 16 Nov 2015 17:13:49 -0800 Subject: [PATCH] MAPREDUCE-5485. Allow repeating job commit by extending OutputCommitter API. Contributed by Junping Du --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 57 +++-- .../v2/app/commit/CommitterEventHandler.java | 25 ++- .../hadoop/mapred/FileOutputCommitter.java | 7 +- .../apache/hadoop/mapred/OutputCommitter.java | 34 ++- .../hadoop/mapreduce/OutputCommitter.java | 26 +++ .../lib/output/FileOutputCommitter.java | 70 ++++++- .../mapred/TestFileOutputCommitter.java | 151 ++++++++++++++ .../lib/output/TestFileOutputCommitter.java | 197 +++++++++++++++++- 9 files changed, 542 insertions(+), 28 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 3bd820f7ec9..fffcf5b621a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -14,6 +14,9 @@ Release 2.7.3 - UNRELEASED MAPREDUCE-6540. TestMRTimelineEventHandling fails (sjlee) + MAPREDUCE-5485. Allow repeating job commit by extending OutputCommitter API. + (Junping Du via jianhe) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 26caa2d6c69..4e0724d955e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -281,10 +281,12 @@ public class MRAppMaster extends CompositeService { } boolean copyHistory = false; + committer = createOutputCommitter(conf); try { String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path stagingDir = MRApps.getStagingAreaDir(conf, user); FileSystem fs = getFileSystem(conf); + boolean stagingExists = fs.exists(stagingDir); Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId); boolean commitStarted = fs.exists(startCommitFile); @@ -317,9 +319,16 @@ public class MRAppMaster extends CompositeService { shutDownMessage = "We crashed after a commit failure."; forcedState = JobStateInternal.FAILED; } else { - //The commit is still pending, commit error - shutDownMessage = "We crashed durring a commit"; - forcedState = JobStateInternal.ERROR; + if (isCommitJobRepeatable()) { + // cleanup previous half done commits if committer supports + // repeatable job commit. + errorHappenedShutDown = false; + cleanupInterruptedCommit(conf, fs, startCommitFile); + } else { + //The commit is still pending, commit error + shutDownMessage = "We crashed durring a commit"; + forcedState = JobStateInternal.ERROR; + } } } } catch (IOException e) { @@ -374,7 +383,6 @@ public class MRAppMaster extends CompositeService { addIfService(cpHist); } } else { - committer = createOutputCommitter(conf); dispatcher = createDispatcher(); addIfService(dispatcher); @@ -454,6 +462,38 @@ public class MRAppMaster extends CompositeService { return new AsyncDispatcher(); } + private boolean isCommitJobRepeatable() throws IOException { + boolean isRepeatable = false; + Configuration conf = getConfig(); + if (committer != null) { + final JobContext jobContext = getJobContextFromConf(conf); + + isRepeatable = callWithJobClassLoader(conf, + new ExceptionAction() { + public Boolean call(Configuration conf) throws IOException { + return committer.isCommitJobRepeatable(jobContext); + } + }); + } + return isRepeatable; + } + + private JobContext getJobContextFromConf(Configuration conf) { + if (newApiCommitter) { + return new JobContextImpl(conf, TypeConverter.fromYarn(getJobId())); + } else { + return new org.apache.hadoop.mapred.JobContextImpl( + new JobConf(conf), TypeConverter.fromYarn(getJobId())); + } + } + + private void cleanupInterruptedCommit(Configuration conf, + FileSystem fs, Path startCommitFile) throws IOException { + LOG.info("Delete startJobCommitFile in case commit is not finished as " + + "successful or failed."); + fs.delete(startCommitFile, false); + } + private OutputCommitter createOutputCommitter(Configuration conf) { return callWithJobClassLoader(conf, new Action() { public OutputCommitter call(Configuration conf) { @@ -1131,14 +1171,7 @@ public class MRAppMaster extends CompositeService { boolean isSupported = false; Configuration conf = getConfig(); if (committer != null) { - final JobContext _jobContext; - if (newApiCommitter) { - _jobContext = new JobContextImpl( - conf, TypeConverter.fromYarn(getJobId())); - } else { - _jobContext = new org.apache.hadoop.mapred.JobContextImpl( - new JobConf(conf), TypeConverter.fromYarn(getJobId())); - } + final JobContext _jobContext = getJobContextFromConf(conf); isSupported = callWithJobClassLoader(conf, new ExceptionAction() { public Boolean call(Configuration conf) throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java index d56c1e5aeb2..b53955fd858 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java @@ -261,27 +261,38 @@ public class CommitterEventHandler extends AbstractService } } - private void touchz(Path p) throws IOException { - fs.create(p, false).close(); + // If job commit is repeatable, then we should allow + // startCommitFile/endCommitSuccessFile/endCommitFailureFile to be written + // by other AM before. + private void touchz(Path p, boolean overwrite) throws IOException { + fs.create(p, overwrite).close(); } - + @SuppressWarnings("unchecked") protected void handleJobCommit(CommitterJobCommitEvent event) { + boolean commitJobIsRepeatable = false; try { - touchz(startCommitFile); + commitJobIsRepeatable = committer.isCommitJobRepeatable( + event.getJobContext()); + } catch (IOException e) { + LOG.warn("Exception in committer.isCommitJobRepeatable():", e); + } + + try { + touchz(startCommitFile, commitJobIsRepeatable); jobCommitStarted(); waitForValidCommitWindow(); committer.commitJob(event.getJobContext()); - touchz(endCommitSuccessFile); + touchz(endCommitSuccessFile, commitJobIsRepeatable); context.getEventHandler().handle( new JobCommitCompletedEvent(event.getJobID())); } catch (Exception e) { + LOG.error("Could not commit job", e); try { - touchz(endCommitFailureFile); + touchz(endCommitFailureFile, commitJobIsRepeatable); } catch (Exception e2) { LOG.error("could not create failure file.", e2); } - LOG.error("Could not commit job", e); context.getEventHandler().handle( new JobCommitFailedEvent(event.getJobID(), StringUtils.stringifyException(e))); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java index 77d06b6b972..c44bb37f5cf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java @@ -182,13 +182,18 @@ public class FileOutputCommitter extends OutputCommitter { throws IOException { return getWrapped(context).needsTaskCommit(context, getTaskAttemptPath(context)); } - + @Override @Deprecated public boolean isRecoverySupported() { return true; } + @Override + public boolean isCommitJobRepeatable(JobContext context) throws IOException { + return getWrapped(context).isCommitJobRepeatable(context); + } + @Override public boolean isRecoverySupported(JobContext context) throws IOException { return getWrapped(context).isRecoverySupported(context); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java index 79df7f863fa..f774456001e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java @@ -192,7 +192,7 @@ public abstract class OutputCommitter * * If task output recovery is supported, job restart can be done more * efficiently. - * + * * @param jobContext * Context of the job whose output is being written. * @return true if task output recovery is supported, @@ -204,6 +204,38 @@ public abstract class OutputCommitter return isRecoverySupported(); } + /** + * Returns true if an in-progress job commit can be retried. If the MR AM is + * re-run then it will check this value to determine if it can retry an + * in-progress commit that was started by a previous version. + * Note that in rare scenarios, the previous AM version might still be running + * at that time, due to system anomalies. Hence if this method returns true + * then the retry commit operation should be able to run concurrently with + * the previous operation. + * + * If repeatable job commit is supported, job restart can tolerate previous + * AM failures during job commit. + * + * By default, it is not supported. Extended classes (like: + * FileOutputCommitter) should explicitly override it if provide support. + * + * @param jobContext + * Context of the job whose output is being written. + * @return true repeatable job commit is supported, + * false otherwise + * @throws IOException + */ + public boolean isCommitJobRepeatable(JobContext jobContext) throws + IOException { + return false; + } + + @Override + public boolean isCommitJobRepeatable(org.apache.hadoop.mapreduce.JobContext + jobContext) throws IOException { + return isCommitJobRepeatable((JobContext) jobContext); + } + /** * Recover the task output. * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java index cb44f63327f..53af617cff6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java @@ -189,6 +189,32 @@ public abstract class OutputCommitter { return false; } + /** + * Returns true if an in-progress job commit can be retried. If the MR AM is + * re-run then it will check this value to determine if it can retry an + * in-progress commit that was started by a previous version. + * Note that in rare scenarios, the previous AM version might still be running + * at that time, due to system anomalies. Hence if this method returns true + * then the retry commit operation should be able to run concurrently with + * the previous operation. + * + * If repeatable job commit is supported, job restart can tolerate previous + * AM failures during job commit. + * + * By default, it is not supported. Extended classes (like: + * FileOutputCommitter) should explicitly override it if provide support. + * + * @param jobContext + * Context of the job whose output is being written. + * @return true repeatable job commit is supported, + * false otherwise + * @throws IOException + */ + public boolean isCommitJobRepeatable(JobContext jobContext) + throws IOException { + return false; + } + /** * Is task output recovery supported for restarting jobs? * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 6e5d0a1e627..a9305dd2dc9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -38,6 +38,8 @@ import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; +import com.google.common.annotations.VisibleForTesting; + /** An {@link OutputCommitter} that commits files specified * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. **/ @@ -64,6 +66,12 @@ public class FileOutputCommitter extends OutputCommitter { public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version"; public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1; + // Number of attempts when failure happens in commit job + public static final String FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS = + "mapreduce.fileoutputcommitter.failures.attempts"; + + // default value to be 1 to keep consistent with previous behavior + public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1; private Path outputPath = null; private Path workPath = null; private final int algorithmVersion; @@ -311,12 +319,40 @@ public class FileOutputCommitter extends OutputCommitter { } /** - * The job has completed so move all committed tasks to the final output dir. + * The job has completed, so do works in commitJobInternal(). + * Could retry on failure if using algorithm 2. + * @param context the job's context + */ + public void commitJob(JobContext context) throws IOException { + int maxAttemptsOnFailure = isCommitJobRepeatable(context) ? + context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS, + FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT) : 1; + int attempt = 0; + boolean jobCommitNotFinished = true; + while (jobCommitNotFinished) { + try { + commitJobInternal(context); + jobCommitNotFinished = false; + } catch (Exception e) { + if (++attempt >= maxAttemptsOnFailure) { + throw e; + } else { + LOG.warn("Exception get thrown in job commit, retry (" + attempt + + ") time.", e); + } + } + } + } + + /** + * The job has completed, so do following commit job, include: + * Move all committed tasks to the final output dir (algorithm 1 only). * Delete the temporary directory, including all of the work directories. * Create a _SUCCESS file to make it as successful. * @param context the job's context */ - public void commitJob(JobContext context) throws IOException { + @VisibleForTesting + protected void commitJobInternal(JobContext context) throws IOException { if (hasOutputPath()) { Path finalOutput = getOutputPath(); FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); @@ -331,9 +367,17 @@ public class FileOutputCommitter extends OutputCommitter { cleanupJob(context); // True if the job requires output.dir marked on successful job. // Note that by default it is set to true. - if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { + if (context.getConfiguration().getBoolean( + SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME); - fs.create(markerPath).close(); + // If job commit is repeatable and previous/another AM could write + // mark file already, we need to set overwritten to be true explicitly + // in case other FS implementations don't overwritten by default. + if (isCommitJobRepeatable(context)) { + fs.create(markerPath, true).close(); + } else { + fs.create(markerPath).close(); + } } } else { LOG.warn("Output Path is null in commitJob()"); @@ -412,7 +456,16 @@ public class FileOutputCommitter extends OutputCommitter { Path pendingJobAttemptsPath = getPendingJobAttemptsPath(); FileSystem fs = pendingJobAttemptsPath .getFileSystem(context.getConfiguration()); - fs.delete(pendingJobAttemptsPath, true); + // if job allow repeatable commit and pendingJobAttemptsPath could be + // deleted by previous AM, we should tolerate FileNotFoundException in + // this case. + try { + fs.delete(pendingJobAttemptsPath, true); + } catch (FileNotFoundException e) { + if (!isCommitJobRepeatable(context)) { + throw e; + } + } } else { LOG.warn("Output Path is null in cleanupJob()"); } @@ -548,7 +601,12 @@ public class FileOutputCommitter extends OutputCommitter { public boolean isRecoverySupported() { return true; } - + + @Override + public boolean isCommitJobRepeatable(JobContext context) throws IOException { + return algorithmVersion == 2; + } + @Override public void recoverTask(TaskAttemptContext context) throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java index 3207a71efe5..e15f7ab2bc4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.URI; import junit.framework.TestCase; +import org.junit.Assert; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -202,6 +203,112 @@ public class TestFileOutputCommitter extends TestCase { assert(dataFileFound && indexFileFound); } + public void testCommitterWithFailureV1() throws Exception { + testCommitterWithFailureInternal(1, 1); + testCommitterWithFailureInternal(1, 2); + } + + public void testCommitterWithFailureV2() throws Exception { + testCommitterWithFailureInternal(2, 1); + testCommitterWithFailureInternal(2, 2); + } + + private void testCommitterWithFailureInternal(int version, int maxAttempts) throws Exception { + JobConf conf = new JobConf(); + FileOutputFormat.setOutputPath(conf, outDir); + conf.set(JobContext.TASK_ATTEMPT_ID, attempt); + conf.setInt(org.apache.hadoop.mapreduce.lib.output. + FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); + conf.setInt(org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter. + FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS, maxAttempts); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new CommitterWithFailedThenSucceed(); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = + theOutputFormat.getRecordWriter(null, conf, partFile, null); + writeOutput(theRecordWriter, tContext); + + // do commit + if(committer.needsTaskCommit(tContext)) { + committer.commitTask(tContext); + } + + try { + committer.commitJob(jContext); + // (1,1), (1,2), (2,1) shouldn't reach to here. + if (version == 1 || maxAttempts <= 1) { + Assert.fail("Commit successful: wrong behavior for version 1."); + } + } catch (IOException e) { + // (2,2) shouldn't reach to here. + if (version == 2 && maxAttempts > 2) { + Assert.fail("Commit failed: wrong behavior for version 2."); + } + } + + FileUtil.fullyDelete(new File(outDir.toString())); + } + + public void testCommitterWithDuplicatedCommitV1() throws Exception { + testCommitterWithDuplicatedCommitInternal(1); + } + + public void testCommitterWithDuplicatedCommitV2() throws Exception { + testCommitterWithDuplicatedCommitInternal(2); + } + + private void testCommitterWithDuplicatedCommitInternal(int version) throws + Exception { + JobConf conf = new JobConf(); + FileOutputFormat.setOutputPath(conf, outDir); + conf.set(JobContext.TASK_ATTEMPT_ID, attempt); + conf.setInt(org.apache.hadoop.mapreduce.lib.output. + FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = + theOutputFormat.getRecordWriter(null, conf, partFile, null); + writeOutput(theRecordWriter, tContext); + + // do commit + if(committer.needsTaskCommit(tContext)) { + committer.commitTask(tContext); + } + committer.commitJob(jContext); + + // validate output + validateContent(outDir); + + // commit again + try { + committer.commitJob(jContext); + if (version == 1) { + Assert.fail("Duplicate commit successful: wrong behavior " + + "for version 1."); + } + } catch (IOException e) { + if (version == 2) { + Assert.fail("Duplicate commit failed: wrong behavior for version 2."); + } + } + FileUtil.fullyDelete(new File(outDir.toString())); + } + private void testCommitterInternal(int version) throws Exception { JobConf conf = new JobConf(); FileOutputFormat.setOutputPath(conf, outDir); @@ -451,4 +558,48 @@ public class TestFileOutputCommitter extends TestCase { return contents; } + /** + * The class provides a overrided implementation of commitJobInternal which + * causes the commit failed for the first time then succeed. + */ + public static class CommitterWithFailedThenSucceed extends + FileOutputCommitter { + boolean firstTimeFail = true; + + public CommitterWithFailedThenSucceed() throws IOException { + super(); + } + + @Override + public void commitJob(JobContext context) throws IOException { + JobConf conf = context.getJobConf(); + org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter wrapped = + new CommitterFailedFirst(FileOutputFormat.getOutputPath(conf), + context); + wrapped.commitJob(context); + } + } + + public static class CommitterFailedFirst extends + org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter { + boolean firstTimeFail = true; + + public CommitterFailedFirst(Path outputPath, + JobContext context) throws IOException { + super(outputPath, context); + } + + @Override + protected void commitJobInternal(org.apache.hadoop.mapreduce.JobContext + context) throws IOException { + super.commitJobInternal(context); + if (firstTimeFail) { + firstTimeFail = false; + throw new IOException(); + } else { + // succeed then, nothing to do + } + } + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java index 0d4ab98693d..bf77e5593fa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.lib.output; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.concurrent.Callable; @@ -28,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import junit.framework.TestCase; +import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -284,6 +286,174 @@ public class TestFileOutputCommitter extends TestCase { testCommitterInternal(2); } + public void testCommitterWithDuplicatedCommitV1() throws Exception { + testCommitterWithDuplicatedCommitInternal(1); + } + + public void testCommitterWithDuplicatedCommitV2() throws Exception { + testCommitterWithDuplicatedCommitInternal(2); + } + + private void testCommitterWithDuplicatedCommitInternal(int version) throws + Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, + version); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); + writeOutput(theRecordWriter, tContext); + + // do commit + committer.commitTask(tContext); + committer.commitJob(jContext); + + // validate output + validateContent(outDir); + + // commit job again on a successful commit job. + try { + committer.commitJob(jContext); + if (version == 1) { + Assert.fail("Duplicate commit success: wrong behavior for version 1."); + } + } catch (IOException e) { + if (version == 2) { + Assert.fail("Duplicate commit failed: wrong behavior for version 2."); + } + } + FileUtil.fullyDelete(new File(outDir.toString())); + } + + public void testCommitterWithFailureV1() throws Exception { + testCommitterWithFailureInternal(1, 1); + testCommitterWithFailureInternal(1, 2); + } + + public void testCommitterWithFailureV2() throws Exception { + testCommitterWithFailureInternal(2, 1); + testCommitterWithFailureInternal(2, 2); + } + + private void testCommitterWithFailureInternal(int version, int maxAttempts) + throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, + version); + conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS, + maxAttempts); + + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new CommitterWithFailedThenSucceed(outDir, + tContext); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); + writeOutput(theRecordWriter, tContext); + + // do commit + committer.commitTask(tContext); + + try { + committer.commitJob(jContext); + // (1,1), (1,2), (2,1) shouldn't reach to here. + if (version == 1 || maxAttempts <= 1) { + Assert.fail("Commit successful: wrong behavior for version 1."); + } + } catch (IOException e) { + // (2,2) shouldn't reach to here. + if (version == 2 && maxAttempts > 2) { + Assert.fail("Commit failed: wrong behavior for version 2."); + } + } + + FileUtil.fullyDelete(new File(outDir.toString())); + } + + public void testCommitterRepeatableV1() throws Exception { + testCommitterRetryInternal(1); + } + + public void testCommitterRepeatableV2() throws Exception { + testCommitterRetryInternal(2); + } + + // retry committer for 2 times. + private void testCommitterRetryInternal(int version) + throws Exception { + Job job = Job.getInstance(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); + conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, + version); + // only attempt for 1 time. + conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS, + 1); + + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new CommitterWithFailedThenSucceed(outDir, + tContext); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + // write output + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); + writeOutput(theRecordWriter, tContext); + + // do commit + committer.commitTask(tContext); + + try { + committer.commitJob(jContext); + Assert.fail("Commit successful: wrong behavior for the first time " + + "commit."); + } catch (IOException e) { + // commit again. + try { + committer.commitJob(jContext); + // version 1 shouldn't reach to here. + if (version == 1) { + Assert.fail("Commit successful after retry: wrong behavior for " + + "version 1."); + } + } catch (FileNotFoundException ex) { + if (version == 2) { + Assert.fail("Commit failed after retry: wrong behavior for" + + " version 2."); + } + assertTrue(ex.getMessage().contains(committer.getJobAttemptPath( + jContext).toString() + " does not exist")); + } + } + + FileUtil.fullyDelete(new File(outDir.toString())); + } + private void testMapFileOutputCommitterInternal(int version) throws Exception { Job job = Job.getInstance(); @@ -292,7 +462,7 @@ public class TestFileOutputCommitter extends TestCase { conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); - JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); @@ -575,4 +745,29 @@ public class TestFileOutputCommitter extends TestCase { return contents; } + /** + * The class provides a overrided implementation of commitJobInternal which + * causes the commit failed for the first time then succeed. + */ + public static class CommitterWithFailedThenSucceed extends + FileOutputCommitter { + boolean firstTimeFail = true; + + public CommitterWithFailedThenSucceed(Path outputPath, + JobContext context) throws IOException { + super(outputPath, context); + } + + @Override + protected void commitJobInternal(JobContext context) throws IOException { + super.commitJobInternal(context); + if (firstTimeFail) { + firstTimeFail = false; + throw new IOException(); + } else { + // succeed then, nothing to do + } + } + } + }