HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream (stateful read). Contributed by Jing Zhao

This commit is contained in:
Zhe Zhang 2015-04-29 15:53:31 -07:00 committed by Zhe Zhang
parent 1a8139e6ad
commit b00c663877
2 changed files with 154 additions and 193 deletions

View File

@ -143,3 +143,6 @@
HDFS-8235. Erasure Coding: Create DFSStripedInputStream in DFSClient#open. HDFS-8235. Erasure Coding: Create DFSStripedInputStream in DFSClient#open.
(Kai Sasaki via jing9) (Kai Sasaki via jing9)
HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream
(stateful read). (Jing Zhao via Zhe Zhang)

View File

@ -22,11 +22,8 @@ import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.htrace.Span; import org.apache.htrace.Span;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
@ -126,23 +123,42 @@ public class DFSStripedInputStream extends DFSInputStream {
return results; return results;
} }
private static class ReaderRetryPolicy {
private int fetchEncryptionKeyTimes = 1;
private int fetchTokenTimes = 1;
void refetchEncryptionKey() {
fetchEncryptionKeyTimes--;
}
void refetchToken() {
fetchTokenTimes--;
}
boolean shouldRefetchEncryptionKey() {
return fetchEncryptionKeyTimes > 0;
}
boolean shouldRefetchToken() {
return fetchTokenTimes > 0;
}
}
private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS; private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
private BlockReader[] blockReaders = null; private final BlockReader[] blockReaders = new BlockReader[groupSize];
private DatanodeInfo[] currentNodes = null; private final DatanodeInfo[] currentNodes = new DatanodeInfo[groupSize];
private final int cellSize; private final int cellSize;
private final short dataBlkNum; private final short dataBlkNum;
private final short parityBlkNum; private final short parityBlkNum;
private final ECInfo ecInfo;
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, ECInfo info) DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
throws IOException { ECInfo ecInfo) throws IOException {
super(dfsClient, src, verifyChecksum); super(dfsClient, src, verifyChecksum);
// ECInfo is restored from NN just before reading striped file. // ECInfo is restored from NN just before reading striped file.
assert info != null; assert ecInfo != null;
ecInfo = info;
cellSize = ecInfo.getSchema().getChunkSize(); cellSize = ecInfo.getSchema().getChunkSize();
dataBlkNum = (short)ecInfo.getSchema().getNumDataUnits(); dataBlkNum = (short) ecInfo.getSchema().getNumDataUnits();
parityBlkNum = (short)ecInfo.getSchema().getNumParityUnits(); parityBlkNum = (short) ecInfo.getSchema().getNumParityUnits();
DFSClient.LOG.debug("Creating an striped input stream for file " + src); DFSClient.LOG.debug("Creating an striped input stream for file " + src);
} }
@ -162,9 +178,7 @@ public class DFSStripedInputStream extends DFSInputStream {
* When seeking into a new block group, create blockReader for each internal * When seeking into a new block group, create blockReader for each internal
* block in the group. * block in the group.
*/ */
@VisibleForTesting private synchronized void blockSeekTo(long target) throws IOException {
private synchronized DatanodeInfo[] blockSeekTo(long target)
throws IOException {
if (target >= getFileLength()) { if (target >= getFileLength()) {
throw new IOException("Attempted to read past end of file"); throw new IOException("Attempted to read past end of file");
} }
@ -172,18 +186,13 @@ public class DFSStripedInputStream extends DFSInputStream {
// Will be getting a new BlockReader. // Will be getting a new BlockReader.
closeCurrentBlockReaders(); closeCurrentBlockReaders();
// Connect to best DataNode for desired Block, with potential offset
DatanodeInfo[] chosenNodes = new DatanodeInfo[groupSize];
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
// Compute desired striped block group // Compute desired striped block group
LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target); LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target);
// Update current position // Update current position
this.pos = target; this.pos = target;
this.blockEnd = targetBlockGroup.getStartOffset() + this.blockEnd = targetBlockGroup.getStartOffset() +
targetBlockGroup.getBlockSize() - 1; targetBlockGroup.getBlockSize() - 1;
currentLocatedBlock = targetBlockGroup;
long offsetIntoBlockGroup = target - targetBlockGroup.getStartOffset(); long offsetIntoBlockGroup = target - targetBlockGroup.getStartOffset();
LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup( LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup(
@ -191,71 +200,50 @@ public class DFSStripedInputStream extends DFSInputStream {
// The purpose is to get start offset into each block // The purpose is to get start offset into each block
ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, ReadPortion[] readPortions = planReadPortions(groupSize, cellSize,
offsetIntoBlockGroup, 0, 0); offsetIntoBlockGroup, 0, 0);
while (true) {
int i = 0;
InetSocketAddress targetAddr = null;
try {
blockReaders = new BlockReader[groupSize];
for (i = 0; i < groupSize; i++) {
LocatedBlock targetBlock = targetBlocks[i];
if (targetBlock == null) {
continue;
}
long offsetIntoBlock = readPortions[i].startOffsetInBlock;
DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
chosenNodes[i] = retval.info;
targetAddr = retval.addr;
StorageType storageType = retval.storageType;
ExtendedBlock blk = targetBlock.getBlock(); final ReaderRetryPolicy retry = new ReaderRetryPolicy();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); for (int i = 0; i < groupSize; i++) {
CachingStrategy curCachingStrategy; LocatedBlock targetBlock = targetBlocks[i];
boolean shortCircuitForbidden; if (targetBlock != null) {
synchronized(infoLock) { DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
curCachingStrategy = cachingStrategy; if (retval != null) {
shortCircuitForbidden = shortCircuitForbidden(); currentNodes[i] = retval.info;
} blockReaders[i] = getBlockReaderWithRetry(targetBlock,
blockReaders[i] = new BlockReaderFactory(dfsClient.getConf()). readPortions[i].startOffsetInBlock,
setInetSocketAddress(targetAddr). targetBlock.getBlockSize() - readPortions[i].startOffsetInBlock,
setRemotePeerFactory(dfsClient). retval.addr, retval.storageType, retval.info, target, retry);
setDatanodeInfo(chosenNodes[i]).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetIntoBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(blk.getNumBytes() - offsetIntoBlock).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
} }
currentLocatedBlock = targetBlockGroup; }
return chosenNodes; }
} catch (IOException ex) { }
// Retry in case of encryption key or token exceptions. Otherwise throw
// IOException: since each internal block is singly replicated, it's private BlockReader getBlockReaderWithRetry(LocatedBlock targetBlock,
// not meaningful trying to locate another replica. long offsetInBlock, long length, InetSocketAddress targetAddr,
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { StorageType storageType, DatanodeInfo datanode, long offsetInFile,
ReaderRetryPolicy retry) throws IOException {
// only need to get a new access token or a new encryption key once
while (true) {
try {
return getBlockReader(targetBlock, offsetInBlock, length, targetAddr,
storageType, datanode);
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException &&
retry.shouldRefetchEncryptionKey()) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, " DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr + "encryption key was invalid when connecting to " + targetAddr
+ " : " + ex); + " : " + e);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey(); dfsClient.clearDataEncryptionKey();
} else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) { retry.refetchEncryptionKey();
refetchToken--; } else if (retry.shouldRefetchToken() &&
fetchBlockAt(target); tokenRefetchNeeded(e, targetAddr)) {
fetchBlockAt(offsetInFile);
retry.refetchToken();
} else { } else {
DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block" DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ ", add to deadNodes and continue. " + ex, ex); + ", add to deadNodes and continue.", e);
// Put chosen node into dead list and throw exception // Put chosen node into dead list, continue
addToDeadNodes(chosenNodes[i]); addToDeadNodes(datanode);
throw ex; return null;
} }
} }
} }
@ -272,15 +260,15 @@ public class DFSStripedInputStream extends DFSInputStream {
return; return;
} }
for (int i = 0; i < groupSize; i++) { for (int i = 0; i < groupSize; i++) {
if (blockReaders[i] == null) { if (blockReaders[i] != null) {
continue; try {
blockReaders[i].close();
} catch (IOException e) {
DFSClient.LOG.error("error closing blockReader", e);
}
blockReaders[i] = null;
} }
try { currentNodes[i] = null;
blockReaders[i].close();
} catch (IOException e) {
DFSClient.LOG.error("error closing blockReader", e);
}
blockReaders[i] = null;
} }
blockEnd = -1; blockEnd = -1;
} }
@ -292,123 +280,93 @@ public class DFSStripedInputStream extends DFSInputStream {
if (closed.get()) { if (closed.get()) {
throw new IOException("Stream closed"); throw new IOException("Stream closed");
} }
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>();
= new HashMap<>();
failures = 0; failures = 0;
if (pos < getFileLength()) { if (pos < getFileLength()) {
int retries = 2;
/** Index of the target block in a stripe to read from */ /** Index of the target block in a stripe to read from */
int idxInGroup = (int) ((pos / cellSize) % dataBlkNum); int idxInGroup = (int) ((pos / cellSize) % dataBlkNum);
while (retries > 0) { try {
try { if (pos > blockEnd) {
// currentNode can be left as null if previous read had a checksum blockSeekTo(pos);
// error on the same block. See HDFS-3067
if (pos > blockEnd || currentNodes == null) {
currentNodes = blockSeekTo(pos);
}
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
synchronized(infoLock) {
if (locatedBlocks.isLastBlockComplete()) {
realLen = (int) Math.min(realLen,
locatedBlocks.getFileLength() - pos);
}
}
/** Number of bytes already read into buffer */
int result = 0;
while (result < realLen) {
/**
* Temporary position into the file; {@link pos} might not proceed
* to this temporary position in case of exceptions.
*/
long tmpPos = pos + result;
/** Start and end offsets of a cell in the file */
long cellStart = (tmpPos / cellSize) * cellSize;
long cellEnd = cellStart + cellSize - 1;
/** Number of bytes to read from the current cell */
int realLenInCell = (int) Math.min(realLen - result,
cellEnd - tmpPos + 1L);
assert realLenInCell > 0 : "Temporary position shouldn't be " +
"after cellEnd";
// Read from one blockReader up to cell boundary
int cellRet = readBuffer(blockReaders[idxInGroup],
currentNodes[idxInGroup], strategy, off + result,
realLenInCell);
if (cellRet >= 0) {
result += cellRet;
if (cellRet < realLenInCell) {
// A short read indicates the current blockReader buffer is
// already drained. Should return the read call. Otherwise
// should proceed to the next cell.
break;
}
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}
idxInGroup = (idxInGroup + 1) % dataBlkNum;
}
pos += result;
if (dfsClient.stats != null) {
dfsClient.stats.incrementBytesRead(result);
}
return result;
} catch (ChecksumException ce) {
throw ce;
} catch (IOException e) {
if (retries == 1) {
DFSClient.LOG.warn("DFS Read", e);
}
blockEnd = -1;
if (currentNodes[idxInGroup] != null) {
addToDeadNodes(currentNodes[idxInGroup]);
}
if (--retries == 0) {
throw e;
}
} finally {
// Check if need to report block replicas corruption either read
// was successful or ChecksumException occured.
reportCheckSumFailure(corruptedBlockMap,
currentLocatedBlock.getLocations().length);
} }
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
synchronized (infoLock) {
if (locatedBlocks.isLastBlockComplete()) {
realLen = (int) Math.min(realLen,
locatedBlocks.getFileLength() - pos);
}
}
/** Number of bytes already read into buffer */
int result = 0;
while (result < realLen) {
/**
* Temporary position into the file; {@link pos} might not proceed
* to this temporary position in case of exceptions.
*/
long tmpPos = pos + result;
/** Start and end offsets of a cell in the file */
long cellStart = (tmpPos / cellSize) * cellSize;
long cellEnd = cellStart + cellSize - 1;
/** Number of bytes to read from the current cell */
int realLenInCell = (int) Math.min(realLen - result,
cellEnd - tmpPos + 1L);
assert realLenInCell > 0 : "Temporary position shouldn't be "
+ "after cellEnd";
// Read from one blockReader up to cell boundary
int cellRet = readBuffer(blockReaders[idxInGroup],
currentNodes[idxInGroup], strategy, off + result, realLenInCell,
corruptedBlockMap);
if (cellRet >= 0) {
result += cellRet;
if (cellRet < realLenInCell) {
// A short read indicates the current blockReader buffer is
// already drained. Should return the read call. Otherwise
// should proceed to the next cell.
break;
}
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}
idxInGroup = (idxInGroup + 1) % dataBlkNum;
}
pos += result;
if (dfsClient.stats != null) {
dfsClient.stats.incrementBytesRead(result);
}
return result;
} finally {
// Check if need to report block replicas corruption either read
// was successful or ChecksumException occured.
reportCheckSumFailure(corruptedBlockMap,
currentLocatedBlock.getLocations().length);
} }
} }
return -1; return -1;
} }
private synchronized int readBuffer(BlockReader blockReader, private synchronized int readBuffer(BlockReader blockReader,
DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len) DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len,
throws IOException { Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
IOException ioe; try {
while (true) { return readerStrategy.doRead(blockReader, off, len);
try { } catch ( ChecksumException ce ) {
return readerStrategy.doRead(blockReader, off, len); DFSClient.LOG.warn("Found Checksum error for "
} catch ( ChecksumException ce ) { + getCurrentBlock() + " from " + currentNode
DFSClient.LOG.warn("Found Checksum error for " + " at " + ce.getPos());
+ getCurrentBlock() + " from " + currentNode // we want to remember which block replicas we have tried
+ " at " + ce.getPos()); addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
// If current block group is corrupt, it's meaningless to retry. corruptedBlockMap);
// TODO: this should trigger decoding logic (HDFS-7678) } catch (IOException e) {
throw ce; DFSClient.LOG.warn("Exception while reading from "
} catch ( IOException e ) { + getCurrentBlock() + " of " + src + " from "
ioe = e; + currentNode, e);
}
boolean sourceFound = seekToBlockSource(pos);
if (!sourceFound) {
throw ioe;
}
} }
} // TODO: this should trigger decoding logic (HDFS-7678)
return -1;
private boolean seekToBlockSource(long targetPos)
throws IOException {
currentNodes = blockSeekTo(targetPos);
return true;
} }
protected class ByteBufferStrategy extends DFSInputStream.ByteBufferStrategy { protected class ByteBufferStrategy extends DFSInputStream.ByteBufferStrategy {
@ -418,7 +376,7 @@ public class DFSStripedInputStream extends DFSInputStream {
@Override @Override
public int doRead(BlockReader blockReader, int off, int len) public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException { throws IOException {
int oldlimit = buf.limit(); int oldlimit = buf.limit();
if (buf.remaining() > len) { if (buf.remaining() > len) {
buf.limit(buf.position() + len); buf.limit(buf.position() + len);