org.apache.hadoop.fs.FutureDataInputStreamBuilder
An interface offering of the Builder pattern for creating Java Future
references to FSDataInputStream
and its subclasses. It is used to initate a (potentially asynchronous) operation to open an existing file for reading.
The FutureDataInputStreamBuilder
interface does not require parameters or or the state of FileSystem
until build()
is invoked and/or during the asynchronous open operation itself.
Some aspects of the state of the filesystem, MAY be checked in the initial openFile()
call, provided they are known to be invariants which will not change between openFile()
and the build().get()
sequence. For example, path validation.
FutureDataInputStreamBuilder bufferSize(int bufSize)
Set the size of the buffer to be used.
FutureDataInputStreamBuilder withFileStatus(FileStatus status)
A FileStatus
instance which refers to the file being opened.
This MAY be used by implementations to short-circuit checks for the file, So potentially saving on remote calls especially to object stores.
Requirements:
status != null
status.getPath().getName()
== the name of the file being opened.The path validation MUST take place if the store uses the FileStatus
when it opens files, and MAY be performed otherwise. The validation SHOULD be postponed until the build()
operation.
This operation should be considered a hint to the filesystem.
If a filesystem implementation extends the FileStatus
returned in its implementation MAY use this information when opening the file.
This is relevant with those stores which return version/etag information, -they MAY use this to guarantee that the file they opened is exactly the one returned in the listing.
The final status.getPath().getName()
element of the supplied status MUST equal the name value of the path supplied to the openFile(path)
call.
Filesystems MUST NOT validate the rest of the path. This is needed to support viewfs and other mount-point wrapper filesystems where schemas and paths are different. These often create their own FileStatus results
Preconditions
status == null or status.getPath().getName() == path.getName()
Filesystems MUST NOT require the class of status
to equal that of any specific subclass their implementation returns in filestatus/list operations. This is to support wrapper filesystems and serialization/deserialization of the status.
FutureDataInputStreamBuilder opt(String key, ...) FutureDataInputStreamBuilder must(String key, ...)
Set optional or mandatory parameters to the builder. Using opt()
or must()
, client can specify FS-specific parameters without inspecting the concrete type of FileSystem
.
Example:
out = fs.openFile(path) .must("fs.option.openfile.read.policy", "random") .opt("fs.http.connection.timeout", 30_000L) .withFileStatus(statusFromListing) .build() .get();
Here the read policy of random
has been specified, with the requirement that the filesystem implementation must understand the option. An http-specific option has been supplied which may be interpreted by any store; If the filesystem opening the file does not recognize the option, it can safely be ignored.
opt()
versus must()
The difference between opt()
versus must()
is how the FileSystem opening the file must react to an option which it does not recognize.
def must(name, value): if not name in known_keys: raise IllegalArgumentException if not name in supported_keys: raise UnsupportedException def opt(name, value): if not name in known_keys: # ignore option
For any known key, the validation of the value
argument MUST be the same irrespective of how the (key, value) pair was declared.
value
is defined in this filesystem specification, validated through contract tests.Checking for supported options must be performed in the build()
operation.
If a mandatory parameter declared via must(key, value)
) is not recognized, IllegalArgumentException
MUST be thrown.
If a mandatory parameter declared via must(key, value)
relies on a feature which is recognized but not supported in the specific FileSystem
/FileContext
instance UnsupportedException
MUST be thrown.
The behavior of resolving the conflicts between the parameters set by builder methods (i.e., bufferSize()
) and opt()
/must()
is as follows:
The last option specified defines the value and its optional/mandatory state.
If the FileStatus
option passed in withFileStatus()
is used, implementations MUST accept all subclasses of FileStatus
, including LocatedFileStatus
, rather than just any FS-specific subclass implemented by the implementation (e.g S3AFileStatus
). They MAY simply ignore those which are not the custom subclasses.
This is critical to ensure safe use of the feature: directory listing/ status serialization/deserialization can result in the withFileStatus()
argument not being the custom subclass returned by the Filesystem instance’s own getFileStatus()
, listFiles()
, listLocatedStatus()
calls, etc.
In such a situation the implementations must:
status.getPath().getName()
matches the current path.getName()
value. The rest of the path MUST NOT be validated.Even if not values of the status are used, the presence of the argument can be interpreted as the caller declaring that they believe the file to be present and of the given size.
CompletableFuture<FSDataInputStream> build()
Return an CompletableFuture<FSDataInputStream>
which, when successfully completed, returns an input stream which can read data from the filesystem.
The build()
operation MAY perform the validation of the file’s existence, its kind, so rejecting attempts to read from a directory or non-existent file. Alternatively * file existence/status checks MAY be performed asynchronously within the returned CompletableFuture<>
. * file existence/status checks MAY be postponed until the first byte is read in any of the read such as read()
or PositionedRead
.
That is, the precondition exists(FS, path)
and isFile(FS, path)
are only guaranteed to have been met after the get()
called on returned future and an attempt has been made to read the stream.
Thus, if even when file does not exist, or is a directory rather than a file, the following call MUST succeed, returning a CompletableFuture
to be evaluated.
Path p = new Path("file://tmp/file-which-does-not-exist"); CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf) .openFile(p) .build();
The inability to access/read a file MUST raise an IOException
or subclass in either the future’s get()
call, or, for late binding operations, when an operation to read data is invoked.
Therefore the following sequence SHALL fail when invoked on the future
returned by the previous example.
future.get().read();
Access permission checks have the same visibility requirements: permission failures MUST be delayed until the get()
call and MAY be delayed into subsequent operations.
Note: some operations on the input stream, such as seek()
may not attempt any IO at all. Such operations MAY NOT raise exceotions when interacting with nonexistent/unreadable files.
openFile()
options since hadoop branch-3.3These are options which FileSystem
and FileContext
implementation MUST recognise and MAY support by changing the behavior of their input streams as appropriate.
Hadoop 3.3.0 added the openFile()
API; these standard options were defined in a later release. Therefore, although they are “well known”, unless confident that the application will only be executed against releases of Hadoop which knows of the options -applications SHOULD set the options via opt()
calls rather than must()
.
When opening a file through the openFile()
builder API, callers MAY use both .opt(key, value)
and .must(key, value)
calls to set standard and filesystem-specific options.
If set as an opt()
parameter, unsupported “standard” options MUST be ignored, as MUST unrecognized standard options.
If set as a must()
parameter, unsupported “standard” options MUST be ignored. unrecognized standard options MUST be rejected.
The standard openFile()
options are defined in org.apache.hadoop.fs.OpenFileOptions
; they all SHALL start with fs.option.openfile.
.
Note that while all FileSystem
/FileContext
instances SHALL support these options to the extent that must()
declarations SHALL NOT fail, the implementations MAY support them to the extent of interpreting the values. This means that it is not a requirement for the stores to actually read the read policy or file length values and use them when opening files.
Unless otherwise stated, they SHOULD be viewed as hints.
Note: if a standard option is added such that if set but not supported would be an error, then implementations SHALL reject it. For example, the S3A filesystem client supports the ability to push down SQL commands. If something like that were ever standardized, then the use of the option, either in opt()
or must()
argument MUST be rejected for filesystems which don’t support the feature.
fs.option.openfile.buffer.size
Read buffer size in bytes.
This overrides the default value set in the configuration with the option io.file.buffer.size
.
It is supported by all filesystem clients which allow for stream-specific buffer sizes to be set via FileSystem.open(path, buffersize)
.
fs.option.openfile.read.policy
Declare the read policy of the input stream. This is a hint as to what the expected read pattern of an input stream will be. This MAY control readahead, buffering and other optimizations.
Sequential reads may be optimized with prefetching data and/or reading data in larger blocks. Some applications (e.g. distCp) perform sequential IO even over columnar data.
In contrast, random IO reads data in different parts of the file using a sequence of seek()/read()
or via the PositionedReadable
or ByteBufferPositionedReadable
APIs.
Random IO performance may be best if little/no prefetching takes place, along with other possible optimizations
Queries over columnar formats such as Apache ORC and Apache Parquet perform such random IO; other data formats may be best read with sequential or whole-file policies.
What is key is that optimizing reads for seqential reads may impair random performance -and vice versa.
must()
option, the filesystem MAY ignore it.Policy | Meaning |
---|---|
adaptive |
Any adaptive policy implemented by the store. |
default |
The default policy for this store. Generally “adaptive”. |
random |
Optimize for random access. |
sequential |
Optimize for sequential access. |
vector |
The Vectored IO API is intended to be used. |
whole-file |
The whole file will be read. |
Choosing the wrong read policy for an input source may be inefficient.
A list of read policies MAY be supplied; the first one recognized/supported by the filesystem SHALL be the one used. This allows for custom policies to be supported, for example an hbase-hfile
policy optimized for HBase HFiles.
The S3A and ABFS input streams both implement the IOStatisticsSource API, and can be queried for their IO Performance.
Tip: log the toString()
value of input streams at DEBUG
. The S3A and ABFS Input Streams log read statistics, which can provide insight about whether reads are being performed efficiently or not.
Futher reading
adaptive
Try to adapt the seek policy to the read pattern of the application.
The normal
policy of the S3A client and the sole policy supported by the wasb:
client are both adaptive -they assume sequential IO, but once a backwards seek/positioned read call is made the stream switches to random IO.
Other filesystem implementations may wish to adopt similar strategies, and/or extend the algorithms to detect forward seeks and/or switch from random to sequential IO if that is considered more efficient.
Adaptive read policies are the absence of the ability to declare the seek policy in the open()
API, so requiring it to be declared, if configurable, in the cluster/application configuration. However, the switch from sequential to random seek policies may be exensive.
When applications explicitly set the fs.option.openfile.read.policy
option, if they know their read plan, they SHOULD declare which policy is most appropriate.
The default policy for the filesystem instance. Implementation/installation-specific.
sequential
Expect sequential reads from the first byte read to the end of the file/until the stream is closed.
random
Expect seek()/read()
sequences, or use of PositionedReadable
or ByteBufferPositionedReadable
APIs.
vector
This declares that the caller intends to use the Vectored read API of HADOOP-11867 Add a high-performance vectored read API.
This is a hint: it is not a requirement when using the API. It does inform the implemenations that the stream should be configured for optimal vectored IO performance, if such a feature has been implemented.
It is not exclusive: the same stream may still be used for classic InputStream
and PositionedRead
API calls. Implementations SHOULD use the random
read policy with these operations.
whole-file
This declares that the whole file is to be read end-to-end; the file system client is free to enable whatever strategies maximise performance for this. In particular, larger ranged reads/GETs can deliver high bandwidth by reducing socket/TLS setup costs and providing a connection long-lived enough for TCP flow control to determine the optimal download rate.
Strategies can include:
openFile()
operation.Applications which know that the entire file is to be read from an opened stream SHOULD declare this read policy.
fs.option.openfile.length
Declare the length of a file.
This can be used by clients to skip querying a remote store for the size of/existence of a file when opening it, similar to declaring a file status through the withFileStatus()
option.
If supported by a filesystem connector, this option MUST be interpreted as declaring the minimum length of the file:
read()
, seek()
and positioned read calls MAY use a position across/beyond this length but below the actual length of the file. Implementations MAY raise EOFExceptions
in such cases, or they MAY return data.If this option is used by the FileSystem implementation
Implementor’s Notes
fs.option.openfile.length
< 0 MUST be rejected.fs.opt.openfile.length
; the file status values take precedence.fs.option.openfile.split.start
and fs.option.openfile.split.end
Declare the start and end of the split when a file has been split for processing in pieces.
fs.option.openfile.split.end
.fs.option.openfile.split.end
.The split end value can provide a hint as to the end of the input stream. The split start can be used to optimize any initial read offset for filesystem clients.
*Note for implementors: applications will read past the end of a split when they need to read to the end of a record/line which begins before the end of the split.
Therefore clients MUST be allowed to seek()
/read()
past the length set in fs.option.openfile.split.end
if the file is actually longer than that value.
The S3A Connector supports custom options for readahead and seek policy.
Name | Type | Meaning |
---|---|---|
fs.s3a.readahead.range |
long |
readahead range in bytes |
fs.s3a.input.async.drain.threshold |
long |
threshold to switch to asynchronous draining of the stream |
fs.s3a.experimental.input.fadvise |
String |
seek policy. Superceded by fs.option.openfile.read.policy |
If the option set contains a SQL statement in the fs.s3a.select.sql
statement, then the file is opened as an S3 Select query. Consult the S3A documentation for more details.
The ABFS Connector supports custom input stream options.
Name | Type | Meaning |
---|---|---|
fs.azure.buffered.pread.disable |
boolean |
disable caching on the positioned read operations. |
Disables caching on data read through the PositionedReadable APIs.
Consult the ABFS Documentation for more details.
Here is an example from a proof of concept org.apache.parquet.hadoop.util.HadoopInputFile
reader which uses a (nullable) file status and a split start/end.
The FileStatus
value is always passed in -but if it is null, then the split end is used to declare the length of the file.
protected SeekableInputStream newStream(Path path, FileStatus stat, long splitStart, long splitEnd) throws IOException { FutureDataInputStreamBuilder builder = fs.openFile(path) .opt("fs.option.openfile.read.policy", "vector, random") .withFileStatus(stat); builder.opt("fs.option.openfile.split.start", splitStart); builder.opt("fs.option.openfile.split.end", splitEnd); CompletableFuture<FSDataInputStream> streamF = builder.build(); return HadoopStreams.wrap(FutureIO.awaitFuture(streamF)); }
As a result, whether driven directly by a file listing, or when opening a file from a query plan of (path, splitStart, splitEnd)
, there is no need to probe the remote store for the length of the file. When working with remote object stores, this can save tens to hundreds of milliseconds, even if such a probe is done asynchronously.
If both the file length and the split end is set, then the file length MUST be considered “more” authoritative, that is it really SHOULD be defining the file length. If the split end is set, the caller MAY ot read past it.
The CompressedSplitLineReader
can read past the end of a split if it is partway through processing a compressed record. That is: it assumes an incomplete record read means that the file length is greater than the split length, and that it MUST read the entirety of the partially read record. Other readers may behave similarly.
Therefore
FileStatus
or in fs.option.openfile.length
SHALL set the strict upper limit on the length of a filefs.option.openfile.split.end
MUST be viewed as a hint, rather than the strict end of the file.Standard and non-standard options MAY be combined in the same openFile()
operation.
Future<FSDataInputStream> f = openFile(path) .must("fs.option.openfile.read.policy", "random, adaptive") .opt("fs.s3a.readahead.range", 1024 * 1024) .build(); FSDataInputStream is = f.get();
The option set in must()
MUST be understood, or at least recognized and ignored by all filesystems. In this example, S3A-specific option MAY be ignored by all other filesystem clients.
Not all hadoop releases recognize the fs.option.openfile.read.policy
option.
The option can be safely used in application code if it is added via the opt()
builder argument, as it will be treated as an unknown optional key which can then be discarded.
Future<FSDataInputStream> f = openFile(path) .opt("fs.option.openfile.read.policy", "vector, random, adaptive") .build(); FSDataInputStream is = f.get();
Note 1 if the option name is set by a reference to a constant in org.apache.hadoop.fs.Options.OpenFileOptions
, then the program will not link against versions of Hadoop without the specific option. Therefore for resilient linking against older releases -use a copy of the value.
Note 2 as option validation is performed in the FileSystem connector, a third-party connector designed to work with multiple hadoop versions MAY NOT support the option.
Hadoop MapReduce will automatically read MR Job Options with the prefixes mapreduce.job.input.file.option.
and mapreduce.job.input.file.must.
prefixes, and apply these values as .opt()
and must()
respectively, after remove the mapreduce-specific prefixes.
This makes passing options in to MR jobs straightforward. For example, to declare that a job should read its data using random IO:
JobConf jobConf = (JobConf) job.getConfiguration() jobConf.set( "mapreduce.job.input.file.option.fs.option.openfile.read.policy", "random");
An example of a record reader passing in options to the file it opens.
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit)genericSplit; Configuration job = context.getConfiguration(); start = split.getStart(); end = start + split.getLength(); Path file = split.getPath(); // open the file and seek to the start of the split FutureDataInputStreamBuilder builder = file.getFileSystem(job).openFile(file); // the start and end of the split may be used to build // an input strategy. builder.opt("fs.option.openfile.split.start", start); builder.opt("fs.option.openfile.split.end", end); FutureIO.propagateOptions(builder, job, "mapreduce.job.input.file.option", "mapreduce.job.input.file.must"); fileIn = FutureIO.awaitFuture(builder.build()); fileIn.seek(start) /* Rest of the operation on the opened stream */ }
FileContext.openFile
From org.apache.hadoop.fs.AvroFSInput
; a file is opened with sequential input. Because the file length has already been probed for, the length is passd down
public AvroFSInput(FileContext fc, Path p) throws IOException { FileStatus status = fc.getFileStatus(p); this.len = status.getLen(); this.stream = awaitFuture(fc.openFile(p) .opt("fs.option.openfile.read.policy", "sequential") .opt("fs.option.openfile.length", Long.toString(status.getLen())) .build()); fc.open(p); }
In this example, the length is passed down as a string (via Long.toString()
) rather than directly as a long. This is to ensure that the input format will link against versions of $Hadoop which do not have the opt(String, long)
and must(String, long)
builder parameters. Similarly, the values are passed as optional, so that if unrecognized the application will still succeed.
This is from org.apache.hadoop.util.JsonSerialization
.
Its load(FileSystem, Path, FileStatus)
method * declares the whole file is to be read end to end. * passes down the file status
public T load(FileSystem fs, Path path, status) throws IOException { try (FSDataInputStream dataInputStream = awaitFuture(fs.openFile(path) .opt("fs.option.openfile.read.policy", "whole-file") .withFileStatus(status) .build())) { return fromJsonStream(dataInputStream); } catch (JsonProcessingException e) { throw new PathIOException(path.toString(), "Failed to read JSON file " + e, e); } }
Note: : in Hadoop 3.3.2 and earlier, the withFileStatus(status)
call required a non-null parameter; this has since been relaxed. For maximum compatibility across versions, only invoke the method when the file status is known to be non-null.