HDFS-8905. Refactor DFSInputStream#ReaderStrategy. Contributed by Kai Zheng and Sammi Chen
This commit is contained in:
parent
ec252ce0fc
commit
793447f799
|
@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
|||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
|
||||
import org.apache.hadoop.hdfs.util.IOUtilsClient;
|
||||
import org.apache.hadoop.io.ByteBufferPool;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
@ -145,94 +146,6 @@ public class DFSInputStream extends FSInputStream
|
|||
return extendedReadBuffers;
|
||||
}
|
||||
|
||||
public static class ReadStatistics {
|
||||
public ReadStatistics() {
|
||||
clear();
|
||||
}
|
||||
|
||||
public ReadStatistics(ReadStatistics rhs) {
|
||||
this.totalBytesRead = rhs.getTotalBytesRead();
|
||||
this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
|
||||
this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
|
||||
this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The total bytes read. This will always be at least as
|
||||
* high as the other numbers, since it includes all of them.
|
||||
*/
|
||||
public long getTotalBytesRead() {
|
||||
return totalBytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The total local bytes read. This will always be at least
|
||||
* as high as totalShortCircuitBytesRead, since all short-circuit
|
||||
* reads are also local.
|
||||
*/
|
||||
public long getTotalLocalBytesRead() {
|
||||
return totalLocalBytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The total short-circuit local bytes read.
|
||||
*/
|
||||
public long getTotalShortCircuitBytesRead() {
|
||||
return totalShortCircuitBytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The total number of zero-copy bytes read.
|
||||
*/
|
||||
public long getTotalZeroCopyBytesRead() {
|
||||
return totalZeroCopyBytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The total number of bytes read which were not local.
|
||||
*/
|
||||
public long getRemoteBytesRead() {
|
||||
return totalBytesRead - totalLocalBytesRead;
|
||||
}
|
||||
|
||||
void addRemoteBytes(long amt) {
|
||||
this.totalBytesRead += amt;
|
||||
}
|
||||
|
||||
void addLocalBytes(long amt) {
|
||||
this.totalBytesRead += amt;
|
||||
this.totalLocalBytesRead += amt;
|
||||
}
|
||||
|
||||
void addShortCircuitBytes(long amt) {
|
||||
this.totalBytesRead += amt;
|
||||
this.totalLocalBytesRead += amt;
|
||||
this.totalShortCircuitBytesRead += amt;
|
||||
}
|
||||
|
||||
void addZeroCopyBytes(long amt) {
|
||||
this.totalBytesRead += amt;
|
||||
this.totalLocalBytesRead += amt;
|
||||
this.totalShortCircuitBytesRead += amt;
|
||||
this.totalZeroCopyBytesRead += amt;
|
||||
}
|
||||
|
||||
void clear() {
|
||||
this.totalBytesRead = 0;
|
||||
this.totalLocalBytesRead = 0;
|
||||
this.totalShortCircuitBytesRead = 0;
|
||||
this.totalZeroCopyBytesRead = 0;
|
||||
}
|
||||
|
||||
private long totalBytesRead;
|
||||
|
||||
private long totalLocalBytesRead;
|
||||
|
||||
private long totalShortCircuitBytesRead;
|
||||
|
||||
private long totalZeroCopyBytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* This variable tracks the number of failures since the start of the
|
||||
* most recent user-facing operation. That is to say, it should be reset
|
||||
|
@ -767,116 +680,11 @@ public class DFSInputStream extends FSInputStream
|
|||
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps different possible read implementations so that readBuffer can be
|
||||
* strategy-agnostic.
|
||||
*/
|
||||
interface ReaderStrategy {
|
||||
int doRead(BlockReader blockReader, int off, int len)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Copy data from the src ByteBuffer into the read buffer.
|
||||
* @param src The src buffer where the data is copied from
|
||||
* @param offset Useful only when the ReadStrategy is based on a byte array.
|
||||
* Indicate the offset of the byte array for copy.
|
||||
* @param length Useful only when the ReadStrategy is based on a byte array.
|
||||
* Indicate the length of the data to copy.
|
||||
*/
|
||||
int copyFrom(ByteBuffer src, int offset, int length);
|
||||
}
|
||||
|
||||
protected void updateReadStatistics(ReadStatistics readStatistics,
|
||||
int nRead, BlockReader blockReader) {
|
||||
if (nRead <= 0) return;
|
||||
synchronized(infoLock) {
|
||||
if (blockReader.isShortCircuit()) {
|
||||
readStatistics.addShortCircuitBytes(nRead);
|
||||
} else if (blockReader.getNetworkDistance() == 0) {
|
||||
readStatistics.addLocalBytes(nRead);
|
||||
} else {
|
||||
readStatistics.addRemoteBytes(nRead);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to read bytes into a byte[]
|
||||
*/
|
||||
private class ByteArrayStrategy implements ReaderStrategy {
|
||||
final byte[] buf;
|
||||
|
||||
public ByteArrayStrategy(byte[] buf) {
|
||||
this.buf = buf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int doRead(BlockReader blockReader, int off, int len)
|
||||
throws IOException {
|
||||
int nRead = blockReader.read(buf, off, len);
|
||||
updateReadStatistics(readStatistics, nRead, blockReader);
|
||||
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
|
||||
nRead);
|
||||
return nRead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int copyFrom(ByteBuffer src, int offset, int length) {
|
||||
ByteBuffer writeSlice = src.duplicate();
|
||||
writeSlice.get(buf, offset, length);
|
||||
return length;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to read bytes into a user-supplied ByteBuffer
|
||||
*/
|
||||
protected class ByteBufferStrategy implements ReaderStrategy {
|
||||
final ByteBuffer buf;
|
||||
ByteBufferStrategy(ByteBuffer buf) {
|
||||
this.buf = buf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int doRead(BlockReader blockReader, int off, int len)
|
||||
throws IOException {
|
||||
int oldpos = buf.position();
|
||||
int oldlimit = buf.limit();
|
||||
boolean success = false;
|
||||
try {
|
||||
int ret = blockReader.read(buf);
|
||||
success = true;
|
||||
updateReadStatistics(readStatistics, ret, blockReader);
|
||||
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
|
||||
ret);
|
||||
if (ret == 0) {
|
||||
DFSClient.LOG.warn("zero");
|
||||
}
|
||||
return ret;
|
||||
} finally {
|
||||
if (!success) {
|
||||
// Reset to original state so that retries work correctly.
|
||||
buf.position(oldpos);
|
||||
buf.limit(oldlimit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int copyFrom(ByteBuffer src, int offset, int length) {
|
||||
ByteBuffer writeSlice = src.duplicate();
|
||||
int remaining = Math.min(buf.remaining(), writeSlice.remaining());
|
||||
writeSlice.limit(writeSlice.position() + remaining);
|
||||
buf.put(writeSlice);
|
||||
return remaining;
|
||||
}
|
||||
}
|
||||
|
||||
/* This is a used by regular read() and handles ChecksumExceptions.
|
||||
* name readBuffer() is chosen to imply similarity to readBuffer() in
|
||||
* ChecksumFileSystem
|
||||
*/
|
||||
private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
|
||||
private synchronized int readBuffer(ReaderStrategy reader, int len,
|
||||
CorruptedBlocks corruptedBlocks)
|
||||
throws IOException {
|
||||
IOException ioe;
|
||||
|
@ -892,7 +700,7 @@ public class DFSInputStream extends FSInputStream
|
|||
while (true) {
|
||||
// retry as many times as seekToNewSource allows.
|
||||
try {
|
||||
return reader.doRead(blockReader, off, len);
|
||||
return reader.readFromBlock(blockReader, len);
|
||||
} catch ( ChecksumException ce ) {
|
||||
DFSClient.LOG.warn("Found Checksum error for "
|
||||
+ getCurrentBlock() + " from " + currentNode
|
||||
|
@ -927,13 +735,14 @@ public class DFSInputStream extends FSInputStream
|
|||
}
|
||||
}
|
||||
|
||||
protected synchronized int readWithStrategy(ReaderStrategy strategy, int off,
|
||||
int len) throws IOException {
|
||||
protected synchronized int readWithStrategy(ReaderStrategy strategy)
|
||||
throws IOException {
|
||||
dfsClient.checkOpen();
|
||||
if (closed.get()) {
|
||||
throw new IOException("Stream closed");
|
||||
}
|
||||
|
||||
int len = strategy.getTargetLength();
|
||||
CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
|
||||
failures = 0;
|
||||
if (pos < getFileLength()) {
|
||||
|
@ -952,7 +761,7 @@ public class DFSInputStream extends FSInputStream
|
|||
locatedBlocks.getFileLength() - pos);
|
||||
}
|
||||
}
|
||||
int result = readBuffer(strategy, off, realLen, corruptedBlocks);
|
||||
int result = readBuffer(strategy, realLen, corruptedBlocks);
|
||||
|
||||
if (result >= 0) {
|
||||
pos += result;
|
||||
|
@ -994,11 +803,12 @@ public class DFSInputStream extends FSInputStream
|
|||
if (len == 0) {
|
||||
return 0;
|
||||
}
|
||||
ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
|
||||
ReaderStrategy byteArrayReader =
|
||||
new ByteArrayStrategy(buf, off, len, readStatistics, dfsClient);
|
||||
try (TraceScope scope =
|
||||
dfsClient.newReaderTraceScope("DFSInputStream#byteArrayRead",
|
||||
src, getPos(), len)) {
|
||||
int retLen = readWithStrategy(byteArrayReader, off, len);
|
||||
int retLen = readWithStrategy(byteArrayReader);
|
||||
if (retLen < len) {
|
||||
dfsClient.addRetLenToReaderScope(scope, retLen);
|
||||
}
|
||||
|
@ -1008,12 +818,13 @@ public class DFSInputStream extends FSInputStream
|
|||
|
||||
@Override
|
||||
public synchronized int read(final ByteBuffer buf) throws IOException {
|
||||
ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
|
||||
ReaderStrategy byteBufferReader =
|
||||
new ByteBufferStrategy(buf, readStatistics, dfsClient);
|
||||
int reqLen = buf.remaining();
|
||||
try (TraceScope scope =
|
||||
dfsClient.newReaderTraceScope("DFSInputStream#byteBufferRead",
|
||||
src, getPos(), reqLen)){
|
||||
int retLen = readWithStrategy(byteBufferReader, 0, reqLen);
|
||||
int retLen = readWithStrategy(byteBufferReader);
|
||||
if (retLen < reqLen) {
|
||||
dfsClient.addRetLenToReaderScope(scope, retLen);
|
||||
}
|
||||
|
@ -1221,7 +1032,7 @@ public class DFSInputStream extends FSInputStream
|
|||
reader = getBlockReader(block, startInBlk, len, datanode.addr,
|
||||
datanode.storageType, datanode.info);
|
||||
int nread = reader.readAll(buf, offset, len);
|
||||
updateReadStatistics(readStatistics, nread, reader);
|
||||
IOUtilsClient.updateReadStatistics(readStatistics, nread, reader);
|
||||
dfsClient.updateFileSystemReadStats(
|
||||
reader.getNetworkDistance(), nread);
|
||||
if (nread != len) {
|
||||
|
@ -1721,18 +1532,14 @@ public class DFSInputStream extends FSInputStream
|
|||
* Get statistics about the reads which this DFSInputStream has done.
|
||||
*/
|
||||
public ReadStatistics getReadStatistics() {
|
||||
synchronized(infoLock) {
|
||||
return new ReadStatistics(readStatistics);
|
||||
}
|
||||
return new ReadStatistics(readStatistics);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear statistics about the reads which this DFSInputStream has done.
|
||||
*/
|
||||
public void clearReadStatistics() {
|
||||
synchronized(infoLock) {
|
||||
readStatistics.clear();
|
||||
}
|
||||
readStatistics.clear();
|
||||
}
|
||||
|
||||
public FileEncryptionInfo getFileEncryptionInfo() {
|
||||
|
@ -1759,7 +1566,8 @@ public class DFSInputStream extends FSInputStream
|
|||
throws IOException {
|
||||
synchronized (infoLock) {
|
||||
this.cachingStrategy =
|
||||
new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
|
||||
new CachingStrategy.Builder(this.cachingStrategy).
|
||||
setReadahead(readahead).build();
|
||||
}
|
||||
closeCurrentBlockReaders();
|
||||
}
|
||||
|
@ -1769,7 +1577,8 @@ public class DFSInputStream extends FSInputStream
|
|||
throws IOException {
|
||||
synchronized (infoLock) {
|
||||
this.cachingStrategy =
|
||||
new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
|
||||
new CachingStrategy.Builder(this.cachingStrategy).
|
||||
setDropBehind(dropBehind).build();
|
||||
}
|
||||
closeCurrentBlockReaders();
|
||||
}
|
||||
|
@ -1883,9 +1692,7 @@ public class DFSInputStream extends FSInputStream
|
|||
buffer.position((int)blockPos);
|
||||
buffer.limit((int)(blockPos + length));
|
||||
getExtendedReadBuffers().put(buffer, clientMmap);
|
||||
synchronized (infoLock) {
|
||||
readStatistics.addZeroCopyBytes(length);
|
||||
}
|
||||
readStatistics.addZeroCopyBytes(length);
|
||||
DFSClient.LOG.debug("readZeroCopy read {} bytes from offset {} via the "
|
||||
+ "zero-copy read path. blockEnd = {}", length, curPos, blockEnd);
|
||||
success = true;
|
||||
|
|
|
@ -359,11 +359,11 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|||
ExtendedBlock currentBlock,
|
||||
CorruptedBlocks corruptedBlocks)
|
||||
throws IOException {
|
||||
final int targetLength = strategy.buf.remaining();
|
||||
final int targetLength = strategy.getTargetLength();
|
||||
int length = 0;
|
||||
try {
|
||||
while (length < targetLength) {
|
||||
int ret = strategy.doRead(blockReader, 0, 0);
|
||||
int ret = strategy.readFromBlock(blockReader);
|
||||
if (ret < 0) {
|
||||
throw new IOException("Unexpected EOS from the reader");
|
||||
}
|
||||
|
@ -425,13 +425,14 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected synchronized int readWithStrategy(ReaderStrategy strategy,
|
||||
int off, int len) throws IOException {
|
||||
protected synchronized int readWithStrategy(ReaderStrategy strategy)
|
||||
throws IOException {
|
||||
dfsClient.checkOpen();
|
||||
if (closed.get()) {
|
||||
throw new IOException("Stream closed");
|
||||
}
|
||||
|
||||
int len = strategy.getTargetLength();
|
||||
CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
|
||||
if (pos < getFileLength()) {
|
||||
try {
|
||||
|
@ -452,7 +453,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|||
if (!curStripeRange.include(getOffsetInBlockGroup())) {
|
||||
readOneStripe(corruptedBlocks);
|
||||
}
|
||||
int ret = copyToTargetBuf(strategy, off + result, realLen - result);
|
||||
int ret = copyToTargetBuf(strategy, realLen - result);
|
||||
result += ret;
|
||||
pos += ret;
|
||||
}
|
||||
|
@ -470,16 +471,14 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|||
/**
|
||||
* Copy the data from {@link #curStripeBuf} into the given buffer
|
||||
* @param strategy the ReaderStrategy containing the given buffer
|
||||
* @param offset the offset of the given buffer. Used only when strategy is
|
||||
* a ByteArrayStrategy
|
||||
* @param length target length
|
||||
* @return number of bytes copied
|
||||
*/
|
||||
private int copyToTargetBuf(ReaderStrategy strategy, int offset, int length) {
|
||||
private int copyToTargetBuf(ReaderStrategy strategy, int length) {
|
||||
final long offsetInBlk = getOffsetInBlockGroup();
|
||||
int bufOffset = getStripedBufOffset(offsetInBlk);
|
||||
curStripeBuf.position(bufOffset);
|
||||
return strategy.copyFrom(curStripeBuf, offset,
|
||||
return strategy.readFromBuffer(curStripeBuf,
|
||||
Math.min(length, curStripeBuf.remaining()));
|
||||
}
|
||||
|
||||
|
@ -700,7 +699,8 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|||
|
||||
private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
|
||||
if (chunk.byteBuffer != null) {
|
||||
ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer);
|
||||
ByteBufferStrategy strategy =
|
||||
new ByteBufferStrategy(chunk.byteBuffer, readStatistics, dfsClient);
|
||||
return new ByteBufferStrategy[]{strategy};
|
||||
} else {
|
||||
ByteBufferStrategy[] strategies =
|
||||
|
@ -708,7 +708,8 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|||
for (int i = 0; i < strategies.length; i++) {
|
||||
ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(),
|
||||
chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]);
|
||||
strategies[i] = new ByteBufferStrategy(buffer);
|
||||
strategies[i] =
|
||||
new ByteBufferStrategy(buffer, readStatistics, dfsClient);
|
||||
}
|
||||
return strategies;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
/**
|
||||
* A utility class that maintains statistics for reading.
|
||||
*/
|
||||
public class ReadStatistics {
|
||||
private long totalBytesRead;
|
||||
private long totalLocalBytesRead;
|
||||
private long totalShortCircuitBytesRead;
|
||||
private long totalZeroCopyBytesRead;
|
||||
|
||||
public ReadStatistics() {
|
||||
clear();
|
||||
}
|
||||
|
||||
public ReadStatistics(ReadStatistics rhs) {
|
||||
this.totalBytesRead = rhs.getTotalBytesRead();
|
||||
this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
|
||||
this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
|
||||
this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The total bytes read. This will always be at least as
|
||||
* high as the other numbers, since it includes all of them.
|
||||
*/
|
||||
public synchronized long getTotalBytesRead() {
|
||||
return totalBytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The total local bytes read. This will always be at least
|
||||
* as high as totalShortCircuitBytesRead, since all short-circuit
|
||||
* reads are also local.
|
||||
*/
|
||||
public synchronized long getTotalLocalBytesRead() {
|
||||
return totalLocalBytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The total short-circuit local bytes read.
|
||||
*/
|
||||
public synchronized long getTotalShortCircuitBytesRead() {
|
||||
return totalShortCircuitBytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The total number of zero-copy bytes read.
|
||||
*/
|
||||
public synchronized long getTotalZeroCopyBytesRead() {
|
||||
return totalZeroCopyBytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The total number of bytes read which were not local.
|
||||
*/
|
||||
public synchronized long getRemoteBytesRead() {
|
||||
return totalBytesRead - totalLocalBytesRead;
|
||||
}
|
||||
|
||||
public synchronized void addRemoteBytes(long amt) {
|
||||
this.totalBytesRead += amt;
|
||||
}
|
||||
|
||||
public synchronized void addLocalBytes(long amt) {
|
||||
this.totalBytesRead += amt;
|
||||
this.totalLocalBytesRead += amt;
|
||||
}
|
||||
|
||||
public synchronized void addShortCircuitBytes(long amt) {
|
||||
this.totalBytesRead += amt;
|
||||
this.totalLocalBytesRead += amt;
|
||||
this.totalShortCircuitBytesRead += amt;
|
||||
}
|
||||
|
||||
public synchronized void addZeroCopyBytes(long amt) {
|
||||
this.totalBytesRead += amt;
|
||||
this.totalLocalBytesRead += amt;
|
||||
this.totalShortCircuitBytesRead += amt;
|
||||
this.totalZeroCopyBytesRead += amt;
|
||||
}
|
||||
|
||||
public synchronized void clear() {
|
||||
this.totalBytesRead = 0;
|
||||
this.totalLocalBytesRead = 0;
|
||||
this.totalShortCircuitBytesRead = 0;
|
||||
this.totalZeroCopyBytesRead = 0;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,215 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
|
||||
|
||||
/**
|
||||
* Wraps different possible read implementations so that callers can be
|
||||
* strategy-agnostic.
|
||||
*/
|
||||
interface ReaderStrategy {
|
||||
/**
|
||||
* Read from a block using the blockReader.
|
||||
* @param blockReader
|
||||
* @return number of bytes read
|
||||
* @throws IOException
|
||||
*/
|
||||
int readFromBlock(BlockReader blockReader) throws IOException;
|
||||
|
||||
/**
|
||||
* Read from a block using the blockReader with desired length to read.
|
||||
* @param blockReader
|
||||
* @param length number of bytes desired to read, not ensured
|
||||
* @return number of bytes read
|
||||
* @throws IOException
|
||||
*/
|
||||
int readFromBlock(BlockReader blockReader, int length) throws IOException;
|
||||
|
||||
/**
|
||||
* Read or copy from a src buffer.
|
||||
* @param src
|
||||
* @return number of bytes copied
|
||||
* Note: the position of the src buffer is not changed after the call
|
||||
*/
|
||||
int readFromBuffer(ByteBuffer src);
|
||||
|
||||
/**
|
||||
* Read or copy length of data bytes from a src buffer with desired length.
|
||||
* @param src
|
||||
* @return number of bytes copied
|
||||
* Note: the position of the src buffer is not changed after the call
|
||||
*/
|
||||
int readFromBuffer(ByteBuffer src, int length);
|
||||
|
||||
/**
|
||||
* @return the target read buffer that reads data into.
|
||||
*/
|
||||
ByteBuffer getReadBuffer();
|
||||
|
||||
/**
|
||||
* @return the target length to read.
|
||||
*/
|
||||
int getTargetLength();
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to read bytes into a byte array buffer. Note it's not thread-safe
|
||||
* and the behavior is not defined if concurrently operated.
|
||||
*/
|
||||
class ByteArrayStrategy implements ReaderStrategy {
|
||||
private final DFSClient dfsClient;
|
||||
private final ReadStatistics readStatistics;
|
||||
private final byte[] readBuf;
|
||||
private int offset;
|
||||
private final int targetLength;
|
||||
|
||||
/**
|
||||
* The constructor.
|
||||
* @param readBuf target buffer to read into
|
||||
* @param offset offset into the buffer
|
||||
* @param targetLength target length of data
|
||||
* @param readStatistics statistics counter
|
||||
*/
|
||||
public ByteArrayStrategy(byte[] readBuf, int offset, int targetLength,
|
||||
ReadStatistics readStatistics,
|
||||
DFSClient dfsClient) {
|
||||
this.readBuf = readBuf;
|
||||
this.offset = offset;
|
||||
this.targetLength = targetLength;
|
||||
this.readStatistics = readStatistics;
|
||||
this.dfsClient = dfsClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getReadBuffer() {
|
||||
return ByteBuffer.wrap(readBuf, offset, targetLength);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTargetLength() {
|
||||
return targetLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readFromBlock(BlockReader blockReader) throws IOException {
|
||||
return readFromBlock(blockReader, targetLength);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readFromBlock(BlockReader blockReader,
|
||||
int length) throws IOException {
|
||||
int nRead = blockReader.read(readBuf, offset, length);
|
||||
if (nRead > 0) {
|
||||
updateReadStatistics(readStatistics, nRead, blockReader);
|
||||
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
|
||||
nRead);
|
||||
offset += nRead;
|
||||
}
|
||||
return nRead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readFromBuffer(ByteBuffer src) {
|
||||
return readFromBuffer(src, src.remaining());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readFromBuffer(ByteBuffer src, int length) {
|
||||
ByteBuffer dup = src.duplicate();
|
||||
dup.get(readBuf, offset, length);
|
||||
offset += length;
|
||||
return length;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to read bytes into a user-supplied ByteBuffer. Note it's not thread-safe
|
||||
* and the behavior is not defined if concurrently operated. When read operation
|
||||
* is performed, the position of the underlying byte buffer will move forward as
|
||||
* stated in ByteBufferReadable#read(ByteBuffer buf) method.
|
||||
*/
|
||||
class ByteBufferStrategy implements ReaderStrategy {
|
||||
private final DFSClient dfsClient;
|
||||
private final ReadStatistics readStatistics;
|
||||
private final ByteBuffer readBuf;
|
||||
private final int targetLength;
|
||||
|
||||
/**
|
||||
* The constructor.
|
||||
* @param readBuf target buffer to read into
|
||||
* @param readStatistics statistics counter
|
||||
*/
|
||||
ByteBufferStrategy(ByteBuffer readBuf,
|
||||
ReadStatistics readStatistics,
|
||||
DFSClient dfsClient) {
|
||||
this.readBuf = readBuf;
|
||||
this.targetLength = readBuf.remaining();
|
||||
this.readStatistics = readStatistics;
|
||||
this.dfsClient = dfsClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getReadBuffer() {
|
||||
return readBuf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readFromBlock(BlockReader blockReader) throws IOException {
|
||||
return readFromBlock(blockReader, readBuf.remaining());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readFromBlock(BlockReader blockReader,
|
||||
int length) throws IOException {
|
||||
ByteBuffer tmpBuf = readBuf.duplicate();
|
||||
tmpBuf.limit(tmpBuf.position() + length);
|
||||
int nRead = blockReader.read(readBuf.slice());
|
||||
// Only when data are read, update the position
|
||||
if (nRead > 0) {
|
||||
readBuf.position(readBuf.position() + nRead);
|
||||
updateReadStatistics(readStatistics, nRead, blockReader);
|
||||
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
|
||||
nRead);
|
||||
}
|
||||
|
||||
return nRead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTargetLength() {
|
||||
return targetLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readFromBuffer(ByteBuffer src) {
|
||||
return readFromBuffer(src, src.remaining());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readFromBuffer(ByteBuffer src, int length) {
|
||||
ByteBuffer dup = src.duplicate();
|
||||
int newLen = Math.min(readBuf.remaining(), dup.remaining());
|
||||
newLen = Math.min(newLen, length);
|
||||
dup.limit(dup.position() + newLen);
|
||||
readBuf.put(dup);
|
||||
return newLen;
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.crypto.CryptoInputStream;
|
||||
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||
import org.apache.hadoop.hdfs.ReadStatistics;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
|
@ -103,7 +104,7 @@ public class HdfsDataInputStream extends FSDataInputStream {
|
|||
* be higher than you would expect just by adding up the number of
|
||||
* bytes read through HdfsDataInputStream.
|
||||
*/
|
||||
public DFSInputStream.ReadStatistics getReadStatistics() {
|
||||
public ReadStatistics getReadStatistics() {
|
||||
return getDFSInputStream().getReadStatistics();
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.util;
|
||||
|
||||
import org.apache.hadoop.hdfs.BlockReader;
|
||||
import org.apache.hadoop.hdfs.ReadStatistics;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -43,4 +45,18 @@ public class IOUtilsClient {
|
|||
}
|
||||
}
|
||||
|
||||
public static void updateReadStatistics(ReadStatistics readStatistics,
|
||||
int nRead, BlockReader blockReader) {
|
||||
if (nRead <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (blockReader.isShortCircuit()) {
|
||||
readStatistics.addShortCircuitBytes(nRead);
|
||||
} else if (blockReader.getNetworkDistance() == 0) {
|
||||
readStatistics.addLocalBytes(nRead);
|
||||
} else {
|
||||
readStatistics.addRemoteBytes(nRead);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -40,7 +39,8 @@ import java.util.LinkedList;
|
|||
import java.util.UUID;
|
||||
|
||||
public class TestExternalBlockReader {
|
||||
private static final Log LOG = LogFactory.getLog(TestExternalBlockReader.class);
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestExternalBlockReader.class);
|
||||
|
||||
private static long SEED = 1234;
|
||||
|
||||
|
|
Loading…
Reference in New Issue