diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index be5b1799b35..4b50ab2c04b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -1164,6 +1164,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, */ @InterfaceAudience.Private @InterfaceStability.Unstable + @VisibleForTesting public S3AInputStreamStatistics getS3AStreamStatistics() { return streamStatistics; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java index 76ef942ed65..f778f40b74c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; @@ -56,6 +57,21 @@ public class S3APrefetchingInputStream */ private S3ARemoteInputStream inputStream; + /** + * To be only used by synchronized getPos(). + */ + private long lastReadCurrentPos = 0; + + /** + * To be only used by getIOStatistics(). + */ + private IOStatistics ioStatistics = null; + + /** + * To be only used by getS3AStreamStatistics(). + */ + private S3AInputStreamStatistics inputStreamStatistics = null; + /** * Initializes a new instance of the {@code S3APrefetchingInputStream} class. * @@ -115,14 +131,20 @@ public class S3APrefetchingInputStream } /** - * Gets the current position. + * Gets the current position. If the underlying S3 input stream is closed, + * it returns last read current position from the underlying steam. If the + * current position was never read and the underlying input stream is closed, + * this would return 0. * * @return the current position. * @throws IOException if there is an IO error during this operation. */ @Override public synchronized long getPos() throws IOException { - return isClosed() ? 0 : inputStream.getPos(); + if (!isClosed()) { + lastReadCurrentPos = inputStream.getPos(); + } + return lastReadCurrentPos; } /** @@ -215,11 +237,12 @@ public class S3APrefetchingInputStream */ @InterfaceAudience.Private @InterfaceStability.Unstable + @VisibleForTesting public S3AInputStreamStatistics getS3AStreamStatistics() { - if (isClosed()) { - return null; + if (!isClosed()) { + inputStreamStatistics = inputStream.getS3AStreamStatistics(); } - return inputStream.getS3AStreamStatistics(); + return inputStreamStatistics; } /** @@ -229,10 +252,10 @@ public class S3APrefetchingInputStream */ @Override public IOStatistics getIOStatistics() { - if (isClosed()) { - return null; + if (!isClosed()) { + ioStatistics = inputStream.getIOStatistics(); } - return inputStream.getIOStatistics(); + return ioStatistics; } protected boolean isClosed() { @@ -249,7 +272,6 @@ public class S3APrefetchingInputStream @Override public boolean seekToNewSource(long targetPos) throws IOException { - throwIfClosed(); return false; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index 121d925d281..4fd50a3eb81 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -31,6 +31,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; @@ -236,4 +238,56 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest { } } + @Test + public void testStatusProbesAfterClosingStream() throws Throwable { + describe("When the underlying input stream is closed, the prefetch input stream" + + " should still support some status probes"); + + byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26); + Path smallFile = methodPath(); + ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true); + + FSDataInputStream in = getFileSystem().open(smallFile); + + byte[] buffer = new byte[SMALL_FILE_SIZE]; + in.read(buffer, 0, S_1K * 4); + in.seek(S_1K * 12); + in.read(buffer, 0, S_1K * 4); + + long pos = in.getPos(); + IOStatistics ioStats = in.getIOStatistics(); + S3AInputStreamStatistics inputStreamStatistics = + ((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics(); + + assertNotNull("Prefetching input IO stats should not be null", ioStats); + assertNotNull("Prefetching input stream stats should not be null", inputStreamStatistics); + assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0, + pos); + + in.close(); + + // status probes after closing the input stream + long newPos = in.getPos(); + IOStatistics newIoStats = in.getIOStatistics(); + S3AInputStreamStatistics newInputStreamStatistics = + ((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics(); + + assertNotNull("Prefetching input IO stats should not be null", newIoStats); + assertNotNull("Prefetching input stream stats should not be null", newInputStreamStatistics); + assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0, + newPos); + + // compare status probes after closing of the stream with status probes done before + // closing the stream + assertEquals("Position retrieved through stream before and after closing should match", pos, + newPos); + assertEquals("IO stats retrieved through stream before and after closing should match", ioStats, + newIoStats); + assertEquals("Stream stats retrieved through stream before and after closing should match", + inputStreamStatistics, newInputStreamStatistics); + + assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10)); + + } + }