HDFS-14585 Backport HDFS-8901 Use ByteBuffer in DFSInputStream#read to branch2.9
Reapply but include JIRA # in the commit message this time. Revert "Revert "Backport HDFS-8901 Use ByteBuffer in DFSInputStream#read to branch-2.9"" This reverts commit 1dd54163fcb40b60efc7ca153d443129e424df7d.
This commit is contained in:
@ -304,7 +304,7 @@ public void verifyChunkedSums(ByteBuffer data, ByteBuffer checksums,
if (NativeCrc32.isAvailable()) {
if (NativeCrc32.isAvailable() && data.isDirect()) {
NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data,
fileName, basePos);
} else {
@ -1180,15 +1180,14 @@ private static String getBestNodeDNAddrPairErrorString(
protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
ByteBuffer buf, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
// Latest block, if refreshed internally
block = addressPair.block;
try {
actualGetFromOneDataNode(addressPair, start, end, buf, offset,
actualGetFromOneDataNode(addressPair, start, end, buf,
} catch (IOException e) {
@ -1209,53 +1208,32 @@ private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
public ByteBuffer call() throws Exception {
byte[] buf = bb.array();
int offset = bb.position();
try (TraceScope ignored = dfsClient.getTracer().
newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
actualGetFromOneDataNode(datanode, start, end, buf, offset,
actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlockMap);
return bb;
* Used when reading contiguous blocks
private void actualGetFromOneDataNode(final DNAddrPair datanode,
final long start, final long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
final int length = (int) (end - start + 1);
actualGetFromOneDataNode(datanode, start, end, buf, new int[] { offset },
new int[] { length }, corruptedBlockMap);
* Read data from one DataNode.
* @param datanode the datanode from which to read data
* @param startInBlk the startInBlk offset of the block
* @param endInBlk the endInBlk offset of the block
* @param buf the given byte array into which the data is read
* @param offsets the data may be read into multiple segments of the buf
* (when reading a striped block). this array indicates the
* offset of each buf segment.
* @param lengths the length of each buf segment
* @param buf the given byte buffer into which the data is read
* @param corruptedBlockMap map recording list of datanodes with corrupted
* block replica
void actualGetFromOneDataNode(final DNAddrPair datanode,
final long startInBlk, final long endInBlk, byte[] buf,
int[] offsets, int[] lengths, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
final long startInBlk, final long endInBlk, ByteBuffer buf,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
final int len = (int) (endInBlk - startInBlk + 1);
checkReadPortions(offsets, lengths, len);
LocatedBlock block = datanode.block;
while (true) {
BlockReader reader = null;
@ -1263,15 +1241,26 @@ void actualGetFromOneDataNode(final DNAddrPair datanode,
reader = getBlockReader(block, startInBlk, len, datanode.addr,
datanode.storageType, datanode.info);
for (int i = 0; i < offsets.length; i++) {
int nread = reader.readAll(buf, offsets[i], lengths[i]);
updateReadStatistics(readStatistics, nread, reader);
reader.getNetworkDistance(), nread);
if (nread != lengths[i]) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + lengths[i] + ", got " + nread);
// Behave exactly as the readAll() call
ByteBuffer tmp = buf.duplicate();
tmp.limit(tmp.position() + len);
tmp = tmp.slice();
int nread = 0;
int ret;
while (true) {
ret = reader.read(tmp);
if (ret <= 0) {
nread += ret;
buf.position(buf.position() + nread);
updateReadStatistics(readStatistics, nread, reader);
dfsClient.updateFileSystemReadStats(reader.getNetworkDistance(), nread);
if (nread != len) {
throw new IOException("truncated return from reader.read(): "
+ "excpected " + len + ", got " + nread);
@ -1354,7 +1343,7 @@ private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
* time. We then wait on which ever read returns first.
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset,
long end, ByteBuffer buf,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
final DfsClientConf conf = dfsClient.getConf();
@ -1389,8 +1378,8 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
if (future != null) {
ByteBuffer result = future.get();
System.arraycopy(result.array(), result.position(), buf, offset,
DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
@ -1438,8 +1427,8 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
// cancel the rest.
System.arraycopy(result.array(), result.position(), buf, offset,
} catch (InterruptedException ie) {
// Ignore and retry
@ -1542,7 +1531,8 @@ public int read(long position, byte[] buffer, int offset, int length)
try (TraceScope scope = dfsClient.
src, position, length)) {
int retLen = pread(position, buffer, offset, length);
ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length);
int retLen = pread(position, bb);
if (retLen < length) {
dfsClient.addRetLenToReaderScope(scope, retLen);
@ -1550,7 +1540,7 @@ public int read(long position, byte[] buffer, int offset, int length)
private int pread(long position, byte[] buffer, int offset, int length)
private int pread(long position, ByteBuffer buffer)
throws IOException {
// sanity checks
@ -1562,6 +1552,7 @@ private int pread(long position, byte[] buffer, int offset, int length)
if ((position < 0) || (position >= filelen)) {
return -1;
int length = buffer.remaining();
int realLen = length;
if ((position + length) > filelen) {
realLen = (int)(filelen - position);
@ -1575,13 +1566,14 @@ private int pread(long position, byte[] buffer, int offset, int length)
for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
long targetEnd = targetStart + bytesToRead - 1;
try {
if (dfsClient.isHedgedReadsEnabled()) {
hedgedFetchBlockByteRange(blk, targetStart,
targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
hedgedFetchBlockByteRange(blk, targetStart, targetEnd, buffer,
} else {
fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
buffer, offset, corruptedBlockMap);
fetchBlockByteRange(blk, targetStart, targetEnd, buffer,
} finally {
// Check and report if any block replicas are corrupted.
@ -1592,7 +1584,6 @@ private int pread(long position, byte[] buffer, int offset, int length)
remaining -= bytesToRead;
position += bytesToRead;
offset += bytesToRead;
assert remaining == 0 : "Wrong number of bytes read.";
return realLen;
Reference in New Issue
Block a user