MAPREDUCE-5485. Allow repeating job commit by extending OutputCommitter API. Contributed by Junping Du
This commit is contained in:
parent
e1716c7cf9
commit
f95f416fad
|
@ -14,6 +14,9 @@ Release 2.7.3 - UNRELEASED
|
|||
|
||||
MAPREDUCE-6540. TestMRTimelineEventHandling fails (sjlee)
|
||||
|
||||
MAPREDUCE-5485. Allow repeating job commit by extending OutputCommitter API.
|
||||
(Junping Du via jianhe)
|
||||
|
||||
Release 2.7.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -281,10 +281,12 @@ public class MRAppMaster extends CompositeService {
|
|||
}
|
||||
|
||||
boolean copyHistory = false;
|
||||
committer = createOutputCommitter(conf);
|
||||
try {
|
||||
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
|
||||
FileSystem fs = getFileSystem(conf);
|
||||
|
||||
boolean stagingExists = fs.exists(stagingDir);
|
||||
Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
|
||||
boolean commitStarted = fs.exists(startCommitFile);
|
||||
|
@ -316,12 +318,19 @@ public class MRAppMaster extends CompositeService {
|
|||
} else if (commitFailure) {
|
||||
shutDownMessage = "We crashed after a commit failure.";
|
||||
forcedState = JobStateInternal.FAILED;
|
||||
} else {
|
||||
if (isCommitJobRepeatable()) {
|
||||
// cleanup previous half done commits if committer supports
|
||||
// repeatable job commit.
|
||||
errorHappenedShutDown = false;
|
||||
cleanupInterruptedCommit(conf, fs, startCommitFile);
|
||||
} else {
|
||||
//The commit is still pending, commit error
|
||||
shutDownMessage = "We crashed durring a commit";
|
||||
forcedState = JobStateInternal.ERROR;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new YarnRuntimeException("Error while initializing", e);
|
||||
}
|
||||
|
@ -374,7 +383,6 @@ public class MRAppMaster extends CompositeService {
|
|||
addIfService(cpHist);
|
||||
}
|
||||
} else {
|
||||
committer = createOutputCommitter(conf);
|
||||
|
||||
dispatcher = createDispatcher();
|
||||
addIfService(dispatcher);
|
||||
|
@ -454,6 +462,38 @@ public class MRAppMaster extends CompositeService {
|
|||
return new AsyncDispatcher();
|
||||
}
|
||||
|
||||
private boolean isCommitJobRepeatable() throws IOException {
|
||||
boolean isRepeatable = false;
|
||||
Configuration conf = getConfig();
|
||||
if (committer != null) {
|
||||
final JobContext jobContext = getJobContextFromConf(conf);
|
||||
|
||||
isRepeatable = callWithJobClassLoader(conf,
|
||||
new ExceptionAction<Boolean>() {
|
||||
public Boolean call(Configuration conf) throws IOException {
|
||||
return committer.isCommitJobRepeatable(jobContext);
|
||||
}
|
||||
});
|
||||
}
|
||||
return isRepeatable;
|
||||
}
|
||||
|
||||
private JobContext getJobContextFromConf(Configuration conf) {
|
||||
if (newApiCommitter) {
|
||||
return new JobContextImpl(conf, TypeConverter.fromYarn(getJobId()));
|
||||
} else {
|
||||
return new org.apache.hadoop.mapred.JobContextImpl(
|
||||
new JobConf(conf), TypeConverter.fromYarn(getJobId()));
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupInterruptedCommit(Configuration conf,
|
||||
FileSystem fs, Path startCommitFile) throws IOException {
|
||||
LOG.info("Delete startJobCommitFile in case commit is not finished as " +
|
||||
"successful or failed.");
|
||||
fs.delete(startCommitFile, false);
|
||||
}
|
||||
|
||||
private OutputCommitter createOutputCommitter(Configuration conf) {
|
||||
return callWithJobClassLoader(conf, new Action<OutputCommitter>() {
|
||||
public OutputCommitter call(Configuration conf) {
|
||||
|
@ -1131,14 +1171,7 @@ public class MRAppMaster extends CompositeService {
|
|||
boolean isSupported = false;
|
||||
Configuration conf = getConfig();
|
||||
if (committer != null) {
|
||||
final JobContext _jobContext;
|
||||
if (newApiCommitter) {
|
||||
_jobContext = new JobContextImpl(
|
||||
conf, TypeConverter.fromYarn(getJobId()));
|
||||
} else {
|
||||
_jobContext = new org.apache.hadoop.mapred.JobContextImpl(
|
||||
new JobConf(conf), TypeConverter.fromYarn(getJobId()));
|
||||
}
|
||||
final JobContext _jobContext = getJobContextFromConf(conf);
|
||||
isSupported = callWithJobClassLoader(conf,
|
||||
new ExceptionAction<Boolean>() {
|
||||
public Boolean call(Configuration conf) throws IOException {
|
||||
|
|
|
@ -261,27 +261,38 @@ public class CommitterEventHandler extends AbstractService
|
|||
}
|
||||
}
|
||||
|
||||
private void touchz(Path p) throws IOException {
|
||||
fs.create(p, false).close();
|
||||
// If job commit is repeatable, then we should allow
|
||||
// startCommitFile/endCommitSuccessFile/endCommitFailureFile to be written
|
||||
// by other AM before.
|
||||
private void touchz(Path p, boolean overwrite) throws IOException {
|
||||
fs.create(p, overwrite).close();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void handleJobCommit(CommitterJobCommitEvent event) {
|
||||
boolean commitJobIsRepeatable = false;
|
||||
try {
|
||||
touchz(startCommitFile);
|
||||
commitJobIsRepeatable = committer.isCommitJobRepeatable(
|
||||
event.getJobContext());
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Exception in committer.isCommitJobRepeatable():", e);
|
||||
}
|
||||
|
||||
try {
|
||||
touchz(startCommitFile, commitJobIsRepeatable);
|
||||
jobCommitStarted();
|
||||
waitForValidCommitWindow();
|
||||
committer.commitJob(event.getJobContext());
|
||||
touchz(endCommitSuccessFile);
|
||||
touchz(endCommitSuccessFile, commitJobIsRepeatable);
|
||||
context.getEventHandler().handle(
|
||||
new JobCommitCompletedEvent(event.getJobID()));
|
||||
} catch (Exception e) {
|
||||
LOG.error("Could not commit job", e);
|
||||
try {
|
||||
touchz(endCommitFailureFile);
|
||||
touchz(endCommitFailureFile, commitJobIsRepeatable);
|
||||
} catch (Exception e2) {
|
||||
LOG.error("could not create failure file.", e2);
|
||||
}
|
||||
LOG.error("Could not commit job", e);
|
||||
context.getEventHandler().handle(
|
||||
new JobCommitFailedEvent(event.getJobID(),
|
||||
StringUtils.stringifyException(e)));
|
||||
|
|
|
@ -189,6 +189,11 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCommitJobRepeatable(JobContext context) throws IOException {
|
||||
return getWrapped(context).isCommitJobRepeatable(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRecoverySupported(JobContext context) throws IOException {
|
||||
return getWrapped(context).isRecoverySupported(context);
|
||||
|
|
|
@ -204,6 +204,38 @@ public abstract class OutputCommitter
|
|||
return isRecoverySupported();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if an in-progress job commit can be retried. If the MR AM is
|
||||
* re-run then it will check this value to determine if it can retry an
|
||||
* in-progress commit that was started by a previous version.
|
||||
* Note that in rare scenarios, the previous AM version might still be running
|
||||
* at that time, due to system anomalies. Hence if this method returns true
|
||||
* then the retry commit operation should be able to run concurrently with
|
||||
* the previous operation.
|
||||
*
|
||||
* If repeatable job commit is supported, job restart can tolerate previous
|
||||
* AM failures during job commit.
|
||||
*
|
||||
* By default, it is not supported. Extended classes (like:
|
||||
* FileOutputCommitter) should explicitly override it if provide support.
|
||||
*
|
||||
* @param jobContext
|
||||
* Context of the job whose output is being written.
|
||||
* @return <code>true</code> repeatable job commit is supported,
|
||||
* <code>false</code> otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean isCommitJobRepeatable(JobContext jobContext) throws
|
||||
IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCommitJobRepeatable(org.apache.hadoop.mapreduce.JobContext
|
||||
jobContext) throws IOException {
|
||||
return isCommitJobRepeatable((JobContext) jobContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Recover the task output.
|
||||
*
|
||||
|
|
|
@ -189,6 +189,32 @@ public abstract class OutputCommitter {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if an in-progress job commit can be retried. If the MR AM is
|
||||
* re-run then it will check this value to determine if it can retry an
|
||||
* in-progress commit that was started by a previous version.
|
||||
* Note that in rare scenarios, the previous AM version might still be running
|
||||
* at that time, due to system anomalies. Hence if this method returns true
|
||||
* then the retry commit operation should be able to run concurrently with
|
||||
* the previous operation.
|
||||
*
|
||||
* If repeatable job commit is supported, job restart can tolerate previous
|
||||
* AM failures during job commit.
|
||||
*
|
||||
* By default, it is not supported. Extended classes (like:
|
||||
* FileOutputCommitter) should explicitly override it if provide support.
|
||||
*
|
||||
* @param jobContext
|
||||
* Context of the job whose output is being written.
|
||||
* @return <code>true</code> repeatable job commit is supported,
|
||||
* <code>false</code> otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean isCommitJobRepeatable(JobContext jobContext)
|
||||
throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Is task output recovery supported for restarting jobs?
|
||||
*
|
||||
|
|
|
@ -38,6 +38,8 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
|
|||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/** An {@link OutputCommitter} that commits files specified
|
||||
* in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
|
||||
**/
|
||||
|
@ -64,6 +66,12 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
|
||||
"mapreduce.fileoutputcommitter.algorithm.version";
|
||||
public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
|
||||
// Number of attempts when failure happens in commit job
|
||||
public static final String FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS =
|
||||
"mapreduce.fileoutputcommitter.failures.attempts";
|
||||
|
||||
// default value to be 1 to keep consistent with previous behavior
|
||||
public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1;
|
||||
private Path outputPath = null;
|
||||
private Path workPath = null;
|
||||
private final int algorithmVersion;
|
||||
|
@ -311,12 +319,40 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
}
|
||||
|
||||
/**
|
||||
* The job has completed so move all committed tasks to the final output dir.
|
||||
* The job has completed, so do works in commitJobInternal().
|
||||
* Could retry on failure if using algorithm 2.
|
||||
* @param context the job's context
|
||||
*/
|
||||
public void commitJob(JobContext context) throws IOException {
|
||||
int maxAttemptsOnFailure = isCommitJobRepeatable(context) ?
|
||||
context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
|
||||
FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT) : 1;
|
||||
int attempt = 0;
|
||||
boolean jobCommitNotFinished = true;
|
||||
while (jobCommitNotFinished) {
|
||||
try {
|
||||
commitJobInternal(context);
|
||||
jobCommitNotFinished = false;
|
||||
} catch (Exception e) {
|
||||
if (++attempt >= maxAttemptsOnFailure) {
|
||||
throw e;
|
||||
} else {
|
||||
LOG.warn("Exception get thrown in job commit, retry (" + attempt +
|
||||
") time.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The job has completed, so do following commit job, include:
|
||||
* Move all committed tasks to the final output dir (algorithm 1 only).
|
||||
* Delete the temporary directory, including all of the work directories.
|
||||
* Create a _SUCCESS file to make it as successful.
|
||||
* @param context the job's context
|
||||
*/
|
||||
public void commitJob(JobContext context) throws IOException {
|
||||
@VisibleForTesting
|
||||
protected void commitJobInternal(JobContext context) throws IOException {
|
||||
if (hasOutputPath()) {
|
||||
Path finalOutput = getOutputPath();
|
||||
FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
|
||||
|
@ -331,10 +367,18 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
cleanupJob(context);
|
||||
// True if the job requires output.dir marked on successful job.
|
||||
// Note that by default it is set to true.
|
||||
if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
|
||||
if (context.getConfiguration().getBoolean(
|
||||
SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
|
||||
Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
|
||||
// If job commit is repeatable and previous/another AM could write
|
||||
// mark file already, we need to set overwritten to be true explicitly
|
||||
// in case other FS implementations don't overwritten by default.
|
||||
if (isCommitJobRepeatable(context)) {
|
||||
fs.create(markerPath, true).close();
|
||||
} else {
|
||||
fs.create(markerPath).close();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Output Path is null in commitJob()");
|
||||
}
|
||||
|
@ -412,7 +456,16 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
|
||||
FileSystem fs = pendingJobAttemptsPath
|
||||
.getFileSystem(context.getConfiguration());
|
||||
// if job allow repeatable commit and pendingJobAttemptsPath could be
|
||||
// deleted by previous AM, we should tolerate FileNotFoundException in
|
||||
// this case.
|
||||
try {
|
||||
fs.delete(pendingJobAttemptsPath, true);
|
||||
} catch (FileNotFoundException e) {
|
||||
if (!isCommitJobRepeatable(context)) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Output Path is null in cleanupJob()");
|
||||
}
|
||||
|
@ -549,6 +602,11 @@ public class FileOutputCommitter extends OutputCommitter {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCommitJobRepeatable(JobContext context) throws IOException {
|
||||
return algorithmVersion == 2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recoverTask(TaskAttemptContext context)
|
||||
throws IOException {
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
|||
import java.net.URI;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -202,6 +203,112 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
assert(dataFileFound && indexFileFound);
|
||||
}
|
||||
|
||||
public void testCommitterWithFailureV1() throws Exception {
|
||||
testCommitterWithFailureInternal(1, 1);
|
||||
testCommitterWithFailureInternal(1, 2);
|
||||
}
|
||||
|
||||
public void testCommitterWithFailureV2() throws Exception {
|
||||
testCommitterWithFailureInternal(2, 1);
|
||||
testCommitterWithFailureInternal(2, 2);
|
||||
}
|
||||
|
||||
private void testCommitterWithFailureInternal(int version, int maxAttempts) throws Exception {
|
||||
JobConf conf = new JobConf();
|
||||
FileOutputFormat.setOutputPath(conf, outDir);
|
||||
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
|
||||
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
|
||||
conf.setInt(org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.
|
||||
FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS, maxAttempts);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new CommitterWithFailedThenSucceed();
|
||||
|
||||
// setup
|
||||
committer.setupJob(jContext);
|
||||
committer.setupTask(tContext);
|
||||
|
||||
// write output
|
||||
TextOutputFormat theOutputFormat = new TextOutputFormat();
|
||||
RecordWriter theRecordWriter =
|
||||
theOutputFormat.getRecordWriter(null, conf, partFile, null);
|
||||
writeOutput(theRecordWriter, tContext);
|
||||
|
||||
// do commit
|
||||
if(committer.needsTaskCommit(tContext)) {
|
||||
committer.commitTask(tContext);
|
||||
}
|
||||
|
||||
try {
|
||||
committer.commitJob(jContext);
|
||||
// (1,1), (1,2), (2,1) shouldn't reach to here.
|
||||
if (version == 1 || maxAttempts <= 1) {
|
||||
Assert.fail("Commit successful: wrong behavior for version 1.");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// (2,2) shouldn't reach to here.
|
||||
if (version == 2 && maxAttempts > 2) {
|
||||
Assert.fail("Commit failed: wrong behavior for version 2.");
|
||||
}
|
||||
}
|
||||
|
||||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
public void testCommitterWithDuplicatedCommitV1() throws Exception {
|
||||
testCommitterWithDuplicatedCommitInternal(1);
|
||||
}
|
||||
|
||||
public void testCommitterWithDuplicatedCommitV2() throws Exception {
|
||||
testCommitterWithDuplicatedCommitInternal(2);
|
||||
}
|
||||
|
||||
private void testCommitterWithDuplicatedCommitInternal(int version) throws
|
||||
Exception {
|
||||
JobConf conf = new JobConf();
|
||||
FileOutputFormat.setOutputPath(conf, outDir);
|
||||
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(org.apache.hadoop.mapreduce.lib.output.
|
||||
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new FileOutputCommitter();
|
||||
|
||||
// setup
|
||||
committer.setupJob(jContext);
|
||||
committer.setupTask(tContext);
|
||||
|
||||
// write output
|
||||
TextOutputFormat theOutputFormat = new TextOutputFormat();
|
||||
RecordWriter theRecordWriter =
|
||||
theOutputFormat.getRecordWriter(null, conf, partFile, null);
|
||||
writeOutput(theRecordWriter, tContext);
|
||||
|
||||
// do commit
|
||||
if(committer.needsTaskCommit(tContext)) {
|
||||
committer.commitTask(tContext);
|
||||
}
|
||||
committer.commitJob(jContext);
|
||||
|
||||
// validate output
|
||||
validateContent(outDir);
|
||||
|
||||
// commit again
|
||||
try {
|
||||
committer.commitJob(jContext);
|
||||
if (version == 1) {
|
||||
Assert.fail("Duplicate commit successful: wrong behavior " +
|
||||
"for version 1.");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (version == 2) {
|
||||
Assert.fail("Duplicate commit failed: wrong behavior for version 2.");
|
||||
}
|
||||
}
|
||||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
private void testCommitterInternal(int version) throws Exception {
|
||||
JobConf conf = new JobConf();
|
||||
FileOutputFormat.setOutputPath(conf, outDir);
|
||||
|
@ -451,4 +558,48 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
return contents;
|
||||
}
|
||||
|
||||
/**
|
||||
* The class provides a overrided implementation of commitJobInternal which
|
||||
* causes the commit failed for the first time then succeed.
|
||||
*/
|
||||
public static class CommitterWithFailedThenSucceed extends
|
||||
FileOutputCommitter {
|
||||
boolean firstTimeFail = true;
|
||||
|
||||
public CommitterWithFailedThenSucceed() throws IOException {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitJob(JobContext context) throws IOException {
|
||||
JobConf conf = context.getJobConf();
|
||||
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter wrapped =
|
||||
new CommitterFailedFirst(FileOutputFormat.getOutputPath(conf),
|
||||
context);
|
||||
wrapped.commitJob(context);
|
||||
}
|
||||
}
|
||||
|
||||
public static class CommitterFailedFirst extends
|
||||
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter {
|
||||
boolean firstTimeFail = true;
|
||||
|
||||
public CommitterFailedFirst(Path outputPath,
|
||||
JobContext context) throws IOException {
|
||||
super(outputPath, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void commitJobInternal(org.apache.hadoop.mapreduce.JobContext
|
||||
context) throws IOException {
|
||||
super.commitJobInternal(context);
|
||||
if (firstTimeFail) {
|
||||
firstTimeFail = false;
|
||||
throw new IOException();
|
||||
} else {
|
||||
// succeed then, nothing to do
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.lib.output;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -28,6 +29,7 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -284,6 +286,174 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
testCommitterInternal(2);
|
||||
}
|
||||
|
||||
public void testCommitterWithDuplicatedCommitV1() throws Exception {
|
||||
testCommitterWithDuplicatedCommitInternal(1);
|
||||
}
|
||||
|
||||
public void testCommitterWithDuplicatedCommitV2() throws Exception {
|
||||
testCommitterWithDuplicatedCommitInternal(2);
|
||||
}
|
||||
|
||||
private void testCommitterWithDuplicatedCommitInternal(int version) throws
|
||||
Exception {
|
||||
Job job = Job.getInstance();
|
||||
FileOutputFormat.setOutputPath(job, outDir);
|
||||
Configuration conf = job.getConfiguration();
|
||||
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
|
||||
version);
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
|
||||
|
||||
// setup
|
||||
committer.setupJob(jContext);
|
||||
committer.setupTask(tContext);
|
||||
|
||||
// write output
|
||||
TextOutputFormat theOutputFormat = new TextOutputFormat();
|
||||
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
|
||||
writeOutput(theRecordWriter, tContext);
|
||||
|
||||
// do commit
|
||||
committer.commitTask(tContext);
|
||||
committer.commitJob(jContext);
|
||||
|
||||
// validate output
|
||||
validateContent(outDir);
|
||||
|
||||
// commit job again on a successful commit job.
|
||||
try {
|
||||
committer.commitJob(jContext);
|
||||
if (version == 1) {
|
||||
Assert.fail("Duplicate commit success: wrong behavior for version 1.");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (version == 2) {
|
||||
Assert.fail("Duplicate commit failed: wrong behavior for version 2.");
|
||||
}
|
||||
}
|
||||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
public void testCommitterWithFailureV1() throws Exception {
|
||||
testCommitterWithFailureInternal(1, 1);
|
||||
testCommitterWithFailureInternal(1, 2);
|
||||
}
|
||||
|
||||
public void testCommitterWithFailureV2() throws Exception {
|
||||
testCommitterWithFailureInternal(2, 1);
|
||||
testCommitterWithFailureInternal(2, 2);
|
||||
}
|
||||
|
||||
private void testCommitterWithFailureInternal(int version, int maxAttempts)
|
||||
throws Exception {
|
||||
Job job = Job.getInstance();
|
||||
FileOutputFormat.setOutputPath(job, outDir);
|
||||
Configuration conf = job.getConfiguration();
|
||||
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
|
||||
version);
|
||||
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
|
||||
maxAttempts);
|
||||
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new CommitterWithFailedThenSucceed(outDir,
|
||||
tContext);
|
||||
|
||||
// setup
|
||||
committer.setupJob(jContext);
|
||||
committer.setupTask(tContext);
|
||||
|
||||
// write output
|
||||
TextOutputFormat theOutputFormat = new TextOutputFormat();
|
||||
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
|
||||
writeOutput(theRecordWriter, tContext);
|
||||
|
||||
// do commit
|
||||
committer.commitTask(tContext);
|
||||
|
||||
try {
|
||||
committer.commitJob(jContext);
|
||||
// (1,1), (1,2), (2,1) shouldn't reach to here.
|
||||
if (version == 1 || maxAttempts <= 1) {
|
||||
Assert.fail("Commit successful: wrong behavior for version 1.");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// (2,2) shouldn't reach to here.
|
||||
if (version == 2 && maxAttempts > 2) {
|
||||
Assert.fail("Commit failed: wrong behavior for version 2.");
|
||||
}
|
||||
}
|
||||
|
||||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
public void testCommitterRepeatableV1() throws Exception {
|
||||
testCommitterRetryInternal(1);
|
||||
}
|
||||
|
||||
public void testCommitterRepeatableV2() throws Exception {
|
||||
testCommitterRetryInternal(2);
|
||||
}
|
||||
|
||||
// retry committer for 2 times.
|
||||
private void testCommitterRetryInternal(int version)
|
||||
throws Exception {
|
||||
Job job = Job.getInstance();
|
||||
FileOutputFormat.setOutputPath(job, outDir);
|
||||
Configuration conf = job.getConfiguration();
|
||||
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
|
||||
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
|
||||
version);
|
||||
// only attempt for 1 time.
|
||||
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
|
||||
1);
|
||||
|
||||
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
|
||||
FileOutputCommitter committer = new CommitterWithFailedThenSucceed(outDir,
|
||||
tContext);
|
||||
|
||||
// setup
|
||||
committer.setupJob(jContext);
|
||||
committer.setupTask(tContext);
|
||||
|
||||
// write output
|
||||
TextOutputFormat theOutputFormat = new TextOutputFormat();
|
||||
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
|
||||
writeOutput(theRecordWriter, tContext);
|
||||
|
||||
// do commit
|
||||
committer.commitTask(tContext);
|
||||
|
||||
try {
|
||||
committer.commitJob(jContext);
|
||||
Assert.fail("Commit successful: wrong behavior for the first time " +
|
||||
"commit.");
|
||||
} catch (IOException e) {
|
||||
// commit again.
|
||||
try {
|
||||
committer.commitJob(jContext);
|
||||
// version 1 shouldn't reach to here.
|
||||
if (version == 1) {
|
||||
Assert.fail("Commit successful after retry: wrong behavior for " +
|
||||
"version 1.");
|
||||
}
|
||||
} catch (FileNotFoundException ex) {
|
||||
if (version == 2) {
|
||||
Assert.fail("Commit failed after retry: wrong behavior for" +
|
||||
" version 2.");
|
||||
}
|
||||
assertTrue(ex.getMessage().contains(committer.getJobAttemptPath(
|
||||
jContext).toString() + " does not exist"));
|
||||
}
|
||||
}
|
||||
|
||||
FileUtil.fullyDelete(new File(outDir.toString()));
|
||||
}
|
||||
|
||||
private void testMapFileOutputCommitterInternal(int version)
|
||||
throws Exception {
|
||||
Job job = Job.getInstance();
|
||||
|
@ -575,4 +745,29 @@ public class TestFileOutputCommitter extends TestCase {
|
|||
return contents;
|
||||
}
|
||||
|
||||
/**
|
||||
* The class provides a overrided implementation of commitJobInternal which
|
||||
* causes the commit failed for the first time then succeed.
|
||||
*/
|
||||
public static class CommitterWithFailedThenSucceed extends
|
||||
FileOutputCommitter {
|
||||
boolean firstTimeFail = true;
|
||||
|
||||
public CommitterWithFailedThenSucceed(Path outputPath,
|
||||
JobContext context) throws IOException {
|
||||
super(outputPath, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void commitJobInternal(JobContext context) throws IOException {
|
||||
super.commitJobInternal(context);
|
||||
if (firstTimeFail) {
|
||||
firstTimeFail = false;
|
||||
throw new IOException();
|
||||
} else {
|
||||
// succeed then, nothing to do
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue