From 1dd54163fcb40b60efc7ca153d443129e424df7d Mon Sep 17 00:00:00 2001 From: stack Date: Mon, 1 Jul 2019 11:07:43 -0700 Subject: [PATCH] Revert "Backport HDFS-8901 Use ByteBuffer in DFSInputStream#read to branch-2.9" This reverts commit bccb0fd846b7cc69e0d83140193eaa7fc0ac081d. --- .../org/apache/hadoop/util/DataChecksum.java | 2 +- .../apache/hadoop/hdfs/DFSInputStream.java | 87 ++++++++++--------- 2 files changed, 49 insertions(+), 40 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java index 80e3b1e50f5..1681c92eef1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java @@ -304,7 +304,7 @@ public class DataChecksum implements Checksum { } return; } - if (NativeCrc32.isAvailable() && data.isDirect()) { + if (NativeCrc32.isAvailable()) { NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data, fileName, basePos); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index e7b68492d51..db42a8f74a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1180,14 +1180,15 @@ public class DFSInputStream extends FSInputStream } protected void fetchBlockByteRange(LocatedBlock block, long start, long end, - ByteBuffer buf, Map> corruptedBlockMap) + byte[] buf, int offset, + Map> corruptedBlockMap) throws IOException { while (true) { DNAddrPair addressPair = chooseDataNode(block, null); // Latest block, if refreshed internally block = addressPair.block; try { - actualGetFromOneDataNode(addressPair, start, end, buf, + actualGetFromOneDataNode(addressPair, start, end, buf, offset, corruptedBlockMap); return; } catch (IOException e) { @@ -1208,32 +1209,53 @@ public class DFSInputStream extends FSInputStream @Override public ByteBuffer call() throws Exception { DFSClientFaultInjector.get().sleepBeforeHedgedGet(); + byte[] buf = bb.array(); + int offset = bb.position(); try (TraceScope ignored = dfsClient.getTracer(). newScope("hedgedRead" + hedgedReadId, parentSpanId)) { - actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlockMap); + actualGetFromOneDataNode(datanode, start, end, buf, offset, + corruptedBlockMap); return bb; } } }; } + /** + * Used when reading contiguous blocks + */ + private void actualGetFromOneDataNode(final DNAddrPair datanode, + final long start, final long end, byte[] buf, int offset, + Map> corruptedBlockMap) + throws IOException { + final int length = (int) (end - start + 1); + actualGetFromOneDataNode(datanode, start, end, buf, new int[] { offset }, + new int[] { length }, corruptedBlockMap); + } + /** * Read data from one DataNode. * @param datanode the datanode from which to read data * @param startInBlk the startInBlk offset of the block * @param endInBlk the endInBlk offset of the block - * @param buf the given byte buffer into which the data is read + * @param buf the given byte array into which the data is read + * @param offsets the data may be read into multiple segments of the buf + * (when reading a striped block). this array indicates the + * offset of each buf segment. + * @param lengths the length of each buf segment * @param corruptedBlockMap map recording list of datanodes with corrupted * block replica */ void actualGetFromOneDataNode(final DNAddrPair datanode, - final long startInBlk, final long endInBlk, ByteBuffer buf, - Map> corruptedBlockMap) + final long startInBlk, final long endInBlk, byte[] buf, + int[] offsets, int[] lengths, Map> corruptedBlockMap) throws IOException { DFSClientFaultInjector.get().startFetchFromDatanode(); int refetchToken = 1; // only need to get a new access token once int refetchEncryptionKey = 1; // only need to get a new encryption key once final int len = (int) (endInBlk - startInBlk + 1); + checkReadPortions(offsets, lengths, len); + LocatedBlock block = datanode.block; while (true) { BlockReader reader = null; @@ -1241,26 +1263,15 @@ public class DFSInputStream extends FSInputStream DFSClientFaultInjector.get().fetchFromDatanodeException(); reader = getBlockReader(block, startInBlk, len, datanode.addr, datanode.storageType, datanode.info); - - // Behave exactly as the readAll() call - ByteBuffer tmp = buf.duplicate(); - tmp.limit(tmp.position() + len); - tmp = tmp.slice(); - int nread = 0; - int ret; - while (true) { - ret = reader.read(tmp); - if (ret <= 0) { - break; + for (int i = 0; i < offsets.length; i++) { + int nread = reader.readAll(buf, offsets[i], lengths[i]); + updateReadStatistics(readStatistics, nread, reader); + dfsClient.updateFileSystemReadStats( + reader.getNetworkDistance(), nread); + if (nread != lengths[i]) { + throw new IOException("truncated return from reader.read(): " + + "excpected " + lengths[i] + ", got " + nread); } - nread += ret; - } - buf.position(buf.position() + nread); - updateReadStatistics(readStatistics, nread, reader); - dfsClient.updateFileSystemReadStats(reader.getNetworkDistance(), nread); - if (nread != len) { - throw new IOException("truncated return from reader.read(): " - + "excpected " + len + ", got " + nread); } DFSClientFaultInjector.get().readFromDatanodeDelay(); return; @@ -1343,7 +1354,7 @@ public class DFSInputStream extends FSInputStream * time. We then wait on which ever read returns first. */ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, - long end, ByteBuffer buf, + long end, byte[] buf, int offset, Map> corruptedBlockMap) throws IOException { final DfsClientConf conf = dfsClient.getConf(); @@ -1378,8 +1389,8 @@ public class DFSInputStream extends FSInputStream conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS); if (future != null) { ByteBuffer result = future.get(); - result.flip(); - buf.put(result); + System.arraycopy(result.array(), result.position(), buf, offset, + len); return; } DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged " @@ -1427,8 +1438,8 @@ public class DFSInputStream extends FSInputStream // cancel the rest. cancelAll(futures); dfsClient.getHedgedReadMetrics().incHedgedReadWins(); - result.flip(); - buf.put(result); + System.arraycopy(result.array(), result.position(), buf, offset, + len); return; } catch (InterruptedException ie) { // Ignore and retry @@ -1531,8 +1542,7 @@ public class DFSInputStream extends FSInputStream try (TraceScope scope = dfsClient. newReaderTraceScope("DFSInputStream#byteArrayPread", src, position, length)) { - ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length); - int retLen = pread(position, bb); + int retLen = pread(position, buffer, offset, length); if (retLen < length) { dfsClient.addRetLenToReaderScope(scope, retLen); } @@ -1540,7 +1550,7 @@ public class DFSInputStream extends FSInputStream } } - private int pread(long position, ByteBuffer buffer) + private int pread(long position, byte[] buffer, int offset, int length) throws IOException { // sanity checks dfsClient.checkOpen(); @@ -1552,7 +1562,6 @@ public class DFSInputStream extends FSInputStream if ((position < 0) || (position >= filelen)) { return -1; } - int length = buffer.remaining(); int realLen = length; if ((position + length) > filelen) { realLen = (int)(filelen - position); @@ -1566,14 +1575,13 @@ public class DFSInputStream extends FSInputStream for (LocatedBlock blk : blockRange) { long targetStart = position - blk.getStartOffset(); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); - long targetEnd = targetStart + bytesToRead - 1; try { if (dfsClient.isHedgedReadsEnabled()) { - hedgedFetchBlockByteRange(blk, targetStart, targetEnd, buffer, - corruptedBlockMap); + hedgedFetchBlockByteRange(blk, targetStart, + targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); } else { - fetchBlockByteRange(blk, targetStart, targetEnd, buffer, - corruptedBlockMap); + fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, + buffer, offset, corruptedBlockMap); } } finally { // Check and report if any block replicas are corrupted. @@ -1584,6 +1592,7 @@ public class DFSInputStream extends FSInputStream remaining -= bytesToRead; position += bytesToRead; + offset += bytesToRead; } assert remaining == 0 : "Wrong number of bytes read."; return realLen;