From 75950e47e721084a105c472af426179050dc64c8 Mon Sep 17 00:00:00 2001
From: Steve Loughran
Date: Sun, 24 Apr 2022 17:03:59 +0100
Subject: [PATCH] HADOOP-16202. Enhanced openFile(): hadoop-common changes.
(#2584/1)
This defines standard option and values for the
openFile() builder API for opening a file:
fs.option.openfile.read.policy
A list of the desired read policy, in preferred order.
standard values are
adaptive, default, random, sequential, vector, whole-file
fs.option.openfile.length
How long the file is.
fs.option.openfile.split.start
start of a task's split
fs.option.openfile.split.end
end of a task's split
These can be used by filesystem connectors to optimize their
reading of the source file, including but not limited to
* skipping existence/length probes when opening a file
* choosing a policy for prefetching/caching data
The hadoop shell commands which read files all declare "whole-file"
and "sequential", as appropriate.
Contributed by Steve Loughran.
Change-Id: Ia290f79ea7973ce8713d4f90f1315b24d7a23da1
---
.../org/apache/hadoop/fs/AvroFSInput.java | 11 +-
.../apache/hadoop/fs/ChecksumFileSystem.java | 4 +-
.../java/org/apache/hadoop/fs/FSBuilder.java | 14 +
.../org/apache/hadoop/fs/FileContext.java | 18 +-
.../java/org/apache/hadoop/fs/FileSystem.java | 13 +-
.../java/org/apache/hadoop/fs/FileUtil.java | 61 +-
.../fs/FutureDataInputStreamBuilder.java | 8 +-
.../java/org/apache/hadoop/fs/Options.java | 119 ++++
.../hadoop/fs/impl/AbstractFSBuilderImpl.java | 38 +-
.../fs/impl/FileSystemMultipartUploader.java | 9 +-
.../FutureDataInputStreamBuilderImpl.java | 8 +-
.../hadoop/fs/impl/FutureIOSupport.java | 83 +--
.../hadoop/fs/impl/OpenFileParameters.java | 13 +
.../hadoop/fs/impl/WrappedIOException.java | 11 +-
.../fs/shell/CommandWithDestination.java | 9 +-
.../apache/hadoop/fs/shell/CopyCommands.java | 3 +-
.../org/apache/hadoop/fs/shell/Display.java | 3 +-
.../java/org/apache/hadoop/fs/shell/Head.java | 8 +-
.../org/apache/hadoop/fs/shell/PathData.java | 35 ++
.../java/org/apache/hadoop/fs/shell/Tail.java | 11 +-
.../fs/statistics/StoreStatisticNames.java | 9 +
.../fs/statistics/StreamStatisticNames.java | 19 +-
.../statistics/impl/IOStatisticsBinding.java | 44 +-
.../org/apache/hadoop/io/SequenceFile.java | 14 +-
.../apache/hadoop/util/JsonSerialization.java | 6 +-
.../functional/CommonCallableSupplier.java | 2 +-
.../hadoop/util/functional/FutureIO.java | 90 +++
.../site/markdown/filesystem/filesystem.md | 90 +--
.../filesystem/fsdatainputstreambuilder.md | 588 +++++++++++++++++-
.../src/site/markdown/filesystem/index.md | 1 +
.../src/site/markdown/filesystem/openfile.md | 122 ++++
...AbstractContractMultipartUploaderTest.java | 2 +-
.../fs/contract/AbstractContractOpenTest.java | 80 ++-
.../hadoop/fs/contract/ContractTestUtils.java | 13 +-
.../fs/statistics/IOStatisticAssertions.java | 20 +
.../fs/statistics/TestDurationTracking.java | 3 +-
36 files changed, 1321 insertions(+), 261 deletions(-)
create mode 100644 hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java
index b4a4a85674d..213fbc24c4d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AvroFSInput.java
@@ -25,6 +25,10 @@ import org.apache.avro.file.SeekableInput;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
+
/** Adapts an {@link FSDataInputStream} to Avro's SeekableInput interface. */
@InterfaceAudience.Public
@InterfaceStability.Stable
@@ -42,7 +46,12 @@ public class AvroFSInput implements Closeable, SeekableInput {
public AvroFSInput(final FileContext fc, final Path p) throws IOException {
FileStatus status = fc.getFileStatus(p);
this.len = status.getLen();
- this.stream = fc.open(p);
+ this.stream = awaitFuture(fc.openFile(p)
+ .opt(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+ .withFileStatus(status)
+ .build());
+ fc.open(p);
}
@Override
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
index 0256a58f463..89a399ec32a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -45,6 +44,7 @@ import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
@@ -889,7 +889,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
final OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
- Collections.emptySet(),
+ FS_OPTION_OPENFILE_STANDARD_OPTIONS,
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(),
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java
index b7757a62e28..a4c7254cfeb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java
@@ -61,6 +61,13 @@ public interface FSBuilder> {
*/
B opt(@Nonnull String key, float value);
+ /**
+ * Set optional long parameter for the Builder.
+ *
+ * @see #opt(String, String)
+ */
+ B opt(@Nonnull String key, long value);
+
/**
* Set optional double parameter for the Builder.
*
@@ -104,6 +111,13 @@ public interface FSBuilder> {
*/
B must(@Nonnull String key, float value);
+ /**
+ * Set mandatory long option.
+ *
+ * @see #must(String, String)
+ */
+ B must(@Nonnull String key, long value);
+
/**
* Set mandatory double option.
*
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
index c7c1428b448..ff4debeaf19 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
@@ -71,7 +71,12 @@ import org.apache.hadoop.tracing.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/**
* The FileContext class provides an interface for users of the Hadoop
@@ -2204,7 +2209,12 @@ public class FileContext implements PathCapabilities {
EnumSet createFlag = overwrite ? EnumSet.of(
CreateFlag.CREATE, CreateFlag.OVERWRITE) :
EnumSet.of(CreateFlag.CREATE);
- InputStream in = open(qSrc);
+ InputStream in = awaitFuture(openFile(qSrc)
+ .opt(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+ .opt(FS_OPTION_OPENFILE_LENGTH,
+ fs.getLen()) // file length hint for object stores
+ .build());
try (OutputStream out = create(qDst, createFlag)) {
IOUtils.copyBytes(in, out, conf, true);
} finally {
@@ -2936,9 +2946,11 @@ public class FileContext implements PathCapabilities {
final Path absF = fixRelativePart(getPath());
OpenFileParameters parameters = new OpenFileParameters()
.withMandatoryKeys(getMandatoryKeys())
+ .withOptionalKeys(getOptionalKeys())
.withOptions(getOptions())
- .withBufferSize(getBufferSize())
- .withStatus(getStatus());
+ .withStatus(getStatus())
+ .withBufferSize(
+ getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
return new FSLinkResolver>() {
@Override
public CompletableFuture next(
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index f0c55f54280..963c3a1acc7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -91,7 +91,8 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -4626,7 +4627,7 @@ public abstract class FileSystem extends Configured
final OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
- Collections.emptySet(),
+ Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS,
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () ->
@@ -4654,7 +4655,7 @@ public abstract class FileSystem extends Configured
final OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
- Collections.emptySet(), "");
+ Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, "");
CompletableFuture result = new CompletableFuture<>();
try {
result.complete(open(pathHandle, parameters.getBufferSize()));
@@ -4761,9 +4762,11 @@ public abstract class FileSystem extends Configured
Optional optionalPath = getOptionalPath();
OpenFileParameters parameters = new OpenFileParameters()
.withMandatoryKeys(getMandatoryKeys())
+ .withOptionalKeys(getOptionalKeys())
.withOptions(getOptions())
- .withBufferSize(getBufferSize())
- .withStatus(super.getStatus()); // explicit to avoid IDE warnings
+ .withStatus(super.getStatus())
+ .withBufferSize(
+ getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
if(optionalPath.isPresent()) {
return getFS().openFileWithOptions(optionalPath.get(),
parameters);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
index b130ba82430..ccb3ac8ca90 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
@@ -76,6 +76,11 @@ import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
+
/**
* A collection of file-processing util methods
*/
@@ -395,7 +400,32 @@ public class FileUtil {
return copy(srcFS, fileStatus, dstFS, dst, deleteSource, overwrite, conf);
}
- /** Copy files between FileSystems. */
+ /**
+ * Copy a file/directory tree within/between filesystems.
+ *
+ * returns true if the operation succeeded. When deleteSource is true,
+ * this means "after the copy, delete(source) returned true"
+ * If the destination is a directory, and mkdirs (dest) fails,
+ * the operation will return false rather than raise any exception.
+ *
+ * The overwrite flag is about overwriting files; it has no effect about
+ * handing an attempt to copy a file atop a directory (expect an IOException),
+ * or a directory over a path which contains a file (mkdir will fail, so
+ * "false").
+ *
+ * The operation is recursive, and the deleteSource operation takes place
+ * as each subdirectory is copied. Therefore, if an operation fails partway
+ * through, the source tree may be partially deleted.
+ * @param srcFS source filesystem
+ * @param srcStatus status of source
+ * @param dstFS destination filesystem
+ * @param dst path of source
+ * @param deleteSource delete the source?
+ * @param overwrite overwrite files at destination?
+ * @param conf configuration to use when opening files
+ * @return true if the operation succeeded.
+ * @throws IOException failure
+ */
public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
FileSystem dstFS, Path dst,
boolean deleteSource,
@@ -408,22 +438,27 @@ public class FileUtil {
if (!dstFS.mkdirs(dst)) {
return false;
}
- FileStatus contents[] = srcFS.listStatus(src);
- for (int i = 0; i < contents.length; i++) {
- copy(srcFS, contents[i], dstFS,
- new Path(dst, contents[i].getPath().getName()),
- deleteSource, overwrite, conf);
+ RemoteIterator contents = srcFS.listStatusIterator(src);
+ while (contents.hasNext()) {
+ FileStatus next = contents.next();
+ copy(srcFS, next, dstFS,
+ new Path(dst, next.getPath().getName()),
+ deleteSource, overwrite, conf);
}
} else {
- InputStream in=null;
+ InputStream in = null;
OutputStream out = null;
try {
- in = srcFS.open(src);
+ in = awaitFuture(srcFS.openFile(src)
+ .opt(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+ .opt(FS_OPTION_OPENFILE_LENGTH,
+ srcStatus.getLen()) // file length hint for object stores
+ .build());
out = dstFS.create(dst, overwrite);
IOUtils.copyBytes(in, out, conf, true);
} catch (IOException e) {
- IOUtils.closeStream(out);
- IOUtils.closeStream(in);
+ IOUtils.cleanupWithLogger(LOG, in, out);
throw e;
}
}
@@ -502,7 +537,11 @@ public class FileUtil {
deleteSource, conf);
}
} else {
- InputStream in = srcFS.open(src);
+ InputStream in = awaitFuture(srcFS.openFile(src)
+ .withFileStatus(srcStatus)
+ .opt(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+ .build());
IOUtils.copyBytes(in, Files.newOutputStream(dst.toPath()), conf);
}
if (deleteSource) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java
index 27a522e5930..e7f441a75d3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -34,7 +35,7 @@ import org.apache.hadoop.classification.InterfaceStability;
* options accordingly, for example:
*
* If the option is not related to the file system, the option will be ignored.
- * If the option is must, but not supported by the file system, a
+ * If the option is must, but not supported/known by the file system, an
* {@link IllegalArgumentException} will be thrown.
*
*/
@@ -51,10 +52,11 @@ public interface FutureDataInputStreamBuilder
/**
* A FileStatus may be provided to the open request.
* It is up to the implementation whether to use this or not.
- * @param status status.
+ * @param status status: may be null
* @return the builder.
*/
- default FutureDataInputStreamBuilder withFileStatus(FileStatus status) {
+ default FutureDataInputStreamBuilder withFileStatus(
+ @Nullable FileStatus status) {
return this;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
index 75bc12df8fd..9b457272fcb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
@@ -17,9 +17,13 @@
*/
package org.apache.hadoop.fs;
+import java.util.Collections;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Function;
import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -518,4 +522,119 @@ public final class Options {
MD5MD5CRC, // MD5 of block checksums, which are MD5 over chunk CRCs
COMPOSITE_CRC // Block/chunk-independent composite CRC
}
+
+ /**
+ * The standard {@code openFile()} options.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public static final class OpenFileOptions {
+
+ private OpenFileOptions() {
+ }
+
+ /**
+ * Prefix for all standard filesystem options: {@value}.
+ */
+ private static final String FILESYSTEM_OPTION = "fs.option.";
+
+ /**
+ * Prefix for all openFile options: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE =
+ FILESYSTEM_OPTION + "openfile.";
+
+ /**
+ * OpenFile option for file length: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_LENGTH =
+ FS_OPTION_OPENFILE + "length";
+
+ /**
+ * OpenFile option for split start: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_SPLIT_START =
+ FS_OPTION_OPENFILE + "split.start";
+
+ /**
+ * OpenFile option for split end: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_SPLIT_END =
+ FS_OPTION_OPENFILE + "split.end";
+
+ /**
+ * OpenFile option for buffer size: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_BUFFER_SIZE =
+ FS_OPTION_OPENFILE + "buffer.size";
+
+ /**
+ * OpenFile option for read policies: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_READ_POLICY =
+ FS_OPTION_OPENFILE + "read.policy";
+
+ /**
+ * Set of standard options which openFile implementations
+ * MUST recognize, even if they ignore the actual values.
+ */
+ public static final Set FS_OPTION_OPENFILE_STANDARD_OPTIONS =
+ Collections.unmodifiableSet(Stream.of(
+ FS_OPTION_OPENFILE_BUFFER_SIZE,
+ FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_LENGTH,
+ FS_OPTION_OPENFILE_SPLIT_START,
+ FS_OPTION_OPENFILE_SPLIT_END)
+ .collect(Collectors.toSet()));
+
+ /**
+ * Read policy for adaptive IO: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE =
+ "adaptive";
+
+ /**
+ * Read policy {@value} -whateve the implementation does by default.
+ */
+ public static final String FS_OPTION_OPENFILE_READ_POLICY_DEFAULT =
+ "default";
+
+ /**
+ * Read policy for random IO: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_READ_POLICY_RANDOM =
+ "random";
+
+ /**
+ * Read policy for sequential IO: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL =
+ "sequential";
+
+ /**
+ * Vectored IO API to be used: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_READ_POLICY_VECTOR =
+ "vector";
+
+ /**
+ * Whole file to be read, end-to-end: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE =
+ "whole-file";
+
+ /**
+ * All the current read policies as a set.
+ */
+ public static final Set FS_OPTION_OPENFILE_READ_POLICIES =
+ Collections.unmodifiableSet(Stream.of(
+ FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE,
+ FS_OPTION_OPENFILE_READ_POLICY_DEFAULT,
+ FS_OPTION_OPENFILE_READ_POLICY_RANDOM,
+ FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL,
+ FS_OPTION_OPENFILE_READ_POLICY_VECTOR,
+ FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+ .collect(Collectors.toSet()));
+
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
index 9cf8b3dc4d2..2308e38a1c9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
@@ -46,7 +46,7 @@ import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.
*
* .opt("foofs:option.a", true)
* .opt("foofs:option.b", "value")
- * .opt("barfs:cache", true)
+ * .opt("fs.s3a.open.option.etag", "9fe4c37c25b")
* .must("foofs:cache", true)
* .must("barfs:cache-size", 256 * 1024 * 1024)
* .build();
@@ -88,6 +88,9 @@ public abstract class
/** Keep track of the keys for mandatory options. */
private final Set mandatoryKeys = new HashSet<>();
+ /** Keep track of the optional keys. */
+ private final Set optionalKeys = new HashSet<>();
+
/**
* Constructor with both optional path and path handle.
* Either or both argument may be empty, but it is an error for
@@ -163,6 +166,7 @@ public abstract class
@Override
public B opt(@Nonnull final String key, @Nonnull final String value) {
mandatoryKeys.remove(key);
+ optionalKeys.add(key);
options.set(key, value);
return getThisBuilder();
}
@@ -175,6 +179,7 @@ public abstract class
@Override
public B opt(@Nonnull final String key, boolean value) {
mandatoryKeys.remove(key);
+ optionalKeys.add(key);
options.setBoolean(key, value);
return getThisBuilder();
}
@@ -187,10 +192,19 @@ public abstract class
@Override
public B opt(@Nonnull final String key, int value) {
mandatoryKeys.remove(key);
+ optionalKeys.add(key);
options.setInt(key, value);
return getThisBuilder();
}
+ @Override
+ public B opt(@Nonnull final String key, final long value) {
+ mandatoryKeys.remove(key);
+ optionalKeys.add(key);
+ options.setLong(key, value);
+ return getThisBuilder();
+ }
+
/**
* Set optional float parameter for the Builder.
*
@@ -199,6 +213,7 @@ public abstract class
@Override
public B opt(@Nonnull final String key, float value) {
mandatoryKeys.remove(key);
+ optionalKeys.add(key);
options.setFloat(key, value);
return getThisBuilder();
}
@@ -211,6 +226,7 @@ public abstract class
@Override
public B opt(@Nonnull final String key, double value) {
mandatoryKeys.remove(key);
+ optionalKeys.add(key);
options.setDouble(key, value);
return getThisBuilder();
}
@@ -223,6 +239,7 @@ public abstract class
@Override
public B opt(@Nonnull final String key, @Nonnull final String... values) {
mandatoryKeys.remove(key);
+ optionalKeys.add(key);
options.setStrings(key, values);
return getThisBuilder();
}
@@ -248,6 +265,7 @@ public abstract class
@Override
public B must(@Nonnull final String key, boolean value) {
mandatoryKeys.add(key);
+ optionalKeys.remove(key);
options.setBoolean(key, value);
return getThisBuilder();
}
@@ -260,10 +278,19 @@ public abstract class
@Override
public B must(@Nonnull final String key, int value) {
mandatoryKeys.add(key);
+ optionalKeys.remove(key);
options.setInt(key, value);
return getThisBuilder();
}
+ @Override
+ public B must(@Nonnull final String key, final long value) {
+ mandatoryKeys.add(key);
+ optionalKeys.remove(key);
+ options.setLong(key, value);
+ return getThisBuilder();
+ }
+
/**
* Set mandatory float option.
*
@@ -272,6 +299,7 @@ public abstract class
@Override
public B must(@Nonnull final String key, float value) {
mandatoryKeys.add(key);
+ optionalKeys.remove(key);
options.setFloat(key, value);
return getThisBuilder();
}
@@ -284,6 +312,7 @@ public abstract class
@Override
public B must(@Nonnull final String key, double value) {
mandatoryKeys.add(key);
+ optionalKeys.remove(key);
options.setDouble(key, value);
return getThisBuilder();
}
@@ -296,6 +325,7 @@ public abstract class
@Override
public B must(@Nonnull final String key, @Nonnull final String... values) {
mandatoryKeys.add(key);
+ optionalKeys.remove(key);
options.setStrings(key, values);
return getThisBuilder();
}
@@ -314,6 +344,12 @@ public abstract class
public Set getMandatoryKeys() {
return Collections.unmodifiableSet(mandatoryKeys);
}
+ /**
+ * Get all the keys that are set as optional keys.
+ */
+ public Set getOptionalKeys() {
+ return Collections.unmodifiableSet(optionalKeys);
+ }
/**
* Reject a configuration if one or more mandatory keys are
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java
index 7c5a5d949a0..1fafd41b054 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.UploadHandle;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.functional.FutureIO;
import static org.apache.hadoop.fs.Path.mergePaths;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@@ -98,7 +99,7 @@ public class FileSystemMultipartUploader extends AbstractMultipartUploader {
public CompletableFuture startUpload(Path filePath)
throws IOException {
checkPath(filePath);
- return FutureIOSupport.eval(() -> {
+ return FutureIO.eval(() -> {
Path collectorPath = createCollectorPath(filePath);
fs.mkdirs(collectorPath, FsPermission.getDirDefault());
@@ -116,7 +117,7 @@ public class FileSystemMultipartUploader extends AbstractMultipartUploader {
throws IOException {
checkPutArguments(filePath, inputStream, partNumber, uploadId,
lengthInBytes);
- return FutureIOSupport.eval(() -> innerPutPart(filePath,
+ return FutureIO.eval(() -> innerPutPart(filePath,
inputStream, partNumber, uploadId, lengthInBytes));
}
@@ -179,7 +180,7 @@ public class FileSystemMultipartUploader extends AbstractMultipartUploader {
Map handleMap) throws IOException {
checkPath(filePath);
- return FutureIOSupport.eval(() ->
+ return FutureIO.eval(() ->
innerComplete(uploadId, filePath, handleMap));
}
@@ -251,7 +252,7 @@ public class FileSystemMultipartUploader extends AbstractMultipartUploader {
Path collectorPath = new Path(new String(uploadIdByteArray, 0,
uploadIdByteArray.length, Charsets.UTF_8));
- return FutureIOSupport.eval(() -> {
+ return FutureIO.eval(() -> {
// force a check for a file existing; raises FNFE if not found
fs.getFileStatus(collectorPath);
fs.delete(collectorPath, true);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java
index 24a8d49747f..70e39de7388 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.impl;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -47,7 +48,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
* options accordingly, for example:
*
* If the option is not related to the file system, the option will be ignored.
- * If the option is must, but not supported by the file system, a
+ * If the option is must, but not supported/known by the file system, an
* {@link IllegalArgumentException} will be thrown.
*
*/
@@ -147,8 +148,9 @@ public abstract class FutureDataInputStreamBuilderImpl
}
@Override
- public FutureDataInputStreamBuilder withFileStatus(FileStatus st) {
- this.status = requireNonNull(st, "status");
+ public FutureDataInputStreamBuilder withFileStatus(
+ @Nullable FileStatus st) {
+ this.status = st;
return this;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java
index fe112d59352..f47e5f4fbfb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.impl;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -37,14 +36,16 @@ import org.apache.hadoop.util.functional.FutureIO;
/**
* Support for future IO and the FS Builder subclasses.
- * If methods in here are needed for applications, promote
- * to {@link FutureIO} for public use -with the original
- * method relaying to it. This is to ensure that external
- * filesystem implementations can safely use these methods
+ * All methods in this class have been superceded by those in
+ * {@link FutureIO}.
+ * The methods here are retained but all marked as deprecated.
+ * This is to ensure that any external
+ * filesystem implementations can still use these methods
* without linkage problems surfacing.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
+@Deprecated
public final class FutureIOSupport {
private FutureIOSupport() {
@@ -53,6 +54,7 @@ public final class FutureIOSupport {
/**
* Given a future, evaluate it. Raised exceptions are
* extracted and handled.
+ * See {@link FutureIO#awaitFuture(Future, long, TimeUnit)}.
* @param future future to evaluate
* @param type of the result.
* @return the result, if all went well.
@@ -60,7 +62,8 @@ public final class FutureIOSupport {
* @throws IOException if something went wrong
* @throws RuntimeException any nested RTE thrown
*/
- public static T awaitFuture(final Future future)
+ @Deprecated
+ public static T awaitFuture(final Future future)
throws InterruptedIOException, IOException, RuntimeException {
return FutureIO.awaitFuture(future);
}
@@ -69,6 +72,7 @@ public final class FutureIOSupport {
/**
* Given a future, evaluate it. Raised exceptions are
* extracted and handled.
+ * See {@link FutureIO#awaitFuture(Future, long, TimeUnit)}.
* @param future future to evaluate
* @param type of the result.
* @return the result, if all went well.
@@ -77,6 +81,7 @@ public final class FutureIOSupport {
* @throws RuntimeException any nested RTE thrown
* @throws TimeoutException the future timed out.
*/
+ @Deprecated
public static T awaitFuture(final Future future,
final long timeout,
final TimeUnit unit)
@@ -88,10 +93,7 @@ public final class FutureIOSupport {
/**
* From the inner cause of an execution exception, extract the inner cause
* if it is an IOE or RTE.
- * This will always raise an exception, either the inner IOException,
- * an inner RuntimeException, or a new IOException wrapping the raised
- * exception.
- *
+ * See {@link FutureIO#raiseInnerCause(ExecutionException)}.
* @param e exception.
* @param type of return value.
* @return nothing, ever.
@@ -99,6 +101,7 @@ public final class FutureIOSupport {
* any non-Runtime-Exception
* @throws RuntimeException if that is the inner cause.
*/
+ @Deprecated
public static T raiseInnerCause(final ExecutionException e)
throws IOException {
return FutureIO.raiseInnerCause(e);
@@ -107,6 +110,7 @@ public final class FutureIOSupport {
/**
* Extract the cause of a completion failure and rethrow it if an IOE
* or RTE.
+ * See {@link FutureIO#raiseInnerCause(CompletionException)}.
* @param e exception.
* @param type of return value.
* @return nothing, ever.
@@ -114,20 +118,15 @@ public final class FutureIOSupport {
* any non-Runtime-Exception
* @throws RuntimeException if that is the inner cause.
*/
+ @Deprecated
public static T raiseInnerCause(final CompletionException e)
throws IOException {
return FutureIO.raiseInnerCause(e);
}
/**
- * Propagate options to any builder, converting everything with the
- * prefix to an option where, if there were 2+ dot-separated elements,
- * it is converted to a schema.
- *
+ * Propagate options to any builder.
+ * {@link FutureIO#propagateOptions(FSBuilder, Configuration, String, String)}
* @param builder builder to modify
* @param conf configuration to read
* @param optionalPrefix prefix for optional settings
@@ -136,56 +135,39 @@ public final class FutureIOSupport {
* @param type of builder
* @return the builder passed in.
*/
+ @Deprecated
public static >
FSBuilder propagateOptions(
final FSBuilder builder,
final Configuration conf,
final String optionalPrefix,
final String mandatoryPrefix) {
- propagateOptions(builder, conf,
- optionalPrefix, false);
- propagateOptions(builder, conf,
- mandatoryPrefix, true);
- return builder;
+ return FutureIO.propagateOptions(builder,
+ conf, optionalPrefix, mandatoryPrefix);
}
/**
- * Propagate options to any builder, converting everything with the
- * prefix to an option where, if there were 2+ dot-separated elements,
- * it is converted to a schema.
- *
+ * Propagate options to any builder.
+ * {@link FutureIO#propagateOptions(FSBuilder, Configuration, String, boolean)}
* @param builder builder to modify
* @param conf configuration to read
* @param prefix prefix to scan/strip
* @param mandatory are the options to be mandatory or optional?
*/
+ @Deprecated
public static void propagateOptions(
final FSBuilder, ?> builder,
final Configuration conf,
final String prefix,
final boolean mandatory) {
-
- final String p = prefix.endsWith(".") ? prefix : (prefix + ".");
- final Map propsWithPrefix = conf.getPropsWithPrefix(p);
- for (Map.Entry entry : propsWithPrefix.entrySet()) {
- // change the schema off each entry
- String key = entry.getKey();
- String val = entry.getValue();
- if (mandatory) {
- builder.must(key, val);
- } else {
- builder.opt(key, val);
- }
- }
+ FutureIO.propagateOptions(builder, conf, prefix, mandatory);
}
/**
* Evaluate a CallableRaisingIOE in the current thread,
* converting IOEs to RTEs and propagating.
+ * See {@link FutureIO#eval(CallableRaisingIOE)}.
+ *
* @param callable callable to invoke
* @param Return type.
* @return the evaluated result.
@@ -194,17 +176,6 @@ public final class FutureIOSupport {
*/
public static CompletableFuture eval(
CallableRaisingIOE callable) {
- CompletableFuture result = new CompletableFuture<>();
- try {
- result.complete(callable.apply());
- } catch (UnsupportedOperationException | IllegalArgumentException tx) {
- // fail fast here
- throw tx;
- } catch (Throwable tx) {
- // fail lazily here to ensure callers expect all File IO operations to
- // surface later
- result.completeExceptionally(tx);
- }
- return result;
+ return FutureIO.eval(callable);
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java
index 77b4ff52696..a19c5faff4d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java
@@ -38,6 +38,9 @@ public class OpenFileParameters {
*/
private Set mandatoryKeys;
+ /** The optional keys. */
+ private Set optionalKeys;
+
/**
* Options set during the build sequence.
*/
@@ -61,6 +64,11 @@ public class OpenFileParameters {
return this;
}
+ public OpenFileParameters withOptionalKeys(final Set keys) {
+ this.optionalKeys = requireNonNull(keys);
+ return this;
+ }
+
public OpenFileParameters withOptions(final Configuration opts) {
this.options = requireNonNull(opts);
return this;
@@ -80,6 +88,10 @@ public class OpenFileParameters {
return mandatoryKeys;
}
+ public Set getOptionalKeys() {
+ return optionalKeys;
+ }
+
public Configuration getOptions() {
return options;
}
@@ -91,4 +103,5 @@ public class OpenFileParameters {
public FileStatus getStatus() {
return status;
}
+
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java
index 2fcdee915ed..3f828897b1d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WrappedIOException.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.impl;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.util.concurrent.ExecutionException;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
@@ -28,13 +27,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
- * A wrapper for an IOException which
- * {@link FutureIOSupport#raiseInnerCause(ExecutionException)} knows to
- * always extract the exception.
+ * A wrapper for an IOException.
*
* The constructor signature guarantees the cause will be an IOException,
* and as it checks for a null-argument, non-null.
- * @deprecated use the {@code UncheckedIOException}.
+ * @deprecated use the {@code UncheckedIOException} directly.]
*/
@Deprecated
@InterfaceAudience.Private
@@ -52,8 +49,4 @@ public class WrappedIOException extends UncheckedIOException {
super(Preconditions.checkNotNull(cause));
}
- @Override
- public synchronized IOException getCause() {
- return (IOException) super.getCause();
- }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
index f6f4247489f..678225f81e0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
@@ -55,6 +55,9 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/**
* Provides: argument processing to ensure the destination is valid
@@ -348,7 +351,11 @@ abstract class CommandWithDestination extends FsCommand {
src.fs.setVerifyChecksum(verifyChecksum);
InputStream in = null;
try {
- in = src.fs.open(src.path);
+ in = awaitFuture(src.fs.openFile(src.path)
+ .withFileStatus(src.stat)
+ .opt(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+ .build());
copyStreamToTarget(in, target);
preserveAttributes(src, target, preserveRawXattrs);
} finally {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
index b03d7de8a1c..0643a2e983d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
@@ -98,7 +98,8 @@ class CopyCommands {
try {
for (PathData src : srcs) {
if (src.stat.getLen() != 0) {
- try (FSDataInputStream in = src.fs.open(src.path)) {
+ // Always do sequential reads.
+ try (FSDataInputStream in = src.openForSequentialIO()) {
IOUtils.copyBytes(in, out, getConf(), false);
writeDelimiter(out);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java
index 670fa152f72..d3ca013a3f2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Display.java
@@ -105,7 +105,8 @@ class Display extends FsCommand {
}
protected InputStream getInputStream(PathData item) throws IOException {
- return item.fs.open(item.path);
+ // Always do sequential reads;
+ return item.openForSequentialIO();
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Head.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Head.java
index 2280225b5ae..7242f261801 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Head.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Head.java
@@ -28,6 +28,8 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+
/**
* Show the first 1KB of the file.
*/
@@ -68,11 +70,9 @@ class Head extends FsCommand {
}
private void dumpToOffset(PathData item) throws IOException {
- FSDataInputStream in = item.fs.open(item.path);
- try {
+ try (FSDataInputStream in = item.openFile(
+ FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
IOUtils.copyBytes(in, System.out, endingOffset, false);
- } finally {
- in.close();
}
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
index 1ff8d8f0494..2071a16799a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
@@ -29,6 +29,7 @@ import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
@@ -39,6 +40,10 @@ import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.fs.RemoteIterator;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator;
/**
@@ -601,4 +606,34 @@ public class PathData implements Comparable {
public int hashCode() {
return path.hashCode();
}
+
+
+ /**
+ * Open a file for sequential IO.
+ *
+ * This uses FileSystem.openFile() to request sequential IO;
+ * the file status is also passed in.
+ * Filesystems may use to optimize their IO.
+ * @return an input stream
+ * @throws IOException failure
+ */
+ protected FSDataInputStream openForSequentialIO()
+ throws IOException {
+ return openFile(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
+ }
+
+ /**
+ * Open a file.
+ * @param policy fadvise policy.
+ * @return an input stream
+ * @throws IOException failure
+ */
+ protected FSDataInputStream openFile(final String policy) throws IOException {
+ return awaitFuture(fs.openFile(path)
+ .opt(FS_OPTION_OPENFILE_READ_POLICY,
+ policy)
+ .opt(FS_OPTION_OPENFILE_LENGTH,
+ stat.getLen()) // file length hint for object stores
+ .build());
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Tail.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Tail.java
index 22dd32bce85..22b135f064c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Tail.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Tail.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+
/**
* Get a listing of all files in that match the file patterns.
*/
@@ -107,16 +109,15 @@ class Tail extends FsCommand {
if (offset < 0) {
offset = Math.max(fileSize + offset, 0);
}
-
- FSDataInputStream in = item.fs.open(item.path);
- try {
+ // Always do sequential reads.
+ try (FSDataInputStream in = item.openFile(
+ FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
in.seek(offset);
// use conf so the system configured io block size is used
IOUtils.copyBytes(in, System.out, getConf(), false);
offset = in.getPos();
- } finally {
- in.close();
}
return offset;
}
+
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
index 166007f5c9a..c458269c351 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
@@ -118,6 +118,9 @@ public final class StoreStatisticNames {
/** {@value}. */
public static final String OP_OPEN = "op_open";
+ /** Call to openFile() {@value}. */
+ public static final String OP_OPENFILE = "op_openfile";
+
/** {@value}. */
public static final String OP_REMOVE_ACL = "op_remove_acl";
@@ -323,6 +326,12 @@ public final class StoreStatisticNames {
public static final String ACTION_EXECUTOR_ACQUIRED =
"action_executor_acquired";
+ /**
+ * A file was opened: {@value}.
+ */
+ public static final String ACTION_FILE_OPENED
+ = "action_file_opened";
+
/**
* An HTTP HEAD request was made: {@value}.
*/
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index 7e9137294c1..ca755f08419 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -76,7 +76,7 @@ public final class StreamStatisticNames {
public static final String STREAM_READ_CLOSED = "stream_read_closed";
/**
- * Total count of times an attempt to close an input stream was made
+ * Total count of times an attempt to close an input stream was made.
* Value: {@value}.
*/
public static final String STREAM_READ_CLOSE_OPERATIONS
@@ -118,6 +118,23 @@ public final class StreamStatisticNames {
public static final String STREAM_READ_OPERATIONS_INCOMPLETE
= "stream_read_operations_incomplete";
+ /**
+ * count/duration of aborting a remote stream during stream IO
+ * IO.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_REMOTE_STREAM_ABORTED
+ = "stream_read_remote_stream_aborted";
+
+ /**
+ * count/duration of closing a remote stream,
+ * possibly including draining the stream to recycle
+ * the HTTP connection.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_REMOTE_STREAM_DRAINED
+ = "stream_read_remote_stream_drain";
+
/**
* Count of version mismatches encountered while reading an input stream.
* Value: {@value}.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
index 37ca37a187d..e06ccb54eb4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
@@ -521,23 +521,39 @@ public final class IOStatisticsBinding {
// create the tracker outside try-with-resources so
// that failures can be set in the catcher.
DurationTracker tracker = createTracker(factory, statistic);
- try {
- // exec the input function and return its value
- return input.apply();
- } catch (IOException | RuntimeException e) {
- // input function failed: note it
- tracker.failed();
- // and rethrow
- throw e;
- } finally {
- // update the tracker.
- // this is called after the catch() call would have
- // set the failed flag.
- tracker.close();
- }
+ return invokeTrackingDuration(tracker, input);
};
}
+ /**
+ * Given an IOException raising callable/lambda expression,
+ * execute it, updating the tracker on success/failure.
+ * @param tracker duration tracker.
+ * @param input input callable.
+ * @param return type.
+ * @return the result of the invocation
+ * @throws IOException on failure.
+ */
+ public static B invokeTrackingDuration(
+ final DurationTracker tracker,
+ final CallableRaisingIOE input)
+ throws IOException {
+ try {
+ // exec the input function and return its value
+ return input.apply();
+ } catch (IOException | RuntimeException e) {
+ // input function failed: note it
+ tracker.failed();
+ // and rethrow
+ throw e;
+ } finally {
+ // update the tracker.
+ // this is called after the catch() call would have
+ // set the failed flag.
+ tracker.close();
+ }
+ }
+
/**
* Given an IOException raising Consumer,
* return a new one which wraps the inner and tracks
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
index 0581fb3f577..0699259bae9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
@@ -60,6 +60,11 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SEQFILE_COMP
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_KEY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/**
* SequenceFiles are flat files consisting of binary key/value
@@ -1959,7 +1964,14 @@ public class SequenceFile {
*/
protected FSDataInputStream openFile(FileSystem fs, Path file,
int bufferSize, long length) throws IOException {
- return fs.open(file, bufferSize);
+ FutureDataInputStreamBuilder builder = fs.openFile(file)
+ .opt(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+ .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
+ if (length >= 0) {
+ builder.opt(FS_OPTION_OPENFILE_LENGTH, length);
+ }
+ return awaitFuture(builder.build());
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java
index 111ce8f6201..bb61306b2b8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java
@@ -50,6 +50,8 @@ import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/**
@@ -266,7 +268,9 @@ public class JsonSerialization {
if (status != null && status.getLen() == 0) {
throw new EOFException("No data in " + path);
}
- FutureDataInputStreamBuilder builder = fs.openFile(path);
+ FutureDataInputStreamBuilder builder = fs.openFile(path)
+ .opt(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE);
if (status != null) {
builder.withFileStatus(status);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java
index e2cdc0fd414..32e299b4d45 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.util.DurationInfo;
-import static org.apache.hadoop.fs.impl.FutureIOSupport.raiseInnerCause;
+import static org.apache.hadoop.util.functional.FutureIO.raiseInnerCause;
/**
* A bridge from Callable to Supplier; catching exceptions
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
index 3f7218baa75..c3fda19d8d7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.util.functional;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -29,6 +31,8 @@ import java.util.concurrent.TimeoutException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSBuilder;
/**
* Future IO Helper methods.
@@ -86,6 +90,8 @@ public final class FutureIO {
* extracted and rethrown.
*
* @param future future to evaluate
+ * @param timeout timeout to wait
+ * @param unit time unit.
* @param type of the result.
* @return the result, if all went well.
* @throws InterruptedIOException future was interrupted
@@ -185,4 +191,88 @@ public final class FutureIO {
}
}
+ /**
+ * Propagate options to any builder, converting everything with the
+ * prefix to an option where, if there were 2+ dot-separated elements,
+ * it is converted to a schema.
+ * See {@link #propagateOptions(FSBuilder, Configuration, String, boolean)}.
+ * @param builder builder to modify
+ * @param conf configuration to read
+ * @param optionalPrefix prefix for optional settings
+ * @param mandatoryPrefix prefix for mandatory settings
+ * @param type of result
+ * @param type of builder
+ * @return the builder passed in.
+ */
+ public static >
+ FSBuilder propagateOptions(
+ final FSBuilder builder,
+ final Configuration conf,
+ final String optionalPrefix,
+ final String mandatoryPrefix) {
+ propagateOptions(builder, conf,
+ optionalPrefix, false);
+ propagateOptions(builder, conf,
+ mandatoryPrefix, true);
+ return builder;
+ }
+
+ /**
+ * Propagate options to any builder, converting everything with the
+ * prefix to an option where, if there were 2+ dot-separated elements,
+ * it is converted to a schema.
+ *
+ * @param builder builder to modify
+ * @param conf configuration to read
+ * @param prefix prefix to scan/strip
+ * @param mandatory are the options to be mandatory or optional?
+ */
+ public static void propagateOptions(
+ final FSBuilder, ?> builder,
+ final Configuration conf,
+ final String prefix,
+ final boolean mandatory) {
+
+ final String p = prefix.endsWith(".") ? prefix : (prefix + ".");
+ final Map propsWithPrefix = conf.getPropsWithPrefix(p);
+ for (Map.Entry entry : propsWithPrefix.entrySet()) {
+ // change the schema off each entry
+ String key = entry.getKey();
+ String val = entry.getValue();
+ if (mandatory) {
+ builder.must(key, val);
+ } else {
+ builder.opt(key, val);
+ }
+ }
+ }
+
+ /**
+ * Evaluate a CallableRaisingIOE in the current thread,
+ * converting IOEs to RTEs and propagating.
+ * @param callable callable to invoke
+ * @param Return type.
+ * @return the evaluated result.
+ * @throws UnsupportedOperationException fail fast if unsupported
+ * @throws IllegalArgumentException invalid argument
+ */
+ public static CompletableFuture eval(
+ CallableRaisingIOE callable) {
+ CompletableFuture result = new CompletableFuture<>();
+ try {
+ result.complete(callable.apply());
+ } catch (UnsupportedOperationException | IllegalArgumentException tx) {
+ // fail fast here
+ throw tx;
+ } catch (Throwable tx) {
+ // fail lazily here to ensure callers expect all File IO operations to
+ // surface later
+ result.completeExceptionally(tx);
+ }
+ return result;
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
index 4517bd8ff4a..004220c4bed 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
@@ -814,97 +814,11 @@ exists in the metadata, but no copies of any its blocks can be located;
### `FSDataInputStreamBuilder openFile(Path path)`
-Creates a [`FSDataInputStreamBuilder`](fsdatainputstreambuilder.html)
-to construct a operation to open the file at `path` for reading.
+See [openFile()](openfile.html).
-When `build()` is invoked on the returned `FSDataInputStreamBuilder` instance,
-the builder parameters are verified and
-`openFileWithOptions(Path, OpenFileParameters)` invoked.
-
-This (protected) operation returns a `CompletableFuture`
-which, when its `get()` method is called, either returns an input
-stream of the contents of opened file, or raises an exception.
-
-The base implementation of the `openFileWithOptions(PathHandle, OpenFileParameters)`
-ultimately invokes `open(Path, int)`.
-
-Thus the chain `openFile(path).build().get()` has the same preconditions
-and postconditions as `open(Path p, int bufferSize)`
-
-However, there is one difference which implementations are free to
-take advantage of:
-
-The returned stream MAY implement a lazy open where file non-existence or
-access permission failures may not surface until the first `read()` of the
-actual data.
-
-The `openFile()` operation may check the state of the filesystem during its
-invocation, but as the state of the filesystem may change betwen this call and
-the actual `build()` and `get()` operations, this file-specific
-preconditions (file exists, file is readable, etc) MUST NOT be checked here.
-
-FileSystem implementations which do not implement `open(Path, int)`
-MAY postpone raising an `UnsupportedOperationException` until either the
-`FSDataInputStreamBuilder.build()` or the subsequent `get()` call,
-else they MAY fail fast in the `openFile()` call.
-
-### Implementors notes
-
-The base implementation of `openFileWithOptions()` actually executes
-the `open(path)` operation synchronously, yet still returns the result
-or any failures in the `CompletableFuture<>`, so as to ensure that users
-code expecting this.
-
-Any filesystem where the time to open a file may be significant SHOULD
-execute it asynchronously by submitting the operation in some executor/thread
-pool. This is particularly recommended for object stores and other filesystems
-likely to be accessed over long-haul connections.
-
-Arbitrary filesystem-specific options MAY be supported; these MUST
-be prefixed with either the filesystem schema, e.g. `hdfs.`
-or in the "fs.SCHEMA" format as normal configuration settings `fs.hdfs`). The
-latter style allows the same configuration option to be used for both
-filesystem configuration and file-specific configuration.
-
-It SHOULD be possible to always open a file without specifying any options,
-so as to present a consistent model to users. However, an implementation MAY
-opt to require one or more mandatory options to be set.
-
-The returned stream may perform "lazy" evaluation of file access. This is
-relevant for object stores where the probes for existence are expensive, and,
-even with an asynchronous open, may be considered needless.
-
### `FSDataInputStreamBuilder openFile(PathHandle)`
-Creates a `FSDataInputStreamBuilder` to build an operation to open a file.
-Creates a [`FSDataInputStreamBuilder`](fsdatainputstreambuilder.html)
-to construct a operation to open the file identified by the given `PathHandle` for reading.
-
-When `build()` is invoked on the returned `FSDataInputStreamBuilder` instance,
-the builder parameters are verified and
-`openFileWithOptions(PathHandle, OpenFileParameters)` invoked.
-
-This (protected) operation returns a `CompletableFuture`
-which, when its `get()` method is called, either returns an input
-stream of the contents of opened file, or raises an exception.
-
-The base implementation of the `openFileWithOptions(PathHandle, OpenFileParameters)` method
-returns a future which invokes `open(Path, int)`.
-
-Thus the chain `openFile(pathhandle).build().get()` has the same preconditions
-and postconditions as `open(Pathhandle, int)`
-
-As with `FSDataInputStreamBuilder openFile(PathHandle)`, the `openFile()`
-call must not be where path-specific preconditions are checked -that
-is postponed to the `build()` and `get()` calls.
-
-FileSystem implementations which do not implement `open(PathHandle handle, int bufferSize)`
-MAY postpone raising an `UnsupportedOperationException` until either the
-`FSDataInputStreamBuilder.build()` or the subsequent `get()` call,
-else they MAY fail fast in the `openFile()` call.
-
-The base implementation raises this exception in the `build()` operation;
-other implementations SHOULD copy this.
+See [openFile()](openfile.html).
### `PathHandle getPathHandle(FileStatus stat, HandleOpt... options)`
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md
index eadba174fc1..db630e05c22 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md
@@ -13,10 +13,10 @@
-->
-
+
-# class `org.apache.hadoop.fs.FSDataInputStreamBuilder`
+# class `org.apache.hadoop.fs.FutureDataInputStreamBuilder`
@@ -27,7 +27,7 @@ file for reading.
## Invariants
-The `FSDataInputStreamBuilder` interface does not require parameters or
+The `FutureDataInputStreamBuilder` interface does not require parameters or
or the state of `FileSystem` until [`build()`](#build) is
invoked and/or during the asynchronous open operation itself.
@@ -39,11 +39,11 @@ path validation.
## Implementation-agnostic parameters.
-### `FSDataInputStreamBuilder bufferSize(int bufSize)`
+### `FutureDataInputStreamBuilder bufferSize(int bufSize)`
Set the size of the buffer to be used.
-### `FSDataInputStreamBuilder withFileStatus(FileStatus status)`
+### `FutureDataInputStreamBuilder withFileStatus(FileStatus status)`
A `FileStatus` instance which refers to the file being opened.
@@ -53,7 +53,7 @@ So potentially saving on remote calls especially to object stores.
Requirements:
* `status != null`
-* `status.getPath()` == the resolved path of the file being opened.
+* `status.getPath().getName()` == the name of the file being opened.
The path validation MUST take place if the store uses the `FileStatus` when
it opens files, and MAY be performed otherwise. The validation
@@ -65,27 +65,85 @@ If a filesystem implementation extends the `FileStatus` returned in its
implementation MAY use this information when opening the file.
This is relevant with those stores which return version/etag information,
-including the S3A and ABFS connectors -they MAY use this to guarantee that
-the file they opened is exactly the one returned in the listing.
+-they MAY use this to guarantee that the file they opened
+is exactly the one returned in the listing.
+
+
+The final `status.getPath().getName()` element of the supplied status MUST equal
+the name value of the path supplied to the `openFile(path)` call.
+
+Filesystems MUST NOT validate the rest of the path.
+This is needed to support viewfs and other mount-point wrapper filesystems
+where schemas and paths are different. These often create their own FileStatus results
+
+Preconditions
+
+```python
+status == null or status.getPath().getName() == path.getName()
+
+```
+
+Filesystems MUST NOT require the class of `status` to equal
+that of any specific subclass their implementation returns in filestatus/list
+operations. This is to support wrapper filesystems and serialization/deserialization
+of the status.
+
### Set optional or mandatory parameters
- FSDataInputStreamBuilder opt(String key, ...)
- FSDataInputStreamBuilder must(String key, ...)
+ FutureDataInputStreamBuilder opt(String key, ...)
+ FutureDataInputStreamBuilder must(String key, ...)
Set optional or mandatory parameters to the builder. Using `opt()` or `must()`,
client can specify FS-specific parameters without inspecting the concrete type
of `FileSystem`.
+Example:
+
```java
out = fs.openFile(path)
- .opt("fs.s3a.experimental.input.fadvise", "random")
- .must("fs.s3a.readahead.range", 256 * 1024)
+ .must("fs.option.openfile.read.policy", "random")
+ .opt("fs.http.connection.timeout", 30_000L)
.withFileStatus(statusFromListing)
.build()
.get();
```
+Here the read policy of `random` has been specified,
+with the requirement that the filesystem implementation must understand the option.
+An http-specific option has been supplied which may be interpreted by any store;
+If the filesystem opening the file does not recognize the option, it can safely be
+ignored.
+
+### When to use `opt()` versus `must()`
+
+The difference between `opt()` versus `must()` is how the FileSystem opening
+the file must react to an option which it does not recognize.
+
+```python
+
+def must(name, value):
+ if not name in known_keys:
+ raise IllegalArgumentException
+ if not name in supported_keys:
+ raise UnsupportedException
+
+
+def opt(name, value):
+ if not name in known_keys:
+ # ignore option
+
+```
+
+For any known key, the validation of the `value` argument MUST be the same
+irrespective of how the (key, value) pair was declared.
+
+1. For a filesystem-specific option, it is the choice of the implementation
+ how to validate the entry.
+1. For standard options, the specification of what is a valid `value` is
+ defined in this filesystem specification, validated through contract
+ tests.
+
#### Implementation Notes
Checking for supported options must be performed in the `build()` operation.
@@ -93,9 +151,9 @@ Checking for supported options must be performed in the `build()` operation.
1. If a mandatory parameter declared via `must(key, value)`) is not recognized,
`IllegalArgumentException` MUST be thrown.
-1. If a mandatory parameter declared via `must(key, value)`) relies on
+1. If a mandatory parameter declared via `must(key, value)` relies on
a feature which is recognized but not supported in the specific
-Filesystem/FileContext instance `UnsupportedException` MUST be thrown.
+`FileSystem`/`FileContext` instance `UnsupportedException` MUST be thrown.
The behavior of resolving the conflicts between the parameters set by
builder methods (i.e., `bufferSize()`) and `opt()`/`must()` is as follows:
@@ -110,13 +168,18 @@ custom subclasses.
This is critical to ensure safe use of the feature: directory listing/
status serialization/deserialization can result result in the `withFileStatus()`
-argumennt not being the custom subclass returned by the Filesystem instance's
+argument not being the custom subclass returned by the Filesystem instance's
own `getFileStatus()`, `listFiles()`, `listLocatedStatus()` calls, etc.
In such a situation the implementations must:
-1. Validate the path (always).
-1. Use the status/convert to the custom type, *or* simply discard it.
+1. Verify that `status.getPath().getName()` matches the current `path.getName()`
+ value. The rest of the path MUST NOT be validated.
+1. Use any status fields as desired -for example the file length.
+
+Even if not values of the status are used, the presence of the argument
+can be interpreted as the caller declaring that they believe the file
+to be present and of the given size.
## Builder interface
@@ -128,26 +191,499 @@ completed, returns an input stream which can read data from the filesystem.
The `build()` operation MAY perform the validation of the file's existence,
its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the returned
+ `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+ any of the read such as `read()` or `PositionedRead`.
That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
-Thus, if even a file does not exist, the following call will still succeed, returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
+the following call MUST succeed, returning a `CompletableFuture` to be evaluated.
```java
Path p = new Path("file://tmp/file-which-does-not-exist");
CompletableFuture future = p.getFileSystem(conf)
.openFile(p)
- .build;
+ .build();
```
-The preconditions for opening the file are checked during the asynchronous
-evaluation, and so will surface when the future is completed:
+The inability to access/read a file MUST raise an `IOException`or subclass
+in either the future's `get()` call, or, for late binding operations,
+when an operation to read data is invoked.
+
+Therefore the following sequence SHALL fail when invoked on the
+`future` returned by the previous example.
```java
-FSDataInputStream in = future.get();
+ future.get().read();
```
+
+Access permission checks have the same visibility requirements: permission failures
+MUST be delayed until the `get()` call and MAY be delayed into subsequent operations.
+
+Note: some operations on the input stream, such as `seek()` may not attempt any IO
+at all. Such operations MAY NOT raise exceotions when interacting with
+nonexistent/unreadable files.
+
+## Standard `openFile()` options since Hadoop 3.3.3
+
+These are options which `FileSystem` and `FileContext` implementation
+MUST recognise and MAY support by changing the behavior of
+their input streams as appropriate.
+
+Hadoop 3.3.0 added the `openFile()` API; these standard options were defined in
+a later release. Therefore, although they are "well known", unless confident that
+the application will only be executed against releases of Hadoop which knows of
+the options -applications SHOULD set the options via `opt()` calls rather than `must()`.
+
+When opening a file through the `openFile()` builder API, callers MAY use
+both `.opt(key, value)` and `.must(key, value)` calls to set standard and
+filesystem-specific options.
+
+If set as an `opt()` parameter, unsupported "standard" options MUST be ignored,
+as MUST unrecognized standard options.
+
+If set as a `must()` parameter, unsupported "standard" options MUST be ignored.
+unrecognized standard options MUST be rejected.
+
+The standard `openFile()` options are defined
+in `org.apache.hadoop.fs.OpenFileOptions`; they all SHALL start
+with `fs.option.openfile.`.
+
+Note that while all `FileSystem`/`FileContext` instances SHALL support these
+options to the extent that `must()` declarations SHALL NOT fail, the
+implementations MAY support them to the extent of interpreting the values. This
+means that it is not a requirement for the stores to actually read the read
+policy or file length values and use them when opening files.
+
+Unless otherwise stated, they SHOULD be viewed as hints.
+
+Note: if a standard option is added such that if set but not
+supported would be an error, then implementations SHALL reject it. For example,
+the S3A filesystem client supports the ability to push down SQL commands. If
+something like that were ever standardized, then the use of the option, either
+in `opt()` or `must()` argument MUST be rejected for filesystems which don't
+support the feature.
+
+### Option: `fs.option.openfile.buffer.size`
+
+Read buffer size in bytes.
+
+This overrides the default value set in the configuration with the option
+`io.file.buffer.size`.
+
+It is supported by all filesystem clients which allow for stream-specific buffer
+sizes to be set via `FileSystem.open(path, buffersize)`.
+
+### Option: `fs.option.openfile.read.policy`
+
+Declare the read policy of the input stream. This is a hint as to what the
+expected read pattern of an input stream will be. This MAY control readahead,
+buffering and other optimizations.
+
+Sequential reads may be optimized with prefetching data and/or reading data in
+larger blocks. Some applications (e.g. distCp) perform sequential IO even over
+columnar data.
+
+In contrast, random IO reads data in different parts of the file using a
+sequence of `seek()/read()`
+or via the `PositionedReadable` or `ByteBufferPositionedReadable` APIs.
+
+Random IO performance may be best if little/no prefetching takes place, along
+with other possible optimizations
+
+Queries over columnar formats such as Apache ORC and Apache Parquet perform such
+random IO; other data formats may be best read with sequential or whole-file
+policies.
+
+What is key is that optimizing reads for seqential reads may impair random
+performance -and vice versa.
+
+1. The seek policy is a hint; even if declared as a `must()` option, the
+ filesystem MAY ignore it.
+1. The interpretation/implementation of a policy is a filesystem specific
+ behavior -and it may change with Hadoop releases and/or specific storage
+ subsystems.
+1. If a policy is not recognized, the filesystem client MUST ignore it.
+
+| Policy | Meaning |
+|--------------|----------------------------------------------------------|
+| `adaptive` | Any adaptive policy implemented by the store. |
+| `default` | The default policy for this store. Generally "adaptive". |
+| `random` | Optimize for random access. |
+| `sequential` | Optimize for sequential access. |
+| `vector` | The Vectored IO API is intended to be used. |
+| `whole-file` | The whole file will be read. |
+
+Choosing the wrong read policy for an input source may be inefficient.
+
+A list of read policies MAY be supplied; the first one recognized/supported by
+the filesystem SHALL be the one used. This allows for custom policies to be
+supported, for example an `hbase-hfile` policy optimized for HBase HFiles.
+
+The S3A and ABFS input streams both implement
+the [IOStatisticsSource](iostatistics.html) API, and can be queried for their IO
+Performance.
+
+*Tip:* log the `toString()` value of input streams at `DEBUG`. The S3A and ABFS
+Input Streams log read statistics, which can provide insight about whether reads
+are being performed efficiently or not.
+
+_Futher reading_
+
+* [Linux fadvise()](https://linux.die.net/man/2/fadvise).
+* [Windows `CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior)
+
+#### Read Policy `adaptive`
+
+Try to adapt the seek policy to the read pattern of the application.
+
+The `normal` policy of the S3A client and the sole policy supported by
+the `wasb:` client are both adaptive -they assume sequential IO, but once a
+backwards seek/positioned read call is made the stream switches to random IO.
+
+Other filesystem implementations may wish to adopt similar strategies, and/or
+extend the algorithms to detect forward seeks and/or switch from random to
+sequential IO if that is considered more efficient.
+
+Adaptive read policies are the absence of the ability to
+declare the seek policy in the `open()` API, so requiring it to be declared, if
+configurable, in the cluster/application configuration. However, the switch from
+sequential to random seek policies may be exensive.
+
+When applications explicitly set the `fs.option.openfile.read.policy` option, if
+they know their read plan, they SHOULD declare which policy is most appropriate.
+
+#### Read Policy ``
+
+The default policy for the filesystem instance.
+Implementation/installation-specific.
+
+#### Read Policy `sequential`
+
+Expect sequential reads from the first byte read to the end of the file/until
+the stream is closed.
+
+#### Read Policy `random`
+
+Expect `seek()/read()` sequences, or use of `PositionedReadable`
+or `ByteBufferPositionedReadable` APIs.
+
+
+#### Read Policy `vector`
+
+This declares that the caller intends to use the Vectored read API of
+[HADOOP-11867](https://issues.apache.org/jira/browse/HADOOP-11867)
+_Add a high-performance vectored read API_.
+
+This is a hint: it is not a requirement when using the API.
+It does inform the implemenations that the stream should be
+configured for optimal vectored IO performance, if such a
+feature has been implemented.
+
+It is *not* exclusive: the same stream may still be used for
+classic `InputStream` and `PositionedRead` API calls.
+Implementations SHOULD use the `random` read policy
+with these operations.
+
+#### Read Policy `whole-file`
+
+
+This declares that the whole file is to be read end-to-end; the file system client is free to enable
+whatever strategies maximise performance for this. In particular, larger ranged reads/GETs can
+deliver high bandwidth by reducing socket/TLS setup costs and providing a connection long-lived
+enough for TCP flow control to determine the optimal download rate.
+
+Strategies can include:
+
+* Initiate an HTTP GET of the entire file in `openFile()` operation.
+* Prefech data in large blocks, possibly in parallel read operations.
+
+Applications which know that the entire file is to be read from an opened stream SHOULD declare this
+read policy.
+
+### Option: `fs.option.openfile.length`
+
+Declare the length of a file.
+
+This can be used by clients to skip querying a remote store for the size
+of/existence of a file when opening it, similar to declaring a file status
+through the `withFileStatus()` option.
+
+If supported by a filesystem connector, this option MUST be interpreted as
+declaring the minimum length of the file:
+
+1. If the value is negative, the option SHALL be considered unset.
+2. It SHALL NOT be an error if the actual length of the file is greater than
+ this value.
+3. `read()`, `seek()` and positioned read calls MAY use a position across/beyond
+ this length but below the actual length of the file. Implementations MAY
+ raise `EOFExceptions` in such cases, or they MAY return data.
+
+If this option is used by the FileSystem implementation
+
+*Implementor's Notes*
+
+* A value of `fs.option.openfile.length` < 0 MUST be rejected.
+* If a file status is supplied along with a value in `fs.opt.openfile.length`;
+ the file status values take precedence.
+
+### Options: `fs.option.openfile.split.start` and `fs.option.openfile.split.end`
+
+Declare the start and end of the split when a file has been split for processing
+in pieces.
+
+1. If a value is negative, the option SHALL be considered unset.
+1. Filesystems MAY assume that the length of the file is greater than or equal
+ to the value of `fs.option.openfile.split.end`.
+1. And that they MAY raise an exception if the client application reads past the
+ value set in `fs.option.openfile.split.end`.
+1. The pair of options MAY be used to optimise the read plan, such as setting
+ the content range for GET requests, or using the split end as an implicit
+ declaration of the guaranteed minimum length of the file.
+1. If both options are set, and the split start is declared as greater than the
+ split end, then the split start SHOULD just be reset to zero, rather than
+ rejecting the operation.
+
+The split end value can provide a hint as to the end of the input stream. The
+split start can be used to optimize any initial read offset for filesystem
+clients.
+
+*Note for implementors: applications will read past the end of a split when they
+need to read to the end of a record/line which begins before the end of the
+split.
+
+Therefore clients MUST be allowed to `seek()`/`read()` past the length
+set in `fs.option.openfile.split.end` if the file is actually longer
+than that value.
+
+## S3A-specific options
+
+The S3A Connector supports custom options for readahead and seek policy.
+
+| Name | Type | Meaning |
+|--------------------------------------|----------|-------------------------------------------------------------|
+| `fs.s3a.readahead.range` | `long` | readahead range in bytes |
+| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to asynchronous draining of the stream |
+| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` |
+
+If the option set contains a SQL statement in the `fs.s3a.select.sql` statement,
+then the file is opened as an S3 Select query.
+Consult the S3A documentation for more details.
+
+## ABFS-specific options
+
+The ABFS Connector supports custom input stream options.
+
+| Name | Type | Meaning |
+|-----------------------------------|-----------|----------------------------------------------------|
+| `fs.azure.buffered.pread.disable` | `boolean` | disable caching on the positioned read operations. |
+
+
+Disables caching on data read through the [PositionedReadable](fsdatainputstream.html#PositionedReadable)
+APIs.
+
+Consult the ABFS Documentation for more details.
+
+## Examples
+
+#### Declaring seek policy and split limits when opening a file.
+
+Here is an example from a proof of
+concept `org.apache.parquet.hadoop.util.HadoopInputFile`
+reader which uses a (nullable) file status and a split start/end.
+
+The `FileStatus` value is always passed in -but if it is null, then the split
+end is used to declare the length of the file.
+
+```java
+protected SeekableInputStream newStream(Path path, FileStatus stat,
+ long splitStart, long splitEnd)
+ throws IOException {
+
+ FutureDataInputStreamBuilder builder = fs.openFile(path)
+ .opt("fs.option.openfile.read.policy", "vector, random")
+ .withFileStatus(stat);
+
+ builder.opt("fs.option.openfile.split.start", splitStart);
+ builder.opt("fs.option.openfile.split.end", splitEnd);
+ CompletableFuture streamF = builder.build();
+ return HadoopStreams.wrap(FutureIO.awaitFuture(streamF));
+}
+```
+
+As a result, whether driven directly by a file listing, or when opening a file
+from a query plan of `(path, splitStart, splitEnd)`, there is no need to probe
+the remote store for the length of the file. When working with remote object
+stores, this can save tens to hundreds of milliseconds, even if such a probe is
+done asynchronously.
+
+If both the file length and the split end is set, then the file length MUST be
+considered "more" authoritative, that is it really SHOULD be defining the file
+length. If the split end is set, the caller MAY ot read past it.
+
+The `CompressedSplitLineReader` can read past the end of a split if it is
+partway through processing a compressed record. That is: it assumes an
+incomplete record read means that the file length is greater than the split
+length, and that it MUST read the entirety of the partially read record. Other
+readers may behave similarly.
+
+Therefore
+
+1. File length as supplied in a `FileStatus` or in `fs.option.openfile.length`
+ SHALL set the strict upper limit on the length of a file
+2. The split end as set in `fs.option.openfile.split.end` MUST be viewed as a
+ hint, rather than the strict end of the file.
+
+### Opening a file with both standard and non-standard options
+
+Standard and non-standard options MAY be combined in the same `openFile()`
+operation.
+
+```java
+Future f = openFile(path)
+ .must("fs.option.openfile.read.policy", "random, adaptive")
+ .opt("fs.s3a.readahead.range", 1024 * 1024)
+ .build();
+
+FSDataInputStream is = f.get();
+```
+
+The option set in `must()` MUST be understood, or at least recognized and
+ignored by all filesystems. In this example, S3A-specific option MAY be
+ignored by all other filesystem clients.
+
+### Opening a file with older releases
+
+Not all hadoop releases recognize the `fs.option.openfile.read.policy` option.
+
+The option can be safely used in application code if it is added via the `opt()`
+builder argument, as it will be treated as an unknown optional key which can
+then be discarded.
+
+```java
+Future f = openFile(path)
+ .opt("fs.option.openfile.read.policy", "vector, random, adaptive")
+ .build();
+
+FSDataInputStream is = f.get();
+```
+
+*Note 1* if the option name is set by a reference to a constant in
+`org.apache.hadoop.fs.Options.OpenFileOptions`, then the program will not link
+against versions of Hadoop without the specific option. Therefore for resilient
+linking against older releases -use a copy of the value.
+
+*Note 2* as option validation is performed in the FileSystem connector,
+a third-party connector designed to work with multiple hadoop versions
+MAY NOT support the option.
+
+### Passing options in to MapReduce
+
+Hadoop MapReduce will automatically read MR Job Options with the prefixes
+`mapreduce.job.input.file.option.` and `mapreduce.job.input.file.must.`
+prefixes, and apply these values as `.opt()` and `must()` respectively, after
+remove the mapreduce-specific prefixes.
+
+This makes passing options in to MR jobs straightforward. For example, to
+declare that a job should read its data using random IO:
+
+```java
+JobConf jobConf = (JobConf) job.getConfiguration()
+jobConf.set(
+ "mapreduce.job.input.file.option.fs.option.openfile.read.policy",
+ "random");
+```
+
+### MapReduce input format propagating options
+
+An example of a record reader passing in options to the file it opens.
+
+```java
+ public void initialize(InputSplit genericSplit,
+ TaskAttemptContext context) throws IOException {
+ FileSplit split = (FileSplit)genericSplit;
+ Configuration job = context.getConfiguration();
+ start = split.getStart();
+ end = start + split.getLength();
+ Path file = split.getPath();
+
+ // open the file and seek to the start of the split
+ FutureDataInputStreamBuilder builder =
+ file.getFileSystem(job).openFile(file);
+ // the start and end of the split may be used to build
+ // an input strategy.
+ builder.opt("fs.option.openfile.split.start", start);
+ builder.opt("fs.option.openfile.split.end", end);
+ FutureIO.propagateOptions(builder, job,
+ "mapreduce.job.input.file.option",
+ "mapreduce.job.input.file.must");
+
+ fileIn = FutureIO.awaitFuture(builder.build());
+ fileIn.seek(start)
+ /* Rest of the operation on the opened stream */
+ }
+```
+
+### `FileContext.openFile`
+
+From `org.apache.hadoop.fs.AvroFSInput`; a file is opened with sequential input.
+Because the file length has already been probed for, the length is passd down
+
+```java
+ public AvroFSInput(FileContext fc, Path p) throws IOException {
+ FileStatus status = fc.getFileStatus(p);
+ this.len = status.getLen();
+ this.stream = awaitFuture(fc.openFile(p)
+ .opt("fs.option.openfile.read.policy",
+ "sequential")
+ .opt("fs.option.openfile.length",
+ Long.toString(status.getLen()))
+ .build());
+ fc.open(p);
+ }
+```
+
+In this example, the length is passed down as a string (via `Long.toString()`)
+rather than directly as a long. This is to ensure that the input format will
+link against versions of $Hadoop which do not have the
+`opt(String, long)` and `must(String, long)` builder parameters. Similarly, the
+values are passed as optional, so that if unrecognized the application will
+still succeed.
+
+### Example: reading a whole file
+
+This is from `org.apache.hadoop.util.JsonSerialization`.
+
+Its `load(FileSystem, Path, FileStatus)` method
+* declares the whole file is to be read end to end.
+* passes down the file status
+
+```java
+public T load(FileSystem fs,
+ Path path,
+ status)
+ throws IOException {
+
+ try (FSDataInputStream dataInputStream =
+ awaitFuture(fs.openFile(path)
+ .opt("fs.option.openfile.read.policy", "whole-file")
+ .withFileStatus(status)
+ .build())) {
+ return fromJsonStream(dataInputStream);
+ } catch (JsonProcessingException e) {
+ throw new PathIOException(path.toString(),
+ "Failed to read JSON file " + e, e);
+ }
+}
+```
+
+*Note:* : in Hadoop 3.3.2 and earlier, the `withFileStatus(status)` call
+required a non-null parameter; this has since been relaxed.
+For maximum compatibility across versions, only invoke the method
+when the file status is known to be non-null.
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
index a4aa136033a..e18f4c3bf4a 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
@@ -41,3 +41,4 @@ HDFS as these are commonly expected by Hadoop client applications.
2. [Extending the specification and its tests](extending.html)
1. [Uploading a file using Multiple Parts](multipartuploader.html)
1. [IOStatistics](iostatistics.html)
+1. [openFile()](openfile.html).
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md
new file mode 100644
index 00000000000..afb3245c510
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md
@@ -0,0 +1,122 @@
+
+
+# `FileSystem.openFile()`/`FileContext.openFile()`
+
+This is a method provided by both FileSystem and FileContext for
+advanced file opening options and, where implemented,
+an asynchrounous/lazy opening of a file.
+
+Creates a builder to open a file, supporting options
+both standard and filesystem specific. The return
+value of the `build()` call is a `Future`,
+which must be waited on. The file opening may be
+asynchronous, and it may actually be postponed (including
+permission/existence checks) until reads are actually
+performed.
+
+This API call was added to `FileSystem` and `FileContext` in
+Hadoop 3.3.0; it was tuned in Hadoop 3.3.1 as follows.
+
+* Added `opt(key, long)` and `must(key, long)`.
+* Declared that `withFileStatus(null)` is allowed.
+* Declared that `withFileStatus(status)` only checks
+ the filename of the path, not the full path.
+ This is needed to support passthrough/mounted filesystems.
+* Added standard option keys.
+
+### `FutureDataInputStreamBuilder openFile(Path path)`
+
+Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html)
+to construct a operation to open the file at `path` for reading.
+
+When `build()` is invoked on the returned `FutureDataInputStreamBuilder` instance,
+the builder parameters are verified and
+`FileSystem.openFileWithOptions(Path, OpenFileParameters)` or
+`AbstractFileSystem.openFileWithOptions(Path, OpenFileParameters)` invoked.
+
+These protected methods returns a `CompletableFuture`
+which, when its `get()` method is called, either returns an input
+stream of the contents of opened file, or raises an exception.
+
+The base implementation of the `FileSystem.openFileWithOptions(PathHandle, OpenFileParameters)`
+ultimately invokes `FileSystem.open(Path, int)`.
+
+Thus the chain `FileSystem.openFile(path).build().get()` has the same preconditions
+and postconditions as `FileSystem.open(Path p, int bufferSize)`
+
+However, there is one difference which implementations are free to
+take advantage of:
+
+The returned stream MAY implement a lazy open where file non-existence or
+access permission failures may not surface until the first `read()` of the
+actual data.
+
+This saves network IO on object stores.
+
+The `openFile()` operation MAY check the state of the filesystem during its
+invocation, but as the state of the filesystem may change between this call and
+the actual `build()` and `get()` operations, this file-specific
+preconditions (file exists, file is readable, etc) MUST NOT be checked here.
+
+FileSystem implementations which do not implement `open(Path, int)`
+MAY postpone raising an `UnsupportedOperationException` until either the
+`FutureDataInputStreamBuilder.build()` or the subsequent `get()` call,
+else they MAY fail fast in the `openFile()` call.
+
+Consult [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html) for details
+on how to use the builder, and for standard options which may be passed in.
+
+### `FutureDataInputStreamBuilder openFile(PathHandle)`
+
+Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html)
+to construct a operation to open the file identified by the given `PathHandle` for reading.
+
+If implemented by a filesystem, the semantics of [`openFile(Path)`](#openfile_path_)
+Thus the chain `openFile(pathhandle).build().get()` has the same preconditions and postconditions
+as `open(Pathhandle, int)`
+
+FileSystem implementations which do not implement `open(PathHandle handle, int bufferSize)`
+MAY postpone raising an `UnsupportedOperationException` until either the
+`FutureDataInputStreamBuilder.build()` or the subsequent `get()` call, else they MAY fail fast in
+the `openFile(PathHandle)` call.
+
+The base implementation raises this exception in the `build()` operation; other implementations
+SHOULD copy this.
+
+### Implementors notes
+
+The base implementation of `openFileWithOptions()` actually executes
+the `open(path)` operation synchronously, yet still returns the result
+or any failures in the `CompletableFuture<>`, so as to provide a consistent
+lifecycle across all filesystems.
+
+Any filesystem client where the time to open a file may be significant SHOULD
+execute it asynchronously by submitting the operation in some executor/thread
+pool. This is particularly recommended for object stores and other filesystems
+likely to be accessed over long-haul connections.
+
+Arbitrary filesystem-specific options MAY be supported; these MUST
+be prefixed with either the filesystem schema, e.g. `hdfs.`
+or in the `fs.SCHEMA` format as normal configuration settings `fs.hdfs`. The
+latter style allows the same configuration option to be used for both
+filesystem configuration and file-specific configuration.
+
+It SHOULD be possible to always open a file without specifying any options,
+so as to present a consistent model to users. However, an implementation MAY
+opt to require one or more mandatory options to be set.
+
+The returned stream may perform "lazy" evaluation of file access. This is
+relevant for object stores where the probes for existence are expensive, and,
+even with an asynchronous open, may be considered needless.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
index 3e754e4578d..c395afdb377 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
@@ -50,11 +50,11 @@ import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
-import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/**
* Tests of multipart uploads.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
index a43053180fb..25bfe082b01 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
@@ -30,14 +30,18 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.IOUtils;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.compareByteArrays;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
import org.junit.Test;
@@ -232,7 +236,7 @@ public abstract class AbstractContractOpenTest
getFileSystem().openFile(path("testAwaitFutureFailToFNFE"))
.opt("fs.test.something", true);
intercept(FileNotFoundException.class,
- () -> FutureIOSupport.awaitFuture(builder.build()));
+ () -> awaitFuture(builder.build()));
}
@Test
@@ -242,7 +246,7 @@ public abstract class AbstractContractOpenTest
getFileSystem().openFile(path("testAwaitFutureFailToFNFE"))
.opt("fs.test.something", true);
intercept(FileNotFoundException.class,
- () -> FutureIOSupport.awaitFuture(builder.build(),
+ () -> awaitFuture(builder.build(),
10, TimeUnit.DAYS));
}
@@ -250,7 +254,7 @@ public abstract class AbstractContractOpenTest
public void testOpenFileExceptionallyTranslating() throws Throwable {
describe("openFile missing file chains into exceptionally()");
CompletableFuture f = getFileSystem()
- .openFile(path("testOpenFileUnknownOption")).build();
+ .openFile(path("testOpenFileExceptionallyTranslating")).build();
interceptFuture(RuntimeException.class,
"exceptionally",
f.exceptionally(ex -> {
@@ -262,11 +266,12 @@ public abstract class AbstractContractOpenTest
public void testChainedFailureAwaitFuture() throws Throwable {
describe("await Future handles chained failures");
CompletableFuture f = getFileSystem()
- .openFile(path("testOpenFileUnknownOption"))
+ .openFile(path("testChainedFailureAwaitFuture"))
+ .withFileStatus(null)
.build();
intercept(RuntimeException.class,
"exceptionally",
- () -> FutureIOSupport.awaitFuture(
+ () -> awaitFuture(
f.exceptionally(ex -> {
throw new RuntimeException("exceptionally", ex);
})));
@@ -280,13 +285,34 @@ public abstract class AbstractContractOpenTest
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
+ FileStatus st = fs.getFileStatus(path);
CompletableFuture readAllBytes = fs.openFile(path)
- .withFileStatus(fs.getFileStatus(path))
+ .withFileStatus(st)
.build()
.thenApply(ContractTestUtils::readStream);
assertEquals("Wrong number of bytes read value",
len,
(long) readAllBytes.get());
+ // now reattempt with a new FileStatus and a different path
+ // other than the final name element
+ // implementations MUST use path in openFile() call
+ FileStatus st2 = new FileStatus(
+ len, false,
+ st.getReplication(),
+ st.getBlockSize(),
+ st.getModificationTime(),
+ st.getAccessTime(),
+ st.getPermission(),
+ st.getOwner(),
+ st.getGroup(),
+ new Path("gopher:///localhost:/" + path.getName()));
+ assertEquals("Wrong number of bytes read value",
+ len,
+ (long) fs.openFile(path)
+ .withFileStatus(st2)
+ .build()
+ .thenApply(ContractTestUtils::readStream)
+ .get());
}
@Test
@@ -298,17 +324,47 @@ public abstract class AbstractContractOpenTest
dataset(4, 0x40, 0x80));
CompletableFuture future = fs.openFile(path).build();
AtomicBoolean accepted = new AtomicBoolean(false);
- future.thenAcceptAsync(i -> accepted.set(true)).get();
+ future.thenApply(stream -> {
+ accepted.set(true);
+ return stream;
+ }).get().close();
assertTrue("async accept operation not invoked",
accepted.get());
}
+ /**
+ * Open a file with a null status, and the length
+ * passed in as an opt() option (along with sequential IO).
+ * The file is opened, the data read, and it must match
+ * the source data.
+ * opt() is used so that integration testing with external
+ * filesystem connectors will downgrade if the option is not
+ * recognized.
+ */
@Test
- public void testOpenFileNullStatus() throws Throwable {
- describe("use openFile() with a null status");
+ public void testOpenFileNullStatusButFileLength() throws Throwable {
+ describe("use openFile() with a null status and expect the status to be"
+ + " ignored. block size, fadvise and length are passed in as"
+ + " opt() options");
Path path = path("testOpenFileNullStatus");
- intercept(NullPointerException.class,
- () -> getFileSystem().openFile(path).withFileStatus(null));
+ FileSystem fs = getFileSystem();
+ int len = 4;
+ byte[] result = new byte[len];
+ byte[] dataset = dataset(len, 0x40, 0x80);
+ createFile(fs, path, true,
+ dataset);
+ CompletableFuture future = fs.openFile(path)
+ .withFileStatus(null)
+ .opt(FS_OPTION_OPENFILE_READ_POLICY,
+ "unknown, sequential, random")
+ .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768)
+ .opt(FS_OPTION_OPENFILE_LENGTH, len)
+ .build();
+
+ try (FSDataInputStream in = future.get()) {
+ in.readFully(result);
+ }
+ compareByteArrays(dataset, result, len);
}
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index e13a49ca10e..eb56d957d9a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -1642,17 +1642,22 @@ public class ContractTestUtils extends Assert {
/**
* Read a whole stream; downgrades an IOE to a runtime exception.
+ * Closes the stream afterwards.
* @param in input
* @return the number of bytes read.
* @throws AssertionError on any IOException
*/
public static long readStream(InputStream in) {
- long count = 0;
+ try {
+ long count = 0;
- while (read(in) >= 0) {
- count++;
+ while (read(in) >= 0) {
+ count++;
+ }
+ return count;
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, in);
}
- return count;
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java
index 22f6c33d2e2..755599f0c39 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java
@@ -36,6 +36,8 @@ import org.assertj.core.api.ObjectAssert;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MIN;
import static org.assertj.core.api.Assertions.assertThat;
/**
@@ -347,6 +349,24 @@ public final class IOStatisticAssertions {
verifyStatisticsNotNull(stats).maximums());
}
+ /**
+ * Assert that a duration is within a given minimum/maximum range.
+ * @param stats statistics source
+ * @param key statistic key without any suffix
+ * @param min minimum statistic must be equal to or greater than this.
+ * @param max maximum statistic must be equal to or less than this.
+ */
+ public static void assertDurationRange(
+ final IOStatistics stats,
+ final String key,
+ final long min,
+ final long max) {
+ assertThatStatisticMinimum(stats, key + SUFFIX_MIN)
+ .isGreaterThanOrEqualTo(min);
+ assertThatStatisticMaximum(stats, key + SUFFIX_MAX)
+ .isLessThanOrEqualTo(max);
+ }
+
/**
* Start an assertion chain on
* a required mean statistic.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java
index 8258b62c1f7..cfde1583e2c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestDurationTracking.java
@@ -30,7 +30,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import org.apache.hadoop.util.functional.FunctionRaisingIOE;
@@ -276,7 +275,7 @@ public class TestDurationTracking extends AbstractHadoopTestBase {
*/
@Test
public void testDurationThroughEval() throws Throwable {
- CompletableFuture