HADOOP-18460. checkIfVectoredIOStopped before populating the buffers (#4986)
Contributed by Mukund Thakur
This commit is contained in:
parent
540a660429
commit
be70bbb4be
|
@ -910,21 +910,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
||||||
private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange,
|
private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange,
|
||||||
IntFunction<ByteBuffer> allocate) {
|
IntFunction<ByteBuffer> allocate) {
|
||||||
LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr);
|
LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr);
|
||||||
// This reference is must be kept till all buffers are populated as this is a
|
// This reference must be kept till all buffers are populated as this is a
|
||||||
// finalizable object which closes the internal stream when gc triggers.
|
// finalizable object which closes the internal stream when gc triggers.
|
||||||
S3Object objectRange = null;
|
S3Object objectRange = null;
|
||||||
S3ObjectInputStream objectContent = null;
|
S3ObjectInputStream objectContent = null;
|
||||||
try {
|
try {
|
||||||
checkIfVectoredIOStopped();
|
objectRange = getS3ObjectAndValidateNotNull("readCombinedFileRange",
|
||||||
final String operationName = "readCombinedFileRange";
|
|
||||||
objectRange = getS3Object(operationName,
|
|
||||||
combinedFileRange.getOffset(),
|
combinedFileRange.getOffset(),
|
||||||
combinedFileRange.getLength());
|
combinedFileRange.getLength());
|
||||||
objectContent = objectRange.getObjectContent();
|
objectContent = objectRange.getObjectContent();
|
||||||
if (objectContent == null) {
|
|
||||||
throw new PathIOException(uri,
|
|
||||||
"Null IO stream received during " + operationName);
|
|
||||||
}
|
|
||||||
populateChildBuffers(combinedFileRange, objectContent, allocate);
|
populateChildBuffers(combinedFileRange, objectContent, allocate);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex);
|
LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex);
|
||||||
|
@ -1019,19 +1013,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
||||||
*/
|
*/
|
||||||
private void readSingleRange(FileRange range, ByteBuffer buffer) {
|
private void readSingleRange(FileRange range, ByteBuffer buffer) {
|
||||||
LOG.debug("Start reading range {} from path {} ", range, pathStr);
|
LOG.debug("Start reading range {} from path {} ", range, pathStr);
|
||||||
|
// This reference must be kept till all buffers are populated as this is a
|
||||||
|
// finalizable object which closes the internal stream when gc triggers.
|
||||||
S3Object objectRange = null;
|
S3Object objectRange = null;
|
||||||
S3ObjectInputStream objectContent = null;
|
S3ObjectInputStream objectContent = null;
|
||||||
try {
|
try {
|
||||||
checkIfVectoredIOStopped();
|
|
||||||
long position = range.getOffset();
|
long position = range.getOffset();
|
||||||
int length = range.getLength();
|
int length = range.getLength();
|
||||||
final String operationName = "readRange";
|
objectRange = getS3ObjectAndValidateNotNull("readSingleRange", position, length);
|
||||||
objectRange = getS3Object(operationName, position, length);
|
|
||||||
objectContent = objectRange.getObjectContent();
|
objectContent = objectRange.getObjectContent();
|
||||||
if (objectContent == null) {
|
|
||||||
throw new PathIOException(uri,
|
|
||||||
"Null IO stream received during " + operationName);
|
|
||||||
}
|
|
||||||
populateBuffer(length, buffer, objectContent);
|
populateBuffer(length, buffer, objectContent);
|
||||||
range.getData().complete(buffer);
|
range.getData().complete(buffer);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
|
@ -1043,6 +1033,29 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
||||||
LOG.debug("Finished reading range {} from path {} ", range, pathStr);
|
LOG.debug("Finished reading range {} from path {} ", range, pathStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the s3 object for S3 server for a specified range.
|
||||||
|
* Also checks if the vectored io operation has been stopped before and after
|
||||||
|
* the http get request such that we don't waste time populating the buffers.
|
||||||
|
* @param operationName name of the operation for which get object on S3 is called.
|
||||||
|
* @param position position of the object to be read from S3.
|
||||||
|
* @param length length from position of the object to be read from S3.
|
||||||
|
* @return result s3 object.
|
||||||
|
* @throws IOException exception if any.
|
||||||
|
*/
|
||||||
|
private S3Object getS3ObjectAndValidateNotNull(final String operationName,
|
||||||
|
final long position,
|
||||||
|
final int length) throws IOException {
|
||||||
|
checkIfVectoredIOStopped();
|
||||||
|
S3Object objectRange = getS3Object(operationName, position, length);
|
||||||
|
if (objectRange.getObjectContent() == null) {
|
||||||
|
throw new PathIOException(uri,
|
||||||
|
"Null IO stream received during " + operationName);
|
||||||
|
}
|
||||||
|
checkIfVectoredIOStopped();
|
||||||
|
return objectRange;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Populates the buffer with data from objectContent
|
* Populates the buffer with data from objectContent
|
||||||
* till length. Handles both direct and heap byte buffers.
|
* till length. Handles both direct and heap byte buffers.
|
||||||
|
|
Loading…
Reference in New Issue