HADOOP-18304. Improve user-facing S3A committers documentation (#4478)

Contributed by: Daniel Carl Jones
This commit is contained in:
Daniel Carl Jones 2022-10-19 08:26:47 +01:00 committed by GitHub
parent d80db6c9e5
commit 6207ac47e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 117 additions and 123 deletions

View File

@ -51,7 +51,7 @@ obsolete.
## Introduction: The Commit Problem ## Introduction: The Commit Problem
Apache Hadoop MapReduce (and behind the scenes, Apache Spark) often write Apache Hadoop MapReduce (and behind the scenes, Apache Spark) often write
the output of their work to filesystems the output of their work to filesystems.
Normally, Hadoop uses the `FileOutputFormatCommitter` to manage the Normally, Hadoop uses the `FileOutputFormatCommitter` to manage the
promotion of files created in a single task attempt to the final output of promotion of files created in a single task attempt to the final output of
@ -68,34 +68,34 @@ process across the cluster may rename a file or directory to the same path.
If the rename fails for any reason, either the data is at the original location, If the rename fails for any reason, either the data is at the original location,
or it is at the destination, -in which case the rename actually succeeded. or it is at the destination, -in which case the rename actually succeeded.
**The S3 object store and the `s3a://` filesystem client cannot meet these requirements.* _The S3 object store and the `s3a://` filesystem client cannot meet these requirements._
Although S3A is (now) consistent, the S3A client still mimics `rename()` Although S3 is (now) consistent, the S3A client still mimics `rename()`
by copying files and then deleting the originals. by copying files and then deleting the originals.
This can fail partway through, and there is nothing to prevent any other process This can fail partway through, and there is nothing to prevent any other process
in the cluster attempting a rename at the same time. in the cluster attempting a rename at the same time.
As a result, As a result,
* If a rename fails, the data is left in an unknown state. * If a 'rename' fails, the data is left in an unknown state.
* If more than one process attempts to commit work simultaneously, the output * If more than one process attempts to commit work simultaneously, the output
directory may contain the results of both processes: it is no longer an exclusive directory may contain the results of both processes: it is no longer an exclusive
operation. operation.
*. Commit time is still * Commit time is still proportional to the amount of data created.
proportional to the amount of data created. It still can't handle task failure. It still can't handle task failure.
**Using the "classic" `FileOutputCommmitter` to commit work to Amazon S3 risks **Using the "classic" `FileOutputCommmitter` to commit work to Amazon S3 risks
loss or corruption of generated data** loss or corruption of generated data**.
To address these problems there is now explicit support in the `hadop-aws` To address these problems there is now explicit support in the `hadoop-aws`
module for committing work to Amazon S3 via the S3A filesystem client, module for committing work to Amazon S3 via the S3A filesystem client:
*the S3A Committers* *the S3A Committers*.
For safe, as well as high-performance output of work to S3, For safe, as well as high-performance output of work to S3,
we need use "a committer" explicitly written to work with S3, treating it as we need to use "a committer" explicitly written to work with S3,
an object store with special features. treating it as an object store with special features.
### Background: Hadoop's "Commit Protocol" ### Background: Hadoop's "Commit Protocol"
@ -106,10 +106,10 @@ a "commit protocol" between the workers and the job manager.
This protocol is implemented in Hadoop MapReduce, with a similar but extended This protocol is implemented in Hadoop MapReduce, with a similar but extended
version in Apache Spark: version in Apache Spark:
1. A "Job" is the entire query, with inputs to outputs 1. The "Job" is the entire query. It takes a given set of input and produces some output.
1. The "Job Manager" is the process in charge of choreographing the execution 1. The "Job Manager" is the process in charge of choreographing the execution
of the job. It may perform some of the actual computation too. of the job. It may perform some of the actual computation too.
1. The job has "workers", which are processes which work the actual data 1. The job has "workers", which are processes which work with the actual data
and write the results. and write the results.
1. Workers execute "Tasks", which are fractions of the job, a job whose 1. Workers execute "Tasks", which are fractions of the job, a job whose
input has been *partitioned* into units of work which can be executed independently. input has been *partitioned* into units of work which can be executed independently.
@ -126,7 +126,7 @@ this "speculation" delivers speedup as it can address the "straggler problem".
When multiple workers are working on the same data, only one worker is allowed When multiple workers are working on the same data, only one worker is allowed
to write the final output. to write the final output.
1. The entire job may fail (often from the failure of the Job Manager (MR Master, Spark Driver, ...)). 1. The entire job may fail (often from the failure of the Job Manager (MR Master, Spark Driver, ...)).
1, The network may partition, with workers isolated from each other or 1. The network may partition, with workers isolated from each other or
the process managing the entire commit. the process managing the entire commit.
1. Restarted jobs may recover from a failure by reusing the output of all 1. Restarted jobs may recover from a failure by reusing the output of all
completed tasks (MapReduce with the "v1" algorithm), or just by rerunning everything completed tasks (MapReduce with the "v1" algorithm), or just by rerunning everything
@ -137,34 +137,34 @@ What is "the commit protocol" then? It is the requirements on workers as to
when their data is made visible, where, for a filesystem, "visible" means "can when their data is made visible, where, for a filesystem, "visible" means "can
be seen in the destination directory of the query." be seen in the destination directory of the query."
* There is a destination directory of work, "the output directory." * There is a destination directory of work: "the output directory".
* The final output of tasks must be in this directory *or paths underneath it*. The final output of tasks must be in this directory *or paths underneath it*.
* The intermediate output of a task must not be visible in the destination directory. * The intermediate output of a task must not be visible in the destination directory.
That is: they must not write directly to the destination. That is: they must not write directly to the destination.
* The final output of a task *may* be visible under the destination. * The final output of a task *may* be visible under the destination.
* The Job Manager makes the decision as to whether a task's data is to be "committed", * Individual workers communicate with the Job manager to manage the commit process.
be it directly to the final directory or to some intermediate store.. * The Job Manager makes the decision on if a task's output data is to be "committed",
* Individual workers communicate with the Job manager to manage the commit process: be it directly to the final directory or to some intermediate store.
whether the output is to be *committed* or *aborted*
* When a worker commits the output of a task, it somehow promotes its intermediate work to becoming * When a worker commits the output of a task, it somehow promotes its intermediate work to becoming
final. final.
* When a worker aborts a task's output, that output must not become visible * When a worker aborts a task's output, that output must not become visible
(i.e. it is not committed). (i.e. it is not committed).
* Jobs themselves may be committed/aborted (the nature of "when" is not covered here). * Jobs themselves may be committed/aborted (the nature of "when" is not covered here).
* After a Job is committed, all its work must be visible. * After a Job is committed, all its work must be visible.
* And a file `_SUCCESS` may be written to the output directory. A file named `_SUCCESS` may be written to the output directory.
* After a Job is aborted, all its intermediate data is lost. * After a Job is aborted, all its intermediate data is lost.
* Jobs may also fail. When restarted, the successor job must be able to clean up * Jobs may also fail. When restarted, the successor job must be able to clean up
all the intermediate and committed work of its predecessor(s). all the intermediate and committed work of its predecessor(s).
* Task and Job processes measure the intervals between communications with their * Task and Job processes measure the intervals between communications with their
Application Master and YARN respectively. Application Master and YARN respectively.
When the interval has grown too large they must conclude When the interval has grown too large, they must conclude
that the network has partitioned and that they must abort their work. that the network has partitioned and that they must abort their work.
That's "essentially" it. When working with HDFS and similar filesystems, That's "essentially" it. When working with HDFS and similar filesystems,
directory `rename()` is the mechanism used to commit the work of tasks and directory `rename()` is the mechanism used to commit the work of tasks and
jobs. jobs.
* Tasks write data to task attempt directories under the directory `_temporary` * Tasks write data to task attempt directories under the directory `_temporary`
underneath the final destination directory. underneath the final destination directory.
* When a task is committed, these files are renamed to the destination directory * When a task is committed, these files are renamed to the destination directory
@ -180,20 +180,19 @@ and restarting the job.
whose output is in the job attempt directory, *and only rerunning all uncommitted tasks*. whose output is in the job attempt directory, *and only rerunning all uncommitted tasks*.
This algorithm does not works safely or swiftly with AWS S3 storage because This algorithm does not work safely or swiftly with AWS S3 storage because
tenames go from being fast, atomic operations to slow operations which can fail partway through. renames go from being fast, atomic operations to slow operations which can fail partway through.
This then is the problem which the S3A committers address: This then is the problem which the S3A committers address:
*How to safely and reliably commit work to Amazon S3 or compatible object store.*
*How to safely and reliably commit work to Amazon S3 or compatible object store*
## Meet the S3A Committers ## Meet the S3A Committers
Since Hadoop 3.1, the S3A FileSystem has been accompanied by classes Since Hadoop 3.1, the S3A FileSystem has been accompanied by classes
designed to integrate with the Hadoop and Spark job commit protocols, classes designed to integrate with the Hadoop and Spark job commit protocols,
which interact with the S3A filesystem to reliably commit work work to S3: classes which interact with the S3A filesystem to reliably commit work to S3:
*The S3A Committers* *The S3A Committers*.
The underlying architecture of this process is very complex, and The underlying architecture of this process is very complex, and
covered in [the committer architecture documentation](./committer_architecture.html). covered in [the committer architecture documentation](./committer_architecture.html).
@ -219,8 +218,8 @@ conflict with existing files is resolved.
| feature | staging | magic | | feature | staging | magic |
|--------|---------|---| |--------|---------|---|
| task output destination | local disk | S3A *without completing the write* | | task output destination | write to local disk | upload to S3 *without completing the write* |
| task commit process | upload data from disk to S3 | list all pending uploads on s3 and write details to job attempt directory | | task commit process | upload data from disk to S3 *without completing the write* | list all pending uploads on S3 and write details to job attempt directory |
| task abort process | delete local disk data | list all pending uploads and abort them | | task abort process | delete local disk data | list all pending uploads and abort them |
| job commit | list & complete pending uploads | list & complete pending uploads | | job commit | list & complete pending uploads | list & complete pending uploads |
@ -228,33 +227,30 @@ The other metric is "maturity". There, the fact that the Staging committers
are based on Netflix's production code counts in its favor. are based on Netflix's production code counts in its favor.
### The Staging Committer ### The Staging Committers
This is based on work from Netflix. It "stages" data into the local filesystem. This is based on work from Netflix.
It also requires the cluster to have HDFS, so that It "stages" data into the local filesystem, using URLs with `file://` schemas.
Tasks write to URLs with `file://` schemas. When a task is committed, When a task is committed, its files are listed and uploaded to S3 as incomplete Multipart Uploads.
its files are listed, uploaded to S3 as incompleted Multipart Uploads.
The information needed to complete the uploads is saved to HDFS where The information needed to complete the uploads is saved to HDFS where
it is committed through the standard "v1" commit algorithm. it is committed through the standard "v1" commit algorithm.
When the Job is committed, the Job Manager reads the lists of pending writes from its When the Job is committed, the Job Manager reads the lists of pending writes from its
HDFS Job destination directory and completes those uploads. HDFS Job destination directory and completes those uploads.
Canceling a task is straightforward: the local directory is deleted with Canceling a _task_ is straightforward: the local directory is deleted with its staged data.
its staged data. Canceling a job is achieved by reading in the lists of Canceling a _job_ is achieved by reading in the lists of
pending writes from the HDFS job attempt directory, and aborting those pending writes from the HDFS job attempt directory, and aborting those
uploads. For extra safety, all outstanding multipart writes to the destination directory uploads. For extra safety, all outstanding multipart writes to the destination directory
are aborted. are aborted.
The staging committer comes in two slightly different forms, with slightly There are two staging committers with slightly different conflict resolution behaviors:
different conflict resolution policies:
* **Directory Committer**: the entire directory tree of data is written or overwritten,
* **Directory**: the entire directory tree of data is written or overwritten,
as normal. as normal.
* **Partitioned**: special handling of partitioned directory trees of the form * **Partitioned Committer**: special handling of partitioned directory trees of the form
`YEAR=2017/MONTH=09/DAY=19`: conflict resolution is limited to the partitions `YEAR=2017/MONTH=09/DAY=19`: conflict resolution is limited to the partitions
being updated. being updated.
@ -265,13 +261,16 @@ directories containing new data. It is intended for use with Apache Spark
only. only.
## Conflict Resolution in the Staging Committers #### Conflict Resolution in the Staging Committers
The Staging committers offer the ability to replace the conflict policy The Staging committers offer the ability to replace the conflict policy
of the execution engine with policy designed to work with the tree of data. of the execution engine with policy designed to work with the tree of data.
This is based on the experience and needs of Netflix, where efficiently adding This is based on the experience and needs of Netflix, where efficiently adding
new data to an existing partitioned directory tree is a common operation. new data to an existing partitioned directory tree is a common operation.
An XML configuration is shown below.
The default conflict mode if unset would be `append`.
```xml ```xml
<property> <property>
<name>fs.s3a.committer.staging.conflict-mode</name> <name>fs.s3a.committer.staging.conflict-mode</name>
@ -283,40 +282,37 @@ new data to an existing partitioned directory tree is a common operation.
</property> </property>
``` ```
**replace** : when the job is committed (and not before), delete files in The _Directory Committer_ uses the entire directory tree for conflict resolution.
For this committer, the behavior of each conflict mode is shown below:
- **replace**: When the job is committed (and not before), delete files in
directories into which new data will be written. directories into which new data will be written.
**fail**: when there are existing files in the destination, fail the job. - **fail**: When there are existing files in the destination, fail the job.
**append**: Add new data to the directories at the destination; overwriting - **append**: Add new data to the directories at the destination; overwriting
any with the same name. Reliable use requires unique names for generated files, any with the same name. Reliable use requires unique names for generated files,
which the committers generate which the committers generate
by default. by default.
The difference between the two staging committers are as follows: The _Partitioned Committer_ calculates the partitions into which files are added,
the final directories in the tree, and uses that in its conflict resolution process.
For the _Partitioned Committer_, the behavior of each mode is as follows:
The Directory Committer uses the entire directory tree for conflict resolution. - **replace**: Delete all data in the destination _partition_ before committing
If any file exists at the destination it will fail in job setup; if the resolution
mechanism is "replace" then all existing files will be deleted.
The partitioned committer calculates the partitions into which files are added,
the final directories in the tree, and uses that in its conflict resolution
process:
**replace** : delete all data in the destination partition before committing
the new files. the new files.
**fail**: fail if there is data in the destination partition, ignoring the state - **fail**: Fail if there is data in the destination _partition_, ignoring the state
of any parallel partitions. of any parallel partitions.
**append**: add the new data. - **append**: Add the new data to the destination _partition_,
overwriting any files with the same name.
It's intended for use in Apache Spark Dataset operations, rather The _Partitioned Committer_ is intended for use in Apache Spark Dataset operations, rather
than Hadoop's original MapReduce engine, and only in jobs than Hadoop's original MapReduce engine, and only in jobs
where adding new data to an existing dataset is the desired goal. where adding new data to an existing dataset is the desired goal.
Prerequisites for successful work Prerequisites for success with the _Partitioned Committer_:
1. The output is written into partitions via `PARTITIONED BY` or `partitionedBy()` 1. The output is written into partitions via `PARTITIONED BY` or `partitionedBy()`
instructions. instructions.
@ -356,19 +352,20 @@ task commit.
However, it has extra requirements of the filesystem However, it has extra requirements of the filesystem
1. [Obsolete] It requires a consistent object store. 1. The object store must be consistent.
1. The S3A client must be configured to recognize interactions 1. The S3A client must be configured to recognize interactions
with the magic directories and treat them specially. with the magic directories and treat them as a special case.
Now that Amazon S3 is consistent, the magic committer is enabled by default. Now that [Amazon S3 is consistent](https://aws.amazon.com/s3/consistency/),
the magic directory path rewriting is enabled by default.
It's also not been field tested to the extent of Netflix's committer; consider The Magic Committer has not been field tested to the extent of Netflix's committer;
it the least mature of the committers. consider it the least mature of the committers.
#### Which Committer to Use? ### Which Committer to Use?
1. If you want to create or update existing partitioned data trees in Spark, use thee 1. If you want to create or update existing partitioned data trees in Spark, use the
Partitioned Committer. Make sure you have enough hard disk capacity for all staged data. Partitioned Committer. Make sure you have enough hard disk capacity for all staged data.
Do not use it in other situations. Do not use it in other situations.
@ -398,8 +395,8 @@ This is done in `mapred-default.xml`
</property> </property>
``` ```
What is missing is an explicit choice of committer to use in the property You must also choose which of the S3A committers to use with the `fs.s3a.committer.name` property.
`fs.s3a.committer.name`; so the classic (and unsafe) file committer is used. Otherwise, the classic (and unsafe) file committer is used.
| `fs.s3a.committer.name` | Committer | | `fs.s3a.committer.name` | Committer |
|--------|---------| |--------|---------|
@ -408,9 +405,7 @@ What is missing is an explicit choice of committer to use in the property
| `magic` | the "magic" committer | | `magic` | the "magic" committer |
| `file` | the original and unsafe File committer; (default) | | `file` | the original and unsafe File committer; (default) |
## Using the Staging Committers
## Using the Directory and Partitioned Staging Committers
Generated files are initially written to a local directory underneath one of the temporary Generated files are initially written to a local directory underneath one of the temporary
directories listed in `fs.s3a.buffer.dir`. directories listed in `fs.s3a.buffer.dir`.
@ -422,16 +417,14 @@ The staging committer needs a path in the cluster filesystem
Temporary files are saved in HDFS (or other cluster filesystem) under the path Temporary files are saved in HDFS (or other cluster filesystem) under the path
`${fs.s3a.committer.staging.tmp.path}/${user}` where `user` is the name of the user running the job. `${fs.s3a.committer.staging.tmp.path}/${user}` where `user` is the name of the user running the job.
The default value of `fs.s3a.committer.staging.tmp.path` is `tmp/staging`, The default value of `fs.s3a.committer.staging.tmp.path` is `tmp/staging`,
Which will be converted at run time to a path under the current user's home directory, resulting in the HDFS directory `~/tmp/staging/${user}`.
essentially `~/tmp/staging`
so the temporary directory
The application attempt ID is used to create a unique path under this directory, The application attempt ID is used to create a unique path under this directory,
resulting in a path `~/tmp/staging/${user}/${application-attempt-id}/` under which resulting in a path `~/tmp/staging/${user}/${application-attempt-id}/` under which
summary data of each task's pending commits are managed using the standard summary data of each task's pending commits are managed using the standard
`FileOutputFormat` committer. `FileOutputFormat` committer.
When a task is committed the data is uploaded under the destination directory. When a task is committed, the data is uploaded under the destination directory.
The policy of how to react if the destination exists is defined by The policy of how to react if the destination exists is defined by
the `fs.s3a.committer.staging.conflict-mode` setting. the `fs.s3a.committer.staging.conflict-mode` setting.
@ -442,9 +435,9 @@ the `fs.s3a.committer.staging.conflict-mode` setting.
| `append` | Add the new files to the existing directory tree | | `append` | Add the new files to the existing directory tree |
## The "Partitioned" Staging Committer ### The "Partitioned" Staging Committer
This committer an extension of the "Directory" committer which has a special conflict resolution This committer is an extension of the "Directory" committer which has a special conflict resolution
policy designed to support operations which insert new data into a directory tree structured policy designed to support operations which insert new data into a directory tree structured
using Hive's partitioning strategy: different levels of the tree represent different columns. using Hive's partitioning strategy: different levels of the tree represent different columns.
@ -471,10 +464,10 @@ logs/YEAR=2017/MONTH=04/
A partitioned structure like this allows for queries using Hive or Spark to filter out A partitioned structure like this allows for queries using Hive or Spark to filter out
files which do not contain relevant data. files which do not contain relevant data.
What the partitioned committer does is, where the tooling permits, allows callers The partitioned committer allows callers to add new data to an existing partitioned layout,
to add data to an existing partitioned layout*. where the application supports it.
More specifically, it does this by having a conflict resolution options which More specifically, it does this by reducing the scope of conflict resolution to
only act on individual partitions, rather than across the entire output tree. only act on individual partitions, rather than across the entire output tree.
| `fs.s3a.committer.staging.conflict-mode` | Meaning | | `fs.s3a.committer.staging.conflict-mode` | Meaning |
@ -492,18 +485,18 @@ was written. With the policy of `append`, the new file would be added to
the existing set of files. the existing set of files.
### Notes ### Notes on using Staging Committers
1. A deep partition tree can itself be a performance problem in S3 and the s3a client, 1. A deep partition tree can itself be a performance problem in S3 and the s3a client,
or, more specifically. a problem with applications which use recursive directory tree or more specifically a problem with applications which use recursive directory tree
walks to work with data. walks to work with data.
1. The outcome if you have more than one job trying simultaneously to write data 1. The outcome if you have more than one job trying simultaneously to write data
to the same destination with any policy other than "append" is undefined. to the same destination with any policy other than "append" is undefined.
1. In the `append` operation, there is no check for conflict with file names. 1. In the `append` operation, there is no check for conflict with file names.
If, in the example above, the file `log-20170228.avro` already existed, If the file `log-20170228.avro` in the example above already existed, it would be overwritten.
it would be overridden. Set `fs.s3a.committer.staging.unique-filenames` to `true` Set `fs.s3a.committer.staging.unique-filenames` to `true`
to ensure that a UUID is included in every filename to avoid this. to ensure that a UUID is included in every filename to avoid this.
@ -514,7 +507,11 @@ performance.
### FileSystem client setup ### FileSystem client setup
1. Turn the magic on by `fs.s3a.committer.magic.enabled"` The S3A connector can recognize files created under paths with `__magic/` as a parent directory.
This allows it to handle those files in a special way, such as uploading to a different location
and storing the information needed to complete pending multipart uploads.
Turn the magic on by setting `fs.s3a.committer.magic.enabled` to `true`:
```xml ```xml
<property> <property>
@ -526,22 +523,24 @@ performance.
</property> </property>
``` ```
### Enabling the committer ### Enabling the committer
Set the committer used by S3A's committer factory to `magic`:
```xml ```xml
<property> <property>
<name>fs.s3a.committer.name</name> <name>fs.s3a.committer.name</name>
<value>magic</value> <value>magic</value>
</property> </property>
``` ```
Conflict management is left to the execution engine itself. Conflict management is left to the execution engine itself.
## Common Committer Options ## Committer Options Reference
### Common S3A Committer Options
The table below provides a summary of each option.
| Option | Meaning | Default | | Option | Meaning | Default |
|--------|---------|---------| |--------|---------|---------|
@ -553,19 +552,7 @@ Conflict management is left to the execution engine itself.
| `fs.s3a.committer.generate.uuid` | Generate a Job UUID if none is passed down from Spark | `false` | | `fs.s3a.committer.generate.uuid` | Generate a Job UUID if none is passed down from Spark | `false` |
| `fs.s3a.committer.require.uuid` |Require the Job UUID to be passed down from Spark | `false` | | `fs.s3a.committer.require.uuid` |Require the Job UUID to be passed down from Spark | `false` |
The examples below shows how these options can be configured in XML.
## Staging committer (Directory and Partitioned) options
| Option | Meaning | Default |
|--------|---------|---------|
| `fs.s3a.committer.staging.conflict-mode` | Conflict resolution: `fail`, `append` or `replace`| `append` |
| `fs.s3a.committer.staging.tmp.path` | Path in the cluster filesystem for temporary data. | `tmp/staging` |
| `fs.s3a.committer.staging.unique-filenames` | Generate unique filenames. | `true` |
| `fs.s3a.committer.staging.abort.pending.uploads` | Deprecated; replaced by `fs.s3a.committer.abort.pending.uploads`. | `(false)` |
### Common Committer Options
```xml ```xml
<property> <property>
@ -628,8 +615,8 @@ Conflict management is left to the execution engine itself.
<name>fs.s3a.committer.require.uuid</name> <name>fs.s3a.committer.require.uuid</name>
<value>false</value> <value>false</value>
<description> <description>
Should the committer fail to initialize if a unique ID isn't set in Require the committer fail to initialize if a unique ID is not set in
"spark.sql.sources.writeJobUUID" or fs.s3a.committer.staging.uuid "spark.sql.sources.writeJobUUID" or "fs.s3a.committer.uuid".
This helps guarantee that unique IDs for jobs are being This helps guarantee that unique IDs for jobs are being
passed down in spark applications. passed down in spark applications.
@ -650,7 +637,14 @@ Conflict management is left to the execution engine itself.
</property> </property>
``` ```
### Staging Committer Options ### Staging committer (Directory and Partitioned) options
| Option | Meaning | Default |
|--------|---------|---------|
| `fs.s3a.committer.staging.conflict-mode` | Conflict resolution: `fail`, `append`, or `replace`.| `append` |
| `fs.s3a.committer.staging.tmp.path` | Path in the cluster filesystem for temporary data. | `tmp/staging` |
| `fs.s3a.committer.staging.unique-filenames` | Generate unique filenames. | `true` |
| `fs.s3a.committer.staging.abort.pending.uploads` | Deprecated; replaced by `fs.s3a.committer.abort.pending.uploads`. | `(false)` |
```xml ```xml
<property> <property>
@ -672,7 +666,7 @@ Conflict management is left to the execution engine itself.
<value>true</value> <value>true</value>
<description> <description>
Option for final files to have a unique name through job attempt info, Option for final files to have a unique name through job attempt info,
or the value of fs.s3a.committer.staging.uuid or the value of fs.s3a.committer.uuid.
When writing data with the "append" conflict option, this guarantees When writing data with the "append" conflict option, this guarantees
that new data will not overwrite any existing data. that new data will not overwrite any existing data.
</description> </description>
@ -696,10 +690,9 @@ The magic committer recognizes when files are created under paths with `__magic/
and redirects the upload to a different location, adding the information needed to complete the upload and redirects the upload to a different location, adding the information needed to complete the upload
in the job commit operation. in the job commit operation.
If, for some reason, you *do not* want these paths to be redirected and not manifest until later, If, for some reason, you *do not* want these paths to be redirected and completed later,
the feature can be disabled by setting `fs.s3a.committer.magic.enabled` to false. the feature can be disabled by setting `fs.s3a.committer.magic.enabled` to false.
By default, it is enabled.
By default it is true.
```xml ```xml
<property> <property>
@ -711,6 +704,8 @@ By default it is true.
</property> </property>
``` ```
You will not be able to use the Magic Committer if this option is disabled.
## <a name="concurrent-jobs"></a> Concurrent Jobs writing to the same destination ## <a name="concurrent-jobs"></a> Concurrent Jobs writing to the same destination
It is sometimes possible for multiple jobs to simultaneously write to the same destination path. It is sometimes possible for multiple jobs to simultaneously write to the same destination path.
@ -730,7 +725,7 @@ be creating files with paths/filenames unique to the specific job.
It is not enough for them to be unique by task `part-00000.snappy.parquet`, It is not enough for them to be unique by task `part-00000.snappy.parquet`,
because each job will have tasks with the same name, so generate files with conflicting operations. because each job will have tasks with the same name, so generate files with conflicting operations.
For the staging committers, setting `fs.s3a.committer.staging.unique-filenames` to ensure unique names are For the staging committers, enable `fs.s3a.committer.staging.unique-filenames` to ensure unique names are
generated during the upload. Otherwise, use what configuration options are available in the specific `FileOutputFormat`. generated during the upload. Otherwise, use what configuration options are available in the specific `FileOutputFormat`.
Note: by default, the option `mapreduce.output.basename` sets the base name for files; Note: by default, the option `mapreduce.output.basename` sets the base name for files;
@ -757,13 +752,12 @@ org.apache.hadoop.fs.s3a.commit.PathCommitException: `s3a://landsat-pds': Filesy
in configuration option fs.s3a.committer.magic.enabled in configuration option fs.s3a.committer.magic.enabled
``` ```
The Job is configured to use the magic committer, but the S3A bucket has not been explicitly The Job is configured to use the magic committer,
declared as supporting it. but the S3A bucket has not been explicitly declared as supporting it.
The Job is configured to use the magic committer, but the S3A bucket has not been explicitly declared as supporting it. Magic Committer support within the S3A filesystem has been enabled by default since Hadoop 3.3.1.
This error will only surface with a configuration which has explicitly disabled it.
As this is now true by default, this error will only surface with a configuration which has explicitly disabled it. Remove all global/per-bucket declarations of `fs.s3a.bucket.magic.enabled` or set them to `true`.
Remove all global/per-bucket declarations of `fs.s3a.bucket.magic.enabled` or set them to `true`
```xml ```xml
<property> <property>
@ -846,7 +840,7 @@ the failure happen at the start of a job.
(Setting this option will not interfere with the Staging Committers' use of HDFS, (Setting this option will not interfere with the Staging Committers' use of HDFS,
as it explicitly sets the algorithm to "2" for that part of its work). as it explicitly sets the algorithm to "2" for that part of its work).
The other way to check which committer to use is to examine the `_SUCCESS` file. The other way to check which committer was used is to examine the `_SUCCESS` file.
If it is 0-bytes long, the classic `FileOutputCommitter` committed the job. If it is 0-bytes long, the classic `FileOutputCommitter` committed the job.
The S3A committers all write a non-empty JSON file; the `committer` field lists The S3A committers all write a non-empty JSON file; the `committer` field lists
the committer used. the committer used.
@ -862,7 +856,7 @@ all committers registered for the s3a:// schema.
1. The output format has overridden `FileOutputFormat.getOutputCommitter()` 1. The output format has overridden `FileOutputFormat.getOutputCommitter()`
and is returning its own committer -one which is a subclass of `FileOutputCommitter`. and is returning its own committer -one which is a subclass of `FileOutputCommitter`.
That final cause. *the output format is returning its own committer*, is not The final cause "the output format is returning its own committer" is not
easily fixed; it may be that the custom committer performs critical work easily fixed; it may be that the custom committer performs critical work
during its lifecycle, and contains assumptions about the state of the written during its lifecycle, and contains assumptions about the state of the written
data during task and job commit (i.e. it is in the destination filesystem). data during task and job commit (i.e. it is in the destination filesystem).