hadoop/hadoop-aws/tools/hadoop-aws/committers.html

1472 lines
90 KiB
HTML

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<!--
| Generated by Apache Maven Doxia at 2023-03-02
| Rendered using Apache Maven Stylus Skin 1.5
-->
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<title>Apache Hadoop Amazon Web Services support &#x2013; Committing work to S3 with the S3A Committers</title>
<style type="text/css" media="all">
@import url("../../css/maven-base.css");
@import url("../../css/maven-theme.css");
@import url("../../css/site.css");
</style>
<link rel="stylesheet" href="../../css/print.css" type="text/css" media="print" />
<meta name="Date-Revision-yyyymmdd" content="20230302" />
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
</head>
<body class="composite">
<div id="banner">
<a href="http://hadoop.apache.org/" id="bannerLeft">
<img src="http://hadoop.apache.org/images/hadoop-logo.jpg" alt="" />
</a>
<a href="http://www.apache.org/" id="bannerRight">
<img src="http://www.apache.org/images/asf_logo_wide.png" alt="" />
</a>
<div class="clear">
<hr/>
</div>
</div>
<div id="breadcrumbs">
<div class="xright"> <a href="http://wiki.apache.org/hadoop" class="externalLink">Wiki</a>
|
<a href="https://gitbox.apache.org/repos/asf/hadoop.git" class="externalLink">git</a>
&nbsp;| Last Published: 2023-03-02
&nbsp;| Version: 3.4.0-SNAPSHOT
</div>
<div class="clear">
<hr/>
</div>
</div>
<div id="leftColumn">
<div id="navcolumn">
<h5>General</h5>
<ul>
<li class="none">
<a href="../../../index.html">Overview</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/SingleCluster.html">Single Node Setup</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/ClusterSetup.html">Cluster Setup</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/CommandsManual.html">Commands Reference</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/FileSystemShell.html">FileSystem Shell</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/Compatibility.html">Compatibility Specification</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/DownstreamDev.html">Downstream Developer's Guide</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/AdminCompatibilityGuide.html">Admin Compatibility Guide</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/InterfaceClassification.html">Interface Classification</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/filesystem/index.html">FileSystem Specification</a>
</li>
</ul>
<h5>Common</h5>
<ul>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/CLIMiniCluster.html">CLI Mini Cluster</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/FairCallQueue.html">Fair Call Queue</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/NativeLibraries.html">Native Libraries</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/Superusers.html">Proxy User</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/RackAwareness.html">Rack Awareness</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/SecureMode.html">Secure Mode</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/ServiceLevelAuth.html">Service Level Authorization</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/HttpAuthentication.html">HTTP Authentication</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/CredentialProviderAPI.html">Credential Provider API</a>
</li>
<li class="none">
<a href="../../../hadoop-kms/index.html">Hadoop KMS</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/Tracing.html">Tracing</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/UnixShellGuide.html">Unix Shell Guide</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/registry/index.html">Registry</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/AsyncProfilerServlet.html">Async Profiler</a>
</li>
</ul>
<h5>HDFS</h5>
<ul>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsDesign.html">Architecture</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html">User Guide</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HDFSCommands.html">Commands Reference</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html">NameNode HA With QJM</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html">NameNode HA With NFS</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html">Observer NameNode</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/Federation.html">Federation</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/ViewFs.html">ViewFs</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/ViewFsOverloadScheme.html">ViewFsOverloadScheme</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsSnapshots.html">Snapshots</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsEditsViewer.html">Edits Viewer</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsImageViewer.html">Image Viewer</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html">Permissions and HDFS</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsQuotaAdminGuide.html">Quotas and HDFS</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/LibHdfs.html">libhdfs (C API)</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/WebHDFS.html">WebHDFS (REST API)</a>
</li>
<li class="none">
<a href="../../../hadoop-hdfs-httpfs/index.html">HttpFS</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/ShortCircuitLocalReads.html">Short Circuit Local Reads</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html">Centralized Cache Management</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsNfsGateway.html">NFS Gateway</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsRollingUpgrade.html">Rolling Upgrade</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/ExtendedAttributes.html">Extended Attributes</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html">Transparent Encryption</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsMultihoming.html">Multihoming</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html">Storage Policies</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/MemoryStorage.html">Memory Storage Support</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/SLGUserGuide.html">Synthetic Load Generator</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html">Erasure Coding</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HDFSDiskbalancer.html">Disk Balancer</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsUpgradeDomain.html">Upgrade Domain</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsDataNodeAdminGuide.html">DataNode Admin</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs-rbf/HDFSRouterFederation.html">Router Federation</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/HdfsProvidedStorage.html">Provided Storage</a>
</li>
</ul>
<h5>MapReduce</h5>
<ul>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html">Tutorial</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredCommands.html">Commands Reference</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduce_Compatibility_Hadoop1_Hadoop2.html">Compatibility with 1.x</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html">Encrypted Shuffle</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/PluggableShuffleAndPluggableSort.html">Pluggable Shuffle/Sort</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/DistributedCacheDeploy.html">Distributed Cache Deploy</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/SharedCacheSupport.html">Support for YARN Shared Cache</a>
</li>
</ul>
<h5>MapReduce REST APIs</h5>
<ul>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredAppMasterRest.html">MR Application Master</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-hs/HistoryServerRest.html">MR History Server</a>
</li>
</ul>
<h5>YARN</h5>
<ul>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/YARN.html">Architecture</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/YarnCommands.html">Commands Reference</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html">Capacity Scheduler</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/FairScheduler.html">Fair Scheduler</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/ResourceManagerRestart.html">ResourceManager Restart</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/ResourceManagerHA.html">ResourceManager HA</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/ResourceModel.html">Resource Model</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/NodeLabel.html">Node Labels</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/NodeAttributes.html">Node Attributes</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/WebApplicationProxy.html">Web Application Proxy</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/TimelineServer.html">Timeline Server</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/TimelineServiceV2.html">Timeline Service V.2</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html">Writing YARN Applications</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html">YARN Application Security</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/NodeManager.html">NodeManager</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/DockerContainers.html">Running Applications in Docker Containers</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/RuncContainers.html">Running Applications in runC Containers</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html">Using CGroups</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/SecureContainer.html">Secure Containers</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/ReservationSystem.html">Reservation System</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/GracefulDecommission.html">Graceful Decommission</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/OpportunisticContainers.html">Opportunistic Containers</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/Federation.html">YARN Federation</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/SharedCache.html">Shared Cache</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/UsingGpus.html">Using GPU</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/UsingFPGA.html">Using FPGA</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html">Placement Constraints</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/YarnUI2.html">YARN UI2</a>
</li>
</ul>
<h5>YARN REST APIs</h5>
<ul>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html">Introduction</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html">Resource Manager</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/NodeManagerRest.html">Node Manager</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/TimelineServer.html#Timeline_Server_REST_API_v1">Timeline Server</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/TimelineServiceV2.html#Timeline_Service_v.2_REST_API">Timeline Service V.2</a>
</li>
</ul>
<h5>YARN Service</h5>
<ul>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/yarn-service/Overview.html">Overview</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/yarn-service/QuickStart.html">QuickStart</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/yarn-service/Concepts.html">Concepts</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/yarn-service/YarnServiceAPI.html">Yarn Service API</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/yarn-service/ServiceDiscovery.html">Service Discovery</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-site/yarn-service/SystemServices.html">System Services</a>
</li>
</ul>
<h5>Hadoop Compatible File Systems</h5>
<ul>
<li class="none">
<a href="../../../hadoop-aliyun/tools/hadoop-aliyun/index.html">Aliyun OSS</a>
</li>
<li class="none">
<a href="../../../hadoop-aws/tools/hadoop-aws/index.html">Amazon S3</a>
</li>
<li class="none">
<a href="../../../hadoop-azure/index.html">Azure Blob Storage</a>
</li>
<li class="none">
<a href="../../../hadoop-azure-datalake/index.html">Azure Data Lake Storage</a>
</li>
<li class="none">
<a href="../../../hadoop-cos/cloud-storage/index.html">Tencent COS</a>
</li>
<li class="none">
<a href="../../../hadoop-huaweicloud/cloud-storage/index.html">Huaweicloud OBS</a>
</li>
</ul>
<h5>Auth</h5>
<ul>
<li class="none">
<a href="../../../hadoop-auth/index.html">Overview</a>
</li>
<li class="none">
<a href="../../../hadoop-auth/Examples.html">Examples</a>
</li>
<li class="none">
<a href="../../../hadoop-auth/Configuration.html">Configuration</a>
</li>
<li class="none">
<a href="../../../hadoop-auth/BuildingIt.html">Building</a>
</li>
</ul>
<h5>Tools</h5>
<ul>
<li class="none">
<a href="../../../hadoop-streaming/HadoopStreaming.html">Hadoop Streaming</a>
</li>
<li class="none">
<a href="../../../hadoop-archives/HadoopArchives.html">Hadoop Archives</a>
</li>
<li class="none">
<a href="../../../hadoop-archive-logs/HadoopArchiveLogs.html">Hadoop Archive Logs</a>
</li>
<li class="none">
<a href="../../../hadoop-distcp/DistCp.html">DistCp</a>
</li>
<li class="none">
<a href="../../../hadoop-federation-balance/HDFSFederationBalance.html">HDFS Federation Balance</a>
</li>
<li class="none">
<a href="../../../hadoop-gridmix/GridMix.html">GridMix</a>
</li>
<li class="none">
<a href="../../../hadoop-rumen/Rumen.html">Rumen</a>
</li>
<li class="none">
<a href="../../../hadoop-resourceestimator/ResourceEstimator.html">Resource Estimator Service</a>
</li>
<li class="none">
<a href="../../../hadoop-sls/SchedulerLoadSimulator.html">Scheduler Load Simulator</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/Benchmarking.html">Hadoop Benchmarking</a>
</li>
<li class="none">
<a href="../../../hadoop-dynamometer/Dynamometer.html">Dynamometer</a>
</li>
</ul>
<h5>Reference</h5>
<ul>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/release/">Changelog and Release Notes</a>
</li>
<li class="none">
<a href="../../../api/index.html">Java API docs</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/UnixShellAPI.html">Unix Shell API</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/Metrics.html">Metrics</a>
</li>
</ul>
<h5>Configuration</h5>
<ul>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/core-default.xml">core-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs/hdfs-default.xml">hdfs-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-hdfs-rbf/hdfs-rbf-default.xml">hdfs-rbf-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml">mapred-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-yarn/hadoop-yarn-common/yarn-default.xml">yarn-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-kms/kms-default.html">kms-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-hdfs-httpfs/httpfs-default.html">httpfs-default.xml</a>
</li>
<li class="none">
<a href="../../../hadoop-project-dist/hadoop-common/DeprecatedProperties.html">Deprecated Properties</a>
</li>
</ul>
<a href="http://maven.apache.org/" title="Built by Maven" class="poweredBy">
<img alt="Built by Maven" src="../../images/logos/maven-feather.png"/>
</a>
</div>
</div>
<div id="bodyColumn">
<div id="contentBox">
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<h1>Committing work to S3 with the &#x201c;S3A Committers&#x201d;</h1>
<ul>
<li><a href="#a"></a>
<ul>
<li><a href="#January_2021_Update">January 2021 Update</a></li></ul></li>
<li><a href="#Introduction:_The_Commit_Problem">Introduction: The Commit Problem</a>
<ul>
<li><a href="#Background:_Hadoop.E2.80.99s_.E2.80.9CCommit_Protocol.E2.80.9D">Background: Hadoop&#x2019;s &#x201c;Commit Protocol&#x201d;</a></li></ul></li>
<li><a href="#Meet_the_S3A_Committers">Meet the S3A Committers</a>
<ul>
<li><a href="#The_Staging_Committers">The Staging Committers</a>
<ul>
<li><a href="#Conflict_Resolution_in_the_Staging_Committers">Conflict Resolution in the Staging Committers</a></li></ul></li>
<li><a href="#The_Magic_Committer">The Magic Committer</a></li>
<li><a href="#Which_Committer_to_Use.3F">Which Committer to Use?</a></li></ul></li>
<li><a href="#Switching_to_an_S3A_Committer">Switching to an S3A Committer</a></li>
<li><a href="#Using_the_Staging_Committers">Using the Staging Committers</a>
<ul>
<li><a href="#The_.E2.80.9CPartitioned.E2.80.9D_Staging_Committer">The &#x201c;Partitioned&#x201d; Staging Committer</a></li>
<li><a href="#Notes_on_using_Staging_Committers">Notes on using Staging Committers</a></li></ul></li>
<li><a href="#Using_the_Magic_committer">Using the Magic committer</a>
<ul>
<li><a href="#FileSystem_client_setup">FileSystem client setup</a></li>
<li><a href="#Enabling_the_committer">Enabling the committer</a></li></ul></li>
<li><a href="#Committer_Options_Reference">Committer Options Reference</a>
<ul>
<li><a href="#Common_S3A_Committer_Options">Common S3A Committer Options</a></li>
<li><a href="#Staging_committer_.28Directory_and_Partitioned.29_options">Staging committer (Directory and Partitioned) options</a></li>
<li><a href="#Disabling_magic_committer_path_rewriting">Disabling magic committer path rewriting</a></li></ul></li>
<li><a href="#Concurrent_Jobs_writing_to_the_same_destination"> Concurrent Jobs writing to the same destination</a></li>
<li><a href="#Troubleshooting">Troubleshooting</a>
<ul>
<li><a href="#Filesystem_does_not_have_support_for_.27magic.27_committer">Filesystem does not have support for 'magic' committer</a></li>
<li><a href="#Error_message:_.E2.80.9CFile_being_created_has_a_magic_path.2C_but_the_filesystem_has_magic_file_support_disabled.E2.80.9D">Error message: &#x201c;File being created has a magic path, but the filesystem has magic file support disabled&#x201d;</a></li>
<li><a href="#FileOutputCommitter_appears_to_be_still_used_.28from_logs_or_delays_in_commits.29">FileOutputCommitter appears to be still used (from logs or delays in commits)</a></li>
<li><a href="#Job.2FTask_fails_with_PathExistsException:_Destination_path_exists_and_committer_conflict_resolution_mode_is_.E2.80.9Cfail.E2.80.9D">Job/Task fails with PathExistsException: Destination path exists and committer conflict resolution mode is &#x201c;fail&#x201d;</a></li>
<li><a href="#Staging_committer_task_fails_with_IOException:_No_space_left_on_device">Staging committer task fails with IOException: No space left on device</a></li>
<li><a href="#Jobs_run_with_directory.2Fpartitioned_committers_complete_but_the_output_is_empty.">Jobs run with directory/partitioned committers complete but the output is empty.</a></li>
<li><a href="#Magic_output_committer_task_fails_.E2.80.9CThe_specified_upload_does_not_exist.E2.80.9D_.E2.80.9CError_Code:_NoSuchUpload.E2.80.9D">Magic output committer task fails &#x201c;The specified upload does not exist&#x201d; &#x201c;Error Code: NoSuchUpload&#x201d;</a></li>
<li><a href="#Job_commit_fails_.E2.80.9Cjava.io.FileNotFoundException:_Completing_multipart_upload.E2.80.9D_.E2.80.9CThe_specified_upload_does_not_exist.E2.80.9D">Job commit fails &#x201c;java.io.FileNotFoundException: Completing multipart upload&#x201d; &#x201c;The specified upload does not exist&#x201d;</a></li>
<li><a href="#Job_commit_fails_java.io.FileNotFoundException_.E2.80.9CFile_hdfs:.2F.2F....2Fstaging-uploads.2F_temporary.2F0_does_not_exist.E2.80.9D">Job commit fails java.io.FileNotFoundException &#x201c;File hdfs://.../staging-uploads/_temporary/0 does not exist&#x201d;</a></li>
<li><a href="#Job_setup_fails_Job.2Ftask_context_does_not_contain_a_unique_ID_in_spark.sql.sources.writeJobUUID">Job setup fails Job/task context does not contain a unique ID in spark.sql.sources.writeJobUUID</a></li></ul></li></ul>
<p>This page covers the S3A Committers, which can commit work directly to an S3 object store.</p>
<p>These committers are designed to solve a fundamental problem which the standard committers of work cannot do to S3: consistent, high performance, and reliable commitment of output to S3.</p>
<p>For details on their internal design, see <a href="./committer_architecture.html">S3A Committers: Architecture and Implementation</a>.</p><section><section>
<h3><a name="January_2021_Update"></a>January 2021 Update</h3>
<p>Now that S3 is fully consistent, problems related to inconsistent directory listings have gone. However the rename problem exists: committing work by renaming directories is unsafe as well as horribly slow.</p>
<p>This architecture document, and the committers, were written at a time when S3 was inconsistent. The two committers addressed this problem differently</p>
<ul>
<li>Staging Committer: rely on a cluster HDFS filesystem for safely propagating the lists of files to commit from workers to the job manager/driver.</li>
<li>Magic Committer: require S3Guard to offer consistent directory listings on the object store.</li>
</ul>
<p>With consistent S3, the Magic Committer can be safely used with any S3 bucket. The choice of which to use, then, is matter for experimentation.</p>
<p>This document was written in 2017, a time when S3 was only consistent when an extra consistency layer such as S3Guard was used. The document indicates where requirements/constraints which existed then are now obsolete.</p></section></section><section>
<h2><a name="Introduction:_The_Commit_Problem"></a>Introduction: The Commit Problem</h2>
<p>Apache Hadoop MapReduce (and behind the scenes, Apache Spark) often write the output of their work to filesystems.</p>
<p>Normally, Hadoop uses the <code>FileOutputFormatCommitter</code> to manage the promotion of files created in a single task attempt to the final output of a query. This is done in a way to handle failures of tasks and jobs, and to support speculative execution. It does that by listing directories and renaming their content into the final destination when tasks and then jobs are committed.</p>
<p>This has some key requirement of the underlying filesystem:</p>
<ol style="list-style-type: decimal">
<li>When you list a directory, you see all the files which have been created in it, and no files which are not in it (i.e. have been deleted).</li>
<li>When you rename a directory, it is an <code>O(1)</code> atomic transaction. No other process across the cluster may rename a file or directory to the same path. If the rename fails for any reason, either the data is at the original location, or it is at the destination, -in which case the rename actually succeeded.</li>
</ol>
<p><i>The S3 object store and the <code>s3a://</code> filesystem client cannot meet these requirements.</i></p>
<p>Although S3 is (now) consistent, the S3A client still mimics <code>rename()</code> by copying files and then deleting the originals. This can fail partway through, and there is nothing to prevent any other process in the cluster attempting a rename at the same time.</p>
<p>As a result,</p>
<ul>
<li>If a &#x2018;rename&#x2019; fails, the data is left in an unknown state.</li>
<li>If more than one process attempts to commit work simultaneously, the output directory may contain the results of both processes: it is no longer an exclusive operation.</li>
<li>Commit time is still proportional to the amount of data created. It still can&#x2019;t handle task failure.</li>
</ul>
<p><b>Using the &#x201c;classic&#x201d; <code>FileOutputCommmitter</code> to commit work to Amazon S3 risks loss or corruption of generated data</b>.</p>
<p>To address these problems there is now explicit support in the <code>hadoop-aws</code> module for committing work to Amazon S3 via the S3A filesystem client: <i>the S3A Committers</i>.</p>
<p>For safe, as well as high-performance output of work to S3, we need to use &#x201c;a committer&#x201d; explicitly written to work with S3, treating it as an object store with special features.</p><section>
<h3><a name="Background:_Hadoop.E2.80.99s_.E2.80.9CCommit_Protocol.E2.80.9D"></a>Background: Hadoop&#x2019;s &#x201c;Commit Protocol&#x201d;</h3>
<p>How exactly is work written to its final destination? That is accomplished by a &#x201c;commit protocol&#x201d; between the workers and the job manager.</p>
<p>This protocol is implemented in Hadoop MapReduce, with a similar but extended version in Apache Spark:</p>
<ol style="list-style-type: decimal">
<li>The &#x201c;Job&#x201d; is the entire query. It takes a given set of input and produces some output.</li>
<li>The &#x201c;Job Manager&#x201d; is the process in charge of choreographing the execution of the job. It may perform some of the actual computation too.</li>
<li>The job has &#x201c;workers&#x201d;, which are processes which work with the actual data and write the results.</li>
<li>Workers execute &#x201c;Tasks&#x201d;, which are fractions of the job, a job whose input has been <i>partitioned</i> into units of work which can be executed independently.</li>
<li>The Job Manager directs workers to execute &#x201c;tasks&#x201d;, usually trying to schedule the work close to the data (if the filesystem provides locality information).</li>
<li>Workers can fail: the Job manager needs to detect this and reschedule their active tasks.</li>
<li>Workers can also become separated from the Job Manager, a &#x201c;network partition&#x201d;. It is (provably) impossible for the Job Manager to distinguish a running-but-unreachable worker from a failed one.</li>
<li>The output of a failed task must not be visible; this is to avoid its data getting into the final output.</li>
<li>Multiple workers can be instructed to evaluate the same partition of the work; this &#x201c;speculation&#x201d; delivers speedup as it can address the &#x201c;straggler problem&#x201d;. When multiple workers are working on the same data, only one worker is allowed to write the final output.</li>
<li>The entire job may fail (often from the failure of the Job Manager (MR Master, Spark Driver, &#x2026;)).</li>
<li>The network may partition, with workers isolated from each other or the process managing the entire commit.</li>
<li>Restarted jobs may recover from a failure by reusing the output of all completed tasks (MapReduce with the &#x201c;v1&#x201d; algorithm), or just by rerunning everything (The &#x201c;v2&#x201d; algorithm and Spark).</li>
</ol>
<p>What is &#x201c;the commit protocol&#x201d; then? It is the requirements on workers as to when their data is made visible, where, for a filesystem, &#x201c;visible&#x201d; means &#x201c;can be seen in the destination directory of the query.&#x201d;</p>
<ul>
<li>There is a destination directory of work: &#x201c;the output directory&#x201d;. The final output of tasks must be in this directory <i>or paths underneath it</i>.</li>
<li>The intermediate output of a task must not be visible in the destination directory. That is: they must not write directly to the destination.</li>
<li>The final output of a task <i>may</i> be visible under the destination.</li>
<li>Individual workers communicate with the Job manager to manage the commit process.</li>
<li>The Job Manager makes the decision on if a task&#x2019;s output data is to be &#x201c;committed&#x201d;, be it directly to the final directory or to some intermediate store.</li>
<li>When a worker commits the output of a task, it somehow promotes its intermediate work to becoming final.</li>
<li>When a worker aborts a task&#x2019;s output, that output must not become visible (i.e. it is not committed).</li>
<li>Jobs themselves may be committed/aborted (the nature of &#x201c;when&#x201d; is not covered here).</li>
<li>After a Job is committed, all its work must be visible. A file named <code>_SUCCESS</code> may be written to the output directory.</li>
<li>After a Job is aborted, all its intermediate data is lost.</li>
<li>Jobs may also fail. When restarted, the successor job must be able to clean up all the intermediate and committed work of its predecessor(s).</li>
<li>Task and Job processes measure the intervals between communications with their Application Master and YARN respectively. When the interval has grown too large, they must conclude that the network has partitioned and that they must abort their work.</li>
</ul>
<p>That&#x2019;s &#x201c;essentially&#x201d; it. When working with HDFS and similar filesystems, directory <code>rename()</code> is the mechanism used to commit the work of tasks and jobs.</p>
<ul>
<li>Tasks write data to task attempt directories under the directory <code>_temporary</code> underneath the final destination directory.</li>
<li>When a task is committed, these files are renamed to the destination directory (v2 algorithm) or a job attempt directory under <code>_temporary</code> (v1 algorithm).</li>
<li>When a job is committed, for the v2 algorithm the <code>_SUCCESS</code> file is created, and the <code>_temporary</code> deleted.</li>
<li>For the v1 algorithm, when a job is committed, all the tasks committed under the job attempt directory will have their output renamed into the destination directory.</li>
<li>The v2 algorithm recovers from failure by deleting the destination directory and restarting the job.</li>
<li>The v1 algorithm recovers from failure by discovering all committed tasks whose output is in the job attempt directory, <i>and only rerunning all uncommitted tasks</i>.</li>
</ul>
<p>This algorithm does not work safely or swiftly with AWS S3 storage because renames go from being fast, atomic operations to slow operations which can fail partway through.</p>
<p>This then is the problem which the S3A committers address: <i>How to safely and reliably commit work to Amazon S3 or compatible object store.</i></p></section></section><section>
<h2><a name="Meet_the_S3A_Committers"></a>Meet the S3A Committers</h2>
<p>Since Hadoop 3.1, the S3A FileSystem has been accompanied by classes designed to integrate with the Hadoop and Spark job commit protocols, classes which interact with the S3A filesystem to reliably commit work to S3: <i>The S3A Committers</i>.</p>
<p>The underlying architecture of this process is very complex, and covered in <a href="./committer_architecture.html">the committer architecture documentation</a>.</p>
<p>The key concept to know of is S3&#x2019;s &#x201c;Multipart Upload&#x201d; mechanism. This allows an S3 client to write data to S3 in multiple HTTP POST requests, only completing the write operation with a final POST to complete the upload; this final POST consisting of a short list of the etags of the uploaded blocks. This multipart upload mechanism is already automatically used when writing large amounts of data to S3; an implementation detail of the S3A output stream.</p>
<p>The S3A committers make explicit use of this multipart upload (&#x201c;MPU&#x201d;) mechanism:</p>
<ol style="list-style-type: decimal">
<li>The individual <i>tasks</i> in a job write their data to S3 as POST operations within multipart uploads, yet do not issue the final POST to complete the upload.</li>
<li>The multipart uploads are committed in the job commit process.</li>
</ol>
<p>There are two different S3A committer types, <i>staging</i> and <i>magic</i>. The committers primarily vary in how data is written during task execution, how the pending commit information is passed to the job manager, and in how conflict with existing files is resolved.</p>
<table border="0" class="bodyTable">
<thead>
<tr class="a">
<th> feature </th>
<th> staging </th>
<th> magic </th></tr>
</thead><tbody>
<tr class="b">
<td> task output destination </td>
<td> write to local disk </td>
<td> upload to S3 <i>without completing the write</i> </td></tr>
<tr class="a">
<td> task commit process </td>
<td> upload data from disk to S3 <i>without completing the write</i> </td>
<td> list all pending uploads on S3 and write details to job attempt directory </td></tr>
<tr class="b">
<td> task abort process </td>
<td> delete local disk data </td>
<td> list all pending uploads and abort them </td></tr>
<tr class="a">
<td> job commit </td>
<td> list &amp; complete pending uploads </td>
<td> list &amp; complete pending uploads </td></tr>
</tbody>
</table>
<p>The other metric is &#x201c;maturity&#x201d;. There, the fact that the Staging committers are based on Netflix&#x2019;s production code counts in its favor.</p><section>
<h3><a name="The_Staging_Committers"></a>The Staging Committers</h3>
<p>This is based on work from Netflix. It &#x201c;stages&#x201d; data into the local filesystem, using URLs with <code>file://</code> schemas.</p>
<p>When a task is committed, its files are listed and uploaded to S3 as incomplete Multipart Uploads. The information needed to complete the uploads is saved to HDFS where it is committed through the standard &#x201c;v1&#x201d; commit algorithm.</p>
<p>When the Job is committed, the Job Manager reads the lists of pending writes from its HDFS Job destination directory and completes those uploads.</p>
<p>Canceling a <i>task</i> is straightforward: the local directory is deleted with its staged data. Canceling a <i>job</i> is achieved by reading in the lists of pending writes from the HDFS job attempt directory, and aborting those uploads. For extra safety, all outstanding multipart writes to the destination directory are aborted.</p>
<p>There are two staging committers with slightly different conflict resolution behaviors:</p>
<ul>
<li>
<p><b>Directory Committer</b>: the entire directory tree of data is written or overwritten, as normal.</p>
</li>
<li>
<p><b>Partitioned Committer</b>: special handling of partitioned directory trees of the form <code>YEAR=2017/MONTH=09/DAY=19</code>: conflict resolution is limited to the partitions being updated.</p>
</li>
</ul>
<p>The Partitioned Committer is intended to allow jobs updating a partitioned directory tree to restrict the conflict resolution to only those partition directories containing new data. It is intended for use with Apache Spark only.</p><section>
<h4><a name="Conflict_Resolution_in_the_Staging_Committers"></a>Conflict Resolution in the Staging Committers</h4>
<p>The Staging committers offer the ability to replace the conflict policy of the execution engine with policy designed to work with the tree of data. This is based on the experience and needs of Netflix, where efficiently adding new data to an existing partitioned directory tree is a common operation.</p>
<p>An XML configuration is shown below. The default conflict mode if unset would be <code>append</code>.</p>
<div class="source">
<div class="source">
<pre>&lt;property&gt;
&lt;name&gt;fs.s3a.committer.staging.conflict-mode&lt;/name&gt;
&lt;value&gt;fail&lt;/value&gt;
&lt;description&gt;
Staging committer conflict resolution policy: {@value}.
Supported: fail, append, replace.
&lt;/description&gt;
&lt;/property&gt;
</pre></div></div>
<p>The <i>Directory Committer</i> uses the entire directory tree for conflict resolution. For this committer, the behavior of each conflict mode is shown below:</p>
<ul>
<li>
<p><b>replace</b>: When the job is committed (and not before), delete files in directories into which new data will be written.</p>
</li>
<li>
<p><b>fail</b>: When there are existing files in the destination, fail the job.</p>
</li>
<li>
<p><b>append</b>: Add new data to the directories at the destination; overwriting any with the same name. Reliable use requires unique names for generated files, which the committers generate by default.</p>
</li>
</ul>
<p>The <i>Partitioned Committer</i> calculates the partitions into which files are added, the final directories in the tree, and uses that in its conflict resolution process. For the <i>Partitioned Committer</i>, the behavior of each mode is as follows:</p>
<ul>
<li>
<p><b>replace</b>: Delete all data in the destination <i>partition</i> before committing the new files.</p>
</li>
<li>
<p><b>fail</b>: Fail if there is data in the destination <i>partition</i>, ignoring the state of any parallel partitions.</p>
</li>
<li>
<p><b>append</b>: Add the new data to the destination <i>partition</i>, overwriting any files with the same name.</p>
</li>
</ul>
<p>The <i>Partitioned Committer</i> is intended for use in Apache Spark Dataset operations, rather than Hadoop&#x2019;s original MapReduce engine, and only in jobs where adding new data to an existing dataset is the desired goal.</p>
<p>Prerequisites for success with the <i>Partitioned Committer</i>:</p>
<ol style="list-style-type: decimal">
<li>The output is written into partitions via <code>PARTITIONED BY</code> or <code>partitionedBy()</code> instructions.</li>
<li>There is no data written directly to the root path (all files there are ignored; it&#x2019;s implicitly &#x201c;append&#x201d;).</li>
</ol>
<p>Here&#x2019;s an example in Spark, assuming that <code>sourceDataset</code> is a dataset whose columns include &#x201c;year&#x201d; and &#x201c;month&#x201d;:</p>
<div class="source">
<div class="source">
<pre>sourceDataset
.write
.partitionBy(&quot;year&quot;, &quot;month&quot;)
.mode(SaveMode.Append)
.opt(&quot;fs.s3a.committer.name&quot;, &quot;partitioned&quot;)
.opt(&quot;fs.s3a.committer.staging.conflict-mode&quot;, &quot;replace&quot;)
.format(&quot;orc&quot;)
.save(&quot;s3a://examples/statistics&quot;)
</pre></div></div>
</section></section><section>
<h3><a name="The_Magic_Committer"></a>The Magic Committer</h3>
<p>The &#x201c;Magic&#x201d; committer does its work through &#x201c;magic&#x201d; in the filesystem: attempts to write to specific &#x201c;magic&#x201d; paths are interpreted as writes to a parent directory <i>which are not to be completed</i>. When the output stream is closed, the information needed to complete the write is saved in the magic directory. The task committer saves the list of these to a directory for the job committers use, or, if aborting, lists the pending writes and aborts them.</p>
<p>The job committer reads in the list of pending commits, and commits them as the Staging Committer does.</p>
<p>Compared to the Staging Committer, the Magic Committer offers faster write times: output is uploaded to S3 as it is written, rather than in the task commit.</p>
<p>However, it has extra requirements of the filesystem</p>
<ol style="list-style-type: decimal">
<li>The object store must be consistent.</li>
<li>The S3A client must be configured to recognize interactions with the magic directories and treat them as a special case.</li>
</ol>
<p>Now that <a class="externalLink" href="https://aws.amazon.com/s3/consistency/">Amazon S3 is consistent</a>, the magic directory path rewriting is enabled by default.</p>
<p>The Magic Committer has not been field tested to the extent of Netflix&#x2019;s committer; consider it the least mature of the committers.</p></section><section>
<h3><a name="Which_Committer_to_Use.3F"></a>Which Committer to Use?</h3>
<ol style="list-style-type: decimal">
<li>
<p>If you want to create or update existing partitioned data trees in Spark, use the Partitioned Committer. Make sure you have enough hard disk capacity for all staged data. Do not use it in other situations.</p>
</li>
<li>
<p>If you do not have a shared cluster store: use the Magic Committer.</p>
</li>
<li>
<p>If you are writing large amounts of data: use the Magic Committer.</p>
</li>
<li>
<p>Otherwise: use the directory committer, making sure you have enough hard disk capacity for all staged data.</p>
</li>
</ol>
<p>Now that S3 is consistent, there are fewer reasons not to use the Magic Committer. Experiment with both to see which works best for your work.</p></section></section><section>
<h2><a name="Switching_to_an_S3A_Committer"></a>Switching to an S3A Committer</h2>
<p>To use an S3A committer, the property <code>mapreduce.outputcommitter.factory.scheme.s3a</code> must be set to the S3A committer factory, <code>org.apache.hadoop.fs.s3a.commit.staging.S3ACommitterFactory</code>. This is done in <code>mapred-default.xml</code></p>
<div class="source">
<div class="source">
<pre>&lt;property&gt;
&lt;name&gt;mapreduce.outputcommitter.factory.scheme.s3a&lt;/name&gt;
&lt;value&gt;org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory&lt;/value&gt;
&lt;description&gt;
The committer factory to use when writing data to S3A filesystems.
&lt;/description&gt;
&lt;/property&gt;
</pre></div></div>
<p>You must also choose which of the S3A committers to use with the <code>fs.s3a.committer.name</code> property. Otherwise, the classic (and unsafe) file committer is used.</p>
<table border="0" class="bodyTable">
<thead>
<tr class="a">
<th> <code>fs.s3a.committer.name</code> </th>
<th> Committer </th></tr>
</thead><tbody>
<tr class="b">
<td> <code>directory</code> </td>
<td> directory staging committer </td></tr>
<tr class="a">
<td> <code>partitioned</code> </td>
<td> partition staging committer (for use in Spark only) </td></tr>
<tr class="b">
<td> <code>magic</code> </td>
<td> the &#x201c;magic&#x201d; committer </td></tr>
<tr class="a">
<td> <code>file</code> </td>
<td> the original and unsafe File committer; (default) </td></tr>
</tbody>
</table></section><section>
<h2><a name="Using_the_Staging_Committers"></a>Using the Staging Committers</h2>
<p>Generated files are initially written to a local directory underneath one of the temporary directories listed in <code>fs.s3a.buffer.dir</code>.</p>
<p>The staging committer needs a path in the cluster filesystem (e.g. HDFS). This must be declared in <code>fs.s3a.committer.staging.tmp.path</code>.</p>
<p>Temporary files are saved in HDFS (or other cluster filesystem) under the path <code>${fs.s3a.committer.staging.tmp.path}/${user}</code> where <code>user</code> is the name of the user running the job. The default value of <code>fs.s3a.committer.staging.tmp.path</code> is <code>tmp/staging</code>, resulting in the HDFS directory <code>~/tmp/staging/${user}</code>.</p>
<p>The application attempt ID is used to create a unique path under this directory, resulting in a path <code>~/tmp/staging/${user}/${application-attempt-id}/</code> under which summary data of each task&#x2019;s pending commits are managed using the standard <code>FileOutputFormat</code> committer.</p>
<p>When a task is committed, the data is uploaded under the destination directory. The policy of how to react if the destination exists is defined by the <code>fs.s3a.committer.staging.conflict-mode</code> setting.</p>
<table border="0" class="bodyTable">
<thead>
<tr class="a">
<th> <code>fs.s3a.committer.staging.conflict-mode</code> </th>
<th> Meaning </th></tr>
</thead><tbody>
<tr class="b">
<td> <code>fail</code> </td>
<td> Fail if the destination directory exists </td></tr>
<tr class="a">
<td> <code>replace</code> </td>
<td> Delete all existing files before committing the new data </td></tr>
<tr class="b">
<td> <code>append</code> </td>
<td> Add the new files to the existing directory tree </td></tr>
</tbody>
</table><section>
<h3><a name="The_.E2.80.9CPartitioned.E2.80.9D_Staging_Committer"></a>The &#x201c;Partitioned&#x201d; Staging Committer</h3>
<p>This committer is an extension of the &#x201c;Directory&#x201d; committer which has a special conflict resolution policy designed to support operations which insert new data into a directory tree structured using Hive&#x2019;s partitioning strategy: different levels of the tree represent different columns.</p>
<p>For example, log data could be partitioned by <code>YEAR</code> and then by <code>MONTH</code>, with different entries underneath.</p>
<div class="source">
<div class="source">
<pre>logs/YEAR=2017/MONTH=01/
log-20170101.avro
log-20170102.avro
...
log-20170131.avro
logs/YEAR=2017/MONTH=02/
log-20170201.avro
log-20170202.avro
...
log-20170227.avro
logs/YEAR=2017/MONTH=03/
logs/YEAR=2017/MONTH=04/
</pre></div></div>
<p>A partitioned structure like this allows for queries using Hive or Spark to filter out files which do not contain relevant data.</p>
<p>The partitioned committer allows callers to add new data to an existing partitioned layout, where the application supports it.</p>
<p>More specifically, it does this by reducing the scope of conflict resolution to only act on individual partitions, rather than across the entire output tree.</p>
<table border="0" class="bodyTable">
<thead>
<tr class="a">
<th> <code>fs.s3a.committer.staging.conflict-mode</code> </th>
<th> Meaning </th></tr>
</thead><tbody>
<tr class="b">
<td> <code>fail</code> </td>
<td> Fail if the destination partition(s) exist </td></tr>
<tr class="a">
<td> <code>replace</code> </td>
<td> Delete the existing data partitions before committing the new data </td></tr>
<tr class="b">
<td> <code>append</code> </td>
<td> Add the new data to the existing partitions </td></tr>
</tbody>
</table>
<p>As an example, if a job was writing the file <code>logs/YEAR=2017/MONTH=02/log-20170228.avro</code>, then with a policy of <code>fail</code>, the job would fail. With a policy of <code>replace</code>, then entire directory <code>logs/YEAR=2017/MONTH=02/</code> would be deleted before the new file <code>log-20170228.avro</code> was written. With the policy of <code>append</code>, the new file would be added to the existing set of files.</p></section><section>
<h3><a name="Notes_on_using_Staging_Committers"></a>Notes on using Staging Committers</h3>
<ol style="list-style-type: decimal">
<li>
<p>A deep partition tree can itself be a performance problem in S3 and the s3a client, or more specifically a problem with applications which use recursive directory tree walks to work with data.</p>
</li>
<li>
<p>The outcome if you have more than one job trying simultaneously to write data to the same destination with any policy other than &#x201c;append&#x201d; is undefined.</p>
</li>
<li>
<p>In the <code>append</code> operation, there is no check for conflict with file names. If the file <code>log-20170228.avro</code> in the example above already existed, it would be overwritten. Set <code>fs.s3a.committer.staging.unique-filenames</code> to <code>true</code> to ensure that a UUID is included in every filename to avoid this.</p>
</li>
</ol></section></section><section>
<h2><a name="Using_the_Magic_committer"></a>Using the Magic committer</h2>
<p>This is less mature than the Staging Committer, but promises higher performance.</p><section>
<h3><a name="FileSystem_client_setup"></a>FileSystem client setup</h3>
<p>The S3A connector can recognize files created under paths with <code>__magic/</code> as a parent directory. This allows it to handle those files in a special way, such as uploading to a different location and storing the information needed to complete pending multipart uploads.</p>
<p>Turn the magic on by setting <code>fs.s3a.committer.magic.enabled</code> to <code>true</code>:</p>
<div class="source">
<div class="source">
<pre>&lt;property&gt;
&lt;name&gt;fs.s3a.committer.magic.enabled&lt;/name&gt;
&lt;description&gt;
Enable support in the filesystem for the S3 &quot;Magic&quot; committer.
&lt;/description&gt;
&lt;value&gt;true&lt;/value&gt;
&lt;/property&gt;
</pre></div></div>
</section><section>
<h3><a name="Enabling_the_committer"></a>Enabling the committer</h3>
<p>Set the committer used by S3A&#x2019;s committer factory to <code>magic</code>:</p>
<div class="source">
<div class="source">
<pre>&lt;property&gt;
&lt;name&gt;fs.s3a.committer.name&lt;/name&gt;
&lt;value&gt;magic&lt;/value&gt;
&lt;/property&gt;
</pre></div></div>
<p>Conflict management is left to the execution engine itself.</p></section></section><section>
<h2><a name="Committer_Options_Reference"></a>Committer Options Reference</h2><section>
<h3><a name="Common_S3A_Committer_Options"></a>Common S3A Committer Options</h3>
<p>The table below provides a summary of each option.</p>
<table border="0" class="bodyTable">
<thead>
<tr class="a">
<th> Option </th>
<th> Meaning </th>
<th> Default </th></tr>
</thead><tbody>
<tr class="b">
<td> <code>mapreduce.fileoutputcommitter.marksuccessfuljobs</code> </td>
<td> Write a <code>_SUCCESS</code> file on the successful completion of the job. </td>
<td> <code>true</code> </td></tr>
<tr class="a">
<td> <code>fs.s3a.buffer.dir</code> </td>
<td> Local filesystem directory for data being written and/or staged. </td>
<td> <code>${env.LOCAL_DIRS:-${hadoop.tmp.dir}}/s3a</code> </td></tr>
<tr class="b">
<td> <code>fs.s3a.committer.magic.enabled</code> </td>
<td> Enable &#x201c;magic committer&#x201d; support in the filesystem. </td>
<td> <code>true</code> </td></tr>
<tr class="a">
<td> <code>fs.s3a.committer.abort.pending.uploads</code> </td>
<td> list and abort all pending uploads under the destination path when the job is committed or aborted. </td>
<td> <code>true</code> </td></tr>
<tr class="b">
<td> <code>fs.s3a.committer.threads</code> </td>
<td> Number of threads in committers for parallel operations on files.</td>
<td> -4 </td></tr>
<tr class="a">
<td> <code>fs.s3a.committer.generate.uuid</code> </td>
<td> Generate a Job UUID if none is passed down from Spark </td>
<td> <code>false</code> </td></tr>
<tr class="b">
<td> <code>fs.s3a.committer.require.uuid</code> </td>
<td>Require the Job UUID to be passed down from Spark </td>
<td> <code>false</code> </td></tr>
</tbody>
</table>
<p>The examples below shows how these options can be configured in XML.</p>
<div class="source">
<div class="source">
<pre>&lt;property&gt;
&lt;name&gt;fs.s3a.committer.name&lt;/name&gt;
&lt;value&gt;file&lt;/value&gt;
&lt;description&gt;
Committer to create for output to S3A, one of:
&quot;file&quot;, &quot;directory&quot;, &quot;partitioned&quot;, &quot;magic&quot;.
&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.s3a.committer.magic.enabled&lt;/name&gt;
&lt;value&gt;true&lt;/value&gt;
&lt;description&gt;
Enable support in the filesystem for the S3 &quot;Magic&quot; committer.
&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.s3a.committer.threads&lt;/name&gt;
&lt;value&gt;-4&lt;/value&gt;
&lt;description&gt;
Number of threads in committers for parallel operations on files
(upload, commit, abort, delete...).
Two thread pools this size are created, one for the outer
task-level parallelism, and one for parallel execution
within tasks (POSTs to commit individual uploads)
If the value is negative, it is inverted and then multiplied
by the number of cores in the CPU.
&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.s3a.committer.abort.pending.uploads&lt;/name&gt;
&lt;value&gt;true&lt;/value&gt;
&lt;description&gt;
Should the committers abort all pending uploads to the destination
directory?
Set to false if more than one job is writing to the same directory tree.
Was: &quot;fs.s3a.committer.staging.abort.pending.uploads&quot; when only used
by the staging committers.
&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;mapreduce.outputcommitter.factory.scheme.s3a&lt;/name&gt;
&lt;value&gt;org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory&lt;/value&gt;
&lt;description&gt;
The committer factory to use when writing data to S3A filesystems.
If mapreduce.outputcommitter.factory.class is set, it will
override this property.
(This property is set in mapred-default.xml)
&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.s3a.committer.require.uuid&lt;/name&gt;
&lt;value&gt;false&lt;/value&gt;
&lt;description&gt;
Require the committer fail to initialize if a unique ID is not set in
&quot;spark.sql.sources.writeJobUUID&quot; or &quot;fs.s3a.committer.uuid&quot;.
This helps guarantee that unique IDs for jobs are being
passed down in spark applications.
Setting this option outside of spark will stop the S3A committer
in job setup. In MapReduce workloads the job attempt ID is unique
and so no unique ID need be passed down.
&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.s3a.committer.generate.uuid&lt;/name&gt;
&lt;value&gt;false&lt;/value&gt;
&lt;description&gt;
Generate a Job UUID if none is passed down from Spark.
This uuid is only generated if the fs.s3a.committer.require.uuid flag
is false.
&lt;/description&gt;
&lt;/property&gt;
</pre></div></div>
</section><section>
<h3><a name="Staging_committer_.28Directory_and_Partitioned.29_options"></a>Staging committer (Directory and Partitioned) options</h3>
<table border="0" class="bodyTable">
<thead>
<tr class="a">
<th> Option </th>
<th> Meaning </th>
<th> Default </th></tr>
</thead><tbody>
<tr class="b">
<td> <code>fs.s3a.committer.staging.conflict-mode</code> </td>
<td> Conflict resolution: <code>fail</code>, <code>append</code>, or <code>replace</code>.</td>
<td> <code>append</code> </td></tr>
<tr class="a">
<td> <code>fs.s3a.committer.staging.tmp.path</code> </td>
<td> Path in the cluster filesystem for temporary data. </td>
<td> <code>tmp/staging</code> </td></tr>
<tr class="b">
<td> <code>fs.s3a.committer.staging.unique-filenames</code> </td>
<td> Generate unique filenames. </td>
<td> <code>true</code> </td></tr>
<tr class="a">
<td> <code>fs.s3a.committer.staging.abort.pending.uploads</code> </td>
<td> Deprecated; replaced by <code>fs.s3a.committer.abort.pending.uploads</code>. </td>
<td> <code>(false)</code> </td></tr>
</tbody>
</table>
<div class="source">
<div class="source">
<pre>&lt;property&gt;
&lt;name&gt;fs.s3a.committer.staging.tmp.path&lt;/name&gt;
&lt;value&gt;tmp/staging&lt;/value&gt;
&lt;description&gt;
Path in the cluster filesystem for temporary data.
This is for HDFS, not the local filesystem.
It is only for the summary data of each file, not the actual
data being committed.
Using an unqualified path guarantees that the full path will be
generated relative to the home directory of the user creating the job,
hence private (assuming home directory permissions are secure).
&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.s3a.committer.staging.unique-filenames&lt;/name&gt;
&lt;value&gt;true&lt;/value&gt;
&lt;description&gt;
Option for final files to have a unique name through job attempt info,
or the value of fs.s3a.committer.uuid.
When writing data with the &quot;append&quot; conflict option, this guarantees
that new data will not overwrite any existing data.
&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.s3a.committer.staging.conflict-mode&lt;/name&gt;
&lt;value&gt;append&lt;/value&gt;
&lt;description&gt;
Staging committer conflict resolution policy.
Supported: &quot;fail&quot;, &quot;append&quot;, &quot;replace&quot;.
&lt;/description&gt;
&lt;/property&gt;
</pre></div></div>
</section><section>
<h3><a name="Disabling_magic_committer_path_rewriting"></a>Disabling magic committer path rewriting</h3>
<p>The magic committer recognizes when files are created under paths with <code>__magic/</code> as a parent directory and redirects the upload to a different location, adding the information needed to complete the upload in the job commit operation.</p>
<p>If, for some reason, you <i>do not</i> want these paths to be redirected and completed later, the feature can be disabled by setting <code>fs.s3a.committer.magic.enabled</code> to false. By default, it is enabled.</p>
<div class="source">
<div class="source">
<pre>&lt;property&gt;
&lt;name&gt;fs.s3a.committer.magic.enabled&lt;/name&gt;
&lt;value&gt;true&lt;/value&gt;
&lt;description&gt;
Enable support in the S3A filesystem for the &quot;Magic&quot; committer.
&lt;/description&gt;
&lt;/property&gt;
</pre></div></div>
<p>You will not be able to use the Magic Committer if this option is disabled.</p></section></section><section>
<h2><a name="Concurrent_Jobs_writing_to_the_same_destination"></a><a name="concurrent-jobs"></a> Concurrent Jobs writing to the same destination</h2>
<p>It is sometimes possible for multiple jobs to simultaneously write to the same destination path.</p>
<p>Before attempting this, the committers must be set to not delete all incomplete uploads on job commit, by setting <code>fs.s3a.committer.abort.pending.uploads</code> to <code>false</code></p>
<div class="source">
<div class="source">
<pre>&lt;property&gt;
&lt;name&gt;fs.s3a.committer.abort.pending.uploads&lt;/name&gt;
&lt;value&gt;false&lt;/value&gt;
&lt;/property&gt;
</pre></div></div>
<p>If more than one job is writing to the same destination path then every task MUST be creating files with paths/filenames unique to the specific job. It is not enough for them to be unique by task <code>part-00000.snappy.parquet</code>, because each job will have tasks with the same name, so generate files with conflicting operations.</p>
<p>For the staging committers, enable <code>fs.s3a.committer.staging.unique-filenames</code> to ensure unique names are generated during the upload. Otherwise, use what configuration options are available in the specific <code>FileOutputFormat</code>.</p>
<p>Note: by default, the option <code>mapreduce.output.basename</code> sets the base name for files; changing that from the default <code>part</code> value to something unique for each job may achieve this.</p>
<p>For example, for any job executed through Hadoop MapReduce, the Job ID can be used in the filename.</p>
<div class="source">
<div class="source">
<pre>&lt;property&gt;
&lt;name&gt;mapreduce.output.basename&lt;/name&gt;
&lt;value&gt;part-${mapreduce.job.id}&lt;/value&gt;
&lt;/property&gt;
</pre></div></div>
<p>Even with these settings, the outcome of concurrent jobs to the same destination is inherently nondeterministic -use with caution.</p></section><section>
<h2><a name="Troubleshooting"></a>Troubleshooting</h2><section>
<h3><a name="Filesystem_does_not_have_support_for_.27magic.27_committer"></a><code>Filesystem does not have support for 'magic' committer</code></h3>
<div class="source">
<div class="source">
<pre>org.apache.hadoop.fs.s3a.commit.PathCommitException: `s3a://landsat-pds': Filesystem does not have support for 'magic' committer enabled
in configuration option fs.s3a.committer.magic.enabled
</pre></div></div>
<p>The Job is configured to use the magic committer, but the S3A bucket has not been explicitly declared as supporting it.</p>
<p>Magic Committer support within the S3A filesystem has been enabled by default since Hadoop 3.3.1. This error will only surface with a configuration which has explicitly disabled it. Remove all global/per-bucket declarations of <code>fs.s3a.bucket.magic.enabled</code> or set them to <code>true</code>.</p>
<div class="source">
<div class="source">
<pre>&lt;property&gt;
&lt;name&gt;fs.s3a.bucket.landsat-pds.committer.magic.enabled&lt;/name&gt;
&lt;value&gt;true&lt;/value&gt;
&lt;/property&gt;
</pre></div></div>
<p>Tip: you can verify that a bucket supports the magic committer through the <code>hadoop s3guard bucket-info</code> command:</p>
<div class="source">
<div class="source">
<pre>&gt; hadoop s3guard bucket-info -magic s3a://landsat-pds/
Location: us-west-2
S3A Client
Signing Algorithm: fs.s3a.signing-algorithm=(unset)
Endpoint: fs.s3a.endpoint=s3.amazonaws.com
Encryption: fs.s3a.encryption.algorithm=none
Input seek policy: fs.s3a.experimental.input.fadvise=normal
Change Detection Source: fs.s3a.change.detection.source=etag
Change Detection Mode: fs.s3a.change.detection.mode=server
S3A Committers
The &quot;magic&quot; committer is supported in the filesystem
S3A Committer factory class: mapreduce.outputcommitter.factory.scheme.s3a=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
S3A Committer name: fs.s3a.committer.name=magic
Store magic committer integration: fs.s3a.committer.magic.enabled=true
Security
Delegation token support is disabled
Directory Markers
The directory marker policy is &quot;delete&quot;
Available Policies: delete, keep, authoritative
Authoritative paths: fs.s3a.authoritative.path=```
</pre></div></div>
</section><section>
<h3><a name="Error_message:_.E2.80.9CFile_being_created_has_a_magic_path.2C_but_the_filesystem_has_magic_file_support_disabled.E2.80.9D"></a>Error message: &#x201c;File being created has a magic path, but the filesystem has magic file support disabled&#x201d;</h3>
<p>A file is being written to a path which is used for &#x201c;magic&#x201d; files, files which are actually written to a different destination than their stated path <i>but the filesystem doesn&#x2019;t support &#x201c;magic&#x201d; files</i></p>
<p>This message should not appear through the committer itself &#x2014;it will fail with the error message in the previous section, but may arise if other applications are attempting to create files under the path <code>/__magic/</code>.</p></section><section>
<h3><a name="FileOutputCommitter_appears_to_be_still_used_.28from_logs_or_delays_in_commits.29"></a><code>FileOutputCommitter</code> appears to be still used (from logs or delays in commits)</h3>
<p>The Staging committers use the original <code>FileOutputCommitter</code> to manage the propagation of commit information: do not worry if it the logs show <code>FileOutputCommitter</code> work with data in the cluster filesystem (e.g. HDFS).</p>
<p>One way to make sure that the <code>FileOutputCommitter</code> is not being used to write the data to S3 is to set the option <code>mapreduce.fileoutputcommitter.algorithm.version</code> to a value such as &#x201c;10&#x201d;. Because the only supported algorithms are &#x201c;1&#x201d; and &#x201c;2&#x201d;, any erroneously created <code>FileOutputCommitter</code> will raise an exception in its constructor when instantiated:</p>
<div class="source">
<div class="source">
<pre>java.io.IOException: Only 1 or 2 algorithm version is supported
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.&lt;init&gt;(FileOutputCommitter.java:130)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.&lt;init&gt;(FileOutputCommitter.java:104)
at org.apache.parquet.hadoop.ParquetOutputCommitter.&lt;init&gt;(ParquetOutputCommitter.java:42)
at org.apache.parquet.hadoop.ParquetOutputFormat.getOutputCommitter(ParquetOutputFormat.java:395)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupCommitter(HadoopMapReduceCommitProtocol.scala:67)
at com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol.setupCommitter(PathOutputCommitProtocol.scala:62)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:124)
at com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol.setupJob(PathOutputCommitProtocol.scala:152)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:175)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
</pre></div></div>
<p>While that will not make the problem go away, it will at least make the failure happen at the start of a job.</p>
<p>(Setting this option will not interfere with the Staging Committers&#x2019; use of HDFS, as it explicitly sets the algorithm to &#x201c;2&#x201d; for that part of its work).</p>
<p>The other way to check which committer was used is to examine the <code>_SUCCESS</code> file. If it is 0-bytes long, the classic <code>FileOutputCommitter</code> committed the job. The S3A committers all write a non-empty JSON file; the <code>committer</code> field lists the committer used.</p>
<p><i>Common causes</i></p>
<ol style="list-style-type: decimal">
<li>The property <code>fs.s3a.committer.name</code> is set to &#x201c;file&#x201d;. Fix: change.</li>
<li>The job has overridden the property <code>mapreduce.outputcommitter.factory.class</code> with a new factory class for all committers. This takes priority over all committers registered for the <a class="externalLink" href="s3a://">s3a://</a> schema.</li>
<li>The property <code>mapreduce.outputcommitter.factory.scheme.s3a</code> is unset.</li>
<li>The output format has overridden <code>FileOutputFormat.getOutputCommitter()</code> and is returning its own committer -one which is a subclass of <code>FileOutputCommitter</code>.</li>
</ol>
<p>The final cause &#x201c;the output format is returning its own committer&#x201d; is not easily fixed; it may be that the custom committer performs critical work during its lifecycle, and contains assumptions about the state of the written data during task and job commit (i.e. it is in the destination filesystem). Consult with the authors/maintainers of the output format to see whether it would be possible to integrate with the new committer factory mechanism and object-store-specific commit algorithms.</p>
<p>Parquet is a special case here: its committer does no extra work other than add the option to read all newly-created files then write a schema summary. The Spark integration has explicit handling for Parquet to enable it to support the new committers, removing this (slow on S3) option.</p>
<p>If you have subclassed <code>FileOutputCommitter</code> and want to move to the factory model, please get in touch.</p></section><section>
<h3><a name="Job.2FTask_fails_with_PathExistsException:_Destination_path_exists_and_committer_conflict_resolution_mode_is_.E2.80.9Cfail.E2.80.9D"></a>Job/Task fails with PathExistsException: Destination path exists and committer conflict resolution mode is &#x201c;fail&#x201d;</h3>
<p>This surfaces when either of two conditions are met.</p>
<ol style="list-style-type: decimal">
<li>The Directory committer is used with <code>fs.s3a.committer.staging.conflict-mode</code> set to <code>fail</code> and the output/destination directory exists. The job will fail in the driver during job setup.</li>
<li>The Partitioned Committer is used with <code>fs.s3a.committer.staging.conflict-mode</code> set to <code>fail</code> and one of the partitions exist. The specific task(s) generating conflicting data will fail during task commit, which will cause the entire job to fail.</li>
</ol>
<p>If you are trying to write data and want write conflicts to be rejected, this is the correct behavior: there was data at the destination so the job was aborted.</p></section><section>
<h3><a name="Staging_committer_task_fails_with_IOException:_No_space_left_on_device"></a>Staging committer task fails with IOException: No space left on device</h3>
<p>There&#x2019;s not enough space on the local hard disk (real or virtual) to store all the uncommitted data of the active tasks on that host. Because the staging committers write all output to the local disk and only upload the data on task commits, enough local temporary storage is needed to store all output generated by all uncommitted tasks running on the single host. Small EC2 VMs may run out of disk.</p>
<ol style="list-style-type: decimal">
<li>
<p>Make sure that <code>fs.s3a.buffer.dir</code> includes a temporary directory on every available hard disk; this spreads load better.</p>
</li>
<li>
<p>Add more disk space. In EC2: request instances with more local storage. There is no need for EMR storage; this is just for temporary data.</p>
</li>
<li>
<p>Purge the directories listed in <code>fs.s3a.buffer.dir</code> of old data. Failed tasks may not clean up all old files.</p>
</li>
<li>
<p>Reduce the number of worker threads/process in the host.</p>
</li>
<li>
<p>Consider partitioning the job into more tasks. This <i>may</i> result in more tasks generating less data each.</p>
</li>
<li>
<p>Use the magic committer. This only needs enough disk storage to buffer blocks of the currently being written file during their upload process, so can use a lot less disk space.</p>
</li>
</ol></section><section>
<h3><a name="Jobs_run_with_directory.2Fpartitioned_committers_complete_but_the_output_is_empty."></a>Jobs run with directory/partitioned committers complete but the output is empty.</h3>
<p>Make sure that <code>fs.s3a.committer.staging.tmp.path</code> is set to a path on the shared cluster filesystem (usually HDFS). It MUST NOT be set to a local directory, as then the job committer, running on a different host <i>will not see the lists of pending uploads to commit</i>.</p></section><section>
<h3><a name="Magic_output_committer_task_fails_.E2.80.9CThe_specified_upload_does_not_exist.E2.80.9D_.E2.80.9CError_Code:_NoSuchUpload.E2.80.9D"></a>Magic output committer task fails &#x201c;The specified upload does not exist&#x201d; &#x201c;Error Code: NoSuchUpload&#x201d;</h3>
<p>The magic committer is being used and a task writing data to the S3 store fails with an error message about the upload not existing.</p>
<div class="source">
<div class="source">
<pre>java.io.FileNotFoundException: upload part #1 upload
YWHTRqBaxlsutujKYS3eZHfdp6INCNXbk0JVtydX_qzL5fZcoznxRbbBZRfswOjomddy3ghRyguOqywJTfGG1Eq6wOW2gitP4fqWrBYMroasAygkmXNYF7XmUyFHYzja
on test/ITestMagicCommitProtocol-testParallelJobsToSameDestPaths/part-m-00000:
com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not
exist. The upload ID may be invalid, or the upload may have been aborted or
completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload;
Request ID: EBE6A0C9F8213AC3; S3 Extended Request ID:
cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=),
S3 Extended Request ID:
cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=:NoSuchUpload
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:259)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:112)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:315)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:407)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:311)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:286)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:154)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:590)
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.lambda$uploadBlockAsync$0(S3ABlockOutputStream.java:652)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception:
The specified upload does not exist.
The upload ID may be invalid, or the upload may have been aborted or completed.
(Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: EBE6A0C9F8213AC3; S3 Extended Request ID:
cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=),
S3 Extended Request ID: cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4920)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4866)
at com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3715)
at com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3700)
at org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:2343)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:594)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:110)
... 15 more
</pre></div></div>
<p>The block write failed because the previously created upload was aborted before the data could be written.</p>
<p>Causes</p>
<ol style="list-style-type: decimal">
<li>Another job has written to the same directory tree with an S3A committer -and when that job was committed, all incomplete uploads were aborted.</li>
<li>The <code>hadoop s3guard uploads --abort</code> command has being called on/above the directory.</li>
<li>Some other program is cancelling uploads to that bucket/path under it.</li>
<li>The job is lasting over 24h and a bucket lifecycle policy is aborting the uploads.</li>
</ol>
<p>The <code>_SUCCESS</code> file from the previous job may provide diagnostics.</p>
<p>If the cause is Concurrent Jobs, see <a href="#concurrent-jobs">Concurrent Jobs writing to the same destination</a>.</p></section><section>
<h3><a name="Job_commit_fails_.E2.80.9Cjava.io.FileNotFoundException:_Completing_multipart_upload.E2.80.9D_.E2.80.9CThe_specified_upload_does_not_exist.E2.80.9D"></a>Job commit fails &#x201c;java.io.FileNotFoundException: Completing multipart upload&#x201d; &#x201c;The specified upload does not exist&#x201d;</h3>
<p>The job commit fails with an error about the specified upload not existing.</p>
<div class="source">
<div class="source">
<pre>java.io.FileNotFoundException: Completing multipart upload on
test/DELAY_LISTING_ME/ITestDirectoryCommitProtocol-testParallelJobsToSameDestPaths/part-m-00001:
com.amazonaws.services.s3.model.AmazonS3Exception:
The specified upload does not exist.
The upload ID may be invalid, or the upload may have been aborted or completed.
(Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload;
Request ID: 8E6173241D2970CB; S3 Extended Request ID:
Pg6x75Q60UrbSJgfShCFX7czFTZAHR1Cy7W0Kh+o1uj60CG9jw7hL40tSa+wa7BRLbaz3rhX8Ds=),
S3 Extended Request ID:
Pg6x75Q60UrbSJgfShCFX7czFTZAHR1Cy7W0Kh+o1uj60CG9jw7hL40tSa+wa7BRLbaz3rhX8Ds=:NoSuchUpload
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:259)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:112)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:315)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:407)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:311)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:261)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.commitUpload(WriteOperationHelper.java:549)
at org.apache.hadoop.fs.s3a.commit.CommitOperations.innerCommit(CommitOperations.java:199)
at org.apache.hadoop.fs.s3a.commit.CommitOperations.commit(CommitOperations.java:168)
at org.apache.hadoop.fs.s3a.commit.CommitOperations.commitOrFail(CommitOperations.java:144)
at org.apache.hadoop.fs.s3a.commit.CommitOperations.access$100(CommitOperations.java:74)
at org.apache.hadoop.fs.s3a.commit.CommitOperations$CommitContext.commitOrFail(CommitOperations.java:612)
at org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.lambda$loadAndCommit$5(AbstractS3ACommitter.java:535)
at org.apache.hadoop.fs.s3a.commit.Tasks$Builder.runSingleThreaded(Tasks.java:164)
at org.apache.hadoop.fs.s3a.commit.Tasks$Builder.run(Tasks.java:149)
at org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.loadAndCommit(AbstractS3ACommitter.java:534)
at org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.lambda$commitPendingUploads$2(AbstractS3ACommitter.java:482)
at org.apache.hadoop.fs.s3a.commit.Tasks$Builder$1.run(Tasks.java:253)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist.
The upload ID may be invalid, or the upload may have been aborted or completed.
(Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: 8E6173241D2970CB;
S3 Extended Request ID: Pg6x75Q60UrbSJgfShCFX7czFTZAHR1Cy7W0Kh+o1uj60CG9jw7hL40tSa+wa7BRLbaz3rhX8Ds=),
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4920)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4866)
at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:3464)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$finalizeMultipartUpload$1(WriteOperationHelper.java:267)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:110)
</pre></div></div>
<p>The problem is likely to be that of the previous one: concurrent jobs are writing the same output directory, or another program has cancelled all pending uploads.</p>
<p>See <a href="#concurrent-jobs">Concurrent Jobs writing to the same destination</a>.</p></section><section>
<h3><a name="Job_commit_fails_java.io.FileNotFoundException_.E2.80.9CFile_hdfs:.2F.2F....2Fstaging-uploads.2F_temporary.2F0_does_not_exist.E2.80.9D"></a>Job commit fails <code>java.io.FileNotFoundException</code> &#x201c;File <a class="externalLink" href="hdfs://.../staging-uploads/_temporary/0">hdfs://.../staging-uploads/_temporary/0</a> does not exist&#x201d;</h3>
<p>The Staging committer will fail in job commit if the intermediate directory on the cluster FS is missing during job commit.</p>
<p>This is possible if another job used the same staging upload directory and, after committing its work, it deleted the directory.</p>
<p>A unique Job ID is required for each spark job run by a specific user. Spark generates job IDs for its committers using the current timestamp, and if two jobs/stages are started in the same second, they will have the same job ID.</p>
<p>See <a class="externalLink" href="https://issues.apache.org/jira/browse/SPARK-33230">SPARK-33230</a>.</p>
<p>This is fixed in all spark releases which have the patch applied.</p>
<p>You can set the property <code>fs.s3a.committer.staging.require.uuid</code> to fail the staging committers fast if a unique Job ID isn&#x2019;t found in <code>spark.sql.sources.writeJobUUID</code>.</p></section><section>
<h3><a name="Job_setup_fails_Job.2Ftask_context_does_not_contain_a_unique_ID_in_spark.sql.sources.writeJobUUID"></a>Job setup fails <code>Job/task context does not contain a unique ID in spark.sql.sources.writeJobUUID</code></h3>
<p>This will surface in job setup if the option <code>fs.s3a.committer.require.uuid</code> is <code>true</code>, and one of the following conditions are met</p>
<ol style="list-style-type: decimal">
<li>The committer is being used in a Hadoop MapReduce job, whose job attempt ID is unique -there is no need to add this requirement. Fix: unset <code>fs.s3a.committer.require.uuid</code>.</li>
<li>The committer is being used in spark, and the version of spark being used does not set the <code>spark.sql.sources.writeJobUUID</code> property. Either upgrade to a new spark release, or set <code>fs.s3a.committer.generate.uuid</code> to true.</li>
</ol></section></section>
</div>
</div>
<div class="clear">
<hr/>
</div>
<div id="footer">
<div class="xright">
&#169; 2008-2023
Apache Software Foundation
- <a href="http://maven.apache.org/privacy-policy.html">Privacy Policy</a>.
Apache Maven, Maven, Apache, the Apache feather logo, and the Apache Maven project logos are trademarks of The Apache Software Foundation.
</div>
<div class="clear">
<hr/>
</div>
</div>
</body>
</html>