From c9d6605a5987db7744761162f49a6baa1b6c00f0 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Fri, 9 Sep 2022 21:46:08 +0530 Subject: [PATCH] HADOOP-18439. Fix VectoredIO for LocalFileSystem when checksum is enabled. (#4862) part of HADOOP-18103. While merging the ranges in CheckSumFs, they are rounded up based on the value of checksum bytes size which leads to some ranges crossing the EOF thus they need to be fixed else it will cause EOFException during actual reads. Contributed By: Mukund Thakur --- .../apache/hadoop/fs/ChecksumFileSystem.java | 74 +++++++++++++++--- .../AbstractContractVectoredReadTest.java | 5 ++ .../TestLocalFSContractVectoredRead.java | 77 ++++++++++++++++--- .../apache/hadoop/fs/s3a/S3AInputStream.java | 6 +- 4 files changed, 139 insertions(+), 23 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index b23df1713c0..8ec2a1c67b2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -174,6 +174,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem { private static final int HEADER_LENGTH = 8; private int bytesPerSum = 1; + private long fileLen = -1L; public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) throws IOException { @@ -320,6 +321,18 @@ public abstract class ChecksumFileSystem extends FilterFileSystem { return HEADER_LENGTH + (dataOffset/bytesPerSum) * FSInputChecker.CHECKSUM_SIZE; } + /** + * Calculate length of file if not already cached. + * @return file length. + * @throws IOException any IOE. + */ + private long getFileLength() throws IOException { + if (fileLen == -1L) { + fileLen = fs.getFileStatus(file).getLen(); + } + return fileLen; + } + /** * Find the checksum ranges that correspond to the given data ranges. * @param dataRanges the input data ranges, which are assumed to be sorted @@ -371,13 +384,28 @@ public abstract class ChecksumFileSystem extends FilterFileSystem { IntBuffer sums = sumsBytes.asIntBuffer(); sums.position(offset / FSInputChecker.CHECKSUM_SIZE); ByteBuffer current = data.duplicate(); - int numChunks = data.remaining() / bytesPerSum; + int numFullChunks = data.remaining() / bytesPerSum; + boolean partialChunk = ((data.remaining() % bytesPerSum) != 0); + int totalChunks = numFullChunks; + if (partialChunk) { + totalChunks++; + } CRC32 crc = new CRC32(); // check each chunk to ensure they match - for(int c = 0; c < numChunks; ++c) { - // set the buffer position and the limit - current.limit((c + 1) * bytesPerSum); + for(int c = 0; c < totalChunks; ++c) { + // set the buffer position to the start of every chunk. current.position(c * bytesPerSum); + + if (c == numFullChunks) { + // During last chunk, there may be less than chunk size + // data preset, so setting the limit accordingly. + int lastIncompleteChunk = data.remaining() % bytesPerSum; + current.limit((c * bytesPerSum) + lastIncompleteChunk); + } else { + // set the buffer limit to end of every chunk. + current.limit((c + 1) * bytesPerSum); + } + // compute the crc crc.reset(); crc.update(current); @@ -396,11 +424,34 @@ public abstract class ChecksumFileSystem extends FilterFileSystem { return data; } + /** + * Validates range parameters. + * In case of CheckSum FS, we already have calculated + * fileLength so failing fast here. + * @param ranges requested ranges. + * @param fileLength length of file. + * @throws EOFException end of file exception. + */ + private void validateRangeRequest(List ranges, + final long fileLength) throws EOFException { + for (FileRange range : ranges) { + VectoredReadUtils.validateRangeRequest(range); + if (range.getOffset() + range.getLength() > fileLength) { + final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s", + range.getOffset(), range.getLength(), file); + LOG.warn(errMsg); + throw new EOFException(errMsg); + } + } + } + @Override public void readVectored(List ranges, IntFunction allocate) throws IOException { + final long length = getFileLength(); + validateRangeRequest(ranges, length); + // If the stream doesn't have checksums, just delegate. - VectoredReadUtils.validateVectoredReadRanges(ranges); if (sums == null) { datas.readVectored(ranges, allocate); return; @@ -410,15 +461,18 @@ public abstract class ChecksumFileSystem extends FilterFileSystem { List dataRanges = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum, minSeek, maxReadSizeForVectorReads()); + // While merging the ranges above, they are rounded up based on the value of bytesPerSum + // which leads to some ranges crossing the EOF thus they need to be fixed else it will + // cause EOFException during actual reads. + for (CombinedFileRange range : dataRanges) { + if (range.getOffset() + range.getLength() > length) { + range.setLength((int) (length - range.getOffset())); + } + } List checksumRanges = findChecksumRanges(dataRanges, bytesPerSum, minSeek, maxSize); sums.readVectored(checksumRanges, allocate); datas.readVectored(dataRanges, allocate); - // Data read is correct. I have verified content of dataRanges. - // There is some bug below here as test (testVectoredReadMultipleRanges) - // is failing, should be - // somewhere while slicing the merged data into smaller user ranges. - // Spend some time figuring out but it is a complex code. for(CombinedFileRange checksumRange: checksumRanges) { for(FileRange dataRange: checksumRange.getUnderlying()) { // when we have both the ranges, validate the checksum diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index c76f1839b77..86b645b9ec9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -272,6 +272,11 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac } } + /** + * Test to validate EOF ranges. Default implementation fails with EOFException + * while reading the ranges. Some implementation like s3, checksum fs fail fast + * as they already have the file length calculated. + */ @Test public void testEOFRanges() throws Exception { FileSystem fs = getFileSystem(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java index 5d6ca3f8f0c..5ee88801531 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.contract.localfs; +import java.io.EOFException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -30,6 +31,7 @@ import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; @@ -52,9 +54,33 @@ public class TestLocalFSContractVectoredRead extends AbstractContractVectoredRea @Test public void testChecksumValidationDuringVectoredRead() throws Exception { - Path testPath = path("big_range_checksum"); + Path testPath = path("big_range_checksum_file"); + List someRandomRanges = new ArrayList<>(); + someRandomRanges.add(FileRange.createFileRange(10, 1024)); + someRandomRanges.add(FileRange.createFileRange(1025, 1024)); + validateCheckReadException(testPath, DATASET_LEN, someRandomRanges); + } + + + /** + * Test for file size less than checksum chunk size. + * {@code ChecksumFileSystem#bytesPerChecksum}. + */ + @Test + public void testChecksumValidationDuringVectoredReadSmallFile() throws Exception { + Path testPath = path("big_range_checksum_file"); + final int length = 471; + List smallFileRanges = new ArrayList<>(); + smallFileRanges.add(FileRange.createFileRange(10, 50)); + smallFileRanges.add(FileRange.createFileRange(100, 20)); + validateCheckReadException(testPath, length, smallFileRanges); + } + + private void validateCheckReadException(Path testPath, + int length, + List ranges) throws Exception { LocalFileSystem localFs = (LocalFileSystem) getFileSystem(); - final byte[] datasetCorrect = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); + final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32); try (FSDataOutputStream out = localFs.create(testPath, true)){ out.write(datasetCorrect); } @@ -63,24 +89,55 @@ public class TestLocalFSContractVectoredRead extends AbstractContractVectoredRea .describedAs("Checksum file should be present") .isTrue(); CompletableFuture fis = localFs.openFile(testPath).build(); - List someRandomRanges = new ArrayList<>(); - someRandomRanges.add(FileRange.createFileRange(10, 1024)); - someRandomRanges.add(FileRange.createFileRange(1025, 1024)); try (FSDataInputStream in = fis.get()){ - in.readVectored(someRandomRanges, getAllocate()); - validateVectoredReadResult(someRandomRanges, datasetCorrect); + in.readVectored(ranges, getAllocate()); + validateVectoredReadResult(ranges, datasetCorrect); } - final byte[] datasetCorrupted = ContractTestUtils.dataset(DATASET_LEN, 'a', 64); + final byte[] datasetCorrupted = ContractTestUtils.dataset(length, 'a', 64); try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){ out.write(datasetCorrupted); } CompletableFuture fisN = localFs.openFile(testPath).build(); try (FSDataInputStream in = fisN.get()){ - in.readVectored(someRandomRanges, getAllocate()); + in.readVectored(ranges, getAllocate()); // Expect checksum exception when data is updated directly through // raw local fs instance. intercept(ChecksumException.class, - () -> validateVectoredReadResult(someRandomRanges, datasetCorrupted)); + () -> validateVectoredReadResult(ranges, datasetCorrupted)); } } + @Test + public void tesChecksumVectoredReadBoundaries() throws Exception { + Path testPath = path("boundary_range_checksum_file"); + final int length = 1071; + LocalFileSystem localFs = (LocalFileSystem) getFileSystem(); + final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32); + try (FSDataOutputStream out = localFs.create(testPath, true)){ + out.write(datasetCorrect); + } + Path checksumPath = localFs.getChecksumFile(testPath); + Assertions.assertThat(localFs.exists(checksumPath)) + .describedAs("Checksum file should be present at {} ", checksumPath) + .isTrue(); + CompletableFuture fis = localFs.openFile(testPath).build(); + List smallRange = new ArrayList<>(); + smallRange.add(FileRange.createFileRange(1000, 71)); + try (FSDataInputStream in = fis.get()){ + in.readVectored(smallRange, getAllocate()); + validateVectoredReadResult(smallRange, datasetCorrect); + } + } + + + /** + * Overriding in checksum fs as vectored read api fails fast + * in case of EOF requested range. + */ + @Override + public void testEOFRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); + verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 7ef1e216cff..b6ac8669a67 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -1005,10 +1005,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, private void validateRangeRequest(FileRange range) throws EOFException { VectoredReadUtils.validateRangeRequest(range); if(range.getOffset() + range.getLength() > contentLength) { - LOG.warn("Requested range [{}, {}) is beyond EOF for path {}", + final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s", range.getOffset(), range.getLength(), pathStr); - throw new EOFException("Requested range [" + range.getOffset() +", " - + range.getLength() + ") is beyond EOF for path " + pathStr); + LOG.warn(errMsg); + throw new EOFException(errMsg); } }