hadoop/hadoop-mapreduce-client/hadoop-mapreduce-client-core/manifest_committer_protocol...

972 lines
58 KiB
HTML

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<!--
| Generated by Apache Maven Doxia at 2023-03-25
| Rendered using Apache Maven Stylus Skin 1.5
-->
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<title>Apache Hadoop 3.4.0-SNAPSHOT &#x2013; Manifest Committer Protocol</title>
<style type="text/css" media="all">
@import url("./css/maven-base.css");
@import url("./css/maven-theme.css");
@import url("./css/site.css");
</style>
<link rel="stylesheet" href="./css/print.css" type="text/css" media="print" />
<meta name="Date-Revision-yyyymmdd" content="20230325" />
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
</head>
<body class="composite">
<div id="banner">
<a href="http://hadoop.apache.org/" id="bannerLeft">
<img src="http://hadoop.apache.org/images/hadoop-logo.jpg" alt="" />
</a>
<a href="http://www.apache.org/" id="bannerRight">
<img src="http://www.apache.org/images/asf_logo_wide.png" alt="" />
</a>
<div class="clear">
<hr/>
</div>
</div>
<div id="breadcrumbs">
<div class="xright"> <a href="http://wiki.apache.org/hadoop" class="externalLink">Wiki</a>
|
<a href="https://gitbox.apache.org/repos/asf/hadoop.git" class="externalLink">git</a>
|
<a href="http://hadoop.apache.org/" class="externalLink">Apache Hadoop</a>
&nbsp;| Last Published: 2023-03-25
&nbsp;| Version: 3.4.0-SNAPSHOT
</div>
<div class="clear">
<hr/>
</div>
</div>
<div id="leftColumn">
<div id="navcolumn">
<h5>General</h5>
<ul>
<li class="none">
<a href="../../index.html">Overview</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/SingleCluster.html">Single Node Setup</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/ClusterSetup.html">Cluster Setup</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/CommandsManual.html">Commands Reference</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/FileSystemShell.html">FileSystem Shell</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/Compatibility.html">Compatibility Specification</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/DownstreamDev.html">Downstream Developer's Guide</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/AdminCompatibilityGuide.html">Admin Compatibility Guide</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/InterfaceClassification.html">Interface Classification</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/filesystem/index.html">FileSystem Specification</a>
</li>
</ul>
<h5>Common</h5>
<ul>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/CLIMiniCluster.html">CLI Mini Cluster</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/FairCallQueue.html">Fair Call Queue</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/NativeLibraries.html">Native Libraries</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/Superusers.html">Proxy User</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/RackAwareness.html">Rack Awareness</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/SecureMode.html">Secure Mode</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/ServiceLevelAuth.html">Service Level Authorization</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/HttpAuthentication.html">HTTP Authentication</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/CredentialProviderAPI.html">Credential Provider API</a>
</li>
<li class="none">
<a href="../../hadoop-kms/index.html">Hadoop KMS</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/Tracing.html">Tracing</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/UnixShellGuide.html">Unix Shell Guide</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/registry/index.html">Registry</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/AsyncProfilerServlet.html">Async Profiler</a>
</li>
</ul>
<h5>HDFS</h5>
<ul>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HdfsDesign.html">Architecture</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html">User Guide</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HDFSCommands.html">Commands Reference</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html">NameNode HA With QJM</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html">NameNode HA With NFS</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html">Observer NameNode</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/Federation.html">Federation</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/ViewFs.html">ViewFs</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/ViewFsOverloadScheme.html">ViewFsOverloadScheme</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HdfsSnapshots.html">Snapshots</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HdfsEditsViewer.html">Edits Viewer</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HdfsImageViewer.html">Image Viewer</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html">Permissions and HDFS</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HdfsQuotaAdminGuide.html">Quotas and HDFS</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/LibHdfs.html">libhdfs (C API)</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/WebHDFS.html">WebHDFS (REST API)</a>
</li>
<li class="none">
<a href="../../hadoop-hdfs-httpfs/index.html">HttpFS</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/ShortCircuitLocalReads.html">Short Circuit Local Reads</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html">Centralized Cache Management</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HdfsNfsGateway.html">NFS Gateway</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HdfsRollingUpgrade.html">Rolling Upgrade</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/ExtendedAttributes.html">Extended Attributes</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html">Transparent Encryption</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HdfsMultihoming.html">Multihoming</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html">Storage Policies</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/MemoryStorage.html">Memory Storage Support</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/SLGUserGuide.html">Synthetic Load Generator</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html">Erasure Coding</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HDFSDiskbalancer.html">Disk Balancer</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HdfsUpgradeDomain.html">Upgrade Domain</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HdfsDataNodeAdminGuide.html">DataNode Admin</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs-rbf/HDFSRouterFederation.html">Router Federation</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/HdfsProvidedStorage.html">Provided Storage</a>
</li>
</ul>
<h5>MapReduce</h5>
<ul>
<li class="none">
<a href="../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html">Tutorial</a>
</li>
<li class="none">
<a href="../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredCommands.html">Commands Reference</a>
</li>
<li class="none">
<a href="../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduce_Compatibility_Hadoop1_Hadoop2.html">Compatibility with 1.x</a>
</li>
<li class="none">
<a href="../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html">Encrypted Shuffle</a>
</li>
<li class="none">
<a href="../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/PluggableShuffleAndPluggableSort.html">Pluggable Shuffle/Sort</a>
</li>
<li class="none">
<a href="../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/DistributedCacheDeploy.html">Distributed Cache Deploy</a>
</li>
<li class="none">
<a href="../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/SharedCacheSupport.html">Support for YARN Shared Cache</a>
</li>
</ul>
<h5>MapReduce REST APIs</h5>
<ul>
<li class="none">
<a href="../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredAppMasterRest.html">MR Application Master</a>
</li>
<li class="none">
<a href="../../hadoop-mapreduce-client/hadoop-mapreduce-client-hs/HistoryServerRest.html">MR History Server</a>
</li>
</ul>
<h5>YARN</h5>
<ul>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/YARN.html">Architecture</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/YarnCommands.html">Commands Reference</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html">Capacity Scheduler</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/FairScheduler.html">Fair Scheduler</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/ResourceManagerRestart.html">ResourceManager Restart</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/ResourceManagerHA.html">ResourceManager HA</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/ResourceModel.html">Resource Model</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/NodeLabel.html">Node Labels</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/NodeAttributes.html">Node Attributes</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/WebApplicationProxy.html">Web Application Proxy</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/TimelineServer.html">Timeline Server</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/TimelineServiceV2.html">Timeline Service V.2</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html">Writing YARN Applications</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html">YARN Application Security</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/NodeManager.html">NodeManager</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/DockerContainers.html">Running Applications in Docker Containers</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/RuncContainers.html">Running Applications in runC Containers</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html">Using CGroups</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/SecureContainer.html">Secure Containers</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/ReservationSystem.html">Reservation System</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/GracefulDecommission.html">Graceful Decommission</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/OpportunisticContainers.html">Opportunistic Containers</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/Federation.html">YARN Federation</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/SharedCache.html">Shared Cache</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/UsingGpus.html">Using GPU</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/UsingFPGA.html">Using FPGA</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html">Placement Constraints</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/YarnUI2.html">YARN UI2</a>
</li>
</ul>
<h5>YARN REST APIs</h5>
<ul>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html">Introduction</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html">Resource Manager</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/NodeManagerRest.html">Node Manager</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/TimelineServer.html#Timeline_Server_REST_API_v1">Timeline Server</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/TimelineServiceV2.html#Timeline_Service_v.2_REST_API">Timeline Service V.2</a>
</li>
</ul>
<h5>YARN Service</h5>
<ul>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/yarn-service/Overview.html">Overview</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/yarn-service/QuickStart.html">QuickStart</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/yarn-service/Concepts.html">Concepts</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/yarn-service/YarnServiceAPI.html">Yarn Service API</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/yarn-service/ServiceDiscovery.html">Service Discovery</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-site/yarn-service/SystemServices.html">System Services</a>
</li>
</ul>
<h5>Hadoop Compatible File Systems</h5>
<ul>
<li class="none">
<a href="../../hadoop-aliyun/tools/hadoop-aliyun/index.html">Aliyun OSS</a>
</li>
<li class="none">
<a href="../../hadoop-aws/tools/hadoop-aws/index.html">Amazon S3</a>
</li>
<li class="none">
<a href="../../hadoop-azure/index.html">Azure Blob Storage</a>
</li>
<li class="none">
<a href="../../hadoop-azure-datalake/index.html">Azure Data Lake Storage</a>
</li>
<li class="none">
<a href="../../hadoop-cos/cloud-storage/index.html">Tencent COS</a>
</li>
<li class="none">
<a href="../../hadoop-huaweicloud/cloud-storage/index.html">Huaweicloud OBS</a>
</li>
</ul>
<h5>Auth</h5>
<ul>
<li class="none">
<a href="../../hadoop-auth/index.html">Overview</a>
</li>
<li class="none">
<a href="../../hadoop-auth/Examples.html">Examples</a>
</li>
<li class="none">
<a href="../../hadoop-auth/Configuration.html">Configuration</a>
</li>
<li class="none">
<a href="../../hadoop-auth/BuildingIt.html">Building</a>
</li>
</ul>
<h5>Tools</h5>
<ul>
<li class="none">
<a href="../../hadoop-streaming/HadoopStreaming.html">Hadoop Streaming</a>
</li>
<li class="none">
<a href="../../hadoop-archives/HadoopArchives.html">Hadoop Archives</a>
</li>
<li class="none">
<a href="../../hadoop-archive-logs/HadoopArchiveLogs.html">Hadoop Archive Logs</a>
</li>
<li class="none">
<a href="../../hadoop-distcp/DistCp.html">DistCp</a>
</li>
<li class="none">
<a href="../../hadoop-federation-balance/HDFSFederationBalance.html">HDFS Federation Balance</a>
</li>
<li class="none">
<a href="../../hadoop-gridmix/GridMix.html">GridMix</a>
</li>
<li class="none">
<a href="../../hadoop-rumen/Rumen.html">Rumen</a>
</li>
<li class="none">
<a href="../../hadoop-resourceestimator/ResourceEstimator.html">Resource Estimator Service</a>
</li>
<li class="none">
<a href="../../hadoop-sls/SchedulerLoadSimulator.html">Scheduler Load Simulator</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/Benchmarking.html">Hadoop Benchmarking</a>
</li>
<li class="none">
<a href="../../hadoop-dynamometer/Dynamometer.html">Dynamometer</a>
</li>
</ul>
<h5>Reference</h5>
<ul>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/release/">Changelog and Release Notes</a>
</li>
<li class="none">
<a href="../../api/index.html">Java API docs</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/UnixShellAPI.html">Unix Shell API</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/Metrics.html">Metrics</a>
</li>
</ul>
<h5>Configuration</h5>
<ul>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/core-default.xml">core-default.xml</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs/hdfs-default.xml">hdfs-default.xml</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-hdfs-rbf/hdfs-rbf-default.xml">hdfs-rbf-default.xml</a>
</li>
<li class="none">
<a href="../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml">mapred-default.xml</a>
</li>
<li class="none">
<a href="../../hadoop-yarn/hadoop-yarn-common/yarn-default.xml">yarn-default.xml</a>
</li>
<li class="none">
<a href="../../hadoop-kms/kms-default.html">kms-default.xml</a>
</li>
<li class="none">
<a href="../../hadoop-hdfs-httpfs/httpfs-default.html">httpfs-default.xml</a>
</li>
<li class="none">
<a href="../../hadoop-project-dist/hadoop-common/DeprecatedProperties.html">Deprecated Properties</a>
</li>
</ul>
<a href="http://maven.apache.org/" title="Built by Maven" class="poweredBy">
<img alt="Built by Maven" src="./images/logos/maven-feather.png"/>
</a>
</div>
</div>
<div id="bodyColumn">
<div id="contentBox">
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<h1>Manifest Committer Protocol</h1>
<p>This document describes the commit protocol of the <a href="manifest_committer.html">Manifest Committer</a></p>
<ul>
<li><a href="#Background">Background</a>
<ul>
<li><a href="#Terminology">Terminology</a></li></ul></li>
<li><a href="#File_Output_Committer_V1_and_V2">File Output Committer V1 and V2</a>
<ul>
<li><a href="#File_Output_Committer_V1_and_V2_Commit_algorithms">File Output Committer V1 and V2 Commit algorithms</a></li>
<li><a href="#MapReduce_V1_algorithm:">MapReduce V1 algorithm:</a></li>
<li><a href="#MapReduce_V2_algorithm:">MapReduce V2 algorithm:</a></li>
<li><a href="#Why_the_V2_committer_is_incorrect.2Funsafe">Why the V2 committer is incorrect/unsafe</a></li></ul></li>
<li><a href="#Background:_the_S3A_Committers">Background: the S3A Committers</a>
<ul>
<li><a href="#Staging_Committer">Staging Committer</a></li>
<li><a href="#The_Magic_Committer">The Magic Committer</a></li>
<li><a href="#Correctness">Correctness</a></li></ul></li>
<li><a href="#The_V1_committer:_slow_in_Azure_and_slow_and_unsafe_on_GCS.">The V1 committer: slow in Azure and slow and unsafe on GCS.</a></li>
<li><a href="#Requirements_of_the_Store">Requirements of the Store</a>
<ul>
<li><a href="#Task_and_Job_IDs">Task and Job IDs</a></li></ul></li>
<li><a href="#Directory_Structure">Directory Structure</a></li>
<li><a href="#Core_Algorithm_of_the_Protocol">Core Algorithm of the Protocol</a>
<ul>
<li><a href="#The_Intermediate_Manifest">The Intermediate Manifest</a></li>
<li><a href="#Job_Setup">Job Setup</a></li>
<li><a href="#Task_Setup">Task Setup</a></li>
<li><a href="#Task_Commit">Task Commit</a></li>
<li><a href="#Task_Abort.2Fcleanup">Task Abort/cleanup</a></li>
<li><a href="#Job_Commit">Job Commit</a></li>
<li><a href="#Job_Abort.2Fcleanup">Job Abort/cleanup</a></li></ul></li>
<li><a href="#Benefits_of_the_new_protocol">Benefits of the new protocol</a></li>
<li><a href="#Disadvantages_of_the_new_protocol_compared_to_the_v1_algorithm">Disadvantages of the new protocol compared to the v1 algorithm</a></li></ul>
<section>
<h2><a name="Background"></a>Background</h2><section>
<h3><a name="Terminology"></a>Terminology</h3>
<table border="0" class="bodyTable">
<thead>
<tr class="a">
<th> Term </th>
<th> Meaning</th></tr>
</thead><tbody>
<tr class="b">
<td> Committer </td>
<td> A class which can be invoked by MR Spark to perform the task and job commit operations. </td></tr>
<tr class="a">
<td> Spark Driver </td>
<td> The spark process scheduling the work and choreographing the commit operation.</td></tr>
<tr class="b">
<td> Job: in MapReduce </td>
<td> The entire application. In spark, this is a single stage in a chain of work </td></tr>
<tr class="a">
<td> Job Attempt </td>
<td> A single attempt at a job. MR supports multiple Job attempts with recovery on partial job failure. Spark says &#x201c;start again from scratch&#x201d; </td></tr>
<tr class="b">
<td> Task </td>
<td> a subsection of a job, such as processing one file, or one part of a file </td></tr>
<tr class="a">
<td> Task ID </td>
<td> ID of the task, unique within this job. Usually starts at 0 and is used in filenames (part-0000, part-001, etc.) </td></tr>
<tr class="b">
<td> Task attempt (TA) </td>
<td> An attempt to perform a task. It may fail, in which case MR/spark will schedule another. </td></tr>
<tr class="a">
<td> Task Attempt ID </td>
<td> A unique ID for the task attempt. The Task ID + an attempt counter.</td></tr>
<tr class="b">
<td> Destination directory </td>
<td> The final destination of work.</td></tr>
<tr class="a">
<td> Job Attempt Directory </td>
<td> A temporary directory used by the job attempt. This is always <i>underneath</i> the destination directory, so as to ensure it is in the same encryption zone as HDFS, storage volume in other filesystems, etc.</td></tr>
<tr class="b">
<td> Task Attempt directory </td>
<td> Directory under the Job Attempt Directory where task attempts create subdiretories for their own work </td></tr>
<tr class="a">
<td> Task Attempt Working Directory</td>
<td> Directory exclusive for each task attempt under which files are written </td></tr>
<tr class="b">
<td> Task Commit </td>
<td> Taking the output of a Task Attempt and making it the final/exclusive result of that &#x201c;successful&#x201d; Task. </td></tr>
<tr class="a">
<td> Job Commit </td>
<td> aggregating all the outputs of all committed tasks and producing the final results of the job. </td></tr>
</tbody>
</table>
<p>The purpose of a committer is to ensure that the complete output of a job ends up in the destination, even in the presence of failures of tasks.</p>
<ul>
<li><i>Complete:</i> the output includes the work of all successful tasks.</li>
<li><i>Exclusive:</i> the output of unsuccessful tasks is not present.</li>
<li><i>Concurrent:</i> When multiple tasks are committed in parallel the output is the same as when the task commits are serialized. This is not a requirement of Job Commit.</li>
<li><i>Abortable:</i> jobs and tasks may be aborted prior to job commit, after which their output is not visible.</li>
<li><i>Continuity of correctness:</i> once a job is committed, the output of any failed, aborted, or unsuccessful task MUST NO appear at some point in the future.</li>
</ul>
<p>For Hive&#x2019;s classic hierarchical-directory-structured tables, job committing requires the output of all committed tasks to be put into the correct location in the directory tree.</p>
<p>The committer built into <code>hadoop-mapreduce-client-core</code> module is the <code>FileOutputCommitter</code>.</p>
<p>It has two algorithms, v1 and v2.</p>
<p>The v1 algorithm is resilient to all forms of task failure, but slow when committing the final aggregate output as it renames each newly created file to the correct place in the table one by one.</p>
<p>The v2 algorithm is not considered safe because the output is visible when individual tasks commit, rather than being delayed until job commit. It is possible for multiple task attempts to get their data into the output directory tree, and if a job fails/is aborted before the job is committed, thie output is visible.</p></section></section><section>
<h2><a name="File_Output_Committer_V1_and_V2"></a>File Output Committer V1 and V2</h2><section>
<h3><a name="File_Output_Committer_V1_and_V2_Commit_algorithms"></a>File Output Committer V1 and V2 Commit algorithms</h3><section>
<h4><a name="Task_attempt_execution_.28V1_and_V2.29"></a>Task attempt execution (V1 and V2)</h4>
<p>job attempt directory in <code>$dest/__temporary/$jobAttemptId/</code> contains all output of the job in progress every task attempt is allocated its own task attempt dir <code>$dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId</code></p>
<p>All work for a task is written under the task attempt directory. If the output is a deep tree with files at the root, the task attempt dir will end up with a similar structure, with the files it has generated and the directories above them.</p></section></section><section>
<h3><a name="MapReduce_V1_algorithm:"></a>MapReduce V1 algorithm:</h3><section>
<h4><a name="v1_Task_commit"></a>v1 Task commit</h4>
<p>The task attempt dir is renamed directly underneath the job attempt dir</p>
<div class="source">
<div class="source">
<pre>rename(
$dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId
$dest/__temporary/$jobAttemptId/$taskId)
</pre></div></div>
</section><section>
<h4><a name="V1_Job_Commit"></a>V1 Job Commit</h4>
<p>For each committed task, all files underneath are renamed into the destination directory, with a filename relative from the base directory of the task remapped to that of the dest dir.</p>
<p>That is, everything under <code>$dest/__temporary/$jobAttemptId/$taskId</code> is converted to a path under <code>$dest</code>.</p>
<p>A recursive treewalk identifies the paths to rename in each TA directory. There&#x2019;s some optimisation if the task directory tree contains a subdirectory directory which does not exist under the destination: in this case the whole directory can be renamed. If the directory already exists, a file-by-file merge takes place for that dir, with the action for subdirectories again depending on the presence of the destination.</p>
<p>As a result, if the output of each task goes a separate final directory (e.g the final partition is unique to a single task), the rename is O(1) for the dir, irrespective of children. If the output is to be in the same dir as other tasks (or updating existing directories), then the rename performance becomes O( files).</p>
<p>Finally, a 0-byte <code>_SUCCESS</code> file is written iff <code>mapreduce.fileoutputcommitter.marksuccessfuljobs</code> is true.</p></section></section><section>
<h3><a name="MapReduce_V2_algorithm:"></a>MapReduce V2 algorithm:</h3><section>
<h4><a name="V2_Task_commit"></a>V2 Task commit</h4>
<p>The files under the task attempt dir are renamed one by one into the destination directory. There&#x2019;s no attempt at optimising directory renaming, because other tasks may be committing their work at the same time. It is therefore <code>O(files)</code> + the cost of listing the directory tree. Again: done with a recursive treewalk, not a deep <code>listFiles(path, recursive=true)</code> API, which would be faster on HDFS and (though not relevant here) S3.</p></section><section>
<h4><a name="V2_Job_Commit"></a>V2 Job Commit</h4>
<p>A 0-byte <code>_SUCCESS</code> file is written iff <code>mapreduce.fileoutputcommitter.marksuccessfuljobs</code> is true.</p></section></section><section>
<h3><a name="Why_the_V2_committer_is_incorrect.2Funsafe"></a>Why the V2 committer is incorrect/unsafe</h3>
<p>If, for a Task T1, Task Attempt 1 (T1A1) fails before committing, the driver will schedule a new attempt &#x201c;T1A2&#x201d;, and commit it. All is good.</p>
<p>But: if T1A1 was given permission to commit and it failed during the commit process, some of its output may have been written to the destination directory.</p>
<p>If attempt T1A2 was then told to commit, then if and only if its output had the exact set of file names would any already-renamed files be overwritten. If different filenames were generated, then the output would contain files of T1A1 and T1A2.</p>
<p>If T1A1 became partitioned during the commit process, then the job committer would schedule another attempt and commit its work. However, if T1A1 still had connectivity to the filesystem, it could still be renaming files. The output of the two tasks could be intermingled even if the same filenames were used.</p></section></section><section>
<h2><a name="Background:_the_S3A_Committers"></a>Background: the S3A Committers</h2>
<p>The paper, <a class="externalLink" href="https://github.com/steveloughran/zero-rename-committer/releases/"><i>A Zero-Rename Committer</i></a>, Loughran et. al., covers these committers</p>
<p>It also describes the commit problem, defines correctness, and describes the algorithms of the v1 and v2 committers, as well as those of the S3A committers, IBM Stocator committer and what we know of EMR&#x2019;s Spark committer.</p>
<p>The <code>hadoop-aws</code> JAR contains a pair of committers, &#x201c;Staging&#x201d; and &#x201c;Magic&#x201d;. Both of these are implementations of the same problem: safely and rapidly committing work to an S3 object store.</p>
<p>The committers take advantage of the fact that S3 offers an atomic way to create a file: the PUT request.</p>
<p>Files either exist or they don&#x2019;t. A file can be uploaded direct to its destination, and it is only when the upload completes that the file is manifest -overwriting any existing copy.</p>
<p>For large files, a multipart upload allows this upload operation to be split into a series of POST requests</p>
<p>1 <code>initiate-upload (path -&gt; upload ID)</code> 1. <code>upload part(path, upload ID, data[]) -&gt; checksum.</code> This can be parallelised. Up to 10,000 parts can be uploaded to a single object. All but the final part must be &gt;= 5MB. 1. <code>complete-upload (path, upload ID, List&lt;checksum&gt;)</code> this manifests the file, building it from the parts in the sequence of blocks defined by the ordering of the checksums.</p>
<p>The secret for the S3A committers is that the final POST request can be delayed until the job commit phase, even though the files are uploaded during task attempt execution/commit. The task attempts need to determine the final destination of each file, upload the data as part of a multipart operation, then save the information needed to complete the upload in a file which is later read by the job committer and used in a POST request.</p><section>
<h3><a name="Staging_Committer"></a>Staging Committer</h3>
<p>The <i>Staging Committer</i> is based on the contribution by Ryan Blue of Netflix. it relies on HDFS to be the consistent store to propagate the <code>.pendingset</code> files.</p>
<p>The working directory of each task attempt is in the local filesystem, &#x201c;the staging directory&#x201d;. The information needed to complete the uploads is passed from Task Attempts to the Job Committer by using a v1 FileOutputCommitter working with the cluster HDFS filesystem. This ensures that the committer has the same correctness guarantees as the v1 algorithm.</p>
<ol style="list-style-type: decimal">
<li>Task commit consists of uploading all files under the local filesystem&#x2019;s task attempt working directory to their final destination path, holding back on the final manifestation POST.</li>
<li>A JSON file containing all information needed to complete the upload of all files in the task attempt is written to the Job Attempt directory of the wrapped committer working with HDFS.</li>
<li>Job commit: load in all the manifest files in the HDFS job attempt directory, then issued the POST request to complete the uploads. These are parallelised.</li>
</ol></section><section>
<h3><a name="The_Magic_Committer"></a>The Magic Committer</h3>
<p>The <i>Magic Committer</i> is purely-S3A and takes advantage and of the fact the authorts could make changes within the file system client itself.</p>
<p>&#x201c;Magic&#x201d; paths are defined which, when opened for writing under, initiate a multi-party upload to the final destination directory. When the output stream is <code>close()</code>d, a zero byte marker file is written to the magic path, and a JSON .pending file containing all the information needed to complete the upload is saved.</p>
<p>Task commit: 1. List all <code>.pending</code> files under each task attempt&#x2019;s magic directory; 1. Aggregate to a <code>.pendingset</code> file 1. Save to the job attempt directory with the task ID.</p>
<p>Job commit:</p>
<ol style="list-style-type: decimal">
<li>List <code>.pendingset</code> files in the job attempt directory</li>
<li>Complete the uploads with POST requests.</li>
</ol>
<p>The Magic committer absolutely requires a consistent S3 Store -originally with S3Guard. Now that S3 is consistent, raw S3 can be used. It does not need HDFS or any other filesystem with <code>rename()</code>.</p></section><section>
<h3><a name="Correctness"></a>Correctness</h3>
<p>The S3A committer is considered correct because</p>
<ol style="list-style-type: decimal">
<li>Nothing is materialized until job commit.</li>
<li>Only one task attempt&#x2019;s manifest can be saved to the job attempt directory. Hence: only of the TA&#x2019;s files of the same task ID are exclusively committed.</li>
<li>The staging committer&#x2019;s use of HDFS to pass manifests from TAs to the Job committer ensures that S3&#x2019;s eventual consistency would not cause manifests to be missed.</li>
<li>Until S3 was consistent, the magic committer relied on S3Guard to provide the list consistency needed during both task- and job- commit.</li>
<li>The authors and wider community fixed all the issues related to the committers which have surfaced in production.</li>
</ol>
<p>Significant issues which were fixed include:</p>
<ul>
<li><a class="externalLink" href="https://issues.apache.org/jira/browse/HADOOP-15961">HADOOP-15961</a>. S3A committers: make sure there&#x2019;s regular progress() calls.</li>
<li><a class="externalLink" href="https://issues.apache.org/jira/browse/HADOOP-16570">HADOOP-16570</a>. S3A committers encounter scale issues.</li>
<li><a class="externalLink" href="https://issues.apache.org/jira/browse/HADOOP-16798">HADOOP-16798</a>. S3A Committer thread pool shutdown problems.</li>
<li><a class="externalLink" href="https://issues.apache.org/jira/browse/HADOOP-17112">HADOOP-17112</a>. S3A committers can&#x2019;t handle whitespace in paths.</li>
<li><a class="externalLink" href="https://issues.apache.org/jira/browse/HADOOP-17318">HADOOP-17318</a>. Support concurrent S3A commit jobs with same app attempt ID.</li>
<li><a class="externalLink" href="https://issues.apache.org/jira/browse/HADOOP-17258">HADOOP-17258</a>. MagicS3GuardCommitter fails with <code>pendingset</code> already exists</li>
<li><a class="externalLink" href="https://issues.apache.org/jira/browse/HADOOP-17414]">HADOOP-17414</a>. Magic committer files don&#x2019;t have the count of bytes written collected by spark</li>
<li><a class="externalLink" href="https://issues.apache.org/jira/browse/SPARK-33230">SPARK-33230</a> Hadoop committers to get unique job ID in <code>spark.sql.sources.writeJobUUID</code></li>
<li><a class="externalLink" href="https://issues.apache.org/jira/browse/SPARK-33402">SPARK-33402</a> Jobs launched in same second have duplicate MapReduce JobIDs</li>
<li><a class="externalLink" href="https://issues.apache.org/jira/browse/SPARK-33739]">SPARK-33739</a>. Jobs committed through the S3A Magic committer don&#x2019;t report the bytes written (depends on HADOOP-17414)</li>
</ul>
<p>Of those which affected the correctness rather than scale/performance/UX: HADOOP-17258 involved the recovery from a failure after TA1 task commit had completed &#x2014;but had failed to report in. SPARK-33402, SPARK-33230 and HADOOP-17318 are all related: if two spark jobs/stages started in the same second, they had the same job ID. This caused the HDFS directories used by the staging committers to be intermingled.</p>
<p>What is notable is this: these are all problems which the minimal integration test suites did not discover.</p>
<p>The good news: we now know of these issues and are better placed to avoid replicating them again. And know what to write tests for.</p></section></section><section>
<h2><a name="The_V1_committer:_slow_in_Azure_and_slow_and_unsafe_on_GCS."></a>The V1 committer: slow in Azure and slow and unsafe on GCS.</h2>
<p>The V1 committer underperforms on ABFS because:</p>
<ol style="list-style-type: decimal">
<li>Directory listing and file renaming is somewhat slower with ABFS than it is with HDFS.</li>
<li>The v1 committer sequentially commits the output of each task through a listing of each committed task&#x2019;s output, moving directories when none exist in the destination, merging files into extant directories.</li>
</ol>
<p>The V2 committer is much faster in the job commit because it performs the list and rename process in the task commit. Which, because it is non-atomic, is why it is considered dangerous to use. What the V2 task commit algorithm does show is that it is possible to parallelise committing the output of different tasks by using file-by-file rename exclusively.</p>
<p>The V1 committer underperforms on GCS because even the task commit operation, &#x2014;directory rename&#x2014;, is a non-atomic <code>O(files)</code> operation. This also means that it is unsafe.</p>
<p>If the task attempt has partitioned and the spark driver schedules/commits another TA, then, the task dir may contain 1+ file from the first attempt.</p><hr />
<h1>The Manifest Committer Protocol</h1></section><section>
<h2><a name="Requirements_of_the_Store"></a>Requirements of the Store</h2>
<p>Stores/filesystems supported by this committer MUST:</p>
<ul>
<li>Have consistent listings.</li>
<li>Have an atomic <code>O(1)</code> file rename operation.</li>
</ul>
<p>Stores/filesystems supported by this committer SHOULD:</p>
<ul>
<li>Rename files successfully, even under load. ABFS does not do this, so special recovery is provided there.</li>
<li>Implement the <code>EtagSource</code> interface of HADOOP-17979. This is used for ABFS rename recovery, and for optional validation of the final output.</li>
</ul>
<p>Stores/filesystems supported by this committer MAY:</p>
<ul>
<li>Have list operations with high latency.</li>
<li>Reject calls under load with throttling responses, which MUST be handled in the filesystem connector.</li>
</ul>
<p>Stores/filesystems supported by this committer MAY NOT:</p>
<ul>
<li>Support atomic directory rename. This is never used except optionally in cleanup.</li>
<li>Support <code>O(1)</code> directory deletion. The <code>CleanupJobStage</code> assumes this is not the case and so deletes task attempt directories in parallel.</li>
<li>Support an atomic <code>create(Path, overwrite=false)</code> operation. The manifests are committed by writing to a path including the task attempt ID, then renamed to their final path.</li>
<li>Support fast <code>listFiles(path, recursive=true)</code> calls. This API call is not used.</li>
</ul>
<p>When compared with the <code>FileOutputCommitter</code>, the requirements which have been removed are:</p>
<ul>
<li>Atomic directory rename.</li>
<li><code>O(1)</code> directory deletion.</li>
<li>Fast directory listings.</li>
<li>The implicit absence of throttling behaviors.</li>
</ul>
<p>HDFS meets all those requirements, so does not benefit significantly from this committer, though it will still work there.</p>
<p>The S3 store does not meet the rename requirements of this committer, even now that it is consistent. This committer is not safe to use on S3.</p><section>
<h3><a name="Task_and_Job_IDs"></a>Task and Job IDs</h3>
<p>Every job MUST have a unique ID.</p>
<p>The implementation expects the Spark runtime to have the relevant patches to ensure this.</p>
<p>The job ID is used to name temporary directories, rather than using the classic incrementing natural numbering scheme of <code>_temporary/0/</code>. That scheme comes from MapReduce where job attempts of attempt ID &gt; 1 look for tasks committed by predecessors and incorporate that into their results.</p>
<p>This committer targets Spark, where there is no attempt at recovery. By using the job ID in paths, if jobs are configured to <i>not</i> delete all of <code>_temporary</code> in job cleanup/abort, then multiple jobs MAY be executed using the same table as their destination.</p>
<p>Task IDs and Task Attempt IDs will be derived from Job IDs as usual.</p>
<p>It is expected that filenames of written files SHALL be unique. This is done in Spark for ORC and Parquet files, and allows for checks for destination files to be omitted by default.</p></section></section><section>
<h2><a name="Directory_Structure"></a>Directory Structure</h2>
<p>Given a destination directory <code>destDir: Path</code></p>
<p>A job of id <code>jobID: String</code> and attempt number <code>jobAttemptNumber:int</code> will use the directory:</p>
<div class="source">
<div class="source">
<pre>$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/
</pre></div></div>
<p>For its work (note: it will actually format that final subdir with <code>%02d</code>).</p>
<p>This is termed the <i>Job Attempt Directory</i></p>
<p>Under the Job Attempt Directory, a subdirectory <code>tasks</code> is created. This is termed the <i>Task Attempt Directory</i>. Every task attempt will have its own subdirectory of this, into which its work will be saved.</p>
<p>Under the Job Attempt Directory, a subdirectory <code>manifests</code> is created. This is termed the <i>y</i>.</p>
<p>The manifests of all committed tasks will be saved to this directory with the filename of <code>$taskId-manifest.json</code></p>
<p>The full path</p>
<div class="source">
<div class="source">
<pre>$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/manifests/$taskId-manifest.json
</pre></div></div>
<p>Is the final location for the manifest of all files created by a committed task. It is termed the <i>Manifest Path of a Committed Task</i>.</p>
<p>Task attempts will save their manifest into this directory with a temporary filename <code>$taskAttemptId-manifest.json.tmp</code>.</p>
<p>This is termed the <i>Temporary Path of a Task Attempt&#x2019;s Manifest</i>.</p>
<p>For the job and task operations then, the following paths are defined.</p>
<div class="source">
<div class="source">
<pre>let jobDirectory = &quot;$destDir/_temporary/manifest_$jobID/&quot;
let jobAttemptDirectory = jobDirectory + &quot;$jobAttemptNumber/&quot;
let manifestDirectory = jobAttemptDirectory + &quot;manifests/&quot;
let taskAttemptDirectory = jobAttemptDirectory + &quot;tasks/&quot;
</pre></div></div>
<p>And for each task attempt, the following paths are also defined</p>
<div class="source">
<div class="source">
<pre>let taskAttemptWorkingDirectory = taskAttemptDirectory + &quot;$taskAttemptId&quot;
let taskManifestPath = manifestDirectory + &quot;$taskId-manifest.json&quot;
let taskAttemptTemporaryManifestPath = manifestDirectory + &quot;$taskAttemptId-manifest.json&quot;
</pre></div></div>
</section><section>
<h2><a name="Core_Algorithm_of_the_Protocol"></a>Core Algorithm of the Protocol</h2>
<ol style="list-style-type: decimal">
<li>Each Task attempt writes all its files to a unique directory tree under the Task Attempt Directory.</li>
<li>Task Commit consists of a recursive scan of the directory for that task attempt, creating a list of directories and a list of files.</li>
<li>These lists are saved as a JSON manifest file.</li>
<li>Job commit consists of listing all of the JSON manifest files, loading their contents, creating the aggregate set of destination directories and renaming all files into their final destinations.</li>
</ol><section>
<h3><a name="The_Intermediate_Manifest"></a>The Intermediate Manifest</h3>
<p>This is JSON file is designed which contains (along with IOStatistics and some diagnostics)</p>
<ol style="list-style-type: decimal">
<li>A list of destination directories which must be created if they do not exist.</li>
<li>A list of files to rename as (absolute source, absolute destination, file-size) entries.</li>
</ol></section><section>
<h3><a name="Job_Setup"></a>Job Setup</h3>
<div class="source">
<div class="source">
<pre>mkdir(jobAttemptDirectory)
mkdir(manifestDirectory)
mkdir(taskAttemptDirectory)
</pre></div></div>
</section><section>
<h3><a name="Task_Setup"></a>Task Setup</h3>
<div class="source">
<div class="source">
<pre>mkdir(taskAttemptWorkingDirectory)
</pre></div></div>
</section><section>
<h3><a name="Task_Commit"></a>Task Commit</h3>
<p>Task attempts are committed by:</p>
<ol style="list-style-type: decimal">
<li>Recursively listing the task attempt working dir to build</li>
<li>A list of destination directories under which files will be renamed, and their status (exists, not_found, file)</li>
<li>A list of files to rename: source, destination, size and optionally, etag.</li>
<li>These lists populate a JSON file, the <i>Intermediate Manifest</i>.</li>
<li>The task attempt saves this file to its <i>Temporary Path of a Task Attempt&#x2019;s Manifest</i>.</li>
<li>The task attempt then deletes the <i>Manifest Path of a Committed Task</i> and renames its own manifest file to that path.</li>
<li>If the rename succeeeds, the task commit is considered a success.</li>
</ol>
<p>No renaming takes place at this point.: the files are left in their original location until renamed in job commit.</p>
<div class="source">
<div class="source">
<pre>let (renames, directories) = scan(taskAttemptWorkingDirectory)
let manifest = new Manifest(renames, directories)
manifest.save(taskAttemptTemporaryManifestPath)
rename(taskAttemptTemporaryManifestPath, taskManifestPath)
</pre></div></div>
</section><section>
<h3><a name="Task_Abort.2Fcleanup"></a>Task Abort/cleanup</h3>
<div class="source">
<div class="source">
<pre>delete(taskAttemptWorkingDirectory)
</pre></div></div>
</section><section>
<h3><a name="Job_Commit"></a>Job Commit</h3>
<p>Job Commit consists of:</p>
<ol style="list-style-type: decimal">
<li>List all manifest files in the job attempt directory.</li>
<li>Load each manifest file, create directories which do not yet exist, then rename each file in the rename list.</li>
<li>Optionally save a JSON <code>_SUCCESS</code> file with the same format as the S3A committer (for testing; use write and rename for atomic save)</li>
</ol>
<p>The job commit phase supports parallelization for many tasks and many files per task, specifically there is a thread pool for parallel store IO</p>
<ol style="list-style-type: decimal">
<li>Manifest tasks are loaded and processed in parallel.</li>
<li>Deletion of files where directories are intended to be created.</li>
<li>Creation of leaf directories.</li>
<li>File rename.</li>
<li>In cleanup and abort: deletion of task attempt directories</li>
<li>If validation of output is enabled for testing/debugging: getFileStatus calls to compare file length and, if possible etags.</li>
</ol>
<div class="source">
<div class="source">
<pre>let manifestPaths = list(&quot;$manifestDirectory/*-manifest.json&quot;)
let manifests = manifestPaths.map(p -&gt; loadManifest(p))
let directoriesToCreate = merge(manifests.directories)
let filesToRename = concat(manifests.files)
directoriesToCreate.map(p -&gt; mkdirs(p))
filesToRename.map((src, dest, etag) -&gt; rename(src, dest, etag))
if mapreduce.fileoutputcommitter.marksuccessfuljobs then
success.save(&quot;$destDir/_SUCCESS&quot;)
</pre></div></div>
<p>Implementation Note:</p>
<p>To aid debugging and development, the summary be saved to a location in the same <i>or different</i> filesystem; the intermediate manifests may be renamed to a location in the target filesystem.</p>
<div class="source">
<div class="source">
<pre>if summary.report.directory != &quot;&quot; then
success.save(&quot;${summary.report.directory}/$jobID.json&quot;)
if diagnostics.manifest.directory != null then
rename($manifestDirectory, &quot;${diagnostics.manifest.directory}/$jobID&quot;)
</pre></div></div>
<p>The summary report is saved even if job commit fails for any reason</p></section><section>
<h3><a name="Job_Abort.2Fcleanup"></a>Job Abort/cleanup</h3>
<p>Job cleanup is nominally one of deleting the job directory</p>
<div class="source">
<div class="source">
<pre>delete(jobDirectory)
</pre></div></div>
<p>To address scale issues with the object stores, this SHALL be preceeded by a (parallelized) deletion of all task attempt working directories</p>
<div class="source">
<div class="source">
<pre>let taskAttemptWorkingDirectories = list(&quot;taskAttemptDirectory&quot;)
taskAttemptWorkingDirectories.map(p -&gt; delete(p))
</pre></div></div>
</section></section><section>
<h2><a name="Benefits_of_the_new_protocol"></a>Benefits of the new protocol</h2>
<ul>
<li>Pushes the source tree list operations into the task commit phase, which is generally off the critical path of execution.</li>
<li>Reduces the number of directories probed/created to the aggregate set of output directories, with all duplicates eliminated.</li>
<li>File rename can be parallelized, with the limits being that of configured thread pool sizes and/or any rate limiting constraints.</li>
<li>Provides an atomic task commit to GCS, as there is no expectation that directory rename is atomic.</li>
<li>Permits pass IOStatistics from tasks attempts to the job committer via the manifests.</li>
<li>Allows for some pre-rename operations in the Job Committer similar to the S3A &#x201c;Partitioned Staging committer&#x201d;. This can be configured to delete all existing entries in directories scheduled to be created -or fail if those partitions are non-empty. See <a href="../../hadoop-aws/tools/hadoop-aws/committers.html#The_.E2.80.9CPartitioned.E2.80.9D_Staging_Committer">Partitioned Staging Committer</a></li>
<li>Allows for an optional preflight validation check (verify no duplicate files created by different tasks).</li>
<li>Manifests can be viewed, size of output determined, etc., during development/debugging.</li>
</ul></section><section>
<h2><a name="Disadvantages_of_the_new_protocol_compared_to_the_v1_algorithm"></a>Disadvantages of the new protocol compared to the v1 algorithm</h2>
<ul>
<li>Needs a new manifest file format.</li>
<li>Manifests may get large if tasks create many files and/or subdirectories, or if etags are collected and the length of these tags is significant. The HTTP protocol limits each etag to 8 KiB, so the cost may be 8 KiB per file.</li>
<li>Makes task commit more complex than the v1 algorithm.</li>
<li>Possibly suboptimal on jobs where individual tasks create unique output directories, as directory rename will never be used to commit a directory.</li>
</ul></section>
</div>
</div>
<div class="clear">
<hr/>
</div>
<div id="footer">
<div class="xright">
&#169; 2008-2023
Apache Software Foundation
- <a href="http://maven.apache.org/privacy-policy.html">Privacy Policy</a>.
Apache Maven, Maven, Apache, the Apache feather logo, and the Apache Maven project logos are trademarks of The Apache Software Foundation.
</div>
<div class="clear">
<hr/>
</div>
</div>
</body>
</html>