diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 39d41f5ffd2..be5b1799b35 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -910,21 +910,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, IntFunction allocate) { LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr); - // This reference is must be kept till all buffers are populated as this is a + // This reference must be kept till all buffers are populated as this is a // finalizable object which closes the internal stream when gc triggers. S3Object objectRange = null; S3ObjectInputStream objectContent = null; try { - checkIfVectoredIOStopped(); - final String operationName = "readCombinedFileRange"; - objectRange = getS3Object(operationName, + objectRange = getS3ObjectAndValidateNotNull("readCombinedFileRange", combinedFileRange.getOffset(), combinedFileRange.getLength()); objectContent = objectRange.getObjectContent(); - if (objectContent == null) { - throw new PathIOException(uri, - "Null IO stream received during " + operationName); - } populateChildBuffers(combinedFileRange, objectContent, allocate); } catch (Exception ex) { LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex); @@ -1019,19 +1013,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, */ private void readSingleRange(FileRange range, ByteBuffer buffer) { LOG.debug("Start reading range {} from path {} ", range, pathStr); + // This reference must be kept till all buffers are populated as this is a + // finalizable object which closes the internal stream when gc triggers. S3Object objectRange = null; S3ObjectInputStream objectContent = null; try { - checkIfVectoredIOStopped(); long position = range.getOffset(); int length = range.getLength(); - final String operationName = "readRange"; - objectRange = getS3Object(operationName, position, length); + objectRange = getS3ObjectAndValidateNotNull("readSingleRange", position, length); objectContent = objectRange.getObjectContent(); - if (objectContent == null) { - throw new PathIOException(uri, - "Null IO stream received during " + operationName); - } populateBuffer(length, buffer, objectContent); range.getData().complete(buffer); } catch (Exception ex) { @@ -1043,6 +1033,29 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, LOG.debug("Finished reading range {} from path {} ", range, pathStr); } + /** + * Get the s3 object for S3 server for a specified range. + * Also checks if the vectored io operation has been stopped before and after + * the http get request such that we don't waste time populating the buffers. + * @param operationName name of the operation for which get object on S3 is called. + * @param position position of the object to be read from S3. + * @param length length from position of the object to be read from S3. + * @return result s3 object. + * @throws IOException exception if any. + */ + private S3Object getS3ObjectAndValidateNotNull(final String operationName, + final long position, + final int length) throws IOException { + checkIfVectoredIOStopped(); + S3Object objectRange = getS3Object(operationName, position, length); + if (objectRange.getObjectContent() == null) { + throw new PathIOException(uri, + "Null IO stream received during " + operationName); + } + checkIfVectoredIOStopped(); + return objectRange; + } + /** * Populates the buffer with data from objectContent * till length. Handles both direct and heap byte buffers.