Revert "Backport HDFS-8901 Use ByteBuffer in DFSInputStream#read to branch-2.9"

This reverts commit bccb0fd846.
This commit is contained in:
stack 2019-07-01 11:07:43 -07:00
parent bccb0fd846
commit 1dd54163fc
2 changed files with 49 additions and 40 deletions

View File

@ -304,7 +304,7 @@ public class DataChecksum implements Checksum {
} }
return; return;
} }
if (NativeCrc32.isAvailable() && data.isDirect()) { if (NativeCrc32.isAvailable()) {
NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data, NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data,
fileName, basePos); fileName, basePos);
} else { } else {

View File

@ -1180,14 +1180,15 @@ public class DFSInputStream extends FSInputStream
} }
protected void fetchBlockByteRange(LocatedBlock block, long start, long end, protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
ByteBuffer buf, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
while (true) { while (true) {
DNAddrPair addressPair = chooseDataNode(block, null); DNAddrPair addressPair = chooseDataNode(block, null);
// Latest block, if refreshed internally // Latest block, if refreshed internally
block = addressPair.block; block = addressPair.block;
try { try {
actualGetFromOneDataNode(addressPair, start, end, buf, actualGetFromOneDataNode(addressPair, start, end, buf, offset,
corruptedBlockMap); corruptedBlockMap);
return; return;
} catch (IOException e) { } catch (IOException e) {
@ -1208,32 +1209,53 @@ public class DFSInputStream extends FSInputStream
@Override @Override
public ByteBuffer call() throws Exception { public ByteBuffer call() throws Exception {
DFSClientFaultInjector.get().sleepBeforeHedgedGet(); DFSClientFaultInjector.get().sleepBeforeHedgedGet();
byte[] buf = bb.array();
int offset = bb.position();
try (TraceScope ignored = dfsClient.getTracer(). try (TraceScope ignored = dfsClient.getTracer().
newScope("hedgedRead" + hedgedReadId, parentSpanId)) { newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlockMap); actualGetFromOneDataNode(datanode, start, end, buf, offset,
corruptedBlockMap);
return bb; return bb;
} }
} }
}; };
} }
/**
* Used when reading contiguous blocks
*/
private void actualGetFromOneDataNode(final DNAddrPair datanode,
final long start, final long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
final int length = (int) (end - start + 1);
actualGetFromOneDataNode(datanode, start, end, buf, new int[] { offset },
new int[] { length }, corruptedBlockMap);
}
/** /**
* Read data from one DataNode. * Read data from one DataNode.
* @param datanode the datanode from which to read data * @param datanode the datanode from which to read data
* @param startInBlk the startInBlk offset of the block * @param startInBlk the startInBlk offset of the block
* @param endInBlk the endInBlk offset of the block * @param endInBlk the endInBlk offset of the block
* @param buf the given byte buffer into which the data is read * @param buf the given byte array into which the data is read
* @param offsets the data may be read into multiple segments of the buf
* (when reading a striped block). this array indicates the
* offset of each buf segment.
* @param lengths the length of each buf segment
* @param corruptedBlockMap map recording list of datanodes with corrupted * @param corruptedBlockMap map recording list of datanodes with corrupted
* block replica * block replica
*/ */
void actualGetFromOneDataNode(final DNAddrPair datanode, void actualGetFromOneDataNode(final DNAddrPair datanode,
final long startInBlk, final long endInBlk, ByteBuffer buf, final long startInBlk, final long endInBlk, byte[] buf,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) int[] offsets, int[] lengths, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode(); DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once int refetchEncryptionKey = 1; // only need to get a new encryption key once
final int len = (int) (endInBlk - startInBlk + 1); final int len = (int) (endInBlk - startInBlk + 1);
checkReadPortions(offsets, lengths, len);
LocatedBlock block = datanode.block; LocatedBlock block = datanode.block;
while (true) { while (true) {
BlockReader reader = null; BlockReader reader = null;
@ -1241,26 +1263,15 @@ public class DFSInputStream extends FSInputStream
DFSClientFaultInjector.get().fetchFromDatanodeException(); DFSClientFaultInjector.get().fetchFromDatanodeException();
reader = getBlockReader(block, startInBlk, len, datanode.addr, reader = getBlockReader(block, startInBlk, len, datanode.addr,
datanode.storageType, datanode.info); datanode.storageType, datanode.info);
for (int i = 0; i < offsets.length; i++) {
// Behave exactly as the readAll() call int nread = reader.readAll(buf, offsets[i], lengths[i]);
ByteBuffer tmp = buf.duplicate(); updateReadStatistics(readStatistics, nread, reader);
tmp.limit(tmp.position() + len); dfsClient.updateFileSystemReadStats(
tmp = tmp.slice(); reader.getNetworkDistance(), nread);
int nread = 0; if (nread != lengths[i]) {
int ret; throw new IOException("truncated return from reader.read(): " +
while (true) { "excpected " + lengths[i] + ", got " + nread);
ret = reader.read(tmp);
if (ret <= 0) {
break;
} }
nread += ret;
}
buf.position(buf.position() + nread);
updateReadStatistics(readStatistics, nread, reader);
dfsClient.updateFileSystemReadStats(reader.getNetworkDistance(), nread);
if (nread != len) {
throw new IOException("truncated return from reader.read(): "
+ "excpected " + len + ", got " + nread);
} }
DFSClientFaultInjector.get().readFromDatanodeDelay(); DFSClientFaultInjector.get().readFromDatanodeDelay();
return; return;
@ -1343,7 +1354,7 @@ public class DFSInputStream extends FSInputStream
* time. We then wait on which ever read returns first. * time. We then wait on which ever read returns first.
*/ */
private void hedgedFetchBlockByteRange(LocatedBlock block, long start, private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, ByteBuffer buf, long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
final DfsClientConf conf = dfsClient.getConf(); final DfsClientConf conf = dfsClient.getConf();
@ -1378,8 +1389,8 @@ public class DFSInputStream extends FSInputStream
conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS); conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
if (future != null) { if (future != null) {
ByteBuffer result = future.get(); ByteBuffer result = future.get();
result.flip(); System.arraycopy(result.array(), result.position(), buf, offset,
buf.put(result); len);
return; return;
} }
DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged " DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
@ -1427,8 +1438,8 @@ public class DFSInputStream extends FSInputStream
// cancel the rest. // cancel the rest.
cancelAll(futures); cancelAll(futures);
dfsClient.getHedgedReadMetrics().incHedgedReadWins(); dfsClient.getHedgedReadMetrics().incHedgedReadWins();
result.flip(); System.arraycopy(result.array(), result.position(), buf, offset,
buf.put(result); len);
return; return;
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
// Ignore and retry // Ignore and retry
@ -1531,8 +1542,7 @@ public class DFSInputStream extends FSInputStream
try (TraceScope scope = dfsClient. try (TraceScope scope = dfsClient.
newReaderTraceScope("DFSInputStream#byteArrayPread", newReaderTraceScope("DFSInputStream#byteArrayPread",
src, position, length)) { src, position, length)) {
ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length); int retLen = pread(position, buffer, offset, length);
int retLen = pread(position, bb);
if (retLen < length) { if (retLen < length) {
dfsClient.addRetLenToReaderScope(scope, retLen); dfsClient.addRetLenToReaderScope(scope, retLen);
} }
@ -1540,7 +1550,7 @@ public class DFSInputStream extends FSInputStream
} }
} }
private int pread(long position, ByteBuffer buffer) private int pread(long position, byte[] buffer, int offset, int length)
throws IOException { throws IOException {
// sanity checks // sanity checks
dfsClient.checkOpen(); dfsClient.checkOpen();
@ -1552,7 +1562,6 @@ public class DFSInputStream extends FSInputStream
if ((position < 0) || (position >= filelen)) { if ((position < 0) || (position >= filelen)) {
return -1; return -1;
} }
int length = buffer.remaining();
int realLen = length; int realLen = length;
if ((position + length) > filelen) { if ((position + length) > filelen) {
realLen = (int)(filelen - position); realLen = (int)(filelen - position);
@ -1566,14 +1575,13 @@ public class DFSInputStream extends FSInputStream
for (LocatedBlock blk : blockRange) { for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset(); long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
long targetEnd = targetStart + bytesToRead - 1;
try { try {
if (dfsClient.isHedgedReadsEnabled()) { if (dfsClient.isHedgedReadsEnabled()) {
hedgedFetchBlockByteRange(blk, targetStart, targetEnd, buffer, hedgedFetchBlockByteRange(blk, targetStart,
corruptedBlockMap); targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
} else { } else {
fetchBlockByteRange(blk, targetStart, targetEnd, buffer, fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
corruptedBlockMap); buffer, offset, corruptedBlockMap);
} }
} finally { } finally {
// Check and report if any block replicas are corrupted. // Check and report if any block replicas are corrupted.
@ -1584,6 +1592,7 @@ public class DFSInputStream extends FSInputStream
remaining -= bytesToRead; remaining -= bytesToRead;
position += bytesToRead; position += bytesToRead;
offset += bytesToRead;
} }
assert remaining == 0 : "Wrong number of bytes read."; assert remaining == 0 : "Wrong number of bytes read.";
return realLen; return realLen;