diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java
index 56ef51f128d..4790a29a460 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java
@@ -28,6 +28,34 @@ import org.apache.hadoop.classification.InterfaceStability;
* The base interface which various FileSystem FileContext Builder
* interfaces can extend, and which underlying implementations
* will then implement.
+ *
+ * HADOOP-16202 expanded the opt() and must() arguments with
+ * operator overloading, but HADOOP-18724 identified mapping problems:
+ * passing a long value in to {@code opt()} could end up invoking
+ * {@code opt(string, double)}, which could then trigger parse failures.
+ *
+ * To fix this without forcing existing code to break/be recompiled.
+ *
+ * - A new method to explicitly set a long value is added:
+ * {@link #optLong(String, long)}
+ *
+ * - A new method to explicitly set a double value is added:
+ * {@link #optLong(String, long)}
+ *
+ * -
+ * All of {@link #opt(String, long)}, {@link #opt(String, float)} and
+ * {@link #opt(String, double)} invoke {@link #optLong(String, long)}.
+ *
+ * -
+ * The same changes have been applied to {@code must()} methods.
+ *
+ *
+ * The forwarding of existing double/float setters to the long setters ensure
+ * that existing code will link, but are guaranteed to always set a long value.
+ * If you need to write code which works correctly with all hadoop releases,
+ * covert the option to a string explicitly and then call {@link #opt(String, String)}
+ * or {@link #must(String, String)} as appropriate.
+ *
* @param Return type on the {@link #build()} call.
* @param type of builder itself.
*/
@@ -63,13 +91,17 @@ public interface FSBuilder> {
B opt(@Nonnull String key, int value);
/**
- * Set optional float parameter for the Builder.
+ * This parameter is converted to a long and passed
+ * to {@link #optLong(String, long)} -all
+ * decimal precision is lost.
*
* @param key key.
* @param value value.
* @return generic type B.
* @see #opt(String, String)
+ * @deprecated use {@link #optDouble(String, double)}
*/
+ @Deprecated
B opt(@Nonnull String key, float value);
/**
@@ -78,18 +110,22 @@ public interface FSBuilder> {
* @param key key.
* @param value value.
* @return generic type B.
- * @see #opt(String, String)
+ * @deprecated use {@link #optLong(String, long)} where possible.
*/
B opt(@Nonnull String key, long value);
/**
- * Set optional double parameter for the Builder.
- *
+ * Pass an optional double parameter for the Builder.
+ * This parameter is converted to a long and passed
+ * to {@link #optLong(String, long)} -all
+ * decimal precision is lost.
* @param key key.
* @param value value.
* @return generic type B.
* @see #opt(String, String)
+ * @deprecated use {@link #optDouble(String, double)}
*/
+ @Deprecated
B opt(@Nonnull String key, double value);
/**
@@ -102,6 +138,28 @@ public interface FSBuilder> {
*/
B opt(@Nonnull String key, @Nonnull String... values);
+ /**
+ * Set optional long parameter for the Builder.
+ *
+ * @param key key.
+ * @param value value.
+ * @return generic type B.
+ * @see #opt(String, String)
+ */
+ @Deprecated
+ B optLong(@Nonnull String key, long value);
+
+ /**
+ * Set optional double parameter for the Builder.
+ *
+ * @param key key.
+ * @param value value.
+ * @return generic type B.
+ * @see #opt(String, String)
+ */
+ @Deprecated
+ B optDouble(@Nonnull String key, double value);
+
/**
* Set mandatory option to the Builder.
*
@@ -135,13 +193,16 @@ public interface FSBuilder> {
B must(@Nonnull String key, int value);
/**
- * Set mandatory float option.
+ * This parameter is converted to a long and passed
+ * to {@link #mustLong(String, long)} -all
+ * decimal precision is lost.
*
* @param key key.
* @param value value.
* @return generic type B.
- * @see #must(String, String)
+ * @deprecated use {@link #mustDouble(String, double)} to set floating point.
*/
+ @Deprecated
B must(@Nonnull String key, float value);
/**
@@ -152,16 +213,19 @@ public interface FSBuilder> {
* @return generic type B.
* @see #must(String, String)
*/
+ @Deprecated
B must(@Nonnull String key, long value);
/**
- * Set mandatory double option.
+ * Set mandatory long option, despite passing in a floating
+ * point value.
*
* @param key key.
* @param value value.
* @return generic type B.
* @see #must(String, String)
*/
+ @Deprecated
B must(@Nonnull String key, double value);
/**
@@ -174,6 +238,26 @@ public interface FSBuilder> {
*/
B must(@Nonnull String key, @Nonnull String... values);
+ /**
+ * Set mandatory long parameter for the Builder.
+ *
+ * @param key key.
+ * @param value value.
+ * @return generic type B.
+ * @see #opt(String, String)
+ */
+ B mustLong(@Nonnull String key, long value);
+
+ /**
+ * Set mandatory double parameter for the Builder.
+ *
+ * @param key key.
+ * @param value value.
+ * @return generic type B.
+ * @see #opt(String, String)
+ */
+ B mustDouble(@Nonnull String key, double value);
+
/**
* Instantiate the object which was being built.
*
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
index 4256522b2a3..c1bcf220c0c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
@@ -44,11 +44,13 @@ import static org.apache.hadoop.util.Preconditions.checkNotNull;
* with option support.
*
*
- * .opt("foofs:option.a", true)
- * .opt("foofs:option.b", "value")
+ * .opt("fs.s3a.open.option.caching", true)
+ * .opt("fs.option.openfile.read.policy", "random, adaptive")
* .opt("fs.s3a.open.option.etag", "9fe4c37c25b")
- * .must("foofs:cache", true)
- * .must("barfs:cache-size", 256 * 1024 * 1024)
+ * .optLong("fs.option.openfile.length", 1_500_000_000_000)
+ * .must("fs.option.openfile.buffer.size", 256_000)
+ * .mustLong("fs.option.openfile.split.start", 256_000_000)
+ * .mustLong("fs.option.openfile.split.end", 512_000_000)
* .build();
*
*
@@ -64,6 +66,7 @@ import static org.apache.hadoop.util.Preconditions.checkNotNull;
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
+@SuppressWarnings({"deprecation", "unused"})
public abstract class
AbstractFSBuilderImpl>
implements FSBuilder {
@@ -178,10 +181,7 @@ public abstract class
*/
@Override
public B opt(@Nonnull final String key, boolean value) {
- mandatoryKeys.remove(key);
- optionalKeys.add(key);
- options.setBoolean(key, value);
- return getThisBuilder();
+ return opt(key, Boolean.toString(value));
}
/**
@@ -190,19 +190,18 @@ public abstract class
* @see #opt(String, String)
*/
@Override
- public B opt(@Nonnull final String key, int value) {
- mandatoryKeys.remove(key);
- optionalKeys.add(key);
- options.setInt(key, value);
- return getThisBuilder();
+ public final B opt(@Nonnull final String key, int value) {
+ return optLong(key, value);
}
@Override
- public B opt(@Nonnull final String key, final long value) {
- mandatoryKeys.remove(key);
- optionalKeys.add(key);
- options.setLong(key, value);
- return getThisBuilder();
+ public final B opt(@Nonnull final String key, final long value) {
+ return optLong(key, value);
+ }
+
+ @Override
+ public B optLong(@Nonnull final String key, final long value) {
+ return opt(key, Long.toString(value));
}
/**
@@ -211,11 +210,8 @@ public abstract class
* @see #opt(String, String)
*/
@Override
- public B opt(@Nonnull final String key, float value) {
- mandatoryKeys.remove(key);
- optionalKeys.add(key);
- options.setFloat(key, value);
- return getThisBuilder();
+ public final B opt(@Nonnull final String key, float value) {
+ return optLong(key, (long) value);
}
/**
@@ -224,11 +220,18 @@ public abstract class
* @see #opt(String, String)
*/
@Override
- public B opt(@Nonnull final String key, double value) {
- mandatoryKeys.remove(key);
- optionalKeys.add(key);
- options.setDouble(key, value);
- return getThisBuilder();
+ public final B opt(@Nonnull final String key, double value) {
+ return optLong(key, (long) value);
+ }
+
+ /**
+ * Set optional double parameter for the Builder.
+ *
+ * @see #opt(String, String)
+ */
+ @Override
+ public B optDouble(@Nonnull final String key, double value) {
+ return opt(key, Double.toString(value));
}
/**
@@ -264,10 +267,22 @@ public abstract class
*/
@Override
public B must(@Nonnull final String key, boolean value) {
- mandatoryKeys.add(key);
- optionalKeys.remove(key);
- options.setBoolean(key, value);
- return getThisBuilder();
+ return must(key, Boolean.toString(value));
+ }
+
+ @Override
+ public B mustLong(@Nonnull final String key, final long value) {
+ return must(key, Long.toString(value));
+ }
+
+ /**
+ * Set optional double parameter for the Builder.
+ *
+ * @see #opt(String, String)
+ */
+ @Override
+ public B mustDouble(@Nonnull final String key, double value) {
+ return must(key, Double.toString(value));
}
/**
@@ -276,45 +291,23 @@ public abstract class
* @see #must(String, String)
*/
@Override
- public B must(@Nonnull final String key, int value) {
- mandatoryKeys.add(key);
- optionalKeys.remove(key);
- options.setInt(key, value);
- return getThisBuilder();
+ public final B must(@Nonnull final String key, int value) {
+ return mustLong(key, value);
}
@Override
- public B must(@Nonnull final String key, final long value) {
- mandatoryKeys.add(key);
- optionalKeys.remove(key);
- options.setLong(key, value);
- return getThisBuilder();
+ public final B must(@Nonnull final String key, final long value) {
+ return mustLong(key, value);
}
- /**
- * Set mandatory float option.
- *
- * @see #must(String, String)
- */
@Override
- public B must(@Nonnull final String key, float value) {
- mandatoryKeys.add(key);
- optionalKeys.remove(key);
- options.setFloat(key, value);
- return getThisBuilder();
+ public final B must(@Nonnull final String key, final float value) {
+ return mustLong(key, (long) value);
}
- /**
- * Set mandatory double option.
- *
- * @see #must(String, String)
- */
@Override
- public B must(@Nonnull final String key, double value) {
- mandatoryKeys.add(key);
- optionalKeys.remove(key);
- options.setDouble(key, value);
- return getThisBuilder();
+ public final B must(@Nonnull final String key, double value) {
+ return mustLong(key, (long) value);
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FSBuilderSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FSBuilderSupport.java
new file mode 100644
index 00000000000..dc4a18eb2b5
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FSBuilderSupport.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
+
+/**
+ * Class to help with use of FSBuilder.
+ */
+public class FSBuilderSupport {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FSBuilderSupport.class);
+
+ public static final LogExactlyOnce LOG_PARSE_ERROR = new LogExactlyOnce(LOG);
+
+ /**
+ * Options which are parsed.
+ */
+ private final Configuration options;
+
+ /**
+ * Constructor.
+ * @param options the configuration options from the builder.
+ */
+ public FSBuilderSupport(final Configuration options) {
+ this.options = options;
+ }
+
+ public Configuration getOptions() {
+ return options;
+ }
+
+ /**
+ * Get a long value with resilience to unparseable values.
+ * Negative values are replaced with the default.
+ * @param key key to log
+ * @param defVal default value
+ * @return long value
+ */
+ public long getPositiveLong(String key, long defVal) {
+ long l = getLong(key, defVal);
+ if (l < 0) {
+ LOG.debug("The option {} has a negative value {}, replacing with the default {}",
+ key, l, defVal);
+ l = defVal;
+ }
+ return l;
+ }
+
+ /**
+ * Get a long value with resilience to unparseable values.
+ * @param key key to log
+ * @param defVal default value
+ * @return long value
+ */
+ public long getLong(String key, long defVal) {
+ final String v = options.getTrimmed(key, "");
+ if (v.isEmpty()) {
+ return defVal;
+ }
+ try {
+ return options.getLong(key, defVal);
+ } catch (NumberFormatException e) {
+ final String msg = String.format(
+ "The option %s value \"%s\" is not a long integer; using the default value %s",
+ key, v, defVal);
+ // not a long,
+ LOG_PARSE_ERROR.warn(msg);
+ LOG.debug("{}", msg, e);
+ return defVal;
+ }
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
index da99ac21256..5e945ed8357 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
@@ -633,7 +633,7 @@ public class PathData implements Comparable {
return awaitFuture(fs.openFile(path)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
policy)
- .opt(FS_OPTION_OPENFILE_LENGTH,
+ .optLong(FS_OPTION_OPENFILE_LENGTH,
stat.getLen()) // file length hint for object stores
.build());
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
index a0b45814f1c..9d6727c159c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
@@ -2006,7 +2006,7 @@ public class SequenceFile {
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
if (length >= 0) {
- builder.opt(FS_OPTION_OPENFILE_LENGTH, length);
+ builder.optLong(FS_OPTION_OPENFILE_LENGTH, length);
}
return awaitFuture(builder.build());
}
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md
index 084c0eaff33..7bf6b16052b 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md
@@ -25,6 +25,55 @@ references to `FSDataInputStream` and its subclasses.
It is used to initate a (potentially asynchronous) operation to open an existing
file for reading.
+
+## History
+
+### Hadoop 3.3.0: API introduced
+
+[HADOOP-15229](https://issues.apache.org/jira/browse/HADOOP-15229)
+_Add FileSystem builder-based openFile() API to match createFile()_
+
+* No `opt(String key, long value)` method was available.
+* the `withFileStatus(status)` call required a non-null parameter.
+* Sole Filesystem to process options and file status was S3A;
+* Only the s3a specific options were the S3 select and `fs.s3a.experimental.input.fadvise`
+* S3A Filesystem raised `IllegalArgumentException` if a file status was passed in
+ and the path of the filestatus did not match the path of the `openFile(path)` call.
+
+This is the baseline implementation. To write code guaranteed to compile against this version,
+use the `opt(String, String)` and `must(String, String)` methods, converting numbers to
+string explicitly.
+
+```java
+fs.open("s3a://bucket/file")
+ .opt("fs.option.openfile.length", Long.toString(length))
+ .build().get()
+```
+
+### Hadoop 3.3.5: standardization and expansion
+
+[HADOOP-16202](https://issues.apache.org/jira/browse/HADOOP-16202)
+_Enhance openFile() for better read performance against object stores_
+
+* `withFileStatus(null)` required to be accepted (and ignored)
+* only the filename part of any supplied FileStatus path must match the
+ filename passed in on `openFile(path)`.
+* An `opt(String key, long value)` option was added. *This is now deprecated as it
+caused regression
+* Standard `fs.option.openfile` options defined.
+* S3A FS to use openfile length option, seek start/end options not _yet_ used.
+* Azure ABFS connector takes a supplied `VersionedFileStatus` and omits any
+ HEAD probe for the object.
+
+### Hadoop 3.3.6: API change to address operator overload bugs.
+
+new `optLong()`, `optDouble()`, `mustLong()` and `mustDouble()` builder methods.
+
+* See [HADOOP-18724](https://issues.apache.org/jira/browse/HADOOP-18724) _Open file fails with NumberFormatException for S3AFileSystem_,
+ which was somehow caused by the overloaded `opt(long)`.
+* Specification updated to declare that unparseable numbers MUST be treated as "unset" and the default
+ value used instead.
+
## Invariants
The `FutureDataInputStreamBuilder` interface does not require parameters or
@@ -36,7 +85,7 @@ Some aspects of the state of the filesystem, MAY be checked in the initial
change between `openFile()` and the `build().get()` sequence. For example,
path validation.
-## Implementation-agnostic parameters.
+## `Implementation-agnostic parameters.
### `FutureDataInputStreamBuilder bufferSize(int bufSize)`
@@ -89,10 +138,20 @@ operations. This is to support wrapper filesystems and serialization/deserializa
of the status.
-### Set optional or mandatory parameters
+### Set optional or mandatory parameters
- FutureDataInputStreamBuilder opt(String key, ...)
- FutureDataInputStreamBuilder must(String key, ...)
+```java
+FutureDataInputStreamBuilder opt(String key, String value)
+FutureDataInputStreamBuilder opt(String key, int value)
+FutureDataInputStreamBuilder opt(String key, boolean value)
+FutureDataInputStreamBuilder optLong(String key, long value)
+FutureDataInputStreamBuilder optDouble(String key, double value)
+FutureDataInputStreamBuilder must(String key, String value)
+FutureDataInputStreamBuilder must(String key, int value)
+FutureDataInputStreamBuilder must(String key, boolean value)
+FutureDataInputStreamBuilder mustLong(String key, long value)
+FutureDataInputStreamBuilder mustDouble(String key, double value)
+```
Set optional or mandatory parameters to the builder. Using `opt()` or `must()`,
client can specify FS-specific parameters without inspecting the concrete type
@@ -103,7 +162,7 @@ Example:
```java
out = fs.openFile(path)
.must("fs.option.openfile.read.policy", "random")
- .opt("fs.http.connection.timeout", 30_000L)
+ .optLong("fs.http.connection.timeout", 30_000L)
.withFileStatus(statusFromListing)
.build()
.get();
@@ -115,9 +174,9 @@ An http-specific option has been supplied which may be interpreted by any store;
If the filesystem opening the file does not recognize the option, it can safely be
ignored.
-### When to use `opt()` versus `must()`
+### When to use `opt` versus `must`
-The difference between `opt()` versus `must()` is how the FileSystem opening
+The difference between `opt` versus `must` is how the FileSystem opening
the file must react to an option which it does not recognize.
```python
@@ -144,7 +203,7 @@ irrespective of how the (key, value) pair was declared.
defined in this filesystem specification, validated through contract
tests.
-#### Implementation Notes
+## Implementation Notes
Checking for supported options must be performed in the `build()` operation.
@@ -155,6 +214,13 @@ Checking for supported options must be performed in the `build()` operation.
a feature which is recognized but not supported in the specific
`FileSystem`/`FileContext` instance `UnsupportedException` MUST be thrown.
+Parsing of numeric values SHOULD trim any string and if the value
+cannot be parsed as a number, downgrade to any default value supplied.
+This is to address [HADOOP-18724](https://issues.apache.org/jira/browse/HADOOP-18724)
+_Open file fails with NumberFormatException for S3AFileSystem_, which was cause by the overloaded `opt()`
+builder parameter binding to `opt(String, double)` rather than `opt(String, long)` when a long
+value was passed in.
+
The behavior of resolving the conflicts between the parameters set by
builder methods (i.e., `bufferSize()`) and `opt()`/`must()` is as follows:
@@ -181,7 +247,7 @@ Even if not values of the status are used, the presence of the argument
can be interpreted as the caller declaring that they believe the file
to be present and of the given size.
-## Builder interface
+## Builder interface
### `CompletableFuture build()`
@@ -339,7 +405,7 @@ _Futher reading_
* [Linux fadvise()](https://linux.die.net/man/2/fadvise).
* [Windows `CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior)
-#### Read Policy `adaptive`
+#### Read Policy `adaptive`
Try to adapt the seek policy to the read pattern of the application.
@@ -429,7 +495,7 @@ If this option is used by the FileSystem implementation
*Implementor's Notes*
-* A value of `fs.option.openfile.length` < 0 MUST be rejected.
+* A value of `fs.option.openfile.length` < 0 MUST be ignored.
* If a file status is supplied along with a value in `fs.opt.openfile.length`;
the file status values take precedence.
@@ -466,11 +532,11 @@ than that value.
The S3A Connector supports custom options for readahead and seek policy.
-| Name | Type | Meaning |
-|--------------------------------------|----------|-------------------------------------------------------------|
-| `fs.s3a.readahead.range` | `long` | readahead range in bytes |
-| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to asynchronous draining of the stream |
-| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` |
+| Name | Type | Meaning |
+|--------------------------------------|----------|---------------------------------------------------------------------------|
+| `fs.s3a.readahead.range` | `long` | readahead range in bytes |
+| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` |
+| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to asynchronous draining of the stream. (Since 3.3.5) |
If the option set contains a SQL statement in the `fs.s3a.select.sql` statement,
then the file is opened as an S3 Select query.
@@ -510,8 +576,8 @@ protected SeekableInputStream newStream(Path path, FileStatus stat,
.opt("fs.option.openfile.read.policy", "vector, random")
.withFileStatus(stat);
- builder.opt("fs.option.openfile.split.start", splitStart);
- builder.opt("fs.option.openfile.split.end", splitEnd);
+ builder.optLong("fs.option.openfile.split.start", splitStart);
+ builder.optLong("fs.option.openfile.split.end", splitEnd);
CompletableFuture streamF = builder.build();
return HadoopStreams.wrap(FutureIO.awaitFuture(streamF));
}
@@ -618,8 +684,8 @@ An example of a record reader passing in options to the file it opens.
file.getFileSystem(job).openFile(file);
// the start and end of the split may be used to build
// an input strategy.
- builder.opt("fs.option.openfile.split.start", start);
- builder.opt("fs.option.openfile.split.end", end);
+ builder.optLong("fs.option.openfile.split.start", start);
+ builder.optLong("fs.option.openfile.split.end", end);
FutureIO.propagateOptions(builder, job,
"mapreduce.job.input.file.option",
"mapreduce.job.input.file.must");
@@ -633,7 +699,7 @@ An example of a record reader passing in options to the file it opens.
### `FileContext.openFile`
From `org.apache.hadoop.fs.AvroFSInput`; a file is opened with sequential input.
-Because the file length has already been probed for, the length is passd down
+Because the file length has already been probed for, the length is passed down
```java
public AvroFSInput(FileContext fc, Path p) throws IOException {
@@ -642,7 +708,7 @@ Because the file length has already been probed for, the length is passd down
this.stream = awaitFuture(fc.openFile(p)
.opt("fs.option.openfile.read.policy",
"sequential")
- .opt("fs.option.openfile.length",
+ .optLong("fs.option.openfile.length",
Long.toString(status.getLen()))
.build());
fc.open(p);
@@ -682,8 +748,3 @@ public T load(FileSystem fs,
}
}
```
-
-*Note:* : in Hadoop 3.3.2 and earlier, the `withFileStatus(status)` call
-required a non-null parameter; this has since been relaxed.
-For maximum compatibility across versions, only invoke the method
-when the file status is known to be non-null.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
index 25bfe082b01..3598d33680e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
@@ -43,6 +43,7 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
/**
@@ -186,7 +187,7 @@ public abstract class AbstractContractOpenTest
@Test
public void testOpenFileReadZeroByte() throws Throwable {
- describe("create & read a 0 byte file through the builders");
+ describe("create & read a 0 byte file through the builders; use a negative length");
Path path = path("zero.txt");
FileSystem fs = getFileSystem();
fs.createFile(path).overwrite(true).build().close();
@@ -194,6 +195,7 @@ public abstract class AbstractContractOpenTest
.opt("fs.test.something", true)
.opt("fs.test.something2", 3)
.opt("fs.test.something3", "3")
+ .optLong(FS_OPTION_OPENFILE_LENGTH, -1L)
.build().get()) {
assertMinusOne("initial byte read", is.read());
}
@@ -210,6 +212,17 @@ public abstract class AbstractContractOpenTest
() -> builder.build());
}
+ @Test
+ public void testOpenFileUnknownOptionLong() throws Throwable {
+ describe("calling openFile fails when a 'must()' option is unknown");
+ FutureDataInputStreamBuilder builder =
+ getFileSystem().openFile(path("testOpenFileUnknownOption"))
+ .optLong("fs.test.something", 1L)
+ .mustLong("fs.test.something2", 1L);
+ intercept(IllegalArgumentException.class,
+ () -> builder.build());
+ }
+
@Test
public void testOpenFileLazyFail() throws Throwable {
describe("openFile fails on a missing file in the get() and not before");
@@ -320,16 +333,22 @@ public abstract class AbstractContractOpenTest
describe("verify that async accept callbacks are evaluated");
Path path = path("testOpenFileApplyAsyncRead");
FileSystem fs = getFileSystem();
+ final int len = 512;
createFile(fs, path, true,
- dataset(4, 0x40, 0x80));
- CompletableFuture future = fs.openFile(path).build();
+ dataset(len, 0x40, 0x80));
+ CompletableFuture future = fs.openFile(path)
+ .mustDouble(FS_OPTION_OPENFILE_LENGTH, 43.2e60) // pass in a double
+ .build();
AtomicBoolean accepted = new AtomicBoolean(false);
- future.thenApply(stream -> {
+ final Long bytes = future.thenApply(stream -> {
accepted.set(true);
- return stream;
- }).get().close();
+ return ContractTestUtils.readStream(stream);
+ }).get();
assertTrue("async accept operation not invoked",
accepted.get());
+ Assertions.assertThat(bytes)
+ .describedAs("bytes read from stream")
+ .isEqualTo(len);
}
/**
@@ -357,8 +376,8 @@ public abstract class AbstractContractOpenTest
.withFileStatus(null)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
"unknown, sequential, random")
- .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768)
- .opt(FS_OPTION_OPENFILE_LENGTH, len)
+ .optLong(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768)
+ .optLong(FS_OPTION_OPENFILE_LENGTH, len)
.build();
try (FSDataInputStream in = future.get()) {
@@ -367,4 +386,26 @@ public abstract class AbstractContractOpenTest
compareByteArrays(dataset, result, len);
}
+ /**
+ * open a file with a length set as a double; verifies resilience
+ * of the parser.
+ */
+ @Test
+ public void testFloatingPointLength() throws Throwable {
+ describe("Open file with a length");
+ Path path = methodPath();
+ FileSystem fs = getFileSystem();
+ int len = 4096;
+ createFile(fs, path, true,
+ dataset(len, 0x40, 0x80));
+ final Long l = fs.openFile(path)
+ .mustDouble(FS_OPTION_OPENFILE_LENGTH, len)
+ .build()
+ .thenApply(ContractTestUtils::readStream)
+ .get();
+ Assertions.assertThat(l)
+ .describedAs("bytes read from file %s", path)
+ .isEqualTo(len);
+ }
+
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestFSBuilderSupport.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestFSBuilderSupport.java
new file mode 100644
index 00000000000..20172ccfe16
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestFSBuilderSupport.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
+import org.apache.hadoop.fs.impl.FSBuilderSupport;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test builder support, forwarding of opt double/float to long,
+ * resilience.
+ */
+@SuppressWarnings("deprecation")
+public class TestFSBuilderSupport extends AbstractHadoopTestBase {
+
+ @Test
+ public void testOptFloatDoubleForwardsToLong() throws Throwable {
+ FSBuilderSupport c = builder()
+ .opt("f", 1.8f)
+ .opt("d", 2.0e3)
+ .build();
+ assertThat(c.getLong("f", 2))
+ .isEqualTo(1);
+ assertThat(c.getLong("d", 2))
+ .isEqualTo(2000);
+ }
+
+ @Test
+ public void testMustFloatDoubleForwardsToLong() throws Throwable {
+ FSBuilderSupport c = builder()
+ .must("f", 1.8f)
+ .must("d", 2.0e3)
+ .build();
+ assertThat(c.getLong("f", 2))
+ .isEqualTo(1);
+ assertThat(c.getLong("d", 2))
+ .isEqualTo(2000);
+ }
+
+ @Test
+ public void testLongOptStillWorks() throws Throwable {
+ FSBuilderSupport c = builder()
+ .opt("o", 1L)
+ .must("m", 1L)
+ .build();
+ assertThat(c.getLong("o", 2))
+ .isEqualTo(1L);
+ assertThat(c.getLong("m", 2))
+ .isEqualTo(1L);
+ }
+
+ @Test
+ public void testFloatParseFallback() throws Throwable {
+ FSBuilderSupport c = builder()
+ .opt("f", "1.8f")
+ .opt("d", "1.8e20")
+ .build();
+
+ assertThat(c.getLong("f", 2))
+ .isEqualTo(2);
+ assertThat(c.getLong("d", 2))
+ .isEqualTo(2);
+ }
+
+ @Test
+ public void testNegatives() throws Throwable {
+ FSBuilderSupport c = builder()
+ .optLong("-1", -1)
+ .mustLong("-2", -2)
+ .build();
+
+ // getLong gets the long value
+ assertThat(c.getLong("-1", 2))
+ .isEqualTo(-1);
+
+
+ // but getPositiveLong returns the positive default
+ assertThat(c.getPositiveLong("-1", 2))
+ .isEqualTo(2);
+ }
+
+ @Test
+ public void testBoolean() throws Throwable {
+ final FSBuilderSupport c = builder()
+ .opt("f", false)
+ .opt("t", true)
+ .opt("o", "other")
+ .build();
+ assertThat(c.getOptions().getBoolean("f", true))
+ .isEqualTo(false);
+ assertThat(c.getOptions().getBoolean("t", false))
+ .isEqualTo(true);
+ // this is handled in Configuration itself.
+ assertThat(c.getOptions().getBoolean("o", true))
+ .isEqualTo(true);
+ }
+
+ private SimpleBuilder builder() {
+ return new BuilderImpl();
+ }
+
+ private interface SimpleBuilder
+ extends FSBuilder {
+ }
+
+ private static final class BuilderImpl
+ extends AbstractFSBuilderImpl
+ implements SimpleBuilder {
+
+ private BuilderImpl() {
+ super(new Path("/"));
+ }
+
+ @Override
+ public FSBuilderSupport build()
+ throws IOException {
+ return new FSBuilderSupport(getOptions());
+ }
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
index 5724e729310..ab63c199f2f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
@@ -114,8 +114,8 @@ public class LineRecordReader implements RecordReader {
file.getFileSystem(job).openFile(file);
// the start and end of the split may be used to build
// an input strategy.
- builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start)
- .opt(FS_OPTION_OPENFILE_SPLIT_END, end);
+ builder.optLong(FS_OPTION_OPENFILE_SPLIT_START, start)
+ .optLong(FS_OPTION_OPENFILE_SPLIT_END, end);
FutureIO.propagateOptions(builder, job,
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
index 617abaacae0..089208841fd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
@@ -92,8 +92,8 @@ public class LineRecordReader extends RecordReader {
file.getFileSystem(job).openFile(file);
// the start and end of the split may be used to build
// an input strategy.
- builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start);
- builder.opt(FS_OPTION_OPENFILE_SPLIT_END, end);
+ builder.optLong(FS_OPTION_OPENFILE_SPLIT_START, start);
+ builder.optLong(FS_OPTION_OPENFILE_SPLIT_END, end);
FutureIO.propagateOptions(builder, job,
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java
index f284a9c3807..3ce6936c3d7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java
@@ -236,8 +236,8 @@ public class TeraInputFormat extends FileInputFormat {
offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
length = ((FileSplit)split).getLength();
final FutureDataInputStreamBuilder builder = fs.openFile(p)
- .opt(FS_OPTION_OPENFILE_SPLIT_START, start)
- .opt(FS_OPTION_OPENFILE_SPLIT_END, start + length)
+ .optLong(FS_OPTION_OPENFILE_SPLIT_START, start)
+ .optLong(FS_OPTION_OPENFILE_SPLIT_END, start + length)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
in = FutureIO.awaitFuture(builder.build());
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java
index 70a99d5318c..4703d635672 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.FSBuilderSupport;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
@@ -246,12 +247,14 @@ public class OpenFileSupport {
// set the end of the read to the file length
fileLength = fileStatus.getLen();
}
+ FSBuilderSupport builderSupport = new FSBuilderSupport(options);
// determine start and end of file.
- long splitStart = options.getLong(FS_OPTION_OPENFILE_SPLIT_START, 0);
+ long splitStart = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_SPLIT_START, 0);
// split end
- long splitEnd = options.getLong(FS_OPTION_OPENFILE_SPLIT_END,
- LENGTH_UNKNOWN);
+ long splitEnd = builderSupport.getLong(
+ FS_OPTION_OPENFILE_SPLIT_END, LENGTH_UNKNOWN);
+
if (splitStart > 0 && splitStart > splitEnd) {
LOG.warn("Split start {} is greater than split end {}, resetting",
splitStart, splitEnd);
@@ -259,7 +262,7 @@ public class OpenFileSupport {
}
// read end is the open file value
- fileLength = options.getLong(FS_OPTION_OPENFILE_LENGTH, fileLength);
+ fileLength = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_LENGTH, fileLength);
// if the read end has come from options, use that
// in creating a file status
@@ -281,16 +284,17 @@ public class OpenFileSupport {
.withS3Select(isSelect)
.withSql(sql)
.withAsyncDrainThreshold(
- options.getLong(ASYNC_DRAIN_THRESHOLD,
+ builderSupport.getPositiveLong(ASYNC_DRAIN_THRESHOLD,
defaultReadAhead))
.withBufferSize(
- options.getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, defaultBufferSize))
+ (int)builderSupport.getPositiveLong(
+ FS_OPTION_OPENFILE_BUFFER_SIZE, defaultBufferSize))
.withChangePolicy(changePolicy)
.withFileLength(fileLength)
.withInputPolicy(
S3AInputPolicy.getFirstSupportedPolicy(policies, defaultInputPolicy))
.withReadAheadRange(
- options.getLong(READAHEAD_RANGE, defaultReadAhead))
+ builderSupport.getPositiveLong(READAHEAD_RANGE, defaultReadAhead))
.withSplitStart(splitStart)
.withSplitEnd(splitEnd)
.withStatus(fileStatus)
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
index b0ee531112b..4aae84dca8e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -141,7 +141,7 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
fs.openFile(testFile)
.must(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
- .opt(FS_OPTION_OPENFILE_LENGTH, shortLen)
+ .mustLong(FS_OPTION_OPENFILE_LENGTH, shortLen)
.build()
.get(),
always(NO_HEAD_OR_LIST),
@@ -183,7 +183,7 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
fs.openFile(testFile)
.must(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
- .must(FS_OPTION_OPENFILE_LENGTH, longLen)
+ .mustLong(FS_OPTION_OPENFILE_LENGTH, longLen)
.build()
.get(),
always(NO_HEAD_OR_LIST));
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
index a03f181cb38..fb75560a807 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
@@ -160,7 +160,7 @@ public class ITestUnbufferDraining extends AbstractS3ACostTest {
int offset = FILE_SIZE - READAHEAD + 1;
try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
.withFileStatus(st)
- .must(ASYNC_DRAIN_THRESHOLD, 1)
+ .mustLong(ASYNC_DRAIN_THRESHOLD, 1)
.build().get()) {
describe("Initiating unbuffer with async drain\n");
for (int i = 0; i < ATTEMPTS; i++) {
@@ -235,9 +235,11 @@ public class ITestUnbufferDraining extends AbstractS3ACostTest {
// open the file at the beginning with a whole file read policy,
// so even with s3a switching to random on unbuffer,
// this always does a full GET
+ // also provide a floating point string for the threshold, to
+ // verify it is safely parsed
try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
.withFileStatus(st)
- .must(ASYNC_DRAIN_THRESHOLD, 1)
+ .must(ASYNC_DRAIN_THRESHOLD, "1.0")
.must(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.build().get()) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
index c4949375b76..200b1fc282b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -572,8 +572,8 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
byte[] readFullRes;
IOStatistics sequentialIOStats, vectorIOStats;
try (FSDataInputStream in = fs.openFile(hugefile)
- .opt(FS_OPTION_OPENFILE_LENGTH, validateSize) // lets us actually force a shorter read
- .opt(FS_OPTION_OPENFILE_SPLIT_START, 0)
+ .optLong(FS_OPTION_OPENFILE_LENGTH, validateSize) // lets us actually force a shorter read
+ .optLong(FS_OPTION_OPENFILE_SPLIT_START, 0)
.opt(FS_OPTION_OPENFILE_SPLIT_END, validateSize)
.opt(FS_OPTION_OPENFILE_READ_POLICY, "sequential")
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize)
@@ -587,7 +587,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
// now do a vector IO read
try (FSDataInputStream in = fs.openFile(hugefile)
- .opt(FS_OPTION_OPENFILE_LENGTH, filesize)
+ .optLong(FS_OPTION_OPENFILE_LENGTH, filesize)
.opt(FS_OPTION_OPENFILE_READ_POLICY, "vector, random")
.build().get();
DurationInfo ignored = new DurationInfo(LOG, "Vector Read")) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
index eea70ced13c..fb9988b29a5 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
@@ -219,11 +219,10 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
final FutureDataInputStreamBuilder builder = fs.openFile(path)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
inputPolicy.toString())
- .opt(FS_OPTION_OPENFILE_LENGTH, length)
- .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
- if (readahead > 0) {
- builder.opt(READAHEAD_RANGE, readahead);
- }
+ .optLong(FS_OPTION_OPENFILE_LENGTH, length)
+ .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize)
+ .optLong(READAHEAD_RANGE, readahead);
+
FSDataInputStream stream = awaitFuture(builder.build());
streamStatistics = getInputStreamStatistics(stream);
return stream;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index babbf791e96..3a503ddfa2b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Stack;
@@ -93,6 +92,7 @@ import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.azure.NativeAzureFileSystemHelper.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -3149,7 +3149,7 @@ public class NativeAzureFileSystem extends FileSystem {
OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
- Collections.emptySet(),
+ FS_OPTION_OPENFILE_STANDARD_OPTIONS,
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () ->
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index bb9ecdd51a6..5fb2c6e1700 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -31,7 +31,6 @@ import java.time.Duration;
import java.util.Hashtable;
import java.util.List;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Optional;
@@ -112,6 +111,7 @@ import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
@@ -296,7 +296,7 @@ public class AzureBlobFileSystem extends FileSystem
LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path);
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
- Collections.emptySet(),
+ FS_OPTION_OPENFILE_STANDARD_OPTIONS,
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () ->
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index fc0b71e6a42..477a8a293ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -587,7 +587,7 @@ public class AggregatedLogFormat {
fileContext.openFile(remoteAppLogFile)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
- .opt(FS_OPTION_OPENFILE_LENGTH,
+ .optLong(FS_OPTION_OPENFILE_LENGTH,
status.getLen()) // file length hint for object stores
.build());
reader = new TFile.Reader(this.fsDataIStream,