HADOOP-18304. Improve user-facing S3A committers documentation (#4478)
Contributed by: Daniel Carl Jones
This commit is contained in:
parent
7a18ceb269
commit
c30b2f0b8c
|
@ -51,7 +51,7 @@ obsolete.
|
||||||
## Introduction: The Commit Problem
|
## Introduction: The Commit Problem
|
||||||
|
|
||||||
Apache Hadoop MapReduce (and behind the scenes, Apache Spark) often write
|
Apache Hadoop MapReduce (and behind the scenes, Apache Spark) often write
|
||||||
the output of their work to filesystems
|
the output of their work to filesystems.
|
||||||
|
|
||||||
Normally, Hadoop uses the `FileOutputFormatCommitter` to manage the
|
Normally, Hadoop uses the `FileOutputFormatCommitter` to manage the
|
||||||
promotion of files created in a single task attempt to the final output of
|
promotion of files created in a single task attempt to the final output of
|
||||||
|
@ -68,37 +68,37 @@ process across the cluster may rename a file or directory to the same path.
|
||||||
If the rename fails for any reason, either the data is at the original location,
|
If the rename fails for any reason, either the data is at the original location,
|
||||||
or it is at the destination, -in which case the rename actually succeeded.
|
or it is at the destination, -in which case the rename actually succeeded.
|
||||||
|
|
||||||
**The S3 object store and the `s3a://` filesystem client cannot meet these requirements.*
|
_The S3 object store and the `s3a://` filesystem client cannot meet these requirements._
|
||||||
|
|
||||||
Although S3A is (now) consistent, the S3A client still mimics `rename()`
|
Although S3 is (now) consistent, the S3A client still mimics `rename()`
|
||||||
by copying files and then deleting the originals.
|
by copying files and then deleting the originals.
|
||||||
This can fail partway through, and there is nothing to prevent any other process
|
This can fail partway through, and there is nothing to prevent any other process
|
||||||
in the cluster attempting a rename at the same time.
|
in the cluster attempting a rename at the same time.
|
||||||
|
|
||||||
As a result,
|
As a result,
|
||||||
|
|
||||||
* If a rename fails, the data is left in an unknown state.
|
* If a 'rename' fails, the data is left in an unknown state.
|
||||||
* If more than one process attempts to commit work simultaneously, the output
|
* If more than one process attempts to commit work simultaneously, the output
|
||||||
directory may contain the results of both processes: it is no longer an exclusive
|
directory may contain the results of both processes: it is no longer an exclusive
|
||||||
operation.
|
operation.
|
||||||
*. Commit time is still
|
* Commit time is still proportional to the amount of data created.
|
||||||
proportional to the amount of data created. It still can't handle task failure.
|
It still can't handle task failure.
|
||||||
|
|
||||||
**Using the "classic" `FileOutputCommmitter` to commit work to Amazon S3 risks
|
**Using the "classic" `FileOutputCommmitter` to commit work to Amazon S3 risks
|
||||||
loss or corruption of generated data**
|
loss or corruption of generated data**.
|
||||||
|
|
||||||
|
|
||||||
To address these problems there is now explicit support in the `hadop-aws`
|
To address these problems there is now explicit support in the `hadoop-aws`
|
||||||
module for committing work to Amazon S3 via the S3A filesystem client,
|
module for committing work to Amazon S3 via the S3A filesystem client:
|
||||||
*the S3A Committers*
|
*the S3A Committers*.
|
||||||
|
|
||||||
|
|
||||||
For safe, as well as high-performance output of work to S3,
|
For safe, as well as high-performance output of work to S3,
|
||||||
we need use "a committer" explicitly written to work with S3, treating it as
|
we need to use "a committer" explicitly written to work with S3,
|
||||||
an object store with special features.
|
treating it as an object store with special features.
|
||||||
|
|
||||||
|
|
||||||
### Background : Hadoop's "Commit Protocol"
|
### Background: Hadoop's "Commit Protocol"
|
||||||
|
|
||||||
How exactly is work written to its final destination? That is accomplished by
|
How exactly is work written to its final destination? That is accomplished by
|
||||||
a "commit protocol" between the workers and the job manager.
|
a "commit protocol" between the workers and the job manager.
|
||||||
|
@ -106,10 +106,10 @@ a "commit protocol" between the workers and the job manager.
|
||||||
This protocol is implemented in Hadoop MapReduce, with a similar but extended
|
This protocol is implemented in Hadoop MapReduce, with a similar but extended
|
||||||
version in Apache Spark:
|
version in Apache Spark:
|
||||||
|
|
||||||
1. A "Job" is the entire query, with inputs to outputs
|
1. The "Job" is the entire query. It takes a given set of input and produces some output.
|
||||||
1. The "Job Manager" is the process in charge of choreographing the execution
|
1. The "Job Manager" is the process in charge of choreographing the execution
|
||||||
of the job. It may perform some of the actual computation too.
|
of the job. It may perform some of the actual computation too.
|
||||||
1. The job has "workers", which are processes which work the actual data
|
1. The job has "workers", which are processes which work with the actual data
|
||||||
and write the results.
|
and write the results.
|
||||||
1. Workers execute "Tasks", which are fractions of the job, a job whose
|
1. Workers execute "Tasks", which are fractions of the job, a job whose
|
||||||
input has been *partitioned* into units of work which can be executed independently.
|
input has been *partitioned* into units of work which can be executed independently.
|
||||||
|
@ -126,7 +126,7 @@ this "speculation" delivers speedup as it can address the "straggler problem".
|
||||||
When multiple workers are working on the same data, only one worker is allowed
|
When multiple workers are working on the same data, only one worker is allowed
|
||||||
to write the final output.
|
to write the final output.
|
||||||
1. The entire job may fail (often from the failure of the Job Manager (MR Master, Spark Driver, ...)).
|
1. The entire job may fail (often from the failure of the Job Manager (MR Master, Spark Driver, ...)).
|
||||||
1, The network may partition, with workers isolated from each other or
|
1. The network may partition, with workers isolated from each other or
|
||||||
the process managing the entire commit.
|
the process managing the entire commit.
|
||||||
1. Restarted jobs may recover from a failure by reusing the output of all
|
1. Restarted jobs may recover from a failure by reusing the output of all
|
||||||
completed tasks (MapReduce with the "v1" algorithm), or just by rerunning everything
|
completed tasks (MapReduce with the "v1" algorithm), or just by rerunning everything
|
||||||
|
@ -137,34 +137,34 @@ What is "the commit protocol" then? It is the requirements on workers as to
|
||||||
when their data is made visible, where, for a filesystem, "visible" means "can
|
when their data is made visible, where, for a filesystem, "visible" means "can
|
||||||
be seen in the destination directory of the query."
|
be seen in the destination directory of the query."
|
||||||
|
|
||||||
* There is a destination directory of work, "the output directory."
|
* There is a destination directory of work: "the output directory".
|
||||||
* The final output of tasks must be in this directory *or paths underneath it*.
|
The final output of tasks must be in this directory *or paths underneath it*.
|
||||||
* The intermediate output of a task must not be visible in the destination directory.
|
* The intermediate output of a task must not be visible in the destination directory.
|
||||||
That is: they must not write directly to the destination.
|
That is: they must not write directly to the destination.
|
||||||
* The final output of a task *may* be visible under the destination.
|
* The final output of a task *may* be visible under the destination.
|
||||||
* The Job Manager makes the decision as to whether a task's data is to be "committed",
|
* Individual workers communicate with the Job manager to manage the commit process.
|
||||||
be it directly to the final directory or to some intermediate store..
|
* The Job Manager makes the decision on if a task's output data is to be "committed",
|
||||||
* Individual workers communicate with the Job manager to manage the commit process:
|
be it directly to the final directory or to some intermediate store.
|
||||||
whether the output is to be *committed* or *aborted*
|
|
||||||
* When a worker commits the output of a task, it somehow promotes its intermediate work to becoming
|
* When a worker commits the output of a task, it somehow promotes its intermediate work to becoming
|
||||||
final.
|
final.
|
||||||
* When a worker aborts a task's output, that output must not become visible
|
* When a worker aborts a task's output, that output must not become visible
|
||||||
(i.e. it is not committed).
|
(i.e. it is not committed).
|
||||||
* Jobs themselves may be committed/aborted (the nature of "when" is not covered here).
|
* Jobs themselves may be committed/aborted (the nature of "when" is not covered here).
|
||||||
* After a Job is committed, all its work must be visible.
|
* After a Job is committed, all its work must be visible.
|
||||||
* And a file `_SUCCESS` may be written to the output directory.
|
A file named `_SUCCESS` may be written to the output directory.
|
||||||
* After a Job is aborted, all its intermediate data is lost.
|
* After a Job is aborted, all its intermediate data is lost.
|
||||||
* Jobs may also fail. When restarted, the successor job must be able to clean up
|
* Jobs may also fail. When restarted, the successor job must be able to clean up
|
||||||
all the intermediate and committed work of its predecessor(s).
|
all the intermediate and committed work of its predecessor(s).
|
||||||
* Task and Job processes measure the intervals between communications with their
|
* Task and Job processes measure the intervals between communications with their
|
||||||
Application Master and YARN respectively.
|
Application Master and YARN respectively.
|
||||||
When the interval has grown too large they must conclude
|
When the interval has grown too large, they must conclude
|
||||||
that the network has partitioned and that they must abort their work.
|
that the network has partitioned and that they must abort their work.
|
||||||
|
|
||||||
|
|
||||||
That's "essentially" it. When working with HDFS and similar filesystems,
|
That's "essentially" it. When working with HDFS and similar filesystems,
|
||||||
directory `rename()` is the mechanism used to commit the work of tasks and
|
directory `rename()` is the mechanism used to commit the work of tasks and
|
||||||
jobs.
|
jobs.
|
||||||
|
|
||||||
* Tasks write data to task attempt directories under the directory `_temporary`
|
* Tasks write data to task attempt directories under the directory `_temporary`
|
||||||
underneath the final destination directory.
|
underneath the final destination directory.
|
||||||
* When a task is committed, these files are renamed to the destination directory
|
* When a task is committed, these files are renamed to the destination directory
|
||||||
|
@ -180,20 +180,19 @@ and restarting the job.
|
||||||
whose output is in the job attempt directory, *and only rerunning all uncommitted tasks*.
|
whose output is in the job attempt directory, *and only rerunning all uncommitted tasks*.
|
||||||
|
|
||||||
|
|
||||||
This algorithm does not works safely or swiftly with AWS S3 storage because
|
This algorithm does not work safely or swiftly with AWS S3 storage because
|
||||||
tenames go from being fast, atomic operations to slow operations which can fail partway through.
|
renames go from being fast, atomic operations to slow operations which can fail partway through.
|
||||||
|
|
||||||
This then is the problem which the S3A committers address:
|
This then is the problem which the S3A committers address:
|
||||||
|
*How to safely and reliably commit work to Amazon S3 or compatible object store.*
|
||||||
*How to safely and reliably commit work to Amazon S3 or compatible object store*
|
|
||||||
|
|
||||||
|
|
||||||
## Meet the S3A Committers
|
## Meet the S3A Committers
|
||||||
|
|
||||||
Since Hadoop 3.1, the S3A FileSystem has been accompanied by classes
|
Since Hadoop 3.1, the S3A FileSystem has been accompanied by classes
|
||||||
designed to integrate with the Hadoop and Spark job commit protocols, classes
|
designed to integrate with the Hadoop and Spark job commit protocols,
|
||||||
which interact with the S3A filesystem to reliably commit work work to S3:
|
classes which interact with the S3A filesystem to reliably commit work to S3:
|
||||||
*The S3A Committers*
|
*The S3A Committers*.
|
||||||
|
|
||||||
The underlying architecture of this process is very complex, and
|
The underlying architecture of this process is very complex, and
|
||||||
covered in [the committer architecture documentation](./committer_architecture.html).
|
covered in [the committer architecture documentation](./committer_architecture.html).
|
||||||
|
@ -219,8 +218,8 @@ conflict with existing files is resolved.
|
||||||
|
|
||||||
| feature | staging | magic |
|
| feature | staging | magic |
|
||||||
|--------|---------|---|
|
|--------|---------|---|
|
||||||
| task output destination | local disk | S3A *without completing the write* |
|
| task output destination | write to local disk | upload to S3 *without completing the write* |
|
||||||
| task commit process | upload data from disk to S3 | list all pending uploads on s3 and write details to job attempt directory |
|
| task commit process | upload data from disk to S3 *without completing the write* | list all pending uploads on S3 and write details to job attempt directory |
|
||||||
| task abort process | delete local disk data | list all pending uploads and abort them |
|
| task abort process | delete local disk data | list all pending uploads and abort them |
|
||||||
| job commit | list & complete pending uploads | list & complete pending uploads |
|
| job commit | list & complete pending uploads | list & complete pending uploads |
|
||||||
|
|
||||||
|
@ -228,33 +227,30 @@ The other metric is "maturity". There, the fact that the Staging committers
|
||||||
are based on Netflix's production code counts in its favor.
|
are based on Netflix's production code counts in its favor.
|
||||||
|
|
||||||
|
|
||||||
### The Staging Committer
|
### The Staging Committers
|
||||||
|
|
||||||
This is based on work from Netflix. It "stages" data into the local filesystem.
|
This is based on work from Netflix.
|
||||||
It also requires the cluster to have HDFS, so that
|
It "stages" data into the local filesystem, using URLs with `file://` schemas.
|
||||||
|
|
||||||
Tasks write to URLs with `file://` schemas. When a task is committed,
|
When a task is committed, its files are listed and uploaded to S3 as incomplete Multipart Uploads.
|
||||||
its files are listed, uploaded to S3 as incompleted Multipart Uploads.
|
|
||||||
The information needed to complete the uploads is saved to HDFS where
|
The information needed to complete the uploads is saved to HDFS where
|
||||||
it is committed through the standard "v1" commit algorithm.
|
it is committed through the standard "v1" commit algorithm.
|
||||||
|
|
||||||
When the Job is committed, the Job Manager reads the lists of pending writes from its
|
When the Job is committed, the Job Manager reads the lists of pending writes from its
|
||||||
HDFS Job destination directory and completes those uploads.
|
HDFS Job destination directory and completes those uploads.
|
||||||
|
|
||||||
Canceling a task is straightforward: the local directory is deleted with
|
Canceling a _task_ is straightforward: the local directory is deleted with its staged data.
|
||||||
its staged data. Canceling a job is achieved by reading in the lists of
|
Canceling a _job_ is achieved by reading in the lists of
|
||||||
pending writes from the HDFS job attempt directory, and aborting those
|
pending writes from the HDFS job attempt directory, and aborting those
|
||||||
uploads. For extra safety, all outstanding multipart writes to the destination directory
|
uploads. For extra safety, all outstanding multipart writes to the destination directory
|
||||||
are aborted.
|
are aborted.
|
||||||
|
|
||||||
The staging committer comes in two slightly different forms, with slightly
|
There are two staging committers with slightly different conflict resolution behaviors:
|
||||||
different conflict resolution policies:
|
|
||||||
|
|
||||||
|
* **Directory Committer**: the entire directory tree of data is written or overwritten,
|
||||||
* **Directory**: the entire directory tree of data is written or overwritten,
|
|
||||||
as normal.
|
as normal.
|
||||||
|
|
||||||
* **Partitioned**: special handling of partitioned directory trees of the form
|
* **Partitioned Committer**: special handling of partitioned directory trees of the form
|
||||||
`YEAR=2017/MONTH=09/DAY=19`: conflict resolution is limited to the partitions
|
`YEAR=2017/MONTH=09/DAY=19`: conflict resolution is limited to the partitions
|
||||||
being updated.
|
being updated.
|
||||||
|
|
||||||
|
@ -265,13 +261,16 @@ directories containing new data. It is intended for use with Apache Spark
|
||||||
only.
|
only.
|
||||||
|
|
||||||
|
|
||||||
## Conflict Resolution in the Staging Committers
|
#### Conflict Resolution in the Staging Committers
|
||||||
|
|
||||||
The Staging committers offer the ability to replace the conflict policy
|
The Staging committers offer the ability to replace the conflict policy
|
||||||
of the execution engine with policy designed to work with the tree of data.
|
of the execution engine with policy designed to work with the tree of data.
|
||||||
This is based on the experience and needs of Netflix, where efficiently adding
|
This is based on the experience and needs of Netflix, where efficiently adding
|
||||||
new data to an existing partitioned directory tree is a common operation.
|
new data to an existing partitioned directory tree is a common operation.
|
||||||
|
|
||||||
|
An XML configuration is shown below.
|
||||||
|
The default conflict mode if unset would be `append`.
|
||||||
|
|
||||||
```xml
|
```xml
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.committer.staging.conflict-mode</name>
|
<name>fs.s3a.committer.staging.conflict-mode</name>
|
||||||
|
@ -283,40 +282,37 @@ new data to an existing partitioned directory tree is a common operation.
|
||||||
</property>
|
</property>
|
||||||
```
|
```
|
||||||
|
|
||||||
**replace** : when the job is committed (and not before), delete files in
|
The _Directory Committer_ uses the entire directory tree for conflict resolution.
|
||||||
|
For this committer, the behavior of each conflict mode is shown below:
|
||||||
|
|
||||||
|
- **replace**: When the job is committed (and not before), delete files in
|
||||||
directories into which new data will be written.
|
directories into which new data will be written.
|
||||||
|
|
||||||
**fail**: when there are existing files in the destination, fail the job.
|
- **fail**: When there are existing files in the destination, fail the job.
|
||||||
|
|
||||||
**append**: Add new data to the directories at the destination; overwriting
|
- **append**: Add new data to the directories at the destination; overwriting
|
||||||
any with the same name. Reliable use requires unique names for generated files,
|
any with the same name. Reliable use requires unique names for generated files,
|
||||||
which the committers generate
|
which the committers generate
|
||||||
by default.
|
by default.
|
||||||
|
|
||||||
The difference between the two staging committers are as follows:
|
The _Partitioned Committer_ calculates the partitions into which files are added,
|
||||||
|
the final directories in the tree, and uses that in its conflict resolution process.
|
||||||
|
For the _Partitioned Committer_, the behavior of each mode is as follows:
|
||||||
|
|
||||||
The Directory Committer uses the entire directory tree for conflict resolution.
|
- **replace**: Delete all data in the destination _partition_ before committing
|
||||||
If any file exists at the destination it will fail in job setup; if the resolution
|
|
||||||
mechanism is "replace" then all existing files will be deleted.
|
|
||||||
|
|
||||||
The partitioned committer calculates the partitions into which files are added,
|
|
||||||
the final directories in the tree, and uses that in its conflict resolution
|
|
||||||
process:
|
|
||||||
|
|
||||||
|
|
||||||
**replace** : delete all data in the destination partition before committing
|
|
||||||
the new files.
|
the new files.
|
||||||
|
|
||||||
**fail**: fail if there is data in the destination partition, ignoring the state
|
- **fail**: Fail if there is data in the destination _partition_, ignoring the state
|
||||||
of any parallel partitions.
|
of any parallel partitions.
|
||||||
|
|
||||||
**append**: add the new data.
|
- **append**: Add the new data to the destination _partition_,
|
||||||
|
overwriting any files with the same name.
|
||||||
|
|
||||||
It's intended for use in Apache Spark Dataset operations, rather
|
The _Partitioned Committer_ is intended for use in Apache Spark Dataset operations, rather
|
||||||
than Hadoop's original MapReduce engine, and only in jobs
|
than Hadoop's original MapReduce engine, and only in jobs
|
||||||
where adding new data to an existing dataset is the desired goal.
|
where adding new data to an existing dataset is the desired goal.
|
||||||
|
|
||||||
Prerequisites for successful work
|
Prerequisites for success with the _Partitioned Committer_:
|
||||||
|
|
||||||
1. The output is written into partitions via `PARTITIONED BY` or `partitionedBy()`
|
1. The output is written into partitions via `PARTITIONED BY` or `partitionedBy()`
|
||||||
instructions.
|
instructions.
|
||||||
|
@ -356,19 +352,20 @@ task commit.
|
||||||
|
|
||||||
However, it has extra requirements of the filesystem
|
However, it has extra requirements of the filesystem
|
||||||
|
|
||||||
1. [Obsolete] It requires a consistent object store.
|
1. The object store must be consistent.
|
||||||
1. The S3A client must be configured to recognize interactions
|
1. The S3A client must be configured to recognize interactions
|
||||||
with the magic directories and treat them specially.
|
with the magic directories and treat them as a special case.
|
||||||
|
|
||||||
Now that Amazon S3 is consistent, the magic committer is enabled by default.
|
Now that [Amazon S3 is consistent](https://aws.amazon.com/s3/consistency/),
|
||||||
|
the magic directory path rewriting is enabled by default.
|
||||||
|
|
||||||
It's also not been field tested to the extent of Netflix's committer; consider
|
The Magic Committer has not been field tested to the extent of Netflix's committer;
|
||||||
it the least mature of the committers.
|
consider it the least mature of the committers.
|
||||||
|
|
||||||
|
|
||||||
#### Which Committer to Use?
|
### Which Committer to Use?
|
||||||
|
|
||||||
1. If you want to create or update existing partitioned data trees in Spark, use thee
|
1. If you want to create or update existing partitioned data trees in Spark, use the
|
||||||
Partitioned Committer. Make sure you have enough hard disk capacity for all staged data.
|
Partitioned Committer. Make sure you have enough hard disk capacity for all staged data.
|
||||||
Do not use it in other situations.
|
Do not use it in other situations.
|
||||||
|
|
||||||
|
@ -398,8 +395,8 @@ This is done in `mapred-default.xml`
|
||||||
</property>
|
</property>
|
||||||
```
|
```
|
||||||
|
|
||||||
What is missing is an explicit choice of committer to use in the property
|
You must also choose which of the S3A committers to use with the `fs.s3a.committer.name` property.
|
||||||
`fs.s3a.committer.name`; so the classic (and unsafe) file committer is used.
|
Otherwise, the classic (and unsafe) file committer is used.
|
||||||
|
|
||||||
| `fs.s3a.committer.name` | Committer |
|
| `fs.s3a.committer.name` | Committer |
|
||||||
|--------|---------|
|
|--------|---------|
|
||||||
|
@ -408,9 +405,7 @@ What is missing is an explicit choice of committer to use in the property
|
||||||
| `magic` | the "magic" committer |
|
| `magic` | the "magic" committer |
|
||||||
| `file` | the original and unsafe File committer; (default) |
|
| `file` | the original and unsafe File committer; (default) |
|
||||||
|
|
||||||
|
## Using the Staging Committers
|
||||||
|
|
||||||
## Using the Directory and Partitioned Staging Committers
|
|
||||||
|
|
||||||
Generated files are initially written to a local directory underneath one of the temporary
|
Generated files are initially written to a local directory underneath one of the temporary
|
||||||
directories listed in `fs.s3a.buffer.dir`.
|
directories listed in `fs.s3a.buffer.dir`.
|
||||||
|
@ -422,16 +417,14 @@ The staging committer needs a path in the cluster filesystem
|
||||||
Temporary files are saved in HDFS (or other cluster filesystem) under the path
|
Temporary files are saved in HDFS (or other cluster filesystem) under the path
|
||||||
`${fs.s3a.committer.staging.tmp.path}/${user}` where `user` is the name of the user running the job.
|
`${fs.s3a.committer.staging.tmp.path}/${user}` where `user` is the name of the user running the job.
|
||||||
The default value of `fs.s3a.committer.staging.tmp.path` is `tmp/staging`,
|
The default value of `fs.s3a.committer.staging.tmp.path` is `tmp/staging`,
|
||||||
Which will be converted at run time to a path under the current user's home directory,
|
resulting in the HDFS directory `~/tmp/staging/${user}`.
|
||||||
essentially `~/tmp/staging`
|
|
||||||
so the temporary directory
|
|
||||||
|
|
||||||
The application attempt ID is used to create a unique path under this directory,
|
The application attempt ID is used to create a unique path under this directory,
|
||||||
resulting in a path `~/tmp/staging/${user}/${application-attempt-id}/` under which
|
resulting in a path `~/tmp/staging/${user}/${application-attempt-id}/` under which
|
||||||
summary data of each task's pending commits are managed using the standard
|
summary data of each task's pending commits are managed using the standard
|
||||||
`FileOutputFormat` committer.
|
`FileOutputFormat` committer.
|
||||||
|
|
||||||
When a task is committed the data is uploaded under the destination directory.
|
When a task is committed, the data is uploaded under the destination directory.
|
||||||
The policy of how to react if the destination exists is defined by
|
The policy of how to react if the destination exists is defined by
|
||||||
the `fs.s3a.committer.staging.conflict-mode` setting.
|
the `fs.s3a.committer.staging.conflict-mode` setting.
|
||||||
|
|
||||||
|
@ -442,9 +435,9 @@ the `fs.s3a.committer.staging.conflict-mode` setting.
|
||||||
| `append` | Add the new files to the existing directory tree |
|
| `append` | Add the new files to the existing directory tree |
|
||||||
|
|
||||||
|
|
||||||
## The "Partitioned" Staging Committer
|
### The "Partitioned" Staging Committer
|
||||||
|
|
||||||
This committer an extension of the "Directory" committer which has a special conflict resolution
|
This committer is an extension of the "Directory" committer which has a special conflict resolution
|
||||||
policy designed to support operations which insert new data into a directory tree structured
|
policy designed to support operations which insert new data into a directory tree structured
|
||||||
using Hive's partitioning strategy: different levels of the tree represent different columns.
|
using Hive's partitioning strategy: different levels of the tree represent different columns.
|
||||||
|
|
||||||
|
@ -471,10 +464,10 @@ logs/YEAR=2017/MONTH=04/
|
||||||
A partitioned structure like this allows for queries using Hive or Spark to filter out
|
A partitioned structure like this allows for queries using Hive or Spark to filter out
|
||||||
files which do not contain relevant data.
|
files which do not contain relevant data.
|
||||||
|
|
||||||
What the partitioned committer does is, where the tooling permits, allows callers
|
The partitioned committer allows callers to add new data to an existing partitioned layout,
|
||||||
to add data to an existing partitioned layout*.
|
where the application supports it.
|
||||||
|
|
||||||
More specifically, it does this by having a conflict resolution options which
|
More specifically, it does this by reducing the scope of conflict resolution to
|
||||||
only act on individual partitions, rather than across the entire output tree.
|
only act on individual partitions, rather than across the entire output tree.
|
||||||
|
|
||||||
| `fs.s3a.committer.staging.conflict-mode` | Meaning |
|
| `fs.s3a.committer.staging.conflict-mode` | Meaning |
|
||||||
|
@ -492,18 +485,18 @@ was written. With the policy of `append`, the new file would be added to
|
||||||
the existing set of files.
|
the existing set of files.
|
||||||
|
|
||||||
|
|
||||||
### Notes
|
### Notes on using Staging Committers
|
||||||
|
|
||||||
1. A deep partition tree can itself be a performance problem in S3 and the s3a client,
|
1. A deep partition tree can itself be a performance problem in S3 and the s3a client,
|
||||||
or, more specifically. a problem with applications which use recursive directory tree
|
or more specifically a problem with applications which use recursive directory tree
|
||||||
walks to work with data.
|
walks to work with data.
|
||||||
|
|
||||||
1. The outcome if you have more than one job trying simultaneously to write data
|
1. The outcome if you have more than one job trying simultaneously to write data
|
||||||
to the same destination with any policy other than "append" is undefined.
|
to the same destination with any policy other than "append" is undefined.
|
||||||
|
|
||||||
1. In the `append` operation, there is no check for conflict with file names.
|
1. In the `append` operation, there is no check for conflict with file names.
|
||||||
If, in the example above, the file `log-20170228.avro` already existed,
|
If the file `log-20170228.avro` in the example above already existed, it would be overwritten.
|
||||||
it would be overridden. Set `fs.s3a.committer.staging.unique-filenames` to `true`
|
Set `fs.s3a.committer.staging.unique-filenames` to `true`
|
||||||
to ensure that a UUID is included in every filename to avoid this.
|
to ensure that a UUID is included in every filename to avoid this.
|
||||||
|
|
||||||
|
|
||||||
|
@ -514,7 +507,11 @@ performance.
|
||||||
|
|
||||||
### FileSystem client setup
|
### FileSystem client setup
|
||||||
|
|
||||||
1. Turn the magic on by `fs.s3a.committer.magic.enabled"`
|
The S3A connector can recognize files created under paths with `__magic/` as a parent directory.
|
||||||
|
This allows it to handle those files in a special way, such as uploading to a different location
|
||||||
|
and storing the information needed to complete pending multipart uploads.
|
||||||
|
|
||||||
|
Turn the magic on by setting `fs.s3a.committer.magic.enabled` to `true`:
|
||||||
|
|
||||||
```xml
|
```xml
|
||||||
<property>
|
<property>
|
||||||
|
@ -526,22 +523,24 @@ performance.
|
||||||
</property>
|
</property>
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
### Enabling the committer
|
### Enabling the committer
|
||||||
|
|
||||||
|
Set the committer used by S3A's committer factory to `magic`:
|
||||||
|
|
||||||
```xml
|
```xml
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.committer.name</name>
|
<name>fs.s3a.committer.name</name>
|
||||||
<value>magic</value>
|
<value>magic</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
Conflict management is left to the execution engine itself.
|
Conflict management is left to the execution engine itself.
|
||||||
|
|
||||||
## Common Committer Options
|
## Committer Options Reference
|
||||||
|
|
||||||
|
### Common S3A Committer Options
|
||||||
|
|
||||||
|
The table below provides a summary of each option.
|
||||||
|
|
||||||
| Option | Meaning | Default |
|
| Option | Meaning | Default |
|
||||||
|--------|---------|---------|
|
|--------|---------|---------|
|
||||||
|
@ -553,19 +552,7 @@ Conflict management is left to the execution engine itself.
|
||||||
| `fs.s3a.committer.generate.uuid` | Generate a Job UUID if none is passed down from Spark | `false` |
|
| `fs.s3a.committer.generate.uuid` | Generate a Job UUID if none is passed down from Spark | `false` |
|
||||||
| `fs.s3a.committer.require.uuid` |Require the Job UUID to be passed down from Spark | `false` |
|
| `fs.s3a.committer.require.uuid` |Require the Job UUID to be passed down from Spark | `false` |
|
||||||
|
|
||||||
|
The examples below shows how these options can be configured in XML.
|
||||||
## Staging committer (Directory and Partitioned) options
|
|
||||||
|
|
||||||
|
|
||||||
| Option | Meaning | Default |
|
|
||||||
|--------|---------|---------|
|
|
||||||
| `fs.s3a.committer.staging.conflict-mode` | Conflict resolution: `fail`, `append` or `replace`| `append` |
|
|
||||||
| `fs.s3a.committer.staging.tmp.path` | Path in the cluster filesystem for temporary data. | `tmp/staging` |
|
|
||||||
| `fs.s3a.committer.staging.unique-filenames` | Generate unique filenames. | `true` |
|
|
||||||
| `fs.s3a.committer.staging.abort.pending.uploads` | Deprecated; replaced by `fs.s3a.committer.abort.pending.uploads`. | `(false)` |
|
|
||||||
|
|
||||||
|
|
||||||
### Common Committer Options
|
|
||||||
|
|
||||||
```xml
|
```xml
|
||||||
<property>
|
<property>
|
||||||
|
@ -628,8 +615,8 @@ Conflict management is left to the execution engine itself.
|
||||||
<name>fs.s3a.committer.require.uuid</name>
|
<name>fs.s3a.committer.require.uuid</name>
|
||||||
<value>false</value>
|
<value>false</value>
|
||||||
<description>
|
<description>
|
||||||
Should the committer fail to initialize if a unique ID isn't set in
|
Require the committer fail to initialize if a unique ID is not set in
|
||||||
"spark.sql.sources.writeJobUUID" or fs.s3a.committer.staging.uuid
|
"spark.sql.sources.writeJobUUID" or "fs.s3a.committer.uuid".
|
||||||
This helps guarantee that unique IDs for jobs are being
|
This helps guarantee that unique IDs for jobs are being
|
||||||
passed down in spark applications.
|
passed down in spark applications.
|
||||||
|
|
||||||
|
@ -650,7 +637,14 @@ Conflict management is left to the execution engine itself.
|
||||||
</property>
|
</property>
|
||||||
```
|
```
|
||||||
|
|
||||||
### Staging Committer Options
|
### Staging committer (Directory and Partitioned) options
|
||||||
|
|
||||||
|
| Option | Meaning | Default |
|
||||||
|
|--------|---------|---------|
|
||||||
|
| `fs.s3a.committer.staging.conflict-mode` | Conflict resolution: `fail`, `append`, or `replace`.| `append` |
|
||||||
|
| `fs.s3a.committer.staging.tmp.path` | Path in the cluster filesystem for temporary data. | `tmp/staging` |
|
||||||
|
| `fs.s3a.committer.staging.unique-filenames` | Generate unique filenames. | `true` |
|
||||||
|
| `fs.s3a.committer.staging.abort.pending.uploads` | Deprecated; replaced by `fs.s3a.committer.abort.pending.uploads`. | `(false)` |
|
||||||
|
|
||||||
```xml
|
```xml
|
||||||
<property>
|
<property>
|
||||||
|
@ -672,7 +666,7 @@ Conflict management is left to the execution engine itself.
|
||||||
<value>true</value>
|
<value>true</value>
|
||||||
<description>
|
<description>
|
||||||
Option for final files to have a unique name through job attempt info,
|
Option for final files to have a unique name through job attempt info,
|
||||||
or the value of fs.s3a.committer.staging.uuid
|
or the value of fs.s3a.committer.uuid.
|
||||||
When writing data with the "append" conflict option, this guarantees
|
When writing data with the "append" conflict option, this guarantees
|
||||||
that new data will not overwrite any existing data.
|
that new data will not overwrite any existing data.
|
||||||
</description>
|
</description>
|
||||||
|
@ -696,10 +690,9 @@ The magic committer recognizes when files are created under paths with `__magic/
|
||||||
and redirects the upload to a different location, adding the information needed to complete the upload
|
and redirects the upload to a different location, adding the information needed to complete the upload
|
||||||
in the job commit operation.
|
in the job commit operation.
|
||||||
|
|
||||||
If, for some reason, you *do not* want these paths to be redirected and not manifest until later,
|
If, for some reason, you *do not* want these paths to be redirected and completed later,
|
||||||
the feature can be disabled by setting `fs.s3a.committer.magic.enabled` to false.
|
the feature can be disabled by setting `fs.s3a.committer.magic.enabled` to false.
|
||||||
|
By default, it is enabled.
|
||||||
By default it is true.
|
|
||||||
|
|
||||||
```xml
|
```xml
|
||||||
<property>
|
<property>
|
||||||
|
@ -711,6 +704,8 @@ By default it is true.
|
||||||
</property>
|
</property>
|
||||||
```
|
```
|
||||||
|
|
||||||
|
You will not be able to use the Magic Committer if this option is disabled.
|
||||||
|
|
||||||
## <a name="concurrent-jobs"></a> Concurrent Jobs writing to the same destination
|
## <a name="concurrent-jobs"></a> Concurrent Jobs writing to the same destination
|
||||||
|
|
||||||
It is sometimes possible for multiple jobs to simultaneously write to the same destination path.
|
It is sometimes possible for multiple jobs to simultaneously write to the same destination path.
|
||||||
|
@ -730,7 +725,7 @@ be creating files with paths/filenames unique to the specific job.
|
||||||
It is not enough for them to be unique by task `part-00000.snappy.parquet`,
|
It is not enough for them to be unique by task `part-00000.snappy.parquet`,
|
||||||
because each job will have tasks with the same name, so generate files with conflicting operations.
|
because each job will have tasks with the same name, so generate files with conflicting operations.
|
||||||
|
|
||||||
For the staging committers, setting `fs.s3a.committer.staging.unique-filenames` to ensure unique names are
|
For the staging committers, enable `fs.s3a.committer.staging.unique-filenames` to ensure unique names are
|
||||||
generated during the upload. Otherwise, use what configuration options are available in the specific `FileOutputFormat`.
|
generated during the upload. Otherwise, use what configuration options are available in the specific `FileOutputFormat`.
|
||||||
|
|
||||||
Note: by default, the option `mapreduce.output.basename` sets the base name for files;
|
Note: by default, the option `mapreduce.output.basename` sets the base name for files;
|
||||||
|
@ -757,13 +752,12 @@ org.apache.hadoop.fs.s3a.commit.PathCommitException: `s3a://landsat-pds': Filesy
|
||||||
in configuration option fs.s3a.committer.magic.enabled
|
in configuration option fs.s3a.committer.magic.enabled
|
||||||
```
|
```
|
||||||
|
|
||||||
The Job is configured to use the magic committer, but the S3A bucket has not been explicitly
|
The Job is configured to use the magic committer,
|
||||||
declared as supporting it.
|
but the S3A bucket has not been explicitly declared as supporting it.
|
||||||
|
|
||||||
The Job is configured to use the magic committer, but the S3A bucket has not been explicitly declared as supporting it.
|
Magic Committer support within the S3A filesystem has been enabled by default since Hadoop 3.3.1.
|
||||||
|
This error will only surface with a configuration which has explicitly disabled it.
|
||||||
As this is now true by default, this error will only surface with a configuration which has explicitly disabled it.
|
Remove all global/per-bucket declarations of `fs.s3a.bucket.magic.enabled` or set them to `true`.
|
||||||
Remove all global/per-bucket declarations of `fs.s3a.bucket.magic.enabled` or set them to `true`
|
|
||||||
|
|
||||||
```xml
|
```xml
|
||||||
<property>
|
<property>
|
||||||
|
@ -846,7 +840,7 @@ the failure happen at the start of a job.
|
||||||
(Setting this option will not interfere with the Staging Committers' use of HDFS,
|
(Setting this option will not interfere with the Staging Committers' use of HDFS,
|
||||||
as it explicitly sets the algorithm to "2" for that part of its work).
|
as it explicitly sets the algorithm to "2" for that part of its work).
|
||||||
|
|
||||||
The other way to check which committer to use is to examine the `_SUCCESS` file.
|
The other way to check which committer was used is to examine the `_SUCCESS` file.
|
||||||
If it is 0-bytes long, the classic `FileOutputCommitter` committed the job.
|
If it is 0-bytes long, the classic `FileOutputCommitter` committed the job.
|
||||||
The S3A committers all write a non-empty JSON file; the `committer` field lists
|
The S3A committers all write a non-empty JSON file; the `committer` field lists
|
||||||
the committer used.
|
the committer used.
|
||||||
|
@ -862,7 +856,7 @@ all committers registered for the s3a:// schema.
|
||||||
1. The output format has overridden `FileOutputFormat.getOutputCommitter()`
|
1. The output format has overridden `FileOutputFormat.getOutputCommitter()`
|
||||||
and is returning its own committer -one which is a subclass of `FileOutputCommitter`.
|
and is returning its own committer -one which is a subclass of `FileOutputCommitter`.
|
||||||
|
|
||||||
That final cause. *the output format is returning its own committer*, is not
|
The final cause "the output format is returning its own committer" is not
|
||||||
easily fixed; it may be that the custom committer performs critical work
|
easily fixed; it may be that the custom committer performs critical work
|
||||||
during its lifecycle, and contains assumptions about the state of the written
|
during its lifecycle, and contains assumptions about the state of the written
|
||||||
data during task and job commit (i.e. it is in the destination filesystem).
|
data during task and job commit (i.e. it is in the destination filesystem).
|
||||||
|
|
Loading…
Reference in New Issue