diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index 379b992fba1..c76f1839b77 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -24,11 +24,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.IntFunction; import org.assertj.core.api.Assertions; -import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -43,13 +42,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; -import org.apache.hadoop.test.LambdaTestUtils; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; @RunWith(Parameterized.class) public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase { @@ -281,16 +281,11 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac in.readVectored(fileRanges, allocate); for (FileRange res : fileRanges) { CompletableFuture data = res.getData(); - try { - ByteBuffer buffer = data.get(); - // Shouldn't reach here. - Assert.fail("EOFException must be thrown while reading EOF"); - } catch (ExecutionException ex) { - // ignore as expected. - } catch (Exception ex) { - LOG.error("Exception while running vectored read ", ex); - Assert.fail("Exception while running vectored read " + ex); - } + interceptFuture(EOFException.class, + "", + ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + data); } } } @@ -410,7 +405,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac fs.openFile(path(VECTORED_READ_FILE_NAME)) .build(); try (FSDataInputStream in = builder.get()) { - LambdaTestUtils.intercept(clazz, + intercept(clazz, () -> in.readVectored(fileRanges, allocate)); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index dfe9fdf2d8d..2dc88eeb85e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -308,6 +308,23 @@ public enum Statistic { StreamStatisticNames.STREAM_READ_OPERATIONS, "Count of read() operations in an input stream", TYPE_COUNTER), + STREAM_READ_VECTORED_OPERATIONS( + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + "Count of readVectored() operations in an input stream.", + TYPE_COUNTER), + STREAM_READ_VECTORED_READ_BYTES_DISCARDED( + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, + "Count of bytes discarded during readVectored() operation." + + " in an input stream", + TYPE_COUNTER), + STREAM_READ_VECTORED_INCOMING_RANGES( + StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, + "Count of incoming file ranges during readVectored() operation.", + TYPE_COUNTER), + STREAM_READ_VECTORED_COMBINED_RANGES( + StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, + "Count of combined file ranges during readVectored() operation.", + TYPE_COUNTER), STREAM_READ_REMOTE_STREAM_ABORTED( StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED, "Duration of aborting a remote stream during stream IO", diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index 84a90ba441a..4c357e288c8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -176,146 +176,172 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe * */ @Test public void testNormalReadVsVectoredReadStatsCollection() throws Exception { - FileSystem fs = getTestFileSystemWithReadAheadDisabled(); - List fileRanges = new ArrayList<>(); - fileRanges.add(FileRange.createFileRange(10 * 1024, 100)); - fileRanges.add(FileRange.createFileRange(8 * 1024, 100)); - fileRanges.add(FileRange.createFileRange(14 * 1024, 100)); - fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100)); - fileRanges.add(FileRange.createFileRange(40 * 1024, 1024)); - FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); - CompletableFuture builder = - fs.openFile(path(VECTORED_READ_FILE_NAME)) - .withFileStatus(fileStatus) - .build(); - try (FSDataInputStream in = builder.get()) { - in.readVectored(fileRanges, getAllocate()); - validateVectoredReadResult(fileRanges, DATASET); - returnBuffersToPoolPostRead(fileRanges, getPool()); + try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) { + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(10 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(8 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(14 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100)); + fileRanges.add(FileRange.createFileRange(40 * 1024, 1024)); - // audit the io statistics for this stream - IOStatistics st = in.getIOStatistics(); - LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st)); + FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(fileRanges, getAllocate()); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, getPool()); - // the vectored io operation must be tracked - verifyStatisticCounterValue(st, - StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, - 1); + // audit the io statistics for this stream + IOStatistics st = in.getIOStatistics(); + LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st)); - // the vectored io operation is being called with 5 input ranges. - verifyStatisticCounterValue(st, - StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, - 5); + // the vectored io operation must be tracked + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + 1); - // 5 input ranges got combined in 3 as some of them are close. - verifyStatisticCounterValue(st, - StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, - 3); + // the vectored io operation is being called with 5 input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, + 5); - // number of bytes discarded will be based on the above input ranges. - verifyStatisticCounterValue(st, - StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, - 5944); + // 5 input ranges got combined in 3 as some of them are close. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, + 3); - verifyStatisticCounterValue(st, - StoreStatisticNames.ACTION_HTTP_GET_REQUEST, - 3); + // number of bytes discarded will be based on the above input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, + 5944); - // read bytes should match the sum of requested length for each input ranges. - verifyStatisticCounterValue(st, - StreamStatisticNames.STREAM_READ_BYTES, - 1424); + verifyStatisticCounterValue(st, + StoreStatisticNames.ACTION_HTTP_GET_REQUEST, + 3); - } + // read bytes should match the sum of requested length for each input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_BYTES, + 1424); - CompletableFuture builder1 = - fs.openFile(path(VECTORED_READ_FILE_NAME)) - .withFileStatus(fileStatus) - .build(); - - try (FSDataInputStream in = builder1.get()) { - for (FileRange range : fileRanges) { - byte[] temp = new byte[range.getLength()]; - in.readFully((int) range.getOffset(), temp, 0, range.getLength()); } - // audit the statistics for this stream - IOStatistics st = in.getIOStatistics(); - LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st)); + CompletableFuture builder1 = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); - verifyStatisticCounterValue(st, + try (FSDataInputStream in = builder1.get()) { + for (FileRange range : fileRanges) { + byte[] temp = new byte[range.getLength()]; + in.readFully((int) range.getOffset(), temp, 0, range.getLength()); + } + + // audit the statistics for this stream + IOStatistics st = in.getIOStatistics(); + LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st)); + + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + 0); + + // all other counter values consistent. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, + 0); + verifyStatisticCounterValue(st, + StoreStatisticNames.ACTION_HTTP_GET_REQUEST, + 5); + + // read bytes should match the sum of requested length for each input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_BYTES, + 1424); + } + // validate stats are getting merged at fs instance level. + IOStatistics fsStats = fs.getIOStatistics(); + // only 1 vectored io call is made in this fs instance. + verifyStatisticCounterValue(fsStats, StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, - 0); - - // all other counter values consistent. - verifyStatisticCounterValue(st, - StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, - 0); - verifyStatisticCounterValue(st, + 1); + // 8 get requests were made in this fs instance. + verifyStatisticCounterValue(fsStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, - 5); + 8); - // read bytes should match the sum of requested length for each input ranges. - verifyStatisticCounterValue(st, + verifyStatisticCounterValue(fsStats, StreamStatisticNames.STREAM_READ_BYTES, - 1424); + 2848); } } @Test public void testMultiVectoredReadStatsCollection() throws Exception { - FileSystem fs = getTestFileSystemWithReadAheadDisabled(); - List ranges1 = getConsecutiveRanges(); - List ranges2 = getConsecutiveRanges(); - FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); - CompletableFuture builder = - fs.openFile(path(VECTORED_READ_FILE_NAME)) - .withFileStatus(fileStatus) - .build(); - try (FSDataInputStream in = builder.get()) { - in.readVectored(ranges1, getAllocate()); - in.readVectored(ranges2, getAllocate()); - validateVectoredReadResult(ranges1, DATASET); - validateVectoredReadResult(ranges2, DATASET); - returnBuffersToPoolPostRead(ranges1, getPool()); - returnBuffersToPoolPostRead(ranges2, getPool()); + try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) { + List ranges1 = getConsecutiveRanges(); + List ranges2 = getConsecutiveRanges(); + FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(ranges1, getAllocate()); + in.readVectored(ranges2, getAllocate()); + validateVectoredReadResult(ranges1, DATASET); + validateVectoredReadResult(ranges2, DATASET); + returnBuffersToPoolPostRead(ranges1, getPool()); + returnBuffersToPoolPostRead(ranges2, getPool()); - // audit the io statistics for this stream - IOStatistics st = in.getIOStatistics(); + // audit the io statistics for this stream + IOStatistics st = in.getIOStatistics(); - // 2 vectored io calls are made above. - verifyStatisticCounterValue(st, + // 2 vectored io calls are made above. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + 2); + + // 2 vectored io operation is being called with 2 input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, + 4); + + // 2 ranges are getting merged in 1 during both vectored io operation. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, + 2); + + // number of bytes discarded will be 0 as the ranges are consecutive. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, + 0); + // only 2 http get request will be made because ranges in both range list will be merged + // to 1 because they are consecutive. + verifyStatisticCounterValue(st, + StoreStatisticNames.ACTION_HTTP_GET_REQUEST, + 2); + // read bytes should match the sum of requested length for each input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_BYTES, + 2000); + } + IOStatistics fsStats = fs.getIOStatistics(); + // 2 vectored io calls are made in this fs instance. + verifyStatisticCounterValue(fsStats, StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, 2); - - // 2 vectored io operation is being called with 2 input ranges. - verifyStatisticCounterValue(st, - StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, - 4); - - // 2 ranges are getting merged in 1 during both vectored io operation. - verifyStatisticCounterValue(st, - StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, - 2); - - // number of bytes discarded will be 0 as the ranges are consecutive. - verifyStatisticCounterValue(st, - StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, - 0); - // only 2 http get request will be made because ranges in both range list will be merged - // to 1 because they are consecutive. - verifyStatisticCounterValue(st, + // 2 get requests were made in this fs instance. + verifyStatisticCounterValue(fsStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2); - // read bytes should match the sum of requested length for each input ranges. - verifyStatisticCounterValue(st, - StreamStatisticNames.STREAM_READ_BYTES, - 2000); } } - private FileSystem getTestFileSystemWithReadAheadDisabled() throws IOException { + private S3AFileSystem getTestFileSystemWithReadAheadDisabled() throws IOException { Configuration conf = getFileSystem().getConf(); // also resetting the min seek and max size values is important // as this same test suite has test which overrides these params.