From ab594ec77eae9c6f2b29975a2139a4e637458d0e Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 16 May 2023 13:41:17 +0100 Subject: [PATCH] HADOOP-18724. Open file fails with NumberFormatException for S3AFileSystem (#5611) This: 1. Adds optLong, optDouble, mustLong and mustDouble methods to the FSBuilder interface to let callers explicitly passin long and double arguments. 2. The opt() and must() builder calls which take float/double values now only set long values instead, so as to avoid problems related to overloaded methods resulting in a ".0" being appended to a long value. 3. All of the relevant opt/must calls in the hadoop codebase move to the new methods 4. And the s3a code is resilient to parse errors in is numeric options -it will downgrade to the default. This is nominally incompatible, but the floating-point builder methods were never used: nothing currently expects floating point numbers. For anyone who wants to safely set numeric builder options across all compatible releases, convert the number to a string and then use the opt(String, String) and must(String, String) methods. Contributed by Steve Loughran --- .../java/org/apache/hadoop/fs/FSBuilder.java | 96 +++++++++++- .../org/apache/hadoop/fs/FileContext.java | 2 +- .../java/org/apache/hadoop/fs/FileUtil.java | 2 +- .../hadoop/fs/impl/AbstractFSBuilderImpl.java | 117 +++++++------- .../hadoop/fs/impl/FSBuilderSupport.java | 95 ++++++++++++ .../org/apache/hadoop/fs/shell/PathData.java | 2 +- .../org/apache/hadoop/io/SequenceFile.java | 2 +- .../filesystem/fsdatainputstreambuilder.md | 115 ++++++++++---- .../fs/contract/AbstractContractOpenTest.java | 57 ++++++- .../hadoop/fs/store/TestFSBuilderSupport.java | 144 ++++++++++++++++++ .../hadoop/mapred/LineRecordReader.java | 4 +- .../mapreduce/lib/input/LineRecordReader.java | 4 +- .../examples/terasort/TeraInputFormat.java | 4 +- .../hadoop/fs/s3a/impl/OpenFileSupport.java | 18 ++- .../fs/s3a/performance/ITestS3AOpenCost.java | 4 +- .../performance/ITestUnbufferDraining.java | 6 +- .../scale/ITestS3AInputStreamPerformance.java | 9 +- .../fs/azure/NativeAzureFileSystem.java | 4 +- .../fs/azurebfs/AzureBlobFileSystem.java | 4 +- .../logaggregation/AggregatedLogFormat.java | 2 +- 20 files changed, 556 insertions(+), 135 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FSBuilderSupport.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestFSBuilderSupport.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java index 56ef51f128d..81a14e97be5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java @@ -28,6 +28,34 @@ * The base interface which various FileSystem FileContext Builder * interfaces can extend, and which underlying implementations * will then implement. + *

+ * HADOOP-16202 expanded the opt() and must() arguments with + * operator overloading, but HADOOP-18724 identified mapping problems: + * passing a long value in to {@code opt()} could end up invoking + * {@code opt(string, double)}, which could then trigger parse failures. + *

+ * To fix this without forcing existing code to break/be recompiled. + *

    + *
  1. A new method to explicitly set a long value is added: + * {@link #optLong(String, long)} + *
  2. + *
  3. A new method to explicitly set a double value is added: + * {@link #optLong(String, long)} + *
  4. + *
  5. + * All of {@link #opt(String, long)}, {@link #opt(String, float)} and + * {@link #opt(String, double)} invoke {@link #optLong(String, long)}. + *
  6. + *
  7. + * The same changes have been applied to {@code must()} methods. + *
  8. + *
+ * The forwarding of existing double/float setters to the long setters ensure + * that existing code will link, but are guaranteed to always set a long value. + * If you need to write code which works correctly with all hadoop releases, + * covert the option to a string explicitly and then call {@link #opt(String, String)} + * or {@link #must(String, String)} as appropriate. + * * @param Return type on the {@link #build()} call. * @param type of builder itself. */ @@ -63,13 +91,17 @@ public interface FSBuilder> { B opt(@Nonnull String key, int value); /** - * Set optional float parameter for the Builder. + * This parameter is converted to a long and passed + * to {@link #optLong(String, long)} -all + * decimal precision is lost. * * @param key key. * @param value value. * @return generic type B. * @see #opt(String, String) + * @deprecated use {@link #optDouble(String, double)} */ + @Deprecated B opt(@Nonnull String key, float value); /** @@ -78,18 +110,22 @@ public interface FSBuilder> { * @param key key. * @param value value. * @return generic type B. - * @see #opt(String, String) + * @deprecated use {@link #optLong(String, long)} where possible. */ B opt(@Nonnull String key, long value); /** - * Set optional double parameter for the Builder. - * + * Pass an optional double parameter for the Builder. + * This parameter is converted to a long and passed + * to {@link #optLong(String, long)} -all + * decimal precision is lost. * @param key key. * @param value value. * @return generic type B. * @see #opt(String, String) + * @deprecated use {@link #optDouble(String, double)} */ + @Deprecated B opt(@Nonnull String key, double value); /** @@ -102,6 +138,26 @@ public interface FSBuilder> { */ B opt(@Nonnull String key, @Nonnull String... values); + /** + * Set optional long parameter for the Builder. + * + * @param key key. + * @param value value. + * @return generic type B. + * @see #opt(String, String) + */ + B optLong(@Nonnull String key, long value); + + /** + * Set optional double parameter for the Builder. + * + * @param key key. + * @param value value. + * @return generic type B. + * @see #opt(String, String) + */ + B optDouble(@Nonnull String key, double value); + /** * Set mandatory option to the Builder. * @@ -135,13 +191,16 @@ public interface FSBuilder> { B must(@Nonnull String key, int value); /** - * Set mandatory float option. + * This parameter is converted to a long and passed + * to {@link #mustLong(String, long)} -all + * decimal precision is lost. * * @param key key. * @param value value. * @return generic type B. - * @see #must(String, String) + * @deprecated use {@link #mustDouble(String, double)} to set floating point. */ + @Deprecated B must(@Nonnull String key, float value); /** @@ -152,16 +211,19 @@ public interface FSBuilder> { * @return generic type B. * @see #must(String, String) */ + @Deprecated B must(@Nonnull String key, long value); /** - * Set mandatory double option. + * Set mandatory long option, despite passing in a floating + * point value. * * @param key key. * @param value value. * @return generic type B. * @see #must(String, String) */ + @Deprecated B must(@Nonnull String key, double value); /** @@ -174,6 +236,26 @@ public interface FSBuilder> { */ B must(@Nonnull String key, @Nonnull String... values); + /** + * Set mandatory long parameter for the Builder. + * + * @param key key. + * @param value value. + * @return generic type B. + * @see #opt(String, String) + */ + B mustLong(@Nonnull String key, long value); + + /** + * Set mandatory double parameter for the Builder. + * + * @param key key. + * @param value value. + * @return generic type B. + * @see #opt(String, String) + */ + B mustDouble(@Nonnull String key, double value); + /** * Instantiate the object which was being built. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index ed467a1890b..5601f166abe 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -2237,7 +2237,7 @@ public boolean copy(final Path src, final Path dst, boolean deleteSource, InputStream in = awaitFuture(openFile(qSrc) .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) - .opt(FS_OPTION_OPENFILE_LENGTH, + .optLong(FS_OPTION_OPENFILE_LENGTH, fs.getLen()) // file length hint for object stores .build()); try (OutputStream out = create(qDst, createFlag)) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java index 6767e0dfbe4..933f5692774 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java @@ -483,7 +483,7 @@ public static boolean copy(FileSystem srcFS, FileStatus srcStatus, in = awaitFuture(srcFS.openFile(src) .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) - .opt(FS_OPTION_OPENFILE_LENGTH, + .optLong(FS_OPTION_OPENFILE_LENGTH, srcStatus.getLen()) // file length hint for object stores .build()); out = dstFS.create(dst, overwrite); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java index 774dfeb5fa0..47cf98f4746 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java @@ -44,11 +44,13 @@ * with option support. * * - * .opt("foofs:option.a", true) - * .opt("foofs:option.b", "value") + * .opt("fs.s3a.open.option.caching", true) + * .opt("fs.option.openfile.read.policy", "random, adaptive") * .opt("fs.s3a.open.option.etag", "9fe4c37c25b") - * .must("foofs:cache", true) - * .must("barfs:cache-size", 256 * 1024 * 1024) + * .optLong("fs.option.openfile.length", 1_500_000_000_000) + * .must("fs.option.openfile.buffer.size", 256_000) + * .mustLong("fs.option.openfile.split.start", 256_000_000) + * .mustLong("fs.option.openfile.split.end", 512_000_000) * .build(); * * @@ -64,6 +66,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Unstable +@SuppressWarnings({"deprecation", "unused"}) public abstract class AbstractFSBuilderImpl> implements FSBuilder { @@ -178,10 +181,7 @@ public B opt(@Nonnull final String key, @Nonnull final String value) { */ @Override public B opt(@Nonnull final String key, boolean value) { - mandatoryKeys.remove(key); - optionalKeys.add(key); - options.setBoolean(key, value); - return getThisBuilder(); + return opt(key, Boolean.toString(value)); } /** @@ -190,19 +190,18 @@ public B opt(@Nonnull final String key, boolean value) { * @see #opt(String, String) */ @Override - public B opt(@Nonnull final String key, int value) { - mandatoryKeys.remove(key); - optionalKeys.add(key); - options.setInt(key, value); - return getThisBuilder(); + public final B opt(@Nonnull final String key, int value) { + return optLong(key, value); } @Override - public B opt(@Nonnull final String key, final long value) { - mandatoryKeys.remove(key); - optionalKeys.add(key); - options.setLong(key, value); - return getThisBuilder(); + public final B opt(@Nonnull final String key, final long value) { + return optLong(key, value); + } + + @Override + public B optLong(@Nonnull final String key, final long value) { + return opt(key, Long.toString(value)); } /** @@ -211,11 +210,8 @@ public B opt(@Nonnull final String key, final long value) { * @see #opt(String, String) */ @Override - public B opt(@Nonnull final String key, float value) { - mandatoryKeys.remove(key); - optionalKeys.add(key); - options.setFloat(key, value); - return getThisBuilder(); + public final B opt(@Nonnull final String key, float value) { + return optLong(key, (long) value); } /** @@ -224,11 +220,18 @@ public B opt(@Nonnull final String key, float value) { * @see #opt(String, String) */ @Override - public B opt(@Nonnull final String key, double value) { - mandatoryKeys.remove(key); - optionalKeys.add(key); - options.setDouble(key, value); - return getThisBuilder(); + public final B opt(@Nonnull final String key, double value) { + return optLong(key, (long) value); + } + + /** + * Set optional double parameter for the Builder. + * + * @see #opt(String, String) + */ + @Override + public B optDouble(@Nonnull final String key, double value) { + return opt(key, Double.toString(value)); } /** @@ -264,10 +267,22 @@ public B must(@Nonnull final String key, @Nonnull final String value) { */ @Override public B must(@Nonnull final String key, boolean value) { - mandatoryKeys.add(key); - optionalKeys.remove(key); - options.setBoolean(key, value); - return getThisBuilder(); + return must(key, Boolean.toString(value)); + } + + @Override + public B mustLong(@Nonnull final String key, final long value) { + return must(key, Long.toString(value)); + } + + /** + * Set optional double parameter for the Builder. + * + * @see #opt(String, String) + */ + @Override + public B mustDouble(@Nonnull final String key, double value) { + return must(key, Double.toString(value)); } /** @@ -276,45 +291,23 @@ public B must(@Nonnull final String key, boolean value) { * @see #must(String, String) */ @Override - public B must(@Nonnull final String key, int value) { - mandatoryKeys.add(key); - optionalKeys.remove(key); - options.setInt(key, value); - return getThisBuilder(); + public final B must(@Nonnull final String key, int value) { + return mustLong(key, value); } @Override - public B must(@Nonnull final String key, final long value) { - mandatoryKeys.add(key); - optionalKeys.remove(key); - options.setLong(key, value); - return getThisBuilder(); + public final B must(@Nonnull final String key, final long value) { + return mustLong(key, value); } - /** - * Set mandatory float option. - * - * @see #must(String, String) - */ @Override - public B must(@Nonnull final String key, float value) { - mandatoryKeys.add(key); - optionalKeys.remove(key); - options.setFloat(key, value); - return getThisBuilder(); + public final B must(@Nonnull final String key, final float value) { + return mustLong(key, (long) value); } - /** - * Set mandatory double option. - * - * @see #must(String, String) - */ @Override - public B must(@Nonnull final String key, double value) { - mandatoryKeys.add(key); - optionalKeys.remove(key); - options.setDouble(key, value); - return getThisBuilder(); + public final B must(@Nonnull final String key, double value) { + return mustLong(key, (long) value); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FSBuilderSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FSBuilderSupport.java new file mode 100644 index 00000000000..dc4a18eb2b5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FSBuilderSupport.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.store.LogExactlyOnce; + +/** + * Class to help with use of FSBuilder. + */ +public class FSBuilderSupport { + + private static final Logger LOG = + LoggerFactory.getLogger(FSBuilderSupport.class); + + public static final LogExactlyOnce LOG_PARSE_ERROR = new LogExactlyOnce(LOG); + + /** + * Options which are parsed. + */ + private final Configuration options; + + /** + * Constructor. + * @param options the configuration options from the builder. + */ + public FSBuilderSupport(final Configuration options) { + this.options = options; + } + + public Configuration getOptions() { + return options; + } + + /** + * Get a long value with resilience to unparseable values. + * Negative values are replaced with the default. + * @param key key to log + * @param defVal default value + * @return long value + */ + public long getPositiveLong(String key, long defVal) { + long l = getLong(key, defVal); + if (l < 0) { + LOG.debug("The option {} has a negative value {}, replacing with the default {}", + key, l, defVal); + l = defVal; + } + return l; + } + + /** + * Get a long value with resilience to unparseable values. + * @param key key to log + * @param defVal default value + * @return long value + */ + public long getLong(String key, long defVal) { + final String v = options.getTrimmed(key, ""); + if (v.isEmpty()) { + return defVal; + } + try { + return options.getLong(key, defVal); + } catch (NumberFormatException e) { + final String msg = String.format( + "The option %s value \"%s\" is not a long integer; using the default value %s", + key, v, defVal); + // not a long, + LOG_PARSE_ERROR.warn(msg); + LOG.debug("{}", msg, e); + return defVal; + } + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java index da99ac21256..5e945ed8357 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java @@ -633,7 +633,7 @@ protected FSDataInputStream openFile(final String policy) throws IOException { return awaitFuture(fs.openFile(path) .opt(FS_OPTION_OPENFILE_READ_POLICY, policy) - .opt(FS_OPTION_OPENFILE_LENGTH, + .optLong(FS_OPTION_OPENFILE_LENGTH, stat.getLen()) // file length hint for object stores .build()); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java index 56dbcee92b4..3807868e7bd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java @@ -2017,7 +2017,7 @@ protected FSDataInputStream openFile(FileSystem fs, Path file, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL) .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize); if (length >= 0) { - builder.opt(FS_OPTION_OPENFILE_LENGTH, length); + builder.optLong(FS_OPTION_OPENFILE_LENGTH, length); } return awaitFuture(builder.build()); } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md index db630e05c22..22bec19e5b4 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md @@ -25,6 +25,55 @@ references to `FSDataInputStream` and its subclasses. It is used to initate a (potentially asynchronous) operation to open an existing file for reading. + +## History + +### Hadoop 3.3.0: API introduced + +[HADOOP-15229](https://issues.apache.org/jira/browse/HADOOP-15229) +_Add FileSystem builder-based openFile() API to match createFile()_ + +* No `opt(String key, long value)` method was available. +* the `withFileStatus(status)` call required a non-null parameter. +* Sole Filesystem to process options and file status was S3A; +* Only the s3a specific options were the S3 select and `fs.s3a.experimental.input.fadvise` +* S3A Filesystem raised `IllegalArgumentException` if a file status was passed in + and the path of the filestatus did not match the path of the `openFile(path)` call. + +This is the baseline implementation. To write code guaranteed to compile against this version, +use the `opt(String, String)` and `must(String, String)` methods, converting numbers to +string explicitly. + +```java +fs.open("s3a://bucket/file") + .opt("fs.option.openfile.length", Long.toString(length)) + .build().get() +``` + +### Hadoop 3.3.5: standardization and expansion + +[HADOOP-16202](https://issues.apache.org/jira/browse/HADOOP-16202) +_Enhance openFile() for better read performance against object stores_ + +* `withFileStatus(null)` required to be accepted (and ignored) +* only the filename part of any supplied FileStatus path must match the + filename passed in on `openFile(path)`. +* An `opt(String key, long value)` option was added. *This is now deprecated as it +caused regression +* Standard `fs.option.openfile` options defined. +* S3A FS to use openfile length option, seek start/end options not _yet_ used. +* Azure ABFS connector takes a supplied `VersionedFileStatus` and omits any + HEAD probe for the object. + +### Hadoop 3.3.6: API change to address operator overload bugs. + +new `optLong()`, `optDouble()`, `mustLong()` and `mustDouble()` builder methods. + +* See [HADOOP-18724](https://issues.apache.org/jira/browse/HADOOP-18724) _Open file fails with NumberFormatException for S3AFileSystem_, + which was somehow caused by the overloaded `opt(long)`. +* Specification updated to declare that unparseable numbers MUST be treated as "unset" and the default + value used instead. + ## Invariants The `FutureDataInputStreamBuilder` interface does not require parameters or @@ -36,7 +85,7 @@ Some aspects of the state of the filesystem, MAY be checked in the initial change between `openFile()` and the `build().get()` sequence. For example, path validation. -## Implementation-agnostic parameters. +## `Implementation-agnostic parameters. ### `FutureDataInputStreamBuilder bufferSize(int bufSize)` @@ -89,10 +138,20 @@ operations. This is to support wrapper filesystems and serialization/deserializa of the status. -### Set optional or mandatory parameters +### Set optional or mandatory parameters - FutureDataInputStreamBuilder opt(String key, ...) - FutureDataInputStreamBuilder must(String key, ...) +```java +FutureDataInputStreamBuilder opt(String key, String value) +FutureDataInputStreamBuilder opt(String key, int value) +FutureDataInputStreamBuilder opt(String key, boolean value) +FutureDataInputStreamBuilder optLong(String key, long value) +FutureDataInputStreamBuilder optDouble(String key, double value) +FutureDataInputStreamBuilder must(String key, String value) +FutureDataInputStreamBuilder must(String key, int value) +FutureDataInputStreamBuilder must(String key, boolean value) +FutureDataInputStreamBuilder mustLong(String key, long value) +FutureDataInputStreamBuilder mustDouble(String key, double value) +``` Set optional or mandatory parameters to the builder. Using `opt()` or `must()`, client can specify FS-specific parameters without inspecting the concrete type @@ -103,7 +162,7 @@ Example: ```java out = fs.openFile(path) .must("fs.option.openfile.read.policy", "random") - .opt("fs.http.connection.timeout", 30_000L) + .optLong("fs.http.connection.timeout", 30_000L) .withFileStatus(statusFromListing) .build() .get(); @@ -115,9 +174,9 @@ An http-specific option has been supplied which may be interpreted by any store; If the filesystem opening the file does not recognize the option, it can safely be ignored. -### When to use `opt()` versus `must()` +### When to use `opt` versus `must` -The difference between `opt()` versus `must()` is how the FileSystem opening +The difference between `opt` versus `must` is how the FileSystem opening the file must react to an option which it does not recognize. ```python @@ -144,7 +203,7 @@ irrespective of how the (key, value) pair was declared. defined in this filesystem specification, validated through contract tests. -#### Implementation Notes +## Implementation Notes Checking for supported options must be performed in the `build()` operation. @@ -155,6 +214,13 @@ Checking for supported options must be performed in the `build()` operation. a feature which is recognized but not supported in the specific `FileSystem`/`FileContext` instance `UnsupportedException` MUST be thrown. +Parsing of numeric values SHOULD trim any string and if the value +cannot be parsed as a number, downgrade to any default value supplied. +This is to address [HADOOP-18724](https://issues.apache.org/jira/browse/HADOOP-18724) +_Open file fails with NumberFormatException for S3AFileSystem_, which was cause by the overloaded `opt()` +builder parameter binding to `opt(String, double)` rather than `opt(String, long)` when a long +value was passed in. + The behavior of resolving the conflicts between the parameters set by builder methods (i.e., `bufferSize()`) and `opt()`/`must()` is as follows: @@ -181,7 +247,7 @@ Even if not values of the status are used, the presence of the argument can be interpreted as the caller declaring that they believe the file to be present and of the given size. -## Builder interface +## Builder interface ### `CompletableFuture build()` @@ -339,7 +405,7 @@ _Futher reading_ * [Linux fadvise()](https://linux.die.net/man/2/fadvise). * [Windows `CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior) -#### Read Policy `adaptive` +#### Read Policy `adaptive` Try to adapt the seek policy to the read pattern of the application. @@ -429,7 +495,7 @@ If this option is used by the FileSystem implementation *Implementor's Notes* -* A value of `fs.option.openfile.length` < 0 MUST be rejected. +* A value of `fs.option.openfile.length` < 0 MUST be ignored. * If a file status is supplied along with a value in `fs.opt.openfile.length`; the file status values take precedence. @@ -466,11 +532,11 @@ than that value. The S3A Connector supports custom options for readahead and seek policy. -| Name | Type | Meaning | -|--------------------------------------|----------|-------------------------------------------------------------| -| `fs.s3a.readahead.range` | `long` | readahead range in bytes | -| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to asynchronous draining of the stream | -| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` | +| Name | Type | Meaning | +|--------------------------------------|----------|---------------------------------------------------------------------------| +| `fs.s3a.readahead.range` | `long` | readahead range in bytes | +| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` | +| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to asynchronous draining of the stream. (Since 3.3.5) | If the option set contains a SQL statement in the `fs.s3a.select.sql` statement, then the file is opened as an S3 Select query. @@ -510,8 +576,8 @@ protected SeekableInputStream newStream(Path path, FileStatus stat, .opt("fs.option.openfile.read.policy", "vector, random") .withFileStatus(stat); - builder.opt("fs.option.openfile.split.start", splitStart); - builder.opt("fs.option.openfile.split.end", splitEnd); + builder.optLong("fs.option.openfile.split.start", splitStart); + builder.optLong("fs.option.openfile.split.end", splitEnd); CompletableFuture streamF = builder.build(); return HadoopStreams.wrap(FutureIO.awaitFuture(streamF)); } @@ -618,8 +684,8 @@ An example of a record reader passing in options to the file it opens. file.getFileSystem(job).openFile(file); // the start and end of the split may be used to build // an input strategy. - builder.opt("fs.option.openfile.split.start", start); - builder.opt("fs.option.openfile.split.end", end); + builder.optLong("fs.option.openfile.split.start", start); + builder.optLong("fs.option.openfile.split.end", end); FutureIO.propagateOptions(builder, job, "mapreduce.job.input.file.option", "mapreduce.job.input.file.must"); @@ -633,7 +699,7 @@ An example of a record reader passing in options to the file it opens. ### `FileContext.openFile` From `org.apache.hadoop.fs.AvroFSInput`; a file is opened with sequential input. -Because the file length has already been probed for, the length is passd down +Because the file length has already been probed for, the length is passed down ```java public AvroFSInput(FileContext fc, Path p) throws IOException { @@ -642,7 +708,7 @@ Because the file length has already been probed for, the length is passd down this.stream = awaitFuture(fc.openFile(p) .opt("fs.option.openfile.read.policy", "sequential") - .opt("fs.option.openfile.length", + .optLong("fs.option.openfile.length", Long.toString(status.getLen())) .build()); fc.open(p); @@ -682,8 +748,3 @@ public T load(FileSystem fs, } } ``` - -*Note:* : in Hadoop 3.3.2 and earlier, the `withFileStatus(status)` call -required a non-null parameter; this has since been relaxed. -For maximum compatibility across versions, only invoke the method -when the file status is known to be non-null. \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java index 25bfe082b01..3598d33680e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java @@ -43,6 +43,7 @@ import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; +import org.assertj.core.api.Assertions; import org.junit.Test; /** @@ -186,7 +187,7 @@ public void testSequentialRead() throws Throwable { @Test public void testOpenFileReadZeroByte() throws Throwable { - describe("create & read a 0 byte file through the builders"); + describe("create & read a 0 byte file through the builders; use a negative length"); Path path = path("zero.txt"); FileSystem fs = getFileSystem(); fs.createFile(path).overwrite(true).build().close(); @@ -194,6 +195,7 @@ public void testOpenFileReadZeroByte() throws Throwable { .opt("fs.test.something", true) .opt("fs.test.something2", 3) .opt("fs.test.something3", "3") + .optLong(FS_OPTION_OPENFILE_LENGTH, -1L) .build().get()) { assertMinusOne("initial byte read", is.read()); } @@ -210,6 +212,17 @@ public void testOpenFileUnknownOption() throws Throwable { () -> builder.build()); } + @Test + public void testOpenFileUnknownOptionLong() throws Throwable { + describe("calling openFile fails when a 'must()' option is unknown"); + FutureDataInputStreamBuilder builder = + getFileSystem().openFile(path("testOpenFileUnknownOption")) + .optLong("fs.test.something", 1L) + .mustLong("fs.test.something2", 1L); + intercept(IllegalArgumentException.class, + () -> builder.build()); + } + @Test public void testOpenFileLazyFail() throws Throwable { describe("openFile fails on a missing file in the get() and not before"); @@ -320,16 +333,22 @@ public void testOpenFileApplyAsyncRead() throws Throwable { describe("verify that async accept callbacks are evaluated"); Path path = path("testOpenFileApplyAsyncRead"); FileSystem fs = getFileSystem(); + final int len = 512; createFile(fs, path, true, - dataset(4, 0x40, 0x80)); - CompletableFuture future = fs.openFile(path).build(); + dataset(len, 0x40, 0x80)); + CompletableFuture future = fs.openFile(path) + .mustDouble(FS_OPTION_OPENFILE_LENGTH, 43.2e60) // pass in a double + .build(); AtomicBoolean accepted = new AtomicBoolean(false); - future.thenApply(stream -> { + final Long bytes = future.thenApply(stream -> { accepted.set(true); - return stream; - }).get().close(); + return ContractTestUtils.readStream(stream); + }).get(); assertTrue("async accept operation not invoked", accepted.get()); + Assertions.assertThat(bytes) + .describedAs("bytes read from stream") + .isEqualTo(len); } /** @@ -357,8 +376,8 @@ public void testOpenFileNullStatusButFileLength() throws Throwable { .withFileStatus(null) .opt(FS_OPTION_OPENFILE_READ_POLICY, "unknown, sequential, random") - .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768) - .opt(FS_OPTION_OPENFILE_LENGTH, len) + .optLong(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768) + .optLong(FS_OPTION_OPENFILE_LENGTH, len) .build(); try (FSDataInputStream in = future.get()) { @@ -367,4 +386,26 @@ public void testOpenFileNullStatusButFileLength() throws Throwable { compareByteArrays(dataset, result, len); } + /** + * open a file with a length set as a double; verifies resilience + * of the parser. + */ + @Test + public void testFloatingPointLength() throws Throwable { + describe("Open file with a length"); + Path path = methodPath(); + FileSystem fs = getFileSystem(); + int len = 4096; + createFile(fs, path, true, + dataset(len, 0x40, 0x80)); + final Long l = fs.openFile(path) + .mustDouble(FS_OPTION_OPENFILE_LENGTH, len) + .build() + .thenApply(ContractTestUtils::readStream) + .get(); + Assertions.assertThat(l) + .describedAs("bytes read from file %s", path) + .isEqualTo(len); + } + } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestFSBuilderSupport.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestFSBuilderSupport.java new file mode 100644 index 00000000000..20172ccfe16 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestFSBuilderSupport.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.store; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.hadoop.fs.FSBuilder; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; +import org.apache.hadoop.fs.impl.FSBuilderSupport; +import org.apache.hadoop.test.AbstractHadoopTestBase; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test builder support, forwarding of opt double/float to long, + * resilience. + */ +@SuppressWarnings("deprecation") +public class TestFSBuilderSupport extends AbstractHadoopTestBase { + + @Test + public void testOptFloatDoubleForwardsToLong() throws Throwable { + FSBuilderSupport c = builder() + .opt("f", 1.8f) + .opt("d", 2.0e3) + .build(); + assertThat(c.getLong("f", 2)) + .isEqualTo(1); + assertThat(c.getLong("d", 2)) + .isEqualTo(2000); + } + + @Test + public void testMustFloatDoubleForwardsToLong() throws Throwable { + FSBuilderSupport c = builder() + .must("f", 1.8f) + .must("d", 2.0e3) + .build(); + assertThat(c.getLong("f", 2)) + .isEqualTo(1); + assertThat(c.getLong("d", 2)) + .isEqualTo(2000); + } + + @Test + public void testLongOptStillWorks() throws Throwable { + FSBuilderSupport c = builder() + .opt("o", 1L) + .must("m", 1L) + .build(); + assertThat(c.getLong("o", 2)) + .isEqualTo(1L); + assertThat(c.getLong("m", 2)) + .isEqualTo(1L); + } + + @Test + public void testFloatParseFallback() throws Throwable { + FSBuilderSupport c = builder() + .opt("f", "1.8f") + .opt("d", "1.8e20") + .build(); + + assertThat(c.getLong("f", 2)) + .isEqualTo(2); + assertThat(c.getLong("d", 2)) + .isEqualTo(2); + } + + @Test + public void testNegatives() throws Throwable { + FSBuilderSupport c = builder() + .optLong("-1", -1) + .mustLong("-2", -2) + .build(); + + // getLong gets the long value + assertThat(c.getLong("-1", 2)) + .isEqualTo(-1); + + + // but getPositiveLong returns the positive default + assertThat(c.getPositiveLong("-1", 2)) + .isEqualTo(2); + } + + @Test + public void testBoolean() throws Throwable { + final FSBuilderSupport c = builder() + .opt("f", false) + .opt("t", true) + .opt("o", "other") + .build(); + assertThat(c.getOptions().getBoolean("f", true)) + .isEqualTo(false); + assertThat(c.getOptions().getBoolean("t", false)) + .isEqualTo(true); + // this is handled in Configuration itself. + assertThat(c.getOptions().getBoolean("o", true)) + .isEqualTo(true); + } + + private SimpleBuilder builder() { + return new BuilderImpl(); + } + + private interface SimpleBuilder + extends FSBuilder { + } + + private static final class BuilderImpl + extends AbstractFSBuilderImpl + implements SimpleBuilder { + + private BuilderImpl() { + super(new Path("/")); + } + + @Override + public FSBuilderSupport build() + throws IOException { + return new FSBuilderSupport(getOptions()); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java index 5724e729310..ab63c199f2f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java @@ -114,8 +114,8 @@ public LineRecordReader(Configuration job, FileSplit split, file.getFileSystem(job).openFile(file); // the start and end of the split may be used to build // an input strategy. - builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start) - .opt(FS_OPTION_OPENFILE_SPLIT_END, end); + builder.optLong(FS_OPTION_OPENFILE_SPLIT_START, start) + .optLong(FS_OPTION_OPENFILE_SPLIT_END, end); FutureIO.propagateOptions(builder, job, MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java index 617abaacae0..089208841fd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java @@ -92,8 +92,8 @@ public void initialize(InputSplit genericSplit, file.getFileSystem(job).openFile(file); // the start and end of the split may be used to build // an input strategy. - builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start); - builder.opt(FS_OPTION_OPENFILE_SPLIT_END, end); + builder.optLong(FS_OPTION_OPENFILE_SPLIT_START, start); + builder.optLong(FS_OPTION_OPENFILE_SPLIT_END, end); FutureIO.propagateOptions(builder, job, MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java index f284a9c3807..3ce6936c3d7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java @@ -236,8 +236,8 @@ public void initialize(InputSplit split, TaskAttemptContext context) offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH; length = ((FileSplit)split).getLength(); final FutureDataInputStreamBuilder builder = fs.openFile(p) - .opt(FS_OPTION_OPENFILE_SPLIT_START, start) - .opt(FS_OPTION_OPENFILE_SPLIT_END, start + length) + .optLong(FS_OPTION_OPENFILE_SPLIT_START, start) + .optLong(FS_OPTION_OPENFILE_SPLIT_END, start + length) .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL); in = FutureIO.awaitFuture(builder.build()); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java index 70a99d5318c..4703d635672 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.FSBuilderSupport; import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AInputPolicy; @@ -246,12 +247,14 @@ public OpenFileInformation prepareToOpenFile( // set the end of the read to the file length fileLength = fileStatus.getLen(); } + FSBuilderSupport builderSupport = new FSBuilderSupport(options); // determine start and end of file. - long splitStart = options.getLong(FS_OPTION_OPENFILE_SPLIT_START, 0); + long splitStart = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_SPLIT_START, 0); // split end - long splitEnd = options.getLong(FS_OPTION_OPENFILE_SPLIT_END, - LENGTH_UNKNOWN); + long splitEnd = builderSupport.getLong( + FS_OPTION_OPENFILE_SPLIT_END, LENGTH_UNKNOWN); + if (splitStart > 0 && splitStart > splitEnd) { LOG.warn("Split start {} is greater than split end {}, resetting", splitStart, splitEnd); @@ -259,7 +262,7 @@ public OpenFileInformation prepareToOpenFile( } // read end is the open file value - fileLength = options.getLong(FS_OPTION_OPENFILE_LENGTH, fileLength); + fileLength = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_LENGTH, fileLength); // if the read end has come from options, use that // in creating a file status @@ -281,16 +284,17 @@ public OpenFileInformation prepareToOpenFile( .withS3Select(isSelect) .withSql(sql) .withAsyncDrainThreshold( - options.getLong(ASYNC_DRAIN_THRESHOLD, + builderSupport.getPositiveLong(ASYNC_DRAIN_THRESHOLD, defaultReadAhead)) .withBufferSize( - options.getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, defaultBufferSize)) + (int)builderSupport.getPositiveLong( + FS_OPTION_OPENFILE_BUFFER_SIZE, defaultBufferSize)) .withChangePolicy(changePolicy) .withFileLength(fileLength) .withInputPolicy( S3AInputPolicy.getFirstSupportedPolicy(policies, defaultInputPolicy)) .withReadAheadRange( - options.getLong(READAHEAD_RANGE, defaultReadAhead)) + builderSupport.getPositiveLong(READAHEAD_RANGE, defaultReadAhead)) .withSplitStart(splitStart) .withSplitEnd(splitEnd) .withStatus(fileStatus) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index b0ee531112b..4aae84dca8e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -141,7 +141,7 @@ public void testOpenFileShorterLength() throws Throwable { fs.openFile(testFile) .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL) - .opt(FS_OPTION_OPENFILE_LENGTH, shortLen) + .mustLong(FS_OPTION_OPENFILE_LENGTH, shortLen) .build() .get(), always(NO_HEAD_OR_LIST), @@ -183,7 +183,7 @@ public void testOpenFileLongerLength() throws Throwable { fs.openFile(testFile) .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL) - .must(FS_OPTION_OPENFILE_LENGTH, longLen) + .mustLong(FS_OPTION_OPENFILE_LENGTH, longLen) .build() .get(), always(NO_HEAD_OR_LIST)); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java index a03f181cb38..fb75560a807 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java @@ -160,7 +160,7 @@ public void testUnbufferDraining() throws Throwable { int offset = FILE_SIZE - READAHEAD + 1; try (FSDataInputStream in = getBrittleFS().openFile(st.getPath()) .withFileStatus(st) - .must(ASYNC_DRAIN_THRESHOLD, 1) + .mustLong(ASYNC_DRAIN_THRESHOLD, 1) .build().get()) { describe("Initiating unbuffer with async drain\n"); for (int i = 0; i < ATTEMPTS; i++) { @@ -235,9 +235,11 @@ public void testUnbufferAborting() throws Throwable { // open the file at the beginning with a whole file read policy, // so even with s3a switching to random on unbuffer, // this always does a full GET + // also provide a floating point string for the threshold, to + // verify it is safely parsed try (FSDataInputStream in = getBrittleFS().openFile(st.getPath()) .withFileStatus(st) - .must(ASYNC_DRAIN_THRESHOLD, 1) + .must(ASYNC_DRAIN_THRESHOLD, "1.0") .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) .build().get()) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java index eea70ced13c..fb9988b29a5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java @@ -219,11 +219,10 @@ private FSDataInputStream openDataFile(S3AFileSystem fs, final FutureDataInputStreamBuilder builder = fs.openFile(path) .opt(FS_OPTION_OPENFILE_READ_POLICY, inputPolicy.toString()) - .opt(FS_OPTION_OPENFILE_LENGTH, length) - .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize); - if (readahead > 0) { - builder.opt(READAHEAD_RANGE, readahead); - } + .optLong(FS_OPTION_OPENFILE_LENGTH, length) + .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize) + .optLong(READAHEAD_RANGE, readahead); + FSDataInputStream stream = awaitFuture(builder.build()); streamStatistics = getInputStreamStatistics(stream); return stream; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index e9f0e784fc1..b7b859cb9f3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -38,7 +38,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Stack; @@ -93,6 +92,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; import static org.apache.hadoop.fs.azure.NativeAzureFileSystemHelper.*; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; @@ -3149,7 +3149,7 @@ protected CompletableFuture openFileWithOptions(Path path, OpenFileParameters parameters) throws IOException { AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( parameters.getMandatoryKeys(), - Collections.emptySet(), + FS_OPTION_OPENFILE_STANDARD_OPTIONS, "for " + path); return LambdaUtils.eval( new CompletableFuture<>(), () -> diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index bb9ecdd51a6..5fb2c6e1700 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -31,7 +31,6 @@ import java.util.Hashtable; import java.util.List; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; import java.util.Map; import java.util.Optional; @@ -112,6 +111,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL; import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS; @@ -296,7 +296,7 @@ protected CompletableFuture openFileWithOptions( LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path); AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( parameters.getMandatoryKeys(), - Collections.emptySet(), + FS_OPTION_OPENFILE_STANDARD_OPTIONS, "for " + path); return LambdaUtils.eval( new CompletableFuture<>(), () -> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 0122b873aab..7c32ddcfb4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -587,7 +587,7 @@ public LogReader(Configuration conf, Path remoteAppLogFile) fileContext.openFile(remoteAppLogFile) .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL) - .opt(FS_OPTION_OPENFILE_LENGTH, + .optLong(FS_OPTION_OPENFILE_LENGTH, status.getLen()) // file length hint for object stores .build()); reader = new TFile.Reader(this.fsDataIStream,