diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java index 501186a2549..88854b87c81 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java @@ -164,6 +164,7 @@ public class S3File implements Closeable { Validate.checkLessOrEqual(offset, "offset", size(), "size()"); Validate.checkLessOrEqual(size, "size", size() - offset, "size() - offset"); + streamStatistics.streamOpened(); final GetObjectRequest request = client.newGetRequest(this.s3Attributes.getKey()) .withRange(offset, offset + size - 1); this.changeTracker.maybeApplyConstraint(request); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java index 0fa6e33200b..00d5fbc367d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java @@ -254,6 +254,10 @@ public abstract class S3InputStream public int read() throws IOException { this.throwIfClosed(); + if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) { + return -1; + } + if (!ensureCurrentBuffer()) { return -1; } @@ -296,6 +300,10 @@ public abstract class S3InputStream return 0; } + if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) { + return -1; + } + if (!ensureCurrentBuffer()) { return -1; } @@ -427,18 +435,8 @@ public abstract class S3InputStream } protected void throwIfInvalidSeek(long pos) throws EOFException { - long fileSize = this.s3File.size(); if (pos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos); - } else { - if (fileSize == 0 && pos == 0) { - // Do nothing. Valid combination. - return; - } - - if (pos >= fileSize) { - throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos); - } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java index c874a8c37b8..0f5834da4cc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java @@ -103,8 +103,7 @@ public class S3PrefetchingInputStream */ @Override public synchronized long getPos() throws IOException { - this.throwIfClosed(); - return this.inputStream.getPos(); + return this.isClosed() ? 0 : this.inputStream.getPos(); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java index 9a818d037e4..aab66dad860 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.statistics.IOStatisticAssertions; import org.apache.hadoop.fs.statistics.StreamStatisticNames; import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -78,11 +79,16 @@ public class ITestS3ARequesterPays extends AbstractS3ATestBase { inputStream.seek(0); inputStream.readByte(); - // Verify > 1 call was made, so we're sure it is correctly configured for each request - IOStatisticAssertions - .assertThatStatisticCounter(inputStream.getIOStatistics(), - StreamStatisticNames.STREAM_READ_OPENED) - .isGreaterThan(1); + if (conf.getBoolean(PREFETCH_ENABLED_KEY, true)) { + // For S3PrefetchingInputStream, verify a call was made + IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(), + StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1); + } else { + // For S3AInputStream, verify > 1 call was made, + // so we're sure it is correctly configured for each request + IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(), + StreamStatisticNames.STREAM_READ_OPENED).isGreaterThan(1); + } // Check list calls work without error fs.listFiles(requesterPaysPath.getParent(), false); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java index 3d7ee0882ef..3a2d1b1b09a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; @@ -33,6 +34,7 @@ import org.junit.Test; import java.io.IOException; +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_TOTAL_BYTES; @@ -72,6 +74,7 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase { IOStatisticsSnapshot iostats = new IOStatisticsSnapshot(); // Open file, read half the data, and then call unbuffer try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + skipIfCannotUnbuffer(inputStream); assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream); int bytesToRead = 8; readAndAssertBytesRead(inputStream, bytesToRead); @@ -138,6 +141,7 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase { Object streamStatsStr; try { inputStream = fs.open(dest); + skipIfCannotUnbuffer(inputStream); streamStatsStr = demandStringifyIOStatisticsSource(inputStream); LOG.info("initial stream statistics {}", streamStatsStr); @@ -192,6 +196,12 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase { return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen(); } + private void skipIfCannotUnbuffer(FSDataInputStream inputStream) { + if (!inputStream.hasCapability(StreamCapabilities.UNBUFFER)) { + skip("input stream does not support unbuffer"); + } + } + /** * Read the specified number of bytes from the given * {@link FSDataInputStream} and assert that diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java index 010bc1c30b6..e3c6c002bff 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java @@ -169,11 +169,6 @@ public class TestS3InputStream extends AbstractHadoopTestBase { EOFException.class, FSExceptionMessages.NEGATIVE_SEEK, () -> inputStream.seek(-1)); - - ExceptionAsserts.assertThrows( - EOFException.class, - FSExceptionMessages.CANNOT_SEEK_PAST_EOF, - () -> inputStream.seek(fileSize + 1)); } @Test