Backport HDFS-8901 Use ByteBuffer in DFSInputStream#read to branch-2.9

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
sunlisheng 2019-06-20 17:00:51 +08:00 committed by stack
parent 0272480e9f
commit bccb0fd846
2 changed files with 40 additions and 49 deletions

View File

@ -304,7 +304,7 @@ public class DataChecksum implements Checksum {
}
return;
}
if (NativeCrc32.isAvailable()) {
if (NativeCrc32.isAvailable() && data.isDirect()) {
NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data,
fileName, basePos);
} else {

View File

@ -1180,15 +1180,14 @@ public class DFSInputStream extends FSInputStream
}
protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
ByteBuffer buf, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
// Latest block, if refreshed internally
block = addressPair.block;
try {
actualGetFromOneDataNode(addressPair, start, end, buf, offset,
actualGetFromOneDataNode(addressPair, start, end, buf,
corruptedBlockMap);
return;
} catch (IOException e) {
@ -1209,53 +1208,32 @@ public class DFSInputStream extends FSInputStream
@Override
public ByteBuffer call() throws Exception {
DFSClientFaultInjector.get().sleepBeforeHedgedGet();
byte[] buf = bb.array();
int offset = bb.position();
try (TraceScope ignored = dfsClient.getTracer().
newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
actualGetFromOneDataNode(datanode, start, end, buf, offset,
corruptedBlockMap);
actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlockMap);
return bb;
}
}
};
}
/**
* Used when reading contiguous blocks
*/
private void actualGetFromOneDataNode(final DNAddrPair datanode,
final long start, final long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
final int length = (int) (end - start + 1);
actualGetFromOneDataNode(datanode, start, end, buf, new int[] { offset },
new int[] { length }, corruptedBlockMap);
}
/**
* Read data from one DataNode.
* @param datanode the datanode from which to read data
* @param startInBlk the startInBlk offset of the block
* @param endInBlk the endInBlk offset of the block
* @param buf the given byte array into which the data is read
* @param offsets the data may be read into multiple segments of the buf
* (when reading a striped block). this array indicates the
* offset of each buf segment.
* @param lengths the length of each buf segment
* @param buf the given byte buffer into which the data is read
* @param corruptedBlockMap map recording list of datanodes with corrupted
* block replica
*/
void actualGetFromOneDataNode(final DNAddrPair datanode,
final long startInBlk, final long endInBlk, byte[] buf,
int[] offsets, int[] lengths, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
final long startInBlk, final long endInBlk, ByteBuffer buf,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
final int len = (int) (endInBlk - startInBlk + 1);
checkReadPortions(offsets, lengths, len);
LocatedBlock block = datanode.block;
while (true) {
BlockReader reader = null;
@ -1263,15 +1241,26 @@ public class DFSInputStream extends FSInputStream
DFSClientFaultInjector.get().fetchFromDatanodeException();
reader = getBlockReader(block, startInBlk, len, datanode.addr,
datanode.storageType, datanode.info);
for (int i = 0; i < offsets.length; i++) {
int nread = reader.readAll(buf, offsets[i], lengths[i]);
updateReadStatistics(readStatistics, nread, reader);
dfsClient.updateFileSystemReadStats(
reader.getNetworkDistance(), nread);
if (nread != lengths[i]) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + lengths[i] + ", got " + nread);
// Behave exactly as the readAll() call
ByteBuffer tmp = buf.duplicate();
tmp.limit(tmp.position() + len);
tmp = tmp.slice();
int nread = 0;
int ret;
while (true) {
ret = reader.read(tmp);
if (ret <= 0) {
break;
}
nread += ret;
}
buf.position(buf.position() + nread);
updateReadStatistics(readStatistics, nread, reader);
dfsClient.updateFileSystemReadStats(reader.getNetworkDistance(), nread);
if (nread != len) {
throw new IOException("truncated return from reader.read(): "
+ "excpected " + len + ", got " + nread);
}
DFSClientFaultInjector.get().readFromDatanodeDelay();
return;
@ -1354,7 +1343,7 @@ public class DFSInputStream extends FSInputStream
* time. We then wait on which ever read returns first.
*/
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset,
long end, ByteBuffer buf,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
final DfsClientConf conf = dfsClient.getConf();
@ -1389,8 +1378,8 @@ public class DFSInputStream extends FSInputStream
conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
if (future != null) {
ByteBuffer result = future.get();
System.arraycopy(result.array(), result.position(), buf, offset,
len);
result.flip();
buf.put(result);
return;
}
DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
@ -1438,8 +1427,8 @@ public class DFSInputStream extends FSInputStream
// cancel the rest.
cancelAll(futures);
dfsClient.getHedgedReadMetrics().incHedgedReadWins();
System.arraycopy(result.array(), result.position(), buf, offset,
len);
result.flip();
buf.put(result);
return;
} catch (InterruptedException ie) {
// Ignore and retry
@ -1542,7 +1531,8 @@ public class DFSInputStream extends FSInputStream
try (TraceScope scope = dfsClient.
newReaderTraceScope("DFSInputStream#byteArrayPread",
src, position, length)) {
int retLen = pread(position, buffer, offset, length);
ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length);
int retLen = pread(position, bb);
if (retLen < length) {
dfsClient.addRetLenToReaderScope(scope, retLen);
}
@ -1550,7 +1540,7 @@ public class DFSInputStream extends FSInputStream
}
}
private int pread(long position, byte[] buffer, int offset, int length)
private int pread(long position, ByteBuffer buffer)
throws IOException {
// sanity checks
dfsClient.checkOpen();
@ -1562,6 +1552,7 @@ public class DFSInputStream extends FSInputStream
if ((position < 0) || (position >= filelen)) {
return -1;
}
int length = buffer.remaining();
int realLen = length;
if ((position + length) > filelen) {
realLen = (int)(filelen - position);
@ -1575,13 +1566,14 @@ public class DFSInputStream extends FSInputStream
for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
long targetEnd = targetStart + bytesToRead - 1;
try {
if (dfsClient.isHedgedReadsEnabled()) {
hedgedFetchBlockByteRange(blk, targetStart,
targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
hedgedFetchBlockByteRange(blk, targetStart, targetEnd, buffer,
corruptedBlockMap);
} else {
fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
buffer, offset, corruptedBlockMap);
fetchBlockByteRange(blk, targetStart, targetEnd, buffer,
corruptedBlockMap);
}
} finally {
// Check and report if any block replicas are corrupted.
@ -1592,7 +1584,6 @@ public class DFSInputStream extends FSInputStream
remaining -= bytesToRead;
position += bytesToRead;
offset += bytesToRead;
}
assert remaining == 0 : "Wrong number of bytes read.";
return realLen;