HDFS-8280. Code Cleanup in DFSInputStream. Contributed by Jing Zhao.

This commit is contained in:
Haohui Mai 2015-04-28 18:11:59 -07:00
parent c79e7f7d99
commit 439614b0c8
2 changed files with 61 additions and 82 deletions

View File

@ -478,6 +478,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8176. Record from/to snapshots in audit log for snapshot diff report. HDFS-8176. Record from/to snapshots in audit log for snapshot diff report.
(J. Andreina via jing9) (J. Andreina via jing9)
HDFS-8280. Code Cleanup in DFSInputStream. (Jing Zhao via wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -601,7 +601,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
targetBlock.getBlockSize() - 1; targetBlock.getBlockSize() - 1;
this.currentLocatedBlock = targetBlock; this.currentLocatedBlock = targetBlock;
assert (target==pos) : "Wrong postion " + pos + " expect " + target;
long offsetIntoBlock = target - targetBlock.getStartOffset(); long offsetIntoBlock = target - targetBlock.getStartOffset();
DNAddrPair retval = chooseDataNode(targetBlock, null); DNAddrPair retval = chooseDataNode(targetBlock, null);
@ -610,35 +609,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
StorageType storageType = retval.storageType; StorageType storageType = retval.storageType;
try { try {
ExtendedBlock blk = targetBlock.getBlock(); blockReader = getBlockReader(targetBlock, offsetIntoBlock,
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); targetBlock.getBlockSize() - offsetIntoBlock, targetAddr,
CachingStrategy curCachingStrategy; storageType, chosenNode);
boolean shortCircuitForbidden;
synchronized(infoLock) {
curCachingStrategy = cachingStrategy;
shortCircuitForbidden = shortCircuitForbidden();
}
blockReader = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetIntoBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(blk.getNumBytes() - offsetIntoBlock).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
if(connectFailedOnce) { if(connectFailedOnce) {
DFSClient.LOG.info("Successfully connected to " + targetAddr + DFSClient.LOG.info("Successfully connected to " + targetAddr +
" for " + blk); " for " + targetBlock.getBlock());
} }
return chosenNode; return chosenNode;
} catch (IOException ex) { } catch (IOException ex) {
@ -663,6 +639,37 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} }
} }
protected BlockReader getBlockReader(LocatedBlock targetBlock,
long offsetInBlock, long length, InetSocketAddress targetAddr,
StorageType storageType, DatanodeInfo datanode) throws IOException {
ExtendedBlock blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
CachingStrategy curCachingStrategy;
boolean shortCircuitForbidden;
synchronized (infoLock) {
curCachingStrategy = cachingStrategy;
shortCircuitForbidden = shortCircuitForbidden();
}
return new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(datanode).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetInBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(length).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
}
/** /**
* Close it down! * Close it down!
*/ */
@ -935,9 +942,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
private DNAddrPair chooseDataNode(LocatedBlock block, private DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException { Collection<DatanodeInfo> ignoredNodes) throws IOException {
while (true) { while (true) {
try { DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes);
return getBestNodeDNAddrPair(block, ignoredNodes); if (result != null) {
} catch (IOException ie) { return result;
} else {
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
deadNodes, ignoredNodes); deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src; String blockInfo = block.getBlock() + " file=" + src;
@ -954,7 +962,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
DFSClient.LOG.info("No node available for " + blockInfo); DFSClient.LOG.info("No node available for " + blockInfo);
} }
DFSClient.LOG.info("Could not obtain " + block.getBlock() DFSClient.LOG.info("Could not obtain " + block.getBlock()
+ " from any node: " + ie + errMsg + " from any node: " + errMsg
+ ". Will get new block locations from namenode and retry..."); + ". Will get new block locations from namenode and retry...");
try { try {
// Introducing a random factor to the wait time before another retry. // Introducing a random factor to the wait time before another retry.
@ -977,7 +985,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
openInfo(); openInfo();
block = getBlockAt(block.getStartOffset()); block = getBlockAt(block.getStartOffset());
failures++; failures++;
continue;
} }
} }
} }
@ -986,11 +993,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* Get the best node from which to stream the data. * Get the best node from which to stream the data.
* @param block LocatedBlock, containing nodes in priority order. * @param block LocatedBlock, containing nodes in priority order.
* @param ignoredNodes Do not choose nodes in this array (may be null) * @param ignoredNodes Do not choose nodes in this array (may be null)
* @return The DNAddrPair of the best node. * @return The DNAddrPair of the best node. Null if no node can be chosen.
* @throws IOException
*/ */
private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException { Collection<DatanodeInfo> ignoredNodes) {
DatanodeInfo[] nodes = block.getLocations(); DatanodeInfo[] nodes = block.getLocations();
StorageType[] storageTypes = block.getStorageTypes(); StorageType[] storageTypes = block.getStorageTypes();
DatanodeInfo chosenNode = null; DatanodeInfo chosenNode = null;
@ -1010,9 +1016,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} }
} }
if (chosenNode == null) { if (chosenNode == null) {
throw new IOException("No live nodes contain block " + block.getBlock() + DFSClient.LOG.warn("No live nodes contain block " + block.getBlock() +
" after checking nodes = " + Arrays.toString(nodes) + " after checking nodes = " + Arrays.toString(nodes) +
", ignoredNodes = " + ignoredNodes); ", ignoredNodes = " + ignoredNodes);
return null;
} }
final String dnAddr = final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname()); chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
@ -1102,40 +1109,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
// cached block locations may have been updated by chooseDataNode() // cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the // or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop. // start of the loop.
CachingStrategy curCachingStrategy;
boolean allowShortCircuitLocalReads;
LocatedBlock block = getBlockAt(blockStartOffset); LocatedBlock block = getBlockAt(blockStartOffset);
synchronized(infoLock) {
curCachingStrategy = cachingStrategy;
allowShortCircuitLocalReads = !shortCircuitForbidden();
}
DatanodeInfo chosenNode = datanode.info;
InetSocketAddress targetAddr = datanode.addr;
StorageType storageType = datanode.storageType;
BlockReader reader = null; BlockReader reader = null;
try { try {
DFSClientFaultInjector.get().fetchFromDatanodeException(); DFSClientFaultInjector.get().fetchFromDatanodeException();
Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
int len = (int) (end - start + 1); int len = (int) (end - start + 1);
reader = new BlockReaderFactory(dfsClient.getConf()). reader = getBlockReader(block, start, len, datanode.addr,
setInetSocketAddress(targetAddr). datanode.storageType, datanode.info);
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(block.getBlock()).
setBlockToken(blockToken).
setStartOffset(start).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(len).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
int nread = reader.readAll(buf, offset, len); int nread = reader.readAll(buf, offset, len);
updateReadStatistics(readStatistics, nread, reader); updateReadStatistics(readStatistics, nread, reader);
@ -1148,34 +1128,33 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} catch (ChecksumException e) { } catch (ChecksumException e) {
String msg = "fetchBlockByteRange(). Got a checksum exception for " String msg = "fetchBlockByteRange(). Got a checksum exception for "
+ src + " at " + block.getBlock() + ":" + e.getPos() + " from " + src + " at " + block.getBlock() + ":" + e.getPos() + " from "
+ chosenNode; + datanode.info;
DFSClient.LOG.warn(msg); DFSClient.LOG.warn(msg);
// we want to remember what we have tried // we want to remember what we have tried
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap); addIntoCorruptedBlockMap(block.getBlock(), datanode.info,
addToDeadNodes(chosenNode); corruptedBlockMap);
addToDeadNodes(datanode.info);
throw new IOException(msg); throw new IOException(msg);
} catch (IOException e) { } catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, " DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr + "encryption key was invalid when connecting to " + datanode.addr
+ " : " + e); + " : " + e);
// The encryption key used is invalid. // The encryption key used is invalid.
refetchEncryptionKey--; refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey(); dfsClient.clearDataEncryptionKey();
continue; } else if (refetchToken > 0 && tokenRefetchNeeded(e, datanode.addr)) {
} else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
refetchToken--; refetchToken--;
try { try {
fetchBlockAt(block.getStartOffset()); fetchBlockAt(block.getStartOffset());
} catch (IOException fbae) { } catch (IOException fbae) {
// ignore IOE, since we can retry it later in a loop // ignore IOE, since we can retry it later in a loop
} }
continue;
} else { } else {
String msg = "Failed to connect to " + targetAddr + " for file " String msg = "Failed to connect to " + datanode.addr + " for file "
+ src + " for block " + block.getBlock() + ":" + e; + src + " for block " + block.getBlock() + ":" + e;
DFSClient.LOG.warn("Connection failure: " + msg, e); DFSClient.LOG.warn("Connection failure: " + msg, e);
addToDeadNodes(chosenNode); addToDeadNodes(datanode.info);
throw new IOException(msg); throw new IOException(msg);
} }
} finally { } finally {
@ -1187,10 +1166,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} }
/** /**
* Like {@link #fetchBlockByteRange(LocatedBlock, long, long, byte[], * Like {@link #fetchBlockByteRange} except we start up a second, parallel,
* int, Map)} except we start up a second, parallel, 'hedged' read * 'hedged' read if the first read is taking longer than configured amount of
* if the first read is taking longer than configured amount of * time. We then wait on which ever read returns first.
* time. We then wait on which ever read returns first.
*/ */
private void hedgedFetchBlockByteRange(long blockStartOffset, long start, private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
long end, byte[] buf, int offset, long end, byte[] buf, int offset,
@ -1248,9 +1226,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
// If no nodes to do hedged reads against, pass. // If no nodes to do hedged reads against, pass.
try { try {
try { chosenNode = getBestNodeDNAddrPair(block, ignored);
chosenNode = getBestNodeDNAddrPair(block, ignored); if (chosenNode == null) {
} catch (IOException ioe) {
chosenNode = chooseDataNode(block, ignored); chosenNode = chooseDataNode(block, ignored);
} }
bb = ByteBuffer.allocate(len); bb = ByteBuffer.allocate(len);