HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch (Contributed by Vinayakumar B)

This commit is contained in:
Vinayakumar B 2015-07-02 16:11:50 +05:30
parent 37d7395773
commit bff5999d07
4 changed files with 169 additions and 71 deletions

View File

@ -696,6 +696,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8666. Speedup the TestMover tests. (Walter Su via jing9)
HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch
(vinayakumarb)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -1181,7 +1181,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
// Get block info from namenode
TraceScope scope = getPathTraceScope("newDFSInputStream", src);
try {
return new DFSInputStream(this, src, verifyChecksum);
return new DFSInputStream(this, src, verifyChecksum, null);
} finally {
scope.close();
}

View File

@ -44,6 +44,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
@ -94,35 +95,35 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@VisibleForTesting
public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
private final DFSClient dfsClient;
private AtomicBoolean closed = new AtomicBoolean(false);
private final String src;
private final boolean verifyChecksum;
protected final DFSClient dfsClient;
protected AtomicBoolean closed = new AtomicBoolean(false);
protected final String src;
protected final boolean verifyChecksum;
// state by stateful read only:
// (protected by lock on this)
/////
private DatanodeInfo currentNode = null;
private LocatedBlock currentLocatedBlock = null;
private long pos = 0;
private long blockEnd = -1;
protected LocatedBlock currentLocatedBlock = null;
protected long pos = 0;
protected long blockEnd = -1;
private BlockReader blockReader = null;
////
// state shared by stateful and positional read:
// (protected by lock on infoLock)
////
private LocatedBlocks locatedBlocks = null;
protected LocatedBlocks locatedBlocks = null;
private long lastBlockBeingWrittenLength = 0;
private FileEncryptionInfo fileEncryptionInfo = null;
private CachingStrategy cachingStrategy;
protected CachingStrategy cachingStrategy;
////
private final ReadStatistics readStatistics = new ReadStatistics();
protected final ReadStatistics readStatistics = new ReadStatistics();
// lock for state shared between read and pread
// Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
// (it's OK to acquire this lock when the lock on <this> is held)
private final Object infoLock = new Object();
protected final Object infoLock = new Object();
/**
* Track the ByteBuffers that we have handed out to readers.
@ -239,7 +240,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* back to the namenode to get a new list of block locations, and is
* capped at maxBlockAcquireFailures
*/
private int failures = 0;
protected int failures = 0;
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */
@ -252,24 +253,28 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
deadNodes.put(dnInfo, dnInfo);
}
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
) throws IOException, UnresolvedLinkException {
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException {
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized (infoLock) {
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
}
openInfo();
this.locatedBlocks = locatedBlocks;
openInfo(false);
}
/**
* Grab the open-file info from namenode
* @param refreshLocatedBlocks whether to re-fetch locatedblocks
*/
void openInfo() throws IOException, UnresolvedLinkException {
void openInfo(boolean refreshLocatedBlocks) throws IOException,
UnresolvedLinkException {
final DfsClientConf conf = dfsClient.getConf();
synchronized(infoLock) {
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
@ -281,7 +286,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
waitFor(conf.getRetryIntervalForGetLastBlockLength());
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(true);
} else {
break;
}
@ -302,8 +308,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
throws IOException {
LocatedBlocks newInfo = locatedBlocks;
if (locatedBlocks == null || refresh) {
newInfo = dfsClient.getLocatedBlocks(src, 0);
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("newInfo = " + newInfo);
}
@ -441,7 +451,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @return located block
* @throws IOException
*/
private LocatedBlock getBlockAt(long offset) throws IOException {
protected LocatedBlock getBlockAt(long offset) throws IOException {
synchronized(infoLock) {
assert (locatedBlocks != null) : "locatedBlocks is null";
@ -476,7 +486,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/** Fetch a block from namenode and cache it */
private void fetchBlockAt(long offset) throws IOException {
protected void fetchBlockAt(long offset) throws IOException {
synchronized(infoLock) {
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
@ -579,7 +589,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
// Will be getting a new BlockReader.
closeCurrentBlockReader();
closeCurrentBlockReaders();
//
// Connect to best DataNode for desired Block, with potential offset
@ -620,7 +630,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
return chosenNode;
} catch (IOException ex) {
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + ex);
// The encryption key used is invalid.
@ -631,8 +641,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
fetchBlockAt(target);
} else {
connectFailedOnce = true;
DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block "
+targetBlock.getBlock()+ ", add to deadNodes and continue. " + ex, ex);
DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ ", add to deadNodes and continue. " + ex, ex);
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
}
@ -696,7 +706,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
"unreleased ByteBuffers allocated by read(). " +
"Please release " + builder.toString() + ".");
}
closeCurrentBlockReader();
closeCurrentBlockReaders();
super.close();
}
@ -713,12 +723,22 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* Wraps different possible read implementations so that readBuffer can be
* strategy-agnostic.
*/
private interface ReaderStrategy {
interface ReaderStrategy {
public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException;
/**
* Copy data from the src ByteBuffer into the read buffer.
* @param src The src buffer where the data is copied from
* @param offset Useful only when the ReadStrategy is based on a byte array.
* Indicate the offset of the byte array for copy.
* @param length Useful only when the ReadStrategy is based on a byte array.
* Indicate the length of the data to copy.
*/
public int copyFrom(ByteBuffer src, int offset, int length);
}
private void updateReadStatistics(ReadStatistics readStatistics,
protected void updateReadStatistics(ReadStatistics readStatistics,
int nRead, BlockReader blockReader) {
if (nRead <= 0) return;
synchronized(infoLock) {
@ -749,12 +769,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
updateReadStatistics(readStatistics, nRead, blockReader);
return nRead;
}
@Override
public int copyFrom(ByteBuffer src, int offset, int length) {
ByteBuffer writeSlice = src.duplicate();
writeSlice.get(buf, offset, length);
return length;
}
}
/**
* Used to read bytes into a user-supplied ByteBuffer
*/
private class ByteBufferStrategy implements ReaderStrategy {
protected class ByteBufferStrategy implements ReaderStrategy {
final ByteBuffer buf;
ByteBufferStrategy(ByteBuffer buf) {
this.buf = buf;
@ -770,6 +797,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
int ret = blockReader.read(buf);
success = true;
updateReadStatistics(readStatistics, ret, blockReader);
if (ret == 0) {
DFSClient.LOG.warn("zero");
}
return ret;
} finally {
if (!success) {
@ -779,6 +809,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
}
@Override
public int copyFrom(ByteBuffer src, int offset, int length) {
ByteBuffer writeSlice = src.duplicate();
int remaining = Math.min(buf.remaining(), writeSlice.remaining());
writeSlice.limit(writeSlice.position() + remaining);
buf.put(writeSlice);
return remaining;
}
}
/* This is a used by regular read() and handles ChecksumExceptions.
@ -837,7 +876,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
@ -926,7 +965,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
/**
* Add corrupted block replica into map.
*/
private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
Set<DatanodeInfo> dnSet = null;
if((corruptedBlockMap.containsKey(blk))) {
@ -985,8 +1024,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} catch (InterruptedException iex) {
}
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
openInfo();
block = getBlockAt(block.getStartOffset());
openInfo(true);
block = refreshLocatedBlock(block);
failures++;
}
}
@ -998,7 +1037,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @param ignoredNodes Do not choose nodes in this array (may be null)
* @return The DNAddrPair of the best node. Null if no node can be chosen.
*/
private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) {
DatanodeInfo[] nodes = block.getLocations();
StorageType[] storageTypes = block.getStorageTypes();
@ -1058,15 +1097,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
return errMsgr.toString();
}
private void fetchBlockByteRange(long blockStartOffset, long start, long end,
protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
LocatedBlock block = getBlockAt(blockStartOffset);
block = refreshLocatedBlock(block);
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
try {
actualGetFromOneDataNode(addressPair, blockStartOffset, start, end,
actualGetFromOneDataNode(addressPair, block, start, end,
buf, offset, corruptedBlockMap);
return;
} catch (IOException e) {
@ -1077,7 +1116,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
final long blockStartOffset, final long start, final long end,
final LocatedBlock block, final long start, final long end,
final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final int hedgedReadId) {
@ -1090,7 +1129,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
TraceScope scope =
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
try {
actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
actualGetFromOneDataNode(datanode, block, start, end, buf,
offset, corruptedBlockMap);
return bb;
} finally {
@ -1100,31 +1139,60 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
};
}
/**
* Used when reading contiguous blocks
*/
private void actualGetFromOneDataNode(final DNAddrPair datanode,
long blockStartOffset, final long start, final long end, byte[] buf,
LocatedBlock block, final long start, final long end, byte[] buf,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
final int length = (int) (end - start + 1);
actualGetFromOneDataNode(datanode, block, start, end, buf,
new int[]{offset}, new int[]{length}, corruptedBlockMap);
}
/**
* Read data from one DataNode.
* @param datanode the datanode from which to read data
* @param block the located block containing the requested data
* @param startInBlk the startInBlk offset of the block
* @param endInBlk the endInBlk offset of the block
* @param buf the given byte array into which the data is read
* @param offsets the data may be read into multiple segments of the buf
* (when reading a striped block). this array indicates the
* offset of each buf segment.
* @param lengths the length of each buf segment
* @param corruptedBlockMap map recording list of datanodes with corrupted
* block replica
*/
void actualGetFromOneDataNode(final DNAddrPair datanode,
LocatedBlock block, final long startInBlk, final long endInBlk,
byte[] buf, int[] offsets, int[] lengths,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
final int len = (int) (endInBlk - startInBlk + 1);
checkReadPortions(offsets, lengths, len);
while (true) {
// cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
LocatedBlock block = getBlockAt(blockStartOffset);
block = refreshLocatedBlock(block);
BlockReader reader = null;
try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
int len = (int) (end - start + 1);
reader = getBlockReader(block, start, len, datanode.addr,
reader = getBlockReader(block, startInBlk, len, datanode.addr,
datanode.storageType, datanode.info);
int nread = reader.readAll(buf, offset, len);
updateReadStatistics(readStatistics, nread, reader);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + len + ", got " + nread);
for (int i = 0; i < offsets.length; i++) {
int nread = reader.readAll(buf, offsets[i], lengths[i]);
updateReadStatistics(readStatistics, nread, reader);
if (nread != lengths[i]) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + lengths[i] + ", got " + nread);
}
}
DFSClientFaultInjector.get().readFromDatanodeDelay();
return;
@ -1169,11 +1237,40 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/**
* Like {@link #fetchBlockByteRange} except we start up a second, parallel,
* Refresh cached block locations.
* @param block The currently cached block locations
* @return Refreshed block locations
* @throws IOException
*/
protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
throws IOException {
return getBlockAt(block.getStartOffset());
}
/**
* This method verifies that the read portions are valid and do not overlap
* with each other.
*/
private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0);
int sum = 0;
for (int i = 0; i < lengths.length; i++) {
if (i > 0) {
int gap = offsets[i] - offsets[i - 1];
// make sure read portions do not overlap with each other
Preconditions.checkArgument(gap >= lengths[i - 1]);
}
sum += lengths[i];
}
Preconditions.checkArgument(sum == totalLen);
}
/**
* Like {@link #fetchBlockByteRange}except we start up a second, parallel,
* 'hedged' read if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first.
*/
private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
@ -1186,7 +1283,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
ByteBuffer bb = null;
int len = (int) (end - start + 1);
int hedgedReadId = 0;
LocatedBlock block = getBlockAt(blockStartOffset);
block = refreshLocatedBlock(block);
while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++;
@ -1198,7 +1295,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
chosenNode = chooseDataNode(block, ignored);
bb = ByteBuffer.wrap(buf, offset, len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block.getStartOffset(), start, end, bb,
chosenNode, block, start, end, bb,
corruptedBlockMap, hedgedReadId++);
Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable);
@ -1235,7 +1332,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block.getStartOffset(), start, end, bb,
chosenNode, block, start, end, bb,
corruptedBlockMap, hedgedReadId++);
Future<ByteBuffer> oneMoreRequest = hedgedService
.submit(getFromDataNodeCallable);
@ -1319,7 +1416,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @return true if block access token has expired or invalid and it should be
* refetched
*/
private static boolean tokenRefetchNeeded(IOException ex,
protected static boolean tokenRefetchNeeded(IOException ex,
InetSocketAddress targetAddr) {
/*
* Get a new access token and retry. Retry is needed in 2 cases. 1)
@ -1389,13 +1486,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
if (dfsClient.isHedgedReadsEnabled()) {
hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
targetStart + bytesToRead - 1, buffer, offset,
corruptedBlockMap);
hedgedFetchBlockByteRange(blk, targetStart,
targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
} else {
fetchBlockByteRange(blk.getStartOffset(), targetStart,
targetStart + bytesToRead - 1, buffer, offset,
corruptedBlockMap);
fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
buffer, offset, corruptedBlockMap);
}
} finally {
// Check and report if any block replicas are corrupted.
@ -1427,7 +1522,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @param corruptedBlockMap map of corrupted blocks
* @param dataNodeCount number of data nodes who contains the block replicas
*/
private void reportCheckSumFailure(
protected void reportCheckSumFailure(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
int dataNodeCount) {
if (corruptedBlockMap.isEmpty()) {
@ -1556,7 +1651,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
/**
*/
@Override
public synchronized long getPos() throws IOException {
public synchronized long getPos() {
return pos;
}
@ -1590,7 +1685,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/** Utility class to encapsulate data node info and its address. */
private static final class DNAddrPair {
static final class DNAddrPair {
final DatanodeInfo info;
final InetSocketAddress addr;
final StorageType storageType;
@ -1627,7 +1722,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
private void closeCurrentBlockReader() {
protected void closeCurrentBlockReaders() {
if (blockReader == null) return;
// Close the current block reader so that the new caching settings can
// take effect immediately.
@ -1647,7 +1742,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
}
closeCurrentBlockReader();
closeCurrentBlockReaders();
}
@Override
@ -1657,7 +1752,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
}
closeCurrentBlockReader();
closeCurrentBlockReaders();
}
/**
@ -1815,6 +1910,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@Override
public synchronized void unbuffer() {
closeCurrentBlockReader();
closeCurrentBlockReaders();
}
}

View File

@ -343,7 +343,7 @@ public class TestDFSClientRetries {
// we're starting a new operation on the user level.
doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
.when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
is.openInfo();
is.openInfo(true);
// Seek to beginning forces a reopen of the BlockReader - otherwise it'll
// just keep reading on the existing stream and the fact that we've poisoned
// the block info won't do anything.