diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
index 8608a7b9771..1f668eb6778 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
@@ -17,17 +17,21 @@
*/
package org.apache.hadoop.fs;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import javax.annotation.Nonnull;
import java.io.IOException;
+import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@@ -43,6 +47,29 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
* Progressable)}.
*
* To create missing parent directory, use {@link #recursive()}.
+ *
+ * To be more generic, {@link #opt(String, int)} and {@link #must(String, int)}
+ * variants provide implementation-agnostic way to customize the builder.
+ * Each FS-specific builder implementation can interpret the FS-specific
+ * options accordingly, for example:
+ *
+ *
+ * FSDataOutputStreamBuilder builder = fs.createFile(path);
+ * builder.permission(perm)
+ * .bufferSize(bufSize)
+ * .opt("dfs.outputstream.builder.lazy-persist", true)
+ * .opt("dfs.outputstream.builder.ec.policy-name", "rs-3-2-64k")
+ * .opt("fs.local.o-direct", true)
+ * .must("fs.s3a.fast-upload", true)
+ * .must("fs.azure.buffer-size", 256 * 1024 * 1024);
+ * FSDataOutputStream out = builder.build();
+ * ...
+ *
+ *
+ * If the option is not related to the file system, the option will be ignored.
+ * If the option is must, but not supported by the file system, a
+ * {@link IllegalArgumentException} will be thrown.
+ *
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@@ -60,6 +87,16 @@ public abstract class FSDataOutputStreamBuilder
private Progressable progress = null;
private ChecksumOpt checksumOpt = null;
+ /**
+ * Contains optional and mandatory parameters.
+ *
+ * It does not load default configurations from default files.
+ */
+ private final Configuration options = new Configuration(false);
+
+ /** Keep track of the keys for mandatory options. */
+ private final Set mandatoryKeys = new HashSet<>();
+
/**
* Return the concrete implementation of the builder instance.
*/
@@ -215,11 +252,125 @@ public abstract class FSDataOutputStreamBuilder
return getThisBuilder();
}
+ /**
+ * Set optional Builder parameter.
+ */
+ public B opt(@Nonnull final String key, @Nonnull final String value) {
+ mandatoryKeys.remove(key);
+ options.set(key, value);
+ return getThisBuilder();
+ }
+
+ /**
+ * Set optional boolean parameter for the Builder.
+ */
+ public B opt(@Nonnull final String key, boolean value) {
+ mandatoryKeys.remove(key);
+ options.setBoolean(key, value);
+ return getThisBuilder();
+ }
+
+ /**
+ * Set optional int parameter for the Builder.
+ */
+ public B opt(@Nonnull final String key, int value) {
+ mandatoryKeys.remove(key);
+ options.setInt(key, value);
+ return getThisBuilder();
+ }
+
+ /**
+ * Set optional float parameter for the Builder.
+ */
+ public B opt(@Nonnull final String key, float value) {
+ mandatoryKeys.remove(key);
+ options.setFloat(key, value);
+ return getThisBuilder();
+ }
+
+ /**
+ * Set optional double parameter for the Builder.
+ */
+ public B opt(@Nonnull final String key, double value) {
+ mandatoryKeys.remove(key);
+ options.setDouble(key, value);
+ return getThisBuilder();
+ }
+
+ /**
+ * Set an array of string values as optional parameter for the Builder.
+ */
+ public B opt(@Nonnull final String key, @Nonnull final String... values) {
+ mandatoryKeys.remove(key);
+ options.setStrings(key, values);
+ return getThisBuilder();
+ }
+
+ /**
+ * Set mandatory option to the Builder.
+ *
+ * If the option is not supported or unavailable on the {@link FileSystem},
+ * the client should expect {@link #build()} throws
+ * {@link IllegalArgumentException}.
+ */
+ public B must(@Nonnull final String key, @Nonnull final String value) {
+ mandatoryKeys.add(key);
+ options.set(key, value);
+ return getThisBuilder();
+ }
+
+ /** Set mandatory boolean option. */
+ public B must(@Nonnull final String key, boolean value) {
+ mandatoryKeys.add(key);
+ options.setBoolean(key, value);
+ return getThisBuilder();
+ }
+
+ /** Set mandatory int option. */
+ public B must(@Nonnull final String key, int value) {
+ mandatoryKeys.add(key);
+ options.setInt(key, value);
+ return getThisBuilder();
+ }
+
+ /** Set mandatory float option. */
+ public B must(@Nonnull final String key, float value) {
+ mandatoryKeys.add(key);
+ options.setFloat(key, value);
+ return getThisBuilder();
+ }
+
+ /** Set mandatory double option. */
+ public B must(@Nonnull final String key, double value) {
+ mandatoryKeys.add(key);
+ options.setDouble(key, value);
+ return getThisBuilder();
+ }
+
+ /** Set a string array as mandatory option. */
+ public B must(@Nonnull final String key, @Nonnull final String... values) {
+ mandatoryKeys.add(key);
+ options.setStrings(key, values);
+ return getThisBuilder();
+ }
+
+ protected Configuration getOptions() {
+ return options;
+ }
+
+ /**
+ * Get all the keys that are set as mandatory keys.
+ */
+ @VisibleForTesting
+ protected Set getMandatoryKeys() {
+ return Collections.unmodifiableSet(mandatoryKeys);
+ }
+
/**
* Create the FSDataOutputStream to write on the file system.
*
- * @throws HadoopIllegalArgumentException if the parameters are not valid.
+ * @throws IllegalArgumentException if the parameters are not valid.
* @throws IOException on errors when file system creates or appends the file.
*/
- public abstract S build() throws IOException;
+ public abstract S build() throws IllegalArgumentException, IOException;
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
index 00cedc38088..357c6832271 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
@@ -17,11 +17,13 @@
*/
package org.apache.hadoop.fs;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
@@ -31,7 +33,11 @@ import static org.apache.hadoop.fs.FileSystemTestHelper.*;
import java.io.*;
import java.net.URI;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
import java.util.Random;
+import java.util.Set;
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
import static org.apache.hadoop.test.PlatformAssumptions.assumeWindows;
@@ -46,6 +52,8 @@ import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.internal.util.reflection.Whitebox;
+import javax.annotation.Nonnull;
+
/**
* This class tests the local file system via the FileSystem abstraction.
@@ -703,4 +711,66 @@ public class TestLocalFileSystem {
Assert.assertEquals("Buffer size should be 0",
builder.getBufferSize(), 0);
}
+
+ /**
+ * A builder to verify configuration keys are supported.
+ */
+ private static class BuilderWithSupportedKeys
+ extends FSDataOutputStreamBuilder {
+
+ private final Set supportedKeys = new HashSet<>();
+
+ BuilderWithSupportedKeys(@Nonnull final Collection supportedKeys,
+ @Nonnull FileSystem fileSystem, @Nonnull Path p) {
+ super(fileSystem, p);
+ this.supportedKeys.addAll(supportedKeys);
+ }
+
+ @Override
+ protected BuilderWithSupportedKeys getThisBuilder() {
+ return this;
+ }
+
+ @Override
+ public FSDataOutputStream build()
+ throws IllegalArgumentException, IOException {
+ Set unsupported = new HashSet<>(getMandatoryKeys());
+ unsupported.removeAll(supportedKeys);
+ Preconditions.checkArgument(unsupported.isEmpty(),
+ "unsupported key found: " + supportedKeys);
+ return getFS().create(
+ getPath(), getPermission(), getFlags(), getBufferSize(),
+ getReplication(), getBlockSize(), getProgress(), getChecksumOpt());
+ }
+ }
+
+ @Test
+ public void testFSOutputStreamBuilderOptions() throws Exception {
+ Path path = new Path(TEST_ROOT_DIR, "testBuilderOpt");
+ final List supportedKeys = Arrays.asList("strM");
+
+ FSDataOutputStreamBuilder, ?> builder =
+ new BuilderWithSupportedKeys(supportedKeys, fileSys, path);
+ builder.opt("strKey", "value");
+ builder.opt("intKey", 123);
+ builder.opt("strM", "ignored");
+ // Over-write an optional value with a mandatory value.
+ builder.must("strM", "value");
+ builder.must("unsupported", 12.34);
+
+ assertEquals("Optional value should be overwrite by a mandatory value",
+ "value", builder.getOptions().get("strM"));
+
+ Set mandatoryKeys = builder.getMandatoryKeys();
+ Set expectedKeys = new HashSet<>();
+ expectedKeys.add("strM");
+ expectedKeys.add("unsupported");
+ assertEquals(expectedKeys, mandatoryKeys);
+ assertEquals(2, mandatoryKeys.size());
+
+ LambdaTestUtils.intercept(IllegalArgumentException.class,
+ "unsupported key found", builder::build
+ );
+ }
}