HADOOP-18460. checkIfVectoredIOStopped before populating the buffers (#4986)

Contributed by Mukund Thakur
This commit is contained in:
Mukund Thakur 2022-10-10 15:47:45 +05:30 committed by Steve Loughran
parent 80525615e5
commit 77cb778a44
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
1 changed files with 28 additions and 15 deletions

View File

@ -910,21 +910,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange,
IntFunction<ByteBuffer> allocate) {
LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr);
// This reference is must be kept till all buffers are populated as this is a
// This reference must be kept till all buffers are populated as this is a
// finalizable object which closes the internal stream when gc triggers.
S3Object objectRange = null;
S3ObjectInputStream objectContent = null;
try {
checkIfVectoredIOStopped();
final String operationName = "readCombinedFileRange";
objectRange = getS3Object(operationName,
objectRange = getS3ObjectAndValidateNotNull("readCombinedFileRange",
combinedFileRange.getOffset(),
combinedFileRange.getLength());
objectContent = objectRange.getObjectContent();
if (objectContent == null) {
throw new PathIOException(uri,
"Null IO stream received during " + operationName);
}
populateChildBuffers(combinedFileRange, objectContent, allocate);
} catch (Exception ex) {
LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex);
@ -1019,19 +1013,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
*/
private void readSingleRange(FileRange range, ByteBuffer buffer) {
LOG.debug("Start reading range {} from path {} ", range, pathStr);
// This reference must be kept till all buffers are populated as this is a
// finalizable object which closes the internal stream when gc triggers.
S3Object objectRange = null;
S3ObjectInputStream objectContent = null;
try {
checkIfVectoredIOStopped();
long position = range.getOffset();
int length = range.getLength();
final String operationName = "readRange";
objectRange = getS3Object(operationName, position, length);
objectRange = getS3ObjectAndValidateNotNull("readSingleRange", position, length);
objectContent = objectRange.getObjectContent();
if (objectContent == null) {
throw new PathIOException(uri,
"Null IO stream received during " + operationName);
}
populateBuffer(length, buffer, objectContent);
range.getData().complete(buffer);
} catch (Exception ex) {
@ -1043,6 +1033,29 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
LOG.debug("Finished reading range {} from path {} ", range, pathStr);
}
/**
* Get the s3 object for S3 server for a specified range.
* Also checks if the vectored io operation has been stopped before and after
* the http get request such that we don't waste time populating the buffers.
* @param operationName name of the operation for which get object on S3 is called.
* @param position position of the object to be read from S3.
* @param length length from position of the object to be read from S3.
* @return result s3 object.
* @throws IOException exception if any.
*/
private S3Object getS3ObjectAndValidateNotNull(final String operationName,
final long position,
final int length) throws IOException {
checkIfVectoredIOStopped();
S3Object objectRange = getS3Object(operationName, position, length);
if (objectRange.getObjectContent() == null) {
throw new PathIOException(uri,
"Null IO stream received during " + operationName);
}
checkIfVectoredIOStopped();
return objectRange;
}
/**
* Populates the buffer with data from objectContent
* till length. Handles both direct and heap byte buffers.