From 793447f79924c97c2b562d5e41fa85adf19673fe Mon Sep 17 00:00:00 2001 From: Kai Zheng Date: Wed, 24 Aug 2016 21:57:23 +0800 Subject: [PATCH] HDFS-8905. Refactor DFSInputStream#ReaderStrategy. Contributed by Kai Zheng and Sammi Chen --- .../apache/hadoop/hdfs/DFSInputStream.java | 235 ++---------------- .../hadoop/hdfs/DFSStripedInputStream.java | 23 +- .../apache/hadoop/hdfs/ReadStatistics.java | 106 ++++++++ .../apache/hadoop/hdfs/ReaderStrategy.java | 215 ++++++++++++++++ .../hdfs/client/HdfsDataInputStream.java | 3 +- .../hadoop/hdfs/util/IOUtilsClient.java | 16 ++ .../hadoop/hdfs/TestExternalBlockReader.java | 4 +- 7 files changed, 374 insertions(+), 228 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 6132f83efec..7a10ba4d0c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.hdfs.util.IOUtilsClient; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -145,94 +146,6 @@ public class DFSInputStream extends FSInputStream return extendedReadBuffers; } - public static class ReadStatistics { - public ReadStatistics() { - clear(); - } - - public ReadStatistics(ReadStatistics rhs) { - this.totalBytesRead = rhs.getTotalBytesRead(); - this.totalLocalBytesRead = rhs.getTotalLocalBytesRead(); - this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead(); - this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead(); - } - - /** - * @return The total bytes read. This will always be at least as - * high as the other numbers, since it includes all of them. - */ - public long getTotalBytesRead() { - return totalBytesRead; - } - - /** - * @return The total local bytes read. This will always be at least - * as high as totalShortCircuitBytesRead, since all short-circuit - * reads are also local. - */ - public long getTotalLocalBytesRead() { - return totalLocalBytesRead; - } - - /** - * @return The total short-circuit local bytes read. - */ - public long getTotalShortCircuitBytesRead() { - return totalShortCircuitBytesRead; - } - - /** - * @return The total number of zero-copy bytes read. - */ - public long getTotalZeroCopyBytesRead() { - return totalZeroCopyBytesRead; - } - - /** - * @return The total number of bytes read which were not local. - */ - public long getRemoteBytesRead() { - return totalBytesRead - totalLocalBytesRead; - } - - void addRemoteBytes(long amt) { - this.totalBytesRead += amt; - } - - void addLocalBytes(long amt) { - this.totalBytesRead += amt; - this.totalLocalBytesRead += amt; - } - - void addShortCircuitBytes(long amt) { - this.totalBytesRead += amt; - this.totalLocalBytesRead += amt; - this.totalShortCircuitBytesRead += amt; - } - - void addZeroCopyBytes(long amt) { - this.totalBytesRead += amt; - this.totalLocalBytesRead += amt; - this.totalShortCircuitBytesRead += amt; - this.totalZeroCopyBytesRead += amt; - } - - void clear() { - this.totalBytesRead = 0; - this.totalLocalBytesRead = 0; - this.totalShortCircuitBytesRead = 0; - this.totalZeroCopyBytesRead = 0; - } - - private long totalBytesRead; - - private long totalLocalBytesRead; - - private long totalShortCircuitBytesRead; - - private long totalZeroCopyBytesRead; - } - /** * This variable tracks the number of failures since the start of the * most recent user-facing operation. That is to say, it should be reset @@ -767,116 +680,11 @@ public class DFSInputStream extends FSInputStream return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff); } - /** - * Wraps different possible read implementations so that readBuffer can be - * strategy-agnostic. - */ - interface ReaderStrategy { - int doRead(BlockReader blockReader, int off, int len) - throws IOException; - - /** - * Copy data from the src ByteBuffer into the read buffer. - * @param src The src buffer where the data is copied from - * @param offset Useful only when the ReadStrategy is based on a byte array. - * Indicate the offset of the byte array for copy. - * @param length Useful only when the ReadStrategy is based on a byte array. - * Indicate the length of the data to copy. - */ - int copyFrom(ByteBuffer src, int offset, int length); - } - - protected void updateReadStatistics(ReadStatistics readStatistics, - int nRead, BlockReader blockReader) { - if (nRead <= 0) return; - synchronized(infoLock) { - if (blockReader.isShortCircuit()) { - readStatistics.addShortCircuitBytes(nRead); - } else if (blockReader.getNetworkDistance() == 0) { - readStatistics.addLocalBytes(nRead); - } else { - readStatistics.addRemoteBytes(nRead); - } - } - } - - /** - * Used to read bytes into a byte[] - */ - private class ByteArrayStrategy implements ReaderStrategy { - final byte[] buf; - - public ByteArrayStrategy(byte[] buf) { - this.buf = buf; - } - - @Override - public int doRead(BlockReader blockReader, int off, int len) - throws IOException { - int nRead = blockReader.read(buf, off, len); - updateReadStatistics(readStatistics, nRead, blockReader); - dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), - nRead); - return nRead; - } - - @Override - public int copyFrom(ByteBuffer src, int offset, int length) { - ByteBuffer writeSlice = src.duplicate(); - writeSlice.get(buf, offset, length); - return length; - } - } - - /** - * Used to read bytes into a user-supplied ByteBuffer - */ - protected class ByteBufferStrategy implements ReaderStrategy { - final ByteBuffer buf; - ByteBufferStrategy(ByteBuffer buf) { - this.buf = buf; - } - - @Override - public int doRead(BlockReader blockReader, int off, int len) - throws IOException { - int oldpos = buf.position(); - int oldlimit = buf.limit(); - boolean success = false; - try { - int ret = blockReader.read(buf); - success = true; - updateReadStatistics(readStatistics, ret, blockReader); - dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), - ret); - if (ret == 0) { - DFSClient.LOG.warn("zero"); - } - return ret; - } finally { - if (!success) { - // Reset to original state so that retries work correctly. - buf.position(oldpos); - buf.limit(oldlimit); - } - } - } - - @Override - public int copyFrom(ByteBuffer src, int offset, int length) { - ByteBuffer writeSlice = src.duplicate(); - int remaining = Math.min(buf.remaining(), writeSlice.remaining()); - writeSlice.limit(writeSlice.position() + remaining); - buf.put(writeSlice); - return remaining; - } - } - /* This is a used by regular read() and handles ChecksumExceptions. * name readBuffer() is chosen to imply similarity to readBuffer() in * ChecksumFileSystem */ - private synchronized int readBuffer(ReaderStrategy reader, int off, int len, + private synchronized int readBuffer(ReaderStrategy reader, int len, CorruptedBlocks corruptedBlocks) throws IOException { IOException ioe; @@ -892,7 +700,7 @@ public class DFSInputStream extends FSInputStream while (true) { // retry as many times as seekToNewSource allows. try { - return reader.doRead(blockReader, off, len); + return reader.readFromBlock(blockReader, len); } catch ( ChecksumException ce ) { DFSClient.LOG.warn("Found Checksum error for " + getCurrentBlock() + " from " + currentNode @@ -927,13 +735,14 @@ public class DFSInputStream extends FSInputStream } } - protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, - int len) throws IOException { + protected synchronized int readWithStrategy(ReaderStrategy strategy) + throws IOException { dfsClient.checkOpen(); if (closed.get()) { throw new IOException("Stream closed"); } + int len = strategy.getTargetLength(); CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); failures = 0; if (pos < getFileLength()) { @@ -952,7 +761,7 @@ public class DFSInputStream extends FSInputStream locatedBlocks.getFileLength() - pos); } } - int result = readBuffer(strategy, off, realLen, corruptedBlocks); + int result = readBuffer(strategy, realLen, corruptedBlocks); if (result >= 0) { pos += result; @@ -994,11 +803,12 @@ public class DFSInputStream extends FSInputStream if (len == 0) { return 0; } - ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf); + ReaderStrategy byteArrayReader = + new ByteArrayStrategy(buf, off, len, readStatistics, dfsClient); try (TraceScope scope = dfsClient.newReaderTraceScope("DFSInputStream#byteArrayRead", src, getPos(), len)) { - int retLen = readWithStrategy(byteArrayReader, off, len); + int retLen = readWithStrategy(byteArrayReader); if (retLen < len) { dfsClient.addRetLenToReaderScope(scope, retLen); } @@ -1008,12 +818,13 @@ public class DFSInputStream extends FSInputStream @Override public synchronized int read(final ByteBuffer buf) throws IOException { - ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf); + ReaderStrategy byteBufferReader = + new ByteBufferStrategy(buf, readStatistics, dfsClient); int reqLen = buf.remaining(); try (TraceScope scope = dfsClient.newReaderTraceScope("DFSInputStream#byteBufferRead", src, getPos(), reqLen)){ - int retLen = readWithStrategy(byteBufferReader, 0, reqLen); + int retLen = readWithStrategy(byteBufferReader); if (retLen < reqLen) { dfsClient.addRetLenToReaderScope(scope, retLen); } @@ -1221,7 +1032,7 @@ public class DFSInputStream extends FSInputStream reader = getBlockReader(block, startInBlk, len, datanode.addr, datanode.storageType, datanode.info); int nread = reader.readAll(buf, offset, len); - updateReadStatistics(readStatistics, nread, reader); + IOUtilsClient.updateReadStatistics(readStatistics, nread, reader); dfsClient.updateFileSystemReadStats( reader.getNetworkDistance(), nread); if (nread != len) { @@ -1721,18 +1532,14 @@ public class DFSInputStream extends FSInputStream * Get statistics about the reads which this DFSInputStream has done. */ public ReadStatistics getReadStatistics() { - synchronized(infoLock) { - return new ReadStatistics(readStatistics); - } + return new ReadStatistics(readStatistics); } /** * Clear statistics about the reads which this DFSInputStream has done. */ public void clearReadStatistics() { - synchronized(infoLock) { - readStatistics.clear(); - } + readStatistics.clear(); } public FileEncryptionInfo getFileEncryptionInfo() { @@ -1759,7 +1566,8 @@ public class DFSInputStream extends FSInputStream throws IOException { synchronized (infoLock) { this.cachingStrategy = - new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build(); + new CachingStrategy.Builder(this.cachingStrategy). + setReadahead(readahead).build(); } closeCurrentBlockReaders(); } @@ -1769,7 +1577,8 @@ public class DFSInputStream extends FSInputStream throws IOException { synchronized (infoLock) { this.cachingStrategy = - new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build(); + new CachingStrategy.Builder(this.cachingStrategy). + setDropBehind(dropBehind).build(); } closeCurrentBlockReaders(); } @@ -1883,9 +1692,7 @@ public class DFSInputStream extends FSInputStream buffer.position((int)blockPos); buffer.limit((int)(blockPos + length)); getExtendedReadBuffers().put(buffer, clientMmap); - synchronized (infoLock) { - readStatistics.addZeroCopyBytes(length); - } + readStatistics.addZeroCopyBytes(length); DFSClient.LOG.debug("readZeroCopy read {} bytes from offset {} via the " + "zero-copy read path. blockEnd = {}", length, curPos, blockEnd); success = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index d93863cebe9..9ca8005f230 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -359,11 +359,11 @@ public class DFSStripedInputStream extends DFSInputStream { ExtendedBlock currentBlock, CorruptedBlocks corruptedBlocks) throws IOException { - final int targetLength = strategy.buf.remaining(); + final int targetLength = strategy.getTargetLength(); int length = 0; try { while (length < targetLength) { - int ret = strategy.doRead(blockReader, 0, 0); + int ret = strategy.readFromBlock(blockReader); if (ret < 0) { throw new IOException("Unexpected EOS from the reader"); } @@ -425,13 +425,14 @@ public class DFSStripedInputStream extends DFSInputStream { } @Override - protected synchronized int readWithStrategy(ReaderStrategy strategy, - int off, int len) throws IOException { + protected synchronized int readWithStrategy(ReaderStrategy strategy) + throws IOException { dfsClient.checkOpen(); if (closed.get()) { throw new IOException("Stream closed"); } + int len = strategy.getTargetLength(); CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); if (pos < getFileLength()) { try { @@ -452,7 +453,7 @@ public class DFSStripedInputStream extends DFSInputStream { if (!curStripeRange.include(getOffsetInBlockGroup())) { readOneStripe(corruptedBlocks); } - int ret = copyToTargetBuf(strategy, off + result, realLen - result); + int ret = copyToTargetBuf(strategy, realLen - result); result += ret; pos += ret; } @@ -470,16 +471,14 @@ public class DFSStripedInputStream extends DFSInputStream { /** * Copy the data from {@link #curStripeBuf} into the given buffer * @param strategy the ReaderStrategy containing the given buffer - * @param offset the offset of the given buffer. Used only when strategy is - * a ByteArrayStrategy * @param length target length * @return number of bytes copied */ - private int copyToTargetBuf(ReaderStrategy strategy, int offset, int length) { + private int copyToTargetBuf(ReaderStrategy strategy, int length) { final long offsetInBlk = getOffsetInBlockGroup(); int bufOffset = getStripedBufOffset(offsetInBlk); curStripeBuf.position(bufOffset); - return strategy.copyFrom(curStripeBuf, offset, + return strategy.readFromBuffer(curStripeBuf, Math.min(length, curStripeBuf.remaining())); } @@ -700,7 +699,8 @@ public class DFSStripedInputStream extends DFSInputStream { private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) { if (chunk.byteBuffer != null) { - ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer); + ByteBufferStrategy strategy = + new ByteBufferStrategy(chunk.byteBuffer, readStatistics, dfsClient); return new ByteBufferStrategy[]{strategy}; } else { ByteBufferStrategy[] strategies = @@ -708,7 +708,8 @@ public class DFSStripedInputStream extends DFSInputStream { for (int i = 0; i < strategies.length; i++) { ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(), chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]); - strategies[i] = new ByteBufferStrategy(buffer); + strategies[i] = + new ByteBufferStrategy(buffer, readStatistics, dfsClient); } return strategies; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java new file mode 100644 index 00000000000..59b1418d95e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +/** + * A utility class that maintains statistics for reading. + */ +public class ReadStatistics { + private long totalBytesRead; + private long totalLocalBytesRead; + private long totalShortCircuitBytesRead; + private long totalZeroCopyBytesRead; + + public ReadStatistics() { + clear(); + } + + public ReadStatistics(ReadStatistics rhs) { + this.totalBytesRead = rhs.getTotalBytesRead(); + this.totalLocalBytesRead = rhs.getTotalLocalBytesRead(); + this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead(); + this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead(); + } + + /** + * @return The total bytes read. This will always be at least as + * high as the other numbers, since it includes all of them. + */ + public synchronized long getTotalBytesRead() { + return totalBytesRead; + } + + /** + * @return The total local bytes read. This will always be at least + * as high as totalShortCircuitBytesRead, since all short-circuit + * reads are also local. + */ + public synchronized long getTotalLocalBytesRead() { + return totalLocalBytesRead; + } + + /** + * @return The total short-circuit local bytes read. + */ + public synchronized long getTotalShortCircuitBytesRead() { + return totalShortCircuitBytesRead; + } + + /** + * @return The total number of zero-copy bytes read. + */ + public synchronized long getTotalZeroCopyBytesRead() { + return totalZeroCopyBytesRead; + } + + /** + * @return The total number of bytes read which were not local. + */ + public synchronized long getRemoteBytesRead() { + return totalBytesRead - totalLocalBytesRead; + } + + public synchronized void addRemoteBytes(long amt) { + this.totalBytesRead += amt; + } + + public synchronized void addLocalBytes(long amt) { + this.totalBytesRead += amt; + this.totalLocalBytesRead += amt; + } + + public synchronized void addShortCircuitBytes(long amt) { + this.totalBytesRead += amt; + this.totalLocalBytesRead += amt; + this.totalShortCircuitBytesRead += amt; + } + + public synchronized void addZeroCopyBytes(long amt) { + this.totalBytesRead += amt; + this.totalLocalBytesRead += amt; + this.totalShortCircuitBytesRead += amt; + this.totalZeroCopyBytesRead += amt; + } + + public synchronized void clear() { + this.totalBytesRead = 0; + this.totalLocalBytesRead = 0; + this.totalShortCircuitBytesRead = 0; + this.totalZeroCopyBytesRead = 0; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java new file mode 100644 index 00000000000..d75a8ef653c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.nio.ByteBuffer; +import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics; + +/** + * Wraps different possible read implementations so that callers can be + * strategy-agnostic. + */ +interface ReaderStrategy { + /** + * Read from a block using the blockReader. + * @param blockReader + * @return number of bytes read + * @throws IOException + */ + int readFromBlock(BlockReader blockReader) throws IOException; + + /** + * Read from a block using the blockReader with desired length to read. + * @param blockReader + * @param length number of bytes desired to read, not ensured + * @return number of bytes read + * @throws IOException + */ + int readFromBlock(BlockReader blockReader, int length) throws IOException; + + /** + * Read or copy from a src buffer. + * @param src + * @return number of bytes copied + * Note: the position of the src buffer is not changed after the call + */ + int readFromBuffer(ByteBuffer src); + + /** + * Read or copy length of data bytes from a src buffer with desired length. + * @param src + * @return number of bytes copied + * Note: the position of the src buffer is not changed after the call + */ + int readFromBuffer(ByteBuffer src, int length); + + /** + * @return the target read buffer that reads data into. + */ + ByteBuffer getReadBuffer(); + + /** + * @return the target length to read. + */ + int getTargetLength(); +} + +/** + * Used to read bytes into a byte array buffer. Note it's not thread-safe + * and the behavior is not defined if concurrently operated. + */ +class ByteArrayStrategy implements ReaderStrategy { + private final DFSClient dfsClient; + private final ReadStatistics readStatistics; + private final byte[] readBuf; + private int offset; + private final int targetLength; + + /** + * The constructor. + * @param readBuf target buffer to read into + * @param offset offset into the buffer + * @param targetLength target length of data + * @param readStatistics statistics counter + */ + public ByteArrayStrategy(byte[] readBuf, int offset, int targetLength, + ReadStatistics readStatistics, + DFSClient dfsClient) { + this.readBuf = readBuf; + this.offset = offset; + this.targetLength = targetLength; + this.readStatistics = readStatistics; + this.dfsClient = dfsClient; + } + + @Override + public ByteBuffer getReadBuffer() { + return ByteBuffer.wrap(readBuf, offset, targetLength); + } + + @Override + public int getTargetLength() { + return targetLength; + } + + @Override + public int readFromBlock(BlockReader blockReader) throws IOException { + return readFromBlock(blockReader, targetLength); + } + + @Override + public int readFromBlock(BlockReader blockReader, + int length) throws IOException { + int nRead = blockReader.read(readBuf, offset, length); + if (nRead > 0) { + updateReadStatistics(readStatistics, nRead, blockReader); + dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), + nRead); + offset += nRead; + } + return nRead; + } + + @Override + public int readFromBuffer(ByteBuffer src) { + return readFromBuffer(src, src.remaining()); + } + + @Override + public int readFromBuffer(ByteBuffer src, int length) { + ByteBuffer dup = src.duplicate(); + dup.get(readBuf, offset, length); + offset += length; + return length; + } +} + +/** + * Used to read bytes into a user-supplied ByteBuffer. Note it's not thread-safe + * and the behavior is not defined if concurrently operated. When read operation + * is performed, the position of the underlying byte buffer will move forward as + * stated in ByteBufferReadable#read(ByteBuffer buf) method. + */ +class ByteBufferStrategy implements ReaderStrategy { + private final DFSClient dfsClient; + private final ReadStatistics readStatistics; + private final ByteBuffer readBuf; + private final int targetLength; + + /** + * The constructor. + * @param readBuf target buffer to read into + * @param readStatistics statistics counter + */ + ByteBufferStrategy(ByteBuffer readBuf, + ReadStatistics readStatistics, + DFSClient dfsClient) { + this.readBuf = readBuf; + this.targetLength = readBuf.remaining(); + this.readStatistics = readStatistics; + this.dfsClient = dfsClient; + } + + @Override + public ByteBuffer getReadBuffer() { + return readBuf; + } + + @Override + public int readFromBlock(BlockReader blockReader) throws IOException { + return readFromBlock(blockReader, readBuf.remaining()); + } + + @Override + public int readFromBlock(BlockReader blockReader, + int length) throws IOException { + ByteBuffer tmpBuf = readBuf.duplicate(); + tmpBuf.limit(tmpBuf.position() + length); + int nRead = blockReader.read(readBuf.slice()); + // Only when data are read, update the position + if (nRead > 0) { + readBuf.position(readBuf.position() + nRead); + updateReadStatistics(readStatistics, nRead, blockReader); + dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), + nRead); + } + + return nRead; + } + + @Override + public int getTargetLength() { + return targetLength; + } + + @Override + public int readFromBuffer(ByteBuffer src) { + return readFromBuffer(src, src.remaining()); + } + + @Override + public int readFromBuffer(ByteBuffer src, int length) { + ByteBuffer dup = src.duplicate(); + int newLen = Math.min(readBuf.remaining(), dup.remaining()); + newLen = Math.min(newLen, length); + dup.limit(dup.position() + newLen); + readBuf.put(dup); + return newLen; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java index 8e9545104d0..02e5deb4999 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.crypto.CryptoInputStream; import org.apache.hadoop.hdfs.DFSInputStream; +import org.apache.hadoop.hdfs.ReadStatistics; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -103,7 +104,7 @@ public class HdfsDataInputStream extends FSDataInputStream { * be higher than you would expect just by adding up the number of * bytes read through HdfsDataInputStream. */ - public DFSInputStream.ReadStatistics getReadStatistics() { + public ReadStatistics getReadStatistics() { return getDFSInputStream().getReadStatistics(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java index 56f8ecc4175..71596f3835d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.util; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.ReadStatistics; import org.slf4j.Logger; import java.io.IOException; @@ -43,4 +45,18 @@ public class IOUtilsClient { } } + public static void updateReadStatistics(ReadStatistics readStatistics, + int nRead, BlockReader blockReader) { + if (nRead <= 0) { + return; + } + + if (blockReader.isShortCircuit()) { + readStatistics.addShortCircuitBytes(nRead); + } else if (blockReader.getNetworkDistance() == 0) { + readStatistics.addLocalBytes(nRead); + } else { + readStatistics.addRemoteBytes(nRead); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java index 5c2b6da02ab..8acf4bf2868 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java @@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -40,7 +39,8 @@ import java.util.LinkedList; import java.util.UUID; public class TestExternalBlockReader { - private static final Log LOG = LogFactory.getLog(TestExternalBlockReader.class); + private static final Log LOG = + LogFactory.getLog(TestExternalBlockReader.class); private static long SEED = 1234;