HADOOP-15107. Stabilize/tune S3A committers; review correctness & docs.

Contributed by Steve Loughran.

(cherry picked from commit 5a0babf765)
This commit is contained in:
Steve Loughran 2018-08-30 15:23:08 +01:00
parent 6f939d4294
commit a0766bf66a
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
19 changed files with 542 additions and 96 deletions

View File

@ -57,8 +57,8 @@ public abstract class PathOutputCommitter extends OutputCommitter {
protected PathOutputCommitter(Path outputPath, protected PathOutputCommitter(Path outputPath,
TaskAttemptContext context) throws IOException { TaskAttemptContext context) throws IOException {
this.context = Preconditions.checkNotNull(context, "Null context"); this.context = Preconditions.checkNotNull(context, "Null context");
LOG.debug("Creating committer with output path {} and task context" LOG.debug("Instantiating committer {} with output path {} and task context"
+ " {}", outputPath, context); + " {}", this, outputPath, context);
} }
/** /**
@ -71,8 +71,8 @@ public abstract class PathOutputCommitter extends OutputCommitter {
protected PathOutputCommitter(Path outputPath, protected PathOutputCommitter(Path outputPath,
JobContext context) throws IOException { JobContext context) throws IOException {
this.context = Preconditions.checkNotNull(context, "Null context"); this.context = Preconditions.checkNotNull(context, "Null context");
LOG.debug("Creating committer with output path {} and job context" LOG.debug("Instantiating committer {} with output path {} and job context"
+ " {}", outputPath, context); + " {}", this, outputPath, context);
} }
/** /**
@ -103,6 +103,8 @@ public abstract class PathOutputCommitter extends OutputCommitter {
@Override @Override
public String toString() { public String toString() {
return "PathOutputCommitter{context=" + context + '}'; return "PathOutputCommitter{context=" + context
+ "; " + super.toString()
+ '}';
} }
} }

View File

@ -130,8 +130,9 @@ public class Invoker {
} }
/** /**
* Execute an operation and ignore all raised IOExceptions; log at INFO. * Execute an operation and ignore all raised IOExceptions; log at INFO;
* @param log log to log at info. * full stack only at DEBUG.
* @param log log to use.
* @param action action to include in log * @param action action to include in log
* @param path optional path to include in log * @param path optional path to include in log
* @param operation operation to execute * @param operation operation to execute
@ -145,13 +146,17 @@ public class Invoker {
try { try {
once(action, path, operation); once(action, path, operation);
} catch (IOException e) { } catch (IOException e) {
log.info("{}: {}", toDescription(action, path), e.toString(), e); String description = toDescription(action, path);
String error = e.toString();
log.info("{}: {}", description, error);
log.debug("{}", description, e);
} }
} }
/** /**
* Execute an operation and ignore all raised IOExceptions; log at INFO. * Execute an operation and ignore all raised IOExceptions; log at INFO;
* @param log log to log at info. * full stack only at DEBUG.
* @param log log to use.
* @param action action to include in log * @param action action to include in log
* @param path optional path to include in log * @param path optional path to include in log
* @param operation operation to execute * @param operation operation to execute

View File

@ -292,7 +292,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
final StringBuilder sb = new StringBuilder( final StringBuilder sb = new StringBuilder(
"AbstractS3ACommitter{"); "AbstractS3ACommitter{");
sb.append("role=").append(role); sb.append("role=").append(role);
sb.append(", name").append(getName()); sb.append(", name=").append(getName());
sb.append(", outputPath=").append(getOutputPath()); sb.append(", outputPath=").append(getOutputPath());
sb.append(", workPath=").append(workPath); sb.append(", workPath=").append(workPath);
sb.append('}'); sb.append('}');
@ -532,8 +532,14 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
new DurationInfo(LOG, "Aborting all pending commits under %s", new DurationInfo(LOG, "Aborting all pending commits under %s",
dest)) { dest)) {
CommitOperations ops = getCommitOperations(); CommitOperations ops = getCommitOperations();
List<MultipartUpload> pending = ops List<MultipartUpload> pending;
.listPendingUploadsUnderPath(dest); try {
pending = ops.listPendingUploadsUnderPath(dest);
} catch (IOException e) {
// raised if the listPendingUploads call failed.
maybeIgnore(suppressExceptions, "aborting pending uploads", e);
return;
}
Tasks.foreach(pending) Tasks.foreach(pending)
.executeWith(buildThreadPool(getJobContext())) .executeWith(buildThreadPool(getJobContext()))
.suppressExceptions(suppressExceptions) .suppressExceptions(suppressExceptions)
@ -656,7 +662,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
} }
/** /**
* Execute an operation; maybe suppress any raised IOException. * Log or rethrow a caught IOException.
* @param suppress should raised IOEs be suppressed? * @param suppress should raised IOEs be suppressed?
* @param action action (for logging when the IOE is suppressed. * @param action action (for logging when the IOE is suppressed.
* @param ex exception * @param ex exception
@ -667,7 +673,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
String action, String action,
IOException ex) throws IOException { IOException ex) throws IOException {
if (suppress) { if (suppress) {
LOG.info(action, ex); LOG.debug(action, ex);
} else { } else {
throw ex; throw ex;
} }

View File

@ -77,9 +77,20 @@ public class S3ACommitterFactory extends AbstractS3ACommitterFactory {
AbstractS3ACommitterFactory factory = chooseCommitterFactory(fileSystem, AbstractS3ACommitterFactory factory = chooseCommitterFactory(fileSystem,
outputPath, outputPath,
context.getConfiguration()); context.getConfiguration());
return factory != null ? if (factory != null) {
factory.createTaskCommitter(fileSystem, outputPath, context) PathOutputCommitter committer = factory.createTaskCommitter(
: createFileOutputCommitter(outputPath, context); fileSystem, outputPath, context);
LOG.info("Using committer {} to output data to {}",
(committer instanceof AbstractS3ACommitter
? ((AbstractS3ACommitter) committer).getName()
: committer.toString()),
outputPath);
return committer;
} else {
LOG.warn("Using standard FileOutputCommitter to commit work."
+ " This is slow and potentially unsafe.");
return createFileOutputCommitter(outputPath, context);
}
} }
/** /**
@ -104,6 +115,7 @@ public class S3ACommitterFactory extends AbstractS3ACommitterFactory {
String name = fsConf.getTrimmed(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE); String name = fsConf.getTrimmed(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
name = taskConf.getTrimmed(FS_S3A_COMMITTER_NAME, name); name = taskConf.getTrimmed(FS_S3A_COMMITTER_NAME, name);
LOG.debug("Committer option is {}", name);
switch (name) { switch (name) {
case COMMITTER_NAME_FILE: case COMMITTER_NAME_FILE:
factory = null; factory = null;

View File

@ -285,4 +285,11 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
return CommitUtilsWithMR.getTempTaskAttemptPath(context, getOutputPath()); return CommitUtilsWithMR.getTempTaskAttemptPath(context, getOutputPath());
} }
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"MagicCommitter{");
sb.append('}');
return sb.toString();
}
} }

View File

@ -27,13 +27,11 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
/** /**
* This commits to a directory. * This commits to a directory.
@ -70,10 +68,8 @@ public class DirectoryStagingCommitter extends StagingCommitter {
if (getConflictResolutionMode(context, fs.getConf()) if (getConflictResolutionMode(context, fs.getConf())
== ConflictResolution.FAIL == ConflictResolution.FAIL
&& fs.exists(outputPath)) { && fs.exists(outputPath)) {
LOG.debug("Failing commit by task attempt {} to write" throw failDestinationExists(outputPath,
+ " to existing output path {}", "Setting job as " + getRole());
context.getJobID(), getOutputPath());
throw new PathExistsException(outputPath.toString(), E_DEST_EXISTS);
} }
} }

View File

@ -31,14 +31,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.s3a.commit.PathCommitException; import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
/** /**
* Partitioned committer. * Partitioned committer.
@ -100,11 +98,8 @@ public class PartitionedStagingCommitter extends StagingCommitter {
Path partitionPath = getFinalPath(partition + "/file", Path partitionPath = getFinalPath(partition + "/file",
context).getParent(); context).getParent();
if (fs.exists(partitionPath)) { if (fs.exists(partitionPath)) {
LOG.debug("Failing commit by task attempt {} to write" throw failDestinationExists(partitionPath,
+ " to existing path {} under {}", "Committing task " + context.getTaskAttemptID());
context.getTaskAttemptID(), partitionPath, getOutputPath());
throw new PathExistsException(partitionPath.toString(),
E_DEST_EXISTS);
} }
} }
} }

View File

@ -167,13 +167,15 @@ public final class Paths {
return FileSystem.getLocal(conf).makeQualified( return FileSystem.getLocal(conf).makeQualified(
allocator.getLocalPathForWrite(uuid, conf)); allocator.getLocalPathForWrite(uuid, conf));
}); });
} catch (ExecutionException e) { } catch (ExecutionException | UncheckedExecutionException e) {
throw new RuntimeException(e.getCause()); Throwable cause = e.getCause();
} catch (UncheckedExecutionException e) { if (cause instanceof RuntimeException) {
if (e.getCause() instanceof RuntimeException) { throw (RuntimeException) cause;
throw (RuntimeException) e.getCause();
} }
throw new RuntimeException(e); if (cause instanceof IOException) {
throw (IOException) cause;
}
throw new IOException(e);
} }
} }

View File

@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter; import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.CommitConstants;
@ -500,6 +502,10 @@ public class StagingCommitter extends AbstractS3ACommitter {
listAndFilter(attemptFS, listAndFilter(attemptFS,
wrappedJobAttemptPath, false, wrappedJobAttemptPath, false,
HIDDEN_FILE_FILTER)); HIDDEN_FILE_FILTER));
} catch (FileNotFoundException e) {
// this can mean the job was aborted early on, so don't confuse people
// with long stack traces that aren't the underlying problem.
maybeIgnore(suppressExceptions, "Pending upload directory not found", e);
} catch (IOException e) { } catch (IOException e) {
// unable to work with endpoint, if suppressing errors decide our actions // unable to work with endpoint, if suppressing errors decide our actions
maybeIgnore(suppressExceptions, "Listing pending uploads", e); maybeIgnore(suppressExceptions, "Listing pending uploads", e);
@ -565,13 +571,13 @@ public class StagingCommitter extends AbstractS3ACommitter {
} }
/** /**
* Delete the working paths of a job. Does not attempt to clean up * Delete the working paths of a job.
* the work of the wrapped committer.
* <ol> * <ol>
* <li>The job attempt path</li> * <li>The job attempt path</li>
* <li>$dest/__temporary</li> * <li>{@code $dest/__temporary}</li>
* <li>the local working directory for staged files</li> * <li>the local working directory for staged files</li>
* </ol> * </ol>
* Does not attempt to clean up the work of the wrapped committer.
* @param context job context * @param context job context
* @throws IOException IO failure * @throws IOException IO failure
*/ */
@ -835,6 +841,44 @@ public class StagingCommitter extends AbstractS3ACommitter {
return conflictResolution; return conflictResolution;
} }
/**
* Generate a {@link PathExistsException} because the destination exists.
* Lists some of the child entries first, to help diagnose the problem.
* @param path path which exists
* @param description description (usually task/job ID)
* @return an exception to throw
*/
protected PathExistsException failDestinationExists(final Path path,
final String description) {
LOG.error("{}: Failing commit by job {} to write"
+ " to existing output path {}.",
description,
getJobContext().getJobID(), path);
// List the first 10 descendants, to give some details
// on what is wrong but not overload things if there are many files.
try {
int limit = 10;
RemoteIterator<LocatedFileStatus> lf
= getDestFS().listFiles(path, true);
LOG.info("Partial Directory listing");
while (limit > 0 && lf.hasNext()) {
limit--;
LocatedFileStatus status = lf.next();
LOG.info("{}: {}",
status.getPath(),
status.isDirectory()
? " dir"
: ("file size " + status.getLen() + " bytes"));
}
} catch (IOException e) {
LOG.info("Discarding exception raised when listing {}: " + e, path);
LOG.debug("stack trace ", e);
}
return new PathExistsException(path.toString(),
description + ": " + InternalCommitterConstants.E_DEST_EXISTS);
}
/** /**
* Get the conflict mode option string. * Get the conflict mode option string.
* @param context context with the config * @param context context with the config

View File

@ -230,7 +230,6 @@ None: directories are created on demand.
Rename task attempt path to task committed path. Rename task attempt path to task committed path.
```python ```python
def needsTaskCommit(fs, jobAttemptPath, taskAttemptPath, dest): def needsTaskCommit(fs, jobAttemptPath, taskAttemptPath, dest):
return fs.exists(taskAttemptPath) return fs.exists(taskAttemptPath)
@ -276,12 +275,12 @@ def commitJob(fs, jobAttemptDir, dest):
(See below for details on `mergePaths()`) (See below for details on `mergePaths()`)
A failure during job abort cannot be recovered from except by re-executing A failure during job commit cannot be recovered from except by re-executing
the entire query: the entire query:
```python ```python
def isCommitJobRepeatable() : def isCommitJobRepeatable() :
return True return False
``` ```
Accordingly, it is a failure point in the protocol. With a low number of files Accordingly, it is a failure point in the protocol. With a low number of files
@ -307,12 +306,28 @@ def cleanupJob(fs, dest):
``` ```
### Job Recovery ### Job Recovery Before `commitJob()`
1. Data under task committed paths is retained For all committers, the recovery process takes place in the application
1. All directories under `$dest/_temporary/$appAttemptId/_temporary/` are deleted. master.
1. The job history file of the previous attempt is loaded and scanned
to determine which tasks were recorded as having succeeded.
1. For each successful task, the job committer has its `recoverTask()` method
invoked with a `TaskAttemptContext` built from the previous attempt's details.
1. If the method does not raise an exception, it is considered to have been
recovered, and not to be re-executed.
1. All other tasks are queued for execution.
Uncommitted/unexecuted tasks are (re)executed. For the v1 committer, task recovery is straightforward.
The directory of the committed task from the previous attempt is
moved under the directory of the current application attempt.
```python
def recoverTask(tac):
oldAttemptId = appAttemptId - 1
fs.rename('$dest/_temporary/oldAttemptId/${tac.taskId}',
'$dest/_temporary/appAttemptId/${tac.taskId}')
```
This significantly improves time to recover from Job driver (here MR AM) failure. This significantly improves time to recover from Job driver (here MR AM) failure.
The only lost work is that of all tasks in progress -those which had generated The only lost work is that of all tasks in progress -those which had generated
@ -330,6 +345,11 @@ failure simply by rerunning the entire job. This is implicitly the strategy
in Spark, which does not attempt to recover any in-progress jobs. The faster in Spark, which does not attempt to recover any in-progress jobs. The faster
your queries, the simpler your recovery strategy needs to be. your queries, the simpler your recovery strategy needs to be.
### Job Recovery During `commitJob()`
This is not possible; a failure during job commit requires the entire job
to be re-executed after cleaning up the destination directory.
### `mergePaths(FileSystem fs, FileStatus src, Path dest)` Algorithm ### `mergePaths(FileSystem fs, FileStatus src, Path dest)` Algorithm
`mergePaths()` is the core algorithm to merge data; it is somewhat confusing `mergePaths()` is the core algorithm to merge data; it is somewhat confusing
@ -352,24 +372,23 @@ def mergePathsV1(fs, src, dest) :
fs.delete(dest, recursive = True) fs.delete(dest, recursive = True)
fs.rename(src.getPath, dest) fs.rename(src.getPath, dest)
else : else :
# destination is directory, choose action on source type # src is directory, choose action on dest type
if src.isDirectory : if not toStat is None :
if not toStat is None : if not toStat.isDirectory :
if not toStat.isDirectory : # Destination exists and is not a directory
# Destination exists and is not a directory fs.delete(dest)
fs.delete(dest)
fs.rename(src.getPath(), dest)
else :
# Destination exists and is a directory
# merge all children under destination directory
for child in fs.listStatus(src.getPath) :
mergePathsV1(fs, child, dest + child.getName)
else :
# destination does not exist
fs.rename(src.getPath(), dest) fs.rename(src.getPath(), dest)
else :
# Destination exists and is a directory
# merge all children under destination directory
for child in fs.listStatus(src.getPath) :
mergePathsV1(fs, child, dest + child.getName)
else :
# destination does not exist
fs.rename(src.getPath(), dest)
``` ```
## v2 commit algorithm ## The v2 Commit Algorithm
The v2 algorithm directly commits task output into the destination directory. The v2 algorithm directly commits task output into the destination directory.
@ -506,12 +525,31 @@ Cost: `O(1)` for normal filesystems, `O(files)` for object stores.
As no data is written to the destination directory, a task can be cleaned up As no data is written to the destination directory, a task can be cleaned up
by deleting the task attempt directory. by deleting the task attempt directory.
### v2 Job Recovery ### v2 Job Recovery Before `commitJob()`
Because the data has been renamed into the destination directory, it is nominally
recoverable. However, this assumes that the number and name of generated
files are constant on retried tasks.
Because the data has been renamed into the destination directory, all tasks
recorded as having being committed have no recovery needed at all:
```python
def recoverTask(tac):
```
All active and queued tasks are scheduled for execution.
There is a weakness here, the same one on a failure during `commitTask()`:
it is only safe to repeat a task which failed during that commit operation
if the name of all generated files are constant across all task attempts.
If the Job AM fails while a task attempt has been instructed to commit,
and that commit is not recorded as having completed, the state of that
in-progress task is unknown...really it isn't be safe to recover the
job at this point.
### v2 Job Recovery During `commitJob()`
This is straightforward: `commitJob()` is re-invoked.
## How MapReduce uses the committer in a task ## How MapReduce uses the committer in a task
@ -896,7 +934,7 @@ and metadata.
POST bucket.s3.aws.com/path?uploads POST bucket.s3.aws.com/path?uploads
An UploadId is returned An `UploadId` is returned
1. Caller uploads one or more parts. 1. Caller uploads one or more parts.
@ -994,7 +1032,7 @@ Task outputs are directed to the local FS by `getTaskAttemptPath` and `getWorkPa
The single-directory and partitioned committers handle conflict resolution by The single-directory and partitioned committers handle conflict resolution by
checking whether target paths exist in S3 before uploading any data. checking whether target paths exist in S3 before uploading any data.
There are 3 conflict resolution modes, controlled by setting `fs.s3a.committer.staging.conflict-mode`: There are three conflict resolution modes, controlled by setting `fs.s3a.committer.staging.conflict-mode`:
* `fail`: Fail a task if an output directory or partition already exists. (Default) * `fail`: Fail a task if an output directory or partition already exists. (Default)
* `append`: Upload data files without checking whether directories or partitions already exist. * `append`: Upload data files without checking whether directories or partitions already exist.

View File

@ -371,7 +371,7 @@ Put differently: start with the Directory Committer.
To use an S3A committer, the property `mapreduce.outputcommitter.factory.scheme.s3a` To use an S3A committer, the property `mapreduce.outputcommitter.factory.scheme.s3a`
must be set to the S3A committer factory, `org.apache.hadoop.fs.s3a.commit.staging.S3ACommitterFactory`. must be set to the S3A committer factory, `org.apache.hadoop.fs.s3a.commit.staging.S3ACommitterFactory`.
This is done in `core-default.xml` This is done in `mapred-default.xml`
```xml ```xml
<property> <property>

View File

@ -173,6 +173,25 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
} }
} }
/**
* Create a random Job ID using the fork ID as part of the number.
* @return fork ID string in a format parseable by Jobs
* @throws Exception failure
*/
protected String randomJobId() throws Exception {
String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID, "0001");
int l = testUniqueForkId.length();
String trailingDigits = testUniqueForkId.substring(l - 4, l);
try {
int digitValue = Integer.valueOf(trailingDigits);
return String.format("20070712%04d_%04d",
(long)(Math.random() * 1000),
digitValue);
} catch (NumberFormatException e) {
throw new Exception("Failed to parse " + trailingDigits, e);
}
}
/** /**
* Teardown waits for the consistency delay and resets failure count, so * Teardown waits for the consistency delay and resets failure count, so
* FS is stable, before the superclass teardown is called. This * FS is stable, before the superclass teardown is called. This

View File

@ -38,7 +38,6 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -266,9 +265,9 @@ public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
/** /**
* Override point to let implementations tune the MR Job conf. * Override point to let implementations tune the MR Job conf.
* @param c configuration * @param jobConf configuration
*/ */
protected void applyCustomConfigOptions(Configuration c) { protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
} }

