HDFS-16520. Improve EC pread: avoid potential reading whole block (#4104)
Reviewed-by: Hui Fei <ferhui@apache.org>
Reviewed-by: Takanobu Asanuma <tasanuma@apache.org>
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit 29401c8203
)
This commit is contained in:
parent
4d935eaed7
commit
d711d200fe
|
@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used for injecting faults in DFSClient and DFSOutputStream tests.
|
* Used for injecting faults in DFSClient and DFSOutputStream tests.
|
||||||
|
@ -65,4 +66,7 @@ public class DFSClientFaultInjector {
|
||||||
public void sleepBeforeHedgedGet() {}
|
public void sleepBeforeHedgedGet() {}
|
||||||
|
|
||||||
public void delayWhenRenewLeaseTimeout() {}
|
public void delayWhenRenewLeaseTimeout() {}
|
||||||
|
|
||||||
|
public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long offset, long length) {}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -234,7 +234,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
|
|
||||||
boolean createBlockReader(LocatedBlock block, long offsetInBlock,
|
boolean createBlockReader(LocatedBlock block, long offsetInBlock,
|
||||||
LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
|
LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
|
||||||
int chunkIndex) throws IOException {
|
int chunkIndex, long readTo) throws IOException {
|
||||||
BlockReader reader = null;
|
BlockReader reader = null;
|
||||||
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
|
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
|
||||||
DFSInputStream.DNAddrPair dnInfo =
|
DFSInputStream.DNAddrPair dnInfo =
|
||||||
|
@ -252,9 +252,14 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
if (dnInfo == null) {
|
if (dnInfo == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (readTo < 0 || readTo > block.getBlockSize()) {
|
||||||
|
readTo = block.getBlockSize();
|
||||||
|
}
|
||||||
reader = getBlockReader(block, offsetInBlock,
|
reader = getBlockReader(block, offsetInBlock,
|
||||||
block.getBlockSize() - offsetInBlock,
|
readTo - offsetInBlock,
|
||||||
dnInfo.addr, dnInfo.storageType, dnInfo.info);
|
dnInfo.addr, dnInfo.storageType, dnInfo.info);
|
||||||
|
DFSClientFaultInjector.get().onCreateBlockReader(block, chunkIndex, offsetInBlock,
|
||||||
|
readTo - offsetInBlock);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
if (e instanceof InvalidEncryptionKeyException &&
|
if (e instanceof InvalidEncryptionKeyException &&
|
||||||
retry.shouldRefetchEncryptionKey()) {
|
retry.shouldRefetchEncryptionKey()) {
|
||||||
|
@ -489,11 +494,16 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
|
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
|
||||||
blockGroup, cellSize, dataBlkNum, parityBlkNum);
|
blockGroup, cellSize, dataBlkNum, parityBlkNum);
|
||||||
final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
|
final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
|
||||||
|
long readTo = -1;
|
||||||
|
for (AlignedStripe stripe : stripes) {
|
||||||
|
readTo = Math.max(readTo, stripe.getOffsetInBlock() + stripe.getSpanInBlock());
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
for (AlignedStripe stripe : stripes) {
|
for (AlignedStripe stripe : stripes) {
|
||||||
// Parse group to get chosen DN location
|
// Parse group to get chosen DN location
|
||||||
StripeReader preader = new PositionStripeReader(stripe, ecPolicy, blks,
|
StripeReader preader = new PositionStripeReader(stripe, ecPolicy, blks,
|
||||||
preaderInfos, corruptedBlocks, decoder, this);
|
preaderInfos, corruptedBlocks, decoder, this);
|
||||||
|
preader.setReadTo(readTo);
|
||||||
try {
|
try {
|
||||||
preader.readStripe();
|
preader.readStripe();
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -558,4 +568,5 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
parityBuf = null;
|
parityBuf = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,6 +119,7 @@ abstract class StripeReader {
|
||||||
protected final int cellSize;
|
protected final int cellSize;
|
||||||
protected final RawErasureDecoder decoder;
|
protected final RawErasureDecoder decoder;
|
||||||
protected final DFSStripedInputStream dfsStripedInputStream;
|
protected final DFSStripedInputStream dfsStripedInputStream;
|
||||||
|
private long readTo = -1;
|
||||||
|
|
||||||
protected ECChunk[] decodeInputs;
|
protected ECChunk[] decodeInputs;
|
||||||
|
|
||||||
|
@ -302,7 +303,7 @@ abstract class StripeReader {
|
||||||
if (readerInfos[chunkIndex] == null) {
|
if (readerInfos[chunkIndex] == null) {
|
||||||
if (!dfsStripedInputStream.createBlockReader(block,
|
if (!dfsStripedInputStream.createBlockReader(block,
|
||||||
alignedStripe.getOffsetInBlock(), targetBlocks,
|
alignedStripe.getOffsetInBlock(), targetBlocks,
|
||||||
readerInfos, chunkIndex)) {
|
readerInfos, chunkIndex, readTo)) {
|
||||||
chunk.state = StripingChunk.MISSING;
|
chunk.state = StripingChunk.MISSING;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -480,4 +481,9 @@ abstract class StripeReader {
|
||||||
boolean useDirectBuffer() {
|
boolean useDirectBuffer() {
|
||||||
return decoder.preferDirectBuffer();
|
return decoder.preferDirectBuffer();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setReadTo(long readTo) {
|
||||||
|
this.readTo = readTo;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,9 @@ import org.junit.rules.Timeout;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||||
|
@ -664,4 +666,70 @@ public class TestDFSStripedInputStream {
|
||||||
assertNull(in.parityBuf);
|
assertNull(in.parityBuf);
|
||||||
in.close();
|
in.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReader() throws Exception {
|
||||||
|
ecPolicy = StripedFileTestUtil.getDefaultECPolicy(); // RS-6-3-1024k
|
||||||
|
int fileSize = 19 * cellSize + 100;
|
||||||
|
long stripeSize = (long) dataBlocks * cellSize;
|
||||||
|
byte[] bytes = StripedFileTestUtil.generateBytes(fileSize);
|
||||||
|
DFSTestUtil.writeFile(fs, filePath, new String(bytes));
|
||||||
|
|
||||||
|
try (DFSStripedInputStream in =
|
||||||
|
(DFSStripedInputStream) fs.getClient().open(filePath.toString())) {
|
||||||
|
// Verify pread:
|
||||||
|
verifyPreadRanges(in, 0, 2 * cellSize,
|
||||||
|
2 * cellSize, Arrays.asList("0_0_1048576", "1_0_1048576"));
|
||||||
|
verifyPreadRanges(in, 0, 5 * cellSize + 9527,
|
||||||
|
5 * cellSize + 9527, Arrays.asList("0_0_1048576", "1_0_1048576",
|
||||||
|
"2_0_1048576", "3_0_1048576", "4_0_1048576", "5_0_1048576"));
|
||||||
|
verifyPreadRanges(in, 100, 5 * cellSize + 9527,
|
||||||
|
5 * cellSize + 9527, Arrays.asList("0_100_1048476", "1_0_1048576",
|
||||||
|
"2_0_1048576", "3_0_1048576", "4_0_1048576", "5_0_1048576"));
|
||||||
|
verifyPreadRanges(in, stripeSize * 3, 2 * cellSize,
|
||||||
|
cellSize + 100, Arrays.asList("0_1048576_1048576", "1_1048576_100"));
|
||||||
|
|
||||||
|
// Verify sread:
|
||||||
|
verifySreadRanges(in, 0, Arrays.asList("0_0_2097152", "1_0_2097152",
|
||||||
|
"2_0_2097152", "3_0_2097152", "4_0_2097152", "5_0_2097152"));
|
||||||
|
verifySreadRanges(in, stripeSize * 2, Arrays.asList("0_0_2097152", "1_0_1048676",
|
||||||
|
"2_0_1048576", "3_0_1048576", "4_0_1048576", "5_0_1048576"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyPreadRanges(DFSStripedInputStream in, long position,
|
||||||
|
int length, int lengthExpected,
|
||||||
|
List<String> rangesExpected) throws Exception {
|
||||||
|
List<String> ranges = new ArrayList<>(); // range format: chunkIndex_offset_len
|
||||||
|
DFSClientFaultInjector.set(new DFSClientFaultInjector() {
|
||||||
|
@Override
|
||||||
|
public void onCreateBlockReader(LocatedBlock block, int chunkIndex,
|
||||||
|
long offset, long length) {
|
||||||
|
ranges.add(String.format("%s_%s_%s", chunkIndex, offset, length));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertEquals(lengthExpected, in.read(position, new byte[length], 0, length));
|
||||||
|
Collections.sort(ranges);
|
||||||
|
Collections.sort(rangesExpected);
|
||||||
|
assertEquals(rangesExpected, ranges);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifySreadRanges(DFSStripedInputStream in, long position,
|
||||||
|
List<String> rangesExpected) throws Exception {
|
||||||
|
List<String> ranges = new ArrayList<>(); // range format: chunkIndex_offset_len
|
||||||
|
DFSClientFaultInjector.set(new DFSClientFaultInjector() {
|
||||||
|
@Override
|
||||||
|
public void onCreateBlockReader(LocatedBlock block, int chunkIndex,
|
||||||
|
long offset, long length) {
|
||||||
|
ranges.add(String.format("%s_%s_%s", chunkIndex, offset, length));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
in.seek(position);
|
||||||
|
int length = in.read(new byte[1024]);
|
||||||
|
assertEquals(1024, length);
|
||||||
|
Collections.sort(ranges);
|
||||||
|
Collections.sort(rangesExpected);
|
||||||
|
assertEquals(rangesExpected, ranges);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue