diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java
index f12678b5562..adde981f247 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java
@@ -23,6 +23,10 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -60,7 +64,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
-public class BindingPathOutputCommitter extends PathOutputCommitter {
+public class BindingPathOutputCommitter extends PathOutputCommitter
+ implements IOStatisticsSource, StreamCapabilities {
/**
* The classname for use in configurations.
@@ -181,4 +186,22 @@ public class BindingPathOutputCommitter extends PathOutputCommitter {
public PathOutputCommitter getCommitter() {
return committer;
}
+
+ /**
+ * Pass through if the inner committer supports StreamCapabilities.
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean hasCapability(final String capability) {
+ if (committer instanceof StreamCapabilities) {
+ return ((StreamCapabilities) committer).hasCapability(capability);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public IOStatistics getIOStatistics() {
+ return IOStatisticsSupport.retrieveIOStatistics(committer);
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java
index 99625e82428..024fb3ab34e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.JobContext;
@@ -55,6 +56,7 @@ import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_COMPLETED_COUNT;
@@ -84,7 +86,7 @@ import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.C
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ManifestCommitter extends PathOutputCommitter implements
- IOStatisticsSource, StageEventCallbacks {
+ IOStatisticsSource, StageEventCallbacks, StreamCapabilities {
public static final Logger LOG = LoggerFactory.getLogger(
ManifestCommitter.class);
@@ -758,4 +760,15 @@ public class ManifestCommitter extends PathOutputCommitter implements
public IOStatisticsStore getIOStatistics() {
return iostatistics;
}
+
+ /**
+ * The committer is compatible with spark's dynamic partitioning
+ * algorithm.
+ * @param capability string to query the stream support for.
+ * @return true if the requested capability is supported.
+ */
+ @Override
+ public boolean hasCapability(final String capability) {
+ return CAPABILITY_DYNAMIC_PARTITIONING.equals(capability);
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java
index eb344e8a27e..fd7b3d816c1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java
@@ -234,6 +234,12 @@ public final class ManifestCommitterConstants {
*/
public static final String CONTEXT_ATTR_TASK_ATTEMPT_ID = "ta";
+ /**
+ * Stream Capabilities probe for spark dynamic partitioning compatibility.
+ */
+ public static final String CAPABILITY_DYNAMIC_PARTITIONING =
+ "mapreduce.job.committer.dynamic.partitioning";
+
private ManifestCommitterConstants() {
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md
index b446be29ddd..12fe1f0b5f2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md
@@ -269,6 +269,76 @@ appending data are creating and writing into new partitions.
job to create unique files. This is foundational for
any job to generate correct data.
+# Spark Dynamic Partition overwriting
+
+Spark has a feature called "Dynamic Partition Overwrites",
+
+This can be initiated in SQL
+```SQL
+INSERT OVERWRITE TABLE ...
+```
+Or through DataSet writes where the mode is `overwrite` and the partitioning matches
+that of the existing table
+```scala
+sparkConf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
+// followed by an overwrite of a Dataset into an existing partitioned table.
+eventData2
+ .write
+ .mode("overwrite")
+ .partitionBy("year", "month")
+ .format("parquet")
+ .save(existingDir)
+```
+
+This feature is implemented in Spark, which
+1. Directs the job to write its new data to a temporary directory
+1. After job commit completes, scans the output to identify the leaf directories "partitions" into which data was written.
+1. Deletes the content of those directories in the destination table
+1. Renames the new files into the partitions.
+
+This is all done in spark, which takes over the tasks of scanning
+the intermediate output tree, deleting partitions and of
+renaming the new files.
+
+This feature also adds the ability for a job to write data entirely outside
+the destination table, which is done by
+1. writing new files into the working directory
+1. spark moving them to the final destination in job commit
+
+
+The manifest committer is compatible with dynamic partition overwrites
+on Azure and Google cloud storage as together they meet the core requirements of
+the extension:
+1. The working directory returned in `getWorkPath()` is in the same filesystem
+ as the final output.
+2. `rename()` is an `O(1)` operation which is safe and fast to use when committing a job.
+
+None of the S3A committers support this. Condition (1) is not met by
+the staging committers, while (2) is not met by S3 itself.
+
+To use the manifest committer with dynamic partition overwrites, the
+spark version must contain
+[SPARK-40034](https://issues.apache.org/jira/browse/SPARK-40034)
+_PathOutputCommitters to work with dynamic partition overwrite_.
+
+Be aware that the rename phase of the operation will be slow
+if many files are renamed -this is done sequentially.
+Parallel renaming would speed this up, *but could trigger the abfs overload
+problems the manifest committer is designed to both minimize the risk
+of and support recovery from*
+
+The spark side of the commit operation will be listing/treewalking
+the temporary output directory (some overhead), followed by
+the file promotion, done with a classic filesystem `rename()`
+call. There will be no explicit rate limiting here.
+
+*What does this mean?*
+
+It means that _dynamic partitioning should not be used on Azure Storage
+for SQL queries/Spark DataSet operations where many thousands of files are created.
+The fact that these will suffer from performance problems before
+throttling scale issues surface, should be considered a warning.
+
# Job Summaries in `_SUCCESS` files
The original hadoop committer creates a zero byte `_SUCCESS` file in the root of the output directory
@@ -585,7 +655,7 @@ There is no need to alter these values, except when writing new implementations
something which is only needed if the store provides extra integration support for the
committer.
-## Support for concurrent test runs.
+## Support for concurrent jobs to the same directory
It *may* be possible to run multiple jobs targeting the same directory tree.
@@ -600,6 +670,8 @@ For this to work, a number of conditions must be met:
`mapreduce.fileoutputcommitter.cleanup.skipped` to `true`.
* All jobs/tasks must create files with unique filenames.
* All jobs must create output with the same directory partition structure.
+* The job/queries MUST NOT be using Spark Dynamic Partitioning "INSERT OVERWRITE TABLE"; data may be lost.
+ This holds for *all* committers, not just the manifest committer.
* Remember to delete the `_temporary` directory later!
This has *NOT BEEN TESTED*
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java
index 2212fabe54a..3037bf33ad6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
@@ -1549,6 +1550,23 @@ public class TestManifestCommitProtocol
ManifestCommitter committer = (ManifestCommitter)
outputFormat.getOutputCommitter(tContext);
+ // check path capabilities directly
+ Assertions.assertThat(committer.hasCapability(
+ ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING))
+ .describedAs("dynamic partitioning capability in committer %s",
+ committer)
+ .isTrue();
+ // and through a binding committer -passthrough is critical
+ // for the spark binding.
+ BindingPathOutputCommitter bindingCommitter =
+ new BindingPathOutputCommitter(outputDir, tContext);
+ Assertions.assertThat(bindingCommitter.hasCapability(
+ ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING))
+ .describedAs("dynamic partitioning capability in committer %s",
+ bindingCommitter)
+ .isTrue();
+
+
// setup
JobData jobData = new JobData(job, jContext, tContext, committer);
setupJob(jobData);