View File

@ -159,25 +159,6 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
cleanupDestDir(); cleanupDestDir();
} }
/**
* Create a random Job ID using the fork ID as part of the number.
* @return fork ID string in a format parseable by Jobs
* @throws Exception failure
*/
private String randomJobId() throws Exception {
String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID, "0001");
int l = testUniqueForkId.length();
String trailingDigits = testUniqueForkId.substring(l - 4, l);
try {
int digitValue = Integer.valueOf(trailingDigits);
return String.format("20070712%04d_%04d",
(long)(Math.random() * 1000),
digitValue);
} catch (NumberFormatException e) {
throw new Exception("Failed to parse " + trailingDigits, e);
}
}
@Override @Override
public void teardown() throws Exception { public void teardown() throws Exception {
describe("teardown"); describe("teardown");
@ -765,6 +746,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
JobContext jContext = jobData.jContext; JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext; TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer; AbstractS3ACommitter committer = jobData.committer;
validateTaskAttemptWorkingDirectory(committer, tContext);
// write output // write output
describe("1. Writing output"); describe("1. Writing output");
@ -1360,12 +1342,55 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
} }
@Test
public void testS3ACommitterFactoryBinding() throws Throwable {
describe("Verify that the committer factory returns this "
+ "committer when configured to do so");
Job job = newJob();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
taskAttempt0);
String name = getCommitterName();
S3ACommitterFactory factory = new S3ACommitterFactory();
assertEquals("Wrong committer from factory",
createCommitter(outDir, tContext).getClass(),
factory.createOutputCommitter(outDir, tContext).getClass());
}
/**
* Validate the path of a file being written to during the write
* itself.
* @param p path
* @throws IOException IO failure
*/
protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException { protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
} }
/**
* Validate the path of a file being written to after the write
* operation has completed.
* @param p path
* @throws IOException IO failure
*/
protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException { protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
} }
/**
* Perform any actions needed to validate the working directory of
* a committer.
* For example: filesystem, path attributes
* @param committer committer instance
* @param context task attempt context
* @throws IOException IO failure
*/
protected void validateTaskAttemptWorkingDirectory(
AbstractS3ACommitter committer,
TaskAttemptContext context) throws IOException {
}
} }

