diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java index ecea795ee5f..caf8aad32e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; /** * Used for injecting faults in DFSClient and DFSOutputStream tests. @@ -65,4 +66,7 @@ public boolean skipRollingRestartWait() { public void sleepBeforeHedgedGet() {} public void delayWhenRenewLeaseTimeout() {} + + public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long offset, long length) {} + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 6377bc461de..5ae51709593 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -232,7 +232,7 @@ private long getOffsetInBlockGroup(long pos) { boolean createBlockReader(LocatedBlock block, long offsetInBlock, LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos, - int chunkIndex) throws IOException { + int chunkIndex, long readTo) throws IOException { BlockReader reader = null; final ReaderRetryPolicy retry = new ReaderRetryPolicy(); DFSInputStream.DNAddrPair dnInfo = @@ -250,9 +250,14 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock, if (dnInfo == null) { break; } + if (readTo < 0 || readTo > block.getBlockSize()) { + readTo = block.getBlockSize(); + } reader = getBlockReader(block, offsetInBlock, - block.getBlockSize() - offsetInBlock, + readTo - offsetInBlock, dnInfo.addr, dnInfo.storageType, dnInfo.info); + DFSClientFaultInjector.get().onCreateBlockReader(block, chunkIndex, offsetInBlock, + readTo - offsetInBlock); } catch (IOException e) { if (e instanceof InvalidEncryptionKeyException && retry.shouldRefetchEncryptionKey()) { @@ -485,11 +490,16 @@ protected void fetchBlockByteRange(LocatedBlock block, long start, final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( blockGroup, cellSize, dataBlkNum, parityBlkNum); final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize]; + long readTo = -1; + for (AlignedStripe stripe : stripes) { + readTo = Math.max(readTo, stripe.getOffsetInBlock() + stripe.getSpanInBlock()); + } try { for (AlignedStripe stripe : stripes) { // Parse group to get chosen DN location StripeReader preader = new PositionStripeReader(stripe, ecPolicy, blks, preaderInfos, corruptedBlocks, decoder, this); + preader.setReadTo(readTo); try { preader.readStripe(); } finally { @@ -554,4 +564,5 @@ public synchronized void unbuffer() { parityBuf = null; } } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index 932ddb491cc..3fc87c7952a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -119,6 +119,7 @@ void skip() { protected final int cellSize; protected final RawErasureDecoder decoder; protected final DFSStripedInputStream dfsStripedInputStream; + private long readTo = -1; protected ECChunk[] decodeInputs; @@ -302,7 +303,7 @@ boolean readChunk(final LocatedBlock block, int chunkIndex) if (readerInfos[chunkIndex] == null) { if (!dfsStripedInputStream.createBlockReader(block, alignedStripe.getOffsetInBlock(), targetBlocks, - readerInfos, chunkIndex)) { + readerInfos, chunkIndex, readTo)) { chunk.state = StripingChunk.MISSING; return false; } @@ -478,4 +479,9 @@ void clearFutures() { boolean useDirectBuffer() { return decoder.preferDirectBuffer(); } + + public void setReadTo(long readTo) { + this.readTo = readTo; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index aedea3c8acd..12cfd49a0bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -49,7 +49,9 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; @@ -664,4 +666,70 @@ public void testUnbuffer() throws Exception { assertNull(in.parityBuf); in.close(); } + + @Test + public void testBlockReader() throws Exception { + ecPolicy = StripedFileTestUtil.getDefaultECPolicy(); // RS-6-3-1024k + int fileSize = 19 * cellSize + 100; + long stripeSize = (long) dataBlocks * cellSize; + byte[] bytes = StripedFileTestUtil.generateBytes(fileSize); + DFSTestUtil.writeFile(fs, filePath, new String(bytes)); + + try (DFSStripedInputStream in = + (DFSStripedInputStream) fs.getClient().open(filePath.toString())) { + // Verify pread: + verifyPreadRanges(in, 0, 2 * cellSize, + 2 * cellSize, Arrays.asList("0_0_1048576", "1_0_1048576")); + verifyPreadRanges(in, 0, 5 * cellSize + 9527, + 5 * cellSize + 9527, Arrays.asList("0_0_1048576", "1_0_1048576", + "2_0_1048576", "3_0_1048576", "4_0_1048576", "5_0_1048576")); + verifyPreadRanges(in, 100, 5 * cellSize + 9527, + 5 * cellSize + 9527, Arrays.asList("0_100_1048476", "1_0_1048576", + "2_0_1048576", "3_0_1048576", "4_0_1048576", "5_0_1048576")); + verifyPreadRanges(in, stripeSize * 3, 2 * cellSize, + cellSize + 100, Arrays.asList("0_1048576_1048576", "1_1048576_100")); + + // Verify sread: + verifySreadRanges(in, 0, Arrays.asList("0_0_2097152", "1_0_2097152", + "2_0_2097152", "3_0_2097152", "4_0_2097152", "5_0_2097152")); + verifySreadRanges(in, stripeSize * 2, Arrays.asList("0_0_2097152", "1_0_1048676", + "2_0_1048576", "3_0_1048576", "4_0_1048576", "5_0_1048576")); + } + } + + private void verifyPreadRanges(DFSStripedInputStream in, long position, + int length, int lengthExpected, + List rangesExpected) throws Exception { + List ranges = new ArrayList<>(); // range format: chunkIndex_offset_len + DFSClientFaultInjector.set(new DFSClientFaultInjector() { + @Override + public void onCreateBlockReader(LocatedBlock block, int chunkIndex, + long offset, long length) { + ranges.add(String.format("%s_%s_%s", chunkIndex, offset, length)); + } + }); + assertEquals(lengthExpected, in.read(position, new byte[length], 0, length)); + Collections.sort(ranges); + Collections.sort(rangesExpected); + assertEquals(rangesExpected, ranges); + } + + private void verifySreadRanges(DFSStripedInputStream in, long position, + List rangesExpected) throws Exception { + List ranges = new ArrayList<>(); // range format: chunkIndex_offset_len + DFSClientFaultInjector.set(new DFSClientFaultInjector() { + @Override + public void onCreateBlockReader(LocatedBlock block, int chunkIndex, + long offset, long length) { + ranges.add(String.format("%s_%s_%s", chunkIndex, offset, length)); + } + }); + in.seek(position); + int length = in.read(new byte[1024]); + assertEquals(1024, length); + Collections.sort(ranges); + Collections.sort(rangesExpected); + assertEquals(rangesExpected, ranges); + } + }