View File

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit;
import java.io.IOException;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
/**
* Tests for some aspects of the committer factory.
* All tests are grouped into one single test so that only one
* S3A FS client is set up and used for the entire run.
* Saves time and money.
*/
public class ITestS3ACommitterFactory extends AbstractCommitITest {
protected static final String INVALID_NAME = "invalid-name";
/**
* Counter to guarantee that even in parallel test runs, no job has the same
* ID.
*/
private String jobId;
// A random task attempt id for testing.
private String attempt0;
private TaskAttemptID taskAttempt0;
private Path outDir;
private S3ACommitterFactory factory;
private TaskAttemptContext tContext;
/**
* Parameterized list of bindings of committer name in config file to
* expected class instantiated.
*/
private static final Object[][] bindings = {
{COMMITTER_NAME_FILE, FileOutputCommitter.class},
{COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class},
{COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class},
{InternalCommitterConstants.COMMITTER_NAME_STAGING,
StagingCommitter.class},
{COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class}
};
/**
* This is a ref to the FS conf, so changes here are visible
* to callers querying the FS config.
*/
private Configuration filesystemConfRef;
private Configuration taskConfRef;
@Override
public void setup() throws Exception {
super.setup();
jobId = randomJobId();
attempt0 = "attempt_" + jobId + "_m_000000_0";
taskAttempt0 = TaskAttemptID.forName(attempt0);
outDir = path(getMethodName());
factory = new S3ACommitterFactory();
Configuration conf = new Configuration();
conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
filesystemConfRef = getFileSystem().getConf();
tContext = new TaskAttemptContextImpl(conf, taskAttempt0);
taskConfRef = tContext.getConfiguration();
}
@Test
public void testEverything() throws Throwable {
testImplicitFileBinding();
testBindingsInTask();
testBindingsInFSConfig();
testInvalidFileBinding();
testInvalidTaskBinding();
}
/**
* Verify that if all config options are unset, the FileOutputCommitter
*
* is returned.
*/
public void testImplicitFileBinding() throws Throwable {
taskConfRef.unset(FS_S3A_COMMITTER_NAME);
filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
}
/**
* Verify that task bindings are picked up.
*/
public void testBindingsInTask() throws Throwable {
// set this to an invalid value to be confident it is not
// being checked.
filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID");
taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
for (Object[] binding : bindings) {
taskConfRef.set(FS_S3A_COMMITTER_NAME,
(String) binding[0]);
assertFactoryCreatesExpectedCommitter((Class) binding[1]);
}
}
/**
* Verify that FS bindings are picked up.
*/
public void testBindingsInFSConfig() throws Throwable {
taskConfRef.unset(FS_S3A_COMMITTER_NAME);
filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
for (Object[] binding : bindings) {
taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]);
assertFactoryCreatesExpectedCommitter((Class) binding[1]);
}
}
/**
* Create an invalid committer via the FS binding,
*/
public void testInvalidFileBinding() throws Throwable {
taskConfRef.unset(FS_S3A_COMMITTER_NAME);
filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
() -> createCommitter());
}
/**
* Create an invalid committer via the task attempt.
*/
public void testInvalidTaskBinding() throws Throwable {
filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
taskConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
() -> createCommitter());
}
/**
* Assert that the factory creates the expected committer.
* @param expected expected committer class.
* @throws IOException IO failure.
*/
protected void assertFactoryCreatesExpectedCommitter(
final Class expected)
throws IOException {
assertEquals("Wrong Committer from factory",
expected,
createCommitter().getClass());
}
/**
* Create a committer.
* @return the committer
* @throws IOException IO failure.
*/
private PathOutputCommitter createCommitter() throws IOException {
return factory.createOutputCommitter(outDir, tContext);
}
}

View File

@ -18,10 +18,10 @@
package org.apache.hadoop.fs.s3a.commit.magic; package org.apache.hadoop.fs.s3a.commit.magic;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData; import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.mapred.JobConf;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@ -30,7 +30,7 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
* *
* There's no need to disable the committer setting for the filesystem here, * There's no need to disable the committer setting for the filesystem here,
* because the committers are being instantiated in their own processes; * because the committers are being instantiated in their own processes;
* the settings in {@link #applyCustomConfigOptions(Configuration)} are * the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are
* passed down to these processes. * passed down to these processes.
*/ */
public class ITMagicCommitMRJob extends AbstractITCommitMRJob { public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
@ -54,7 +54,7 @@ public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
* @param conf configuration * @param conf configuration
*/ */
@Override @Override
protected void applyCustomConfigOptions(Configuration conf) { protected void applyCustomConfigOptions(JobConf conf) {
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
} }

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.fs.s3a.commit.magic; package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -32,9 +35,8 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.hamcrest.CoreMatchers.containsString;
/** /**
* Test the magic committer's commit protocol. * Test the magic committer's commit protocol.
@ -115,6 +117,25 @@ public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
assertPathExists("pending file", pendingFile); assertPathExists("pending file", pendingFile);
} }
/**
* The magic committer paths are always on S3, and always have
* "__magic" in the path.
* @param committer committer instance
* @param context task attempt context
* @throws IOException IO failure
*/
@Override
protected void validateTaskAttemptWorkingDirectory(
final AbstractS3ACommitter committer,
final TaskAttemptContext context) throws IOException {
URI wd = committer.getWorkPath().toUri();
assertEquals("Wrong schema for working dir " + wd
+ " with committer " + committer,
"s3a", wd.getScheme());
assertThat(wd.getPath(),
containsString('/' + CommitConstants.MAGIC + '/'));
}
/** /**
* The class provides a overridden implementation of commitJobInternal which * The class provides a overridden implementation of commitJobInternal which
* causes the commit failed for the first time then succeed. * causes the commit failed for the first time then succeed.

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.staging.integration;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.test.LambdaTestUtils;
/**
* This is a test to verify that the committer will fail if the destination
* directory exists, and that this happens in job setup.
*/
public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
@Override
protected String committerName() {
return StagingCommitter.NAME;
}
/**
* create the destination directory and expect a failure.
* @param conf configuration
*/
@Override
protected void applyCustomConfigOptions(JobConf conf) throws IOException {
// This is the destination in the S3 FS
String outdir = conf.get(FileOutputFormat.OUTDIR);
S3AFileSystem fs = getFileSystem();
Path outputPath = new Path(outdir);
fs.mkdirs(outputPath);
}
@Override
public void testMRJob() throws Exception {
LambdaTestUtils.intercept(FileAlreadyExistsException.class,
"Output directory",
super::testMRJob);
}
}

View File

@ -117,6 +117,19 @@ public class ITestStagingCommitProtocol extends AbstractITCommitProtocol {
return FileSystem.getLocal(getConfiguration()); return FileSystem.getLocal(getConfiguration());
} }
/**
* The staging committers always have the local FS for their work.
* @param committer committer instance
* @param context task attempt context
* @throws IOException IO failure
*/
@Override
protected void validateTaskAttemptWorkingDirectory(final AbstractS3ACommitter committer,
final TaskAttemptContext context) throws IOException {
Path wd = context.getWorkingDirectory();
assertEquals("file", wd.toUri().getScheme());
}
/** /**
* The class provides a overridden implementation of commitJobInternal which * The class provides a overridden implementation of commitJobInternal which
* causes the commit failed for the first time then succeed. * causes the commit failed for the first time then succeed.