HDFS-9734. Refactoring of checksum failure report related codes. Contributed by Kai Zheng.

Change-Id: Ie69a77e3498a360959f8e213c51fb2b17c28b64a
This commit is contained in:
Zhe Zhang 2016-02-25 09:55:50 -08:00
parent ccff6035f5
commit 8808779db3
7 changed files with 115 additions and 112 deletions

View File

@ -26,8 +26,6 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -57,6 +55,7 @@ import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -857,7 +856,7 @@ public class DFSInputStream extends FSInputStream
* ChecksumFileSystem * ChecksumFileSystem
*/ */
private synchronized int readBuffer(ReaderStrategy reader, int off, int len, private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) CorruptedBlocks corruptedBlocks)
throws IOException { throws IOException {
IOException ioe; IOException ioe;
@ -880,8 +879,7 @@ public class DFSInputStream extends FSInputStream
ioe = ce; ioe = ce;
retryCurrentNode = false; retryCurrentNode = false;
// we want to remember which block replicas we have tried // we want to remember which block replicas we have tried
addIntoCorruptedBlockMap(getCurrentBlock(), currentNode, corruptedBlocks.addCorruptedBlock(getCurrentBlock(), currentNode);
corruptedBlockMap);
} catch ( IOException e ) { } catch ( IOException e ) {
if (!retryCurrentNode) { if (!retryCurrentNode) {
DFSClient.LOG.warn("Exception while reading from " DFSClient.LOG.warn("Exception while reading from "
@ -914,7 +912,8 @@ public class DFSInputStream extends FSInputStream
if (closed.get()) { if (closed.get()) {
throw new IOException("Stream closed"); throw new IOException("Stream closed");
} }
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>();
CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
failures = 0; failures = 0;
if (pos < getFileLength()) { if (pos < getFileLength()) {
int retries = 2; int retries = 2;
@ -932,7 +931,7 @@ public class DFSInputStream extends FSInputStream
locatedBlocks.getFileLength() - pos); locatedBlocks.getFileLength() - pos);
} }
} }
int result = readBuffer(strategy, off, realLen, corruptedBlockMap); int result = readBuffer(strategy, off, realLen, corruptedBlocks);
if (result >= 0) { if (result >= 0) {
pos += result; pos += result;
@ -958,7 +957,7 @@ public class DFSInputStream extends FSInputStream
} finally { } finally {
// Check if need to report block replicas corruption either read // Check if need to report block replicas corruption either read
// was successful or ChecksumException occured. // was successful or ChecksumException occured.
reportCheckSumFailure(corruptedBlockMap, reportCheckSumFailure(corruptedBlocks,
currentLocatedBlock.getLocations().length, false); currentLocatedBlock.getLocations().length, false);
} }
} }
@ -999,24 +998,6 @@ public class DFSInputStream extends FSInputStream
} }
} }
/**
* Add corrupted block replica into map.
*/
protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
Set<DatanodeInfo> dnSet;
if((corruptedBlockMap.containsKey(blk))) {
dnSet = corruptedBlockMap.get(blk);
}else {
dnSet = new HashSet<>();
}
if (!dnSet.contains(node)) {
dnSet.add(node);
corruptedBlockMap.put(blk, dnSet);
}
}
private DNAddrPair chooseDataNode(LocatedBlock block, private DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException { Collection<DatanodeInfo> ignoredNodes) throws IOException {
while (true) { while (true) {
@ -1143,15 +1124,14 @@ public class DFSInputStream extends FSInputStream
} }
protected void fetchBlockByteRange(LocatedBlock block, long start, long end, protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
byte[] buf, int offset, byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
block = refreshLocatedBlock(block); block = refreshLocatedBlock(block);
while (true) { while (true) {
DNAddrPair addressPair = chooseDataNode(block, null); DNAddrPair addressPair = chooseDataNode(block, null);
try { try {
actualGetFromOneDataNode(addressPair, block, start, end, actualGetFromOneDataNode(addressPair, block, start, end,
buf, offset, corruptedBlockMap); buf, offset, corruptedBlocks);
return; return;
} catch (IOException e) { } catch (IOException e) {
// Ignore. Already processed inside the function. // Ignore. Already processed inside the function.
@ -1163,7 +1143,7 @@ public class DFSInputStream extends FSInputStream
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode, private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
final LocatedBlock block, final long start, final long end, final LocatedBlock block, final long start, final long end,
final ByteBuffer bb, final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, final CorruptedBlocks corruptedBlocks,
final int hedgedReadId) { final int hedgedReadId) {
final SpanId parentSpanId = Tracer.getCurrentSpanId(); final SpanId parentSpanId = Tracer.getCurrentSpanId();
return new Callable<ByteBuffer>() { return new Callable<ByteBuffer>() {
@ -1174,7 +1154,7 @@ public class DFSInputStream extends FSInputStream
try (TraceScope ignored = dfsClient.getTracer(). try (TraceScope ignored = dfsClient.getTracer().
newScope("hedgedRead" + hedgedReadId, parentSpanId)) { newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
actualGetFromOneDataNode(datanode, block, start, end, buf, actualGetFromOneDataNode(datanode, block, start, end, buf,
offset, corruptedBlockMap); offset, corruptedBlocks);
return bb; return bb;
} }
} }
@ -1190,12 +1170,12 @@ public class DFSInputStream extends FSInputStream
* @param endInBlk the endInBlk offset of the block * @param endInBlk the endInBlk offset of the block
* @param buf the given byte array into which the data is read * @param buf the given byte array into which the data is read
* @param offset the offset in buf * @param offset the offset in buf
* @param corruptedBlockMap map recording list of datanodes with corrupted * @param corruptedBlocks map recording list of datanodes with corrupted
* block replica * block replica
*/ */
void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block, void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block,
final long startInBlk, final long endInBlk, byte[] buf, int offset, final long startInBlk, final long endInBlk, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) CorruptedBlocks corruptedBlocks)
throws IOException { throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode(); DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once int refetchToken = 1; // only need to get a new access token once
@ -1226,8 +1206,7 @@ public class DFSInputStream extends FSInputStream
+ datanode.info; + datanode.info;
DFSClient.LOG.warn(msg); DFSClient.LOG.warn(msg);
// we want to remember what we have tried // we want to remember what we have tried
addIntoCorruptedBlockMap(block.getBlock(), datanode.info, corruptedBlocks.addCorruptedBlock(block.getBlock(), datanode.info);
corruptedBlockMap);
addToDeadNodes(datanode.info); addToDeadNodes(datanode.info);
throw new IOException(msg); throw new IOException(msg);
} catch (IOException e) { } catch (IOException e) {
@ -1277,8 +1256,7 @@ public class DFSInputStream extends FSInputStream
* time. We then wait on which ever read returns first. * time. We then wait on which ever read returns first.
*/ */
private void hedgedFetchBlockByteRange(LocatedBlock block, long start, private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset, long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
final DfsClientConf conf = dfsClient.getConf(); final DfsClientConf conf = dfsClient.getConf();
ArrayList<Future<ByteBuffer>> futures = new ArrayList<>(); ArrayList<Future<ByteBuffer>> futures = new ArrayList<>();
@ -1301,7 +1279,7 @@ public class DFSInputStream extends FSInputStream
bb = ByteBuffer.wrap(buf, offset, len); bb = ByteBuffer.wrap(buf, offset, len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, chosenNode, block, start, end, bb,
corruptedBlockMap, hedgedReadId++); corruptedBlocks, hedgedReadId++);
Future<ByteBuffer> firstRequest = hedgedService Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable); .submit(getFromDataNodeCallable);
futures.add(firstRequest); futures.add(firstRequest);
@ -1333,7 +1311,7 @@ public class DFSInputStream extends FSInputStream
bb = ByteBuffer.allocate(len); bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, chosenNode, block, start, end, bb,
corruptedBlockMap, hedgedReadId++); corruptedBlocks, hedgedReadId++);
Future<ByteBuffer> oneMoreRequest = hedgedService Future<ByteBuffer> oneMoreRequest = hedgedService
.submit(getFromDataNodeCallable); .submit(getFromDataNodeCallable);
futures.add(oneMoreRequest); futures.add(oneMoreRequest);
@ -1476,23 +1454,23 @@ public class DFSInputStream extends FSInputStream
// corresponding to position and realLen // corresponding to position and realLen
List<LocatedBlock> blockRange = getBlockRange(position, realLen); List<LocatedBlock> blockRange = getBlockRange(position, realLen);
int remaining = realLen; int remaining = realLen;
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>(); CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
for (LocatedBlock blk : blockRange) { for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset(); long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try { try {
if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) { if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
hedgedFetchBlockByteRange(blk, targetStart, hedgedFetchBlockByteRange(blk, targetStart,
targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); targetStart + bytesToRead - 1, buffer, offset, corruptedBlocks);
} else { } else {
fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
buffer, offset, corruptedBlockMap); buffer, offset, corruptedBlocks);
} }
} finally { } finally {
// Check and report if any block replicas are corrupted. // Check and report if any block replicas are corrupted.
// BlockMissingException may be caught if all block replicas are // BlockMissingException may be caught if all block replicas are
// corrupted. // corrupted.
reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length, reportCheckSumFailure(corruptedBlocks, blk.getLocations().length,
false); false);
} }
@ -1523,12 +1501,14 @@ public class DFSInputStream extends FSInputStream
* corresponding to each internal block. For this case we simply report the * corresponding to each internal block. For this case we simply report the
* corrupted blocks to NameNode and ignore the above logic. * corrupted blocks to NameNode and ignore the above logic.
* *
* @param corruptedBlockMap map of corrupted blocks * @param corruptedBlocks map of corrupted blocks
* @param dataNodeCount number of data nodes who contains the block replicas * @param dataNodeCount number of data nodes who contains the block replicas
*/ */
protected void reportCheckSumFailure( protected void reportCheckSumFailure(CorruptedBlocks corruptedBlocks,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
int dataNodeCount, boolean isStriped) { int dataNodeCount, boolean isStriped) {
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
corruptedBlocks.getCorruptionMap();
if (corruptedBlockMap.isEmpty()) { if (corruptedBlockMap.isEmpty()) {
return; return;
} }

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.ByteBufferPool;
@ -282,8 +283,7 @@ public class DFSStripedInputStream extends DFSInputStream {
* Read a new stripe covering the current position, and store the data in the * Read a new stripe covering the current position, and store the data in the
* {@link #curStripeBuf}. * {@link #curStripeBuf}.
*/ */
private void readOneStripe( private void readOneStripe(CorruptedBlocks corruptedBlocks)
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
resetCurStripeBuffer(); resetCurStripeBuffer();
@ -307,7 +307,7 @@ public class DFSStripedInputStream extends DFSInputStream {
for (AlignedStripe stripe : stripes) { for (AlignedStripe stripe : stripes) {
// Parse group to get chosen DN location // Parse group to get chosen DN location
StripeReader sreader = new StatefulStripeReader(readingService, stripe, StripeReader sreader = new StatefulStripeReader(readingService, stripe,
blks, blockReaders, corruptedBlockMap); blks, blockReaders, corruptedBlocks);
sreader.readStripe(); sreader.readStripe();
} }
curStripeBuf.position(stripeBufOffset); curStripeBuf.position(stripeBufOffset);
@ -319,7 +319,7 @@ public class DFSStripedInputStream extends DFSInputStream {
final DatanodeInfo datanode, final long currentReaderOffset, final DatanodeInfo datanode, final long currentReaderOffset,
final long targetReaderOffset, final ByteBufferStrategy[] strategies, final long targetReaderOffset, final ByteBufferStrategy[] strategies,
final ExtendedBlock currentBlock, final ExtendedBlock currentBlock,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { final CorruptedBlocks corruptedBlocks) {
return new Callable<Void>() { return new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
@ -338,7 +338,7 @@ public class DFSStripedInputStream extends DFSInputStream {
int result = 0; int result = 0;
for (ByteBufferStrategy strategy : strategies) { for (ByteBufferStrategy strategy : strategies) {
result += readToBuffer(reader, datanode, strategy, currentBlock, result += readToBuffer(reader, datanode, strategy, currentBlock,
corruptedBlockMap); corruptedBlocks);
} }
return null; return null;
} }
@ -348,7 +348,7 @@ public class DFSStripedInputStream extends DFSInputStream {
private int readToBuffer(BlockReader blockReader, private int readToBuffer(BlockReader blockReader,
DatanodeInfo currentNode, ByteBufferStrategy strategy, DatanodeInfo currentNode, ByteBufferStrategy strategy,
ExtendedBlock currentBlock, ExtendedBlock currentBlock,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) CorruptedBlocks corruptedBlocks)
throws IOException { throws IOException {
final int targetLength = strategy.buf.remaining(); final int targetLength = strategy.buf.remaining();
int length = 0; int length = 0;
@ -366,8 +366,7 @@ public class DFSStripedInputStream extends DFSInputStream {
+ currentBlock + " from " + currentNode + currentBlock + " from " + currentNode
+ " at " + ce.getPos()); + " at " + ce.getPos());
// we want to remember which block replicas we have tried // we want to remember which block replicas we have tried
addIntoCorruptedBlockMap(currentBlock, currentNode, corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
corruptedBlockMap);
throw ce; throw ce;
} catch (IOException e) { } catch (IOException e) {
DFSClient.LOG.warn("Exception while reading from " DFSClient.LOG.warn("Exception while reading from "
@ -423,8 +422,8 @@ public class DFSStripedInputStream extends DFSInputStream {
if (closed.get()) { if (closed.get()) {
throw new IOException("Stream closed"); throw new IOException("Stream closed");
} }
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
new ConcurrentHashMap<>(); CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
if (pos < getFileLength()) { if (pos < getFileLength()) {
try { try {
if (pos > blockEnd) { if (pos > blockEnd) {
@ -442,7 +441,7 @@ public class DFSStripedInputStream extends DFSInputStream {
int result = 0; int result = 0;
while (result < realLen) { while (result < realLen) {
if (!curStripeRange.include(getOffsetInBlockGroup())) { if (!curStripeRange.include(getOffsetInBlockGroup())) {
readOneStripe(corruptedBlockMap); readOneStripe(corruptedBlocks);
} }
int ret = copyToTargetBuf(strategy, off + result, realLen - result); int ret = copyToTargetBuf(strategy, off + result, realLen - result);
result += ret; result += ret;
@ -455,7 +454,7 @@ public class DFSStripedInputStream extends DFSInputStream {
} finally { } finally {
// Check if need to report block replicas corruption either read // Check if need to report block replicas corruption either read
// was successful or ChecksumException occured. // was successful or ChecksumException occured.
reportCheckSumFailure(corruptedBlockMap, reportCheckSumFailure(corruptedBlocks,
currentLocatedBlock.getLocations().length, true); currentLocatedBlock.getLocations().length, true);
} }
} }
@ -519,8 +518,7 @@ public class DFSStripedInputStream extends DFSInputStream {
*/ */
@Override @Override
protected void fetchBlockByteRange(LocatedBlock block, long start, protected void fetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset, long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
// Refresh the striped block group // Refresh the striped block group
LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset()); LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());
@ -536,7 +534,7 @@ public class DFSStripedInputStream extends DFSInputStream {
for (AlignedStripe stripe : stripes) { for (AlignedStripe stripe : stripes) {
// Parse group to get chosen DN location // Parse group to get chosen DN location
StripeReader preader = new PositionStripeReader(readService, stripe, StripeReader preader = new PositionStripeReader(readService, stripe,
blks, preaderInfos, corruptedBlockMap); blks, preaderInfos, corruptedBlocks);
preader.readStripe(); preader.readStripe();
} }
} finally { } finally {
@ -575,17 +573,17 @@ public class DFSStripedInputStream extends DFSInputStream {
final AlignedStripe alignedStripe; final AlignedStripe alignedStripe;
final CompletionService<Void> service; final CompletionService<Void> service;
final LocatedBlock[] targetBlocks; final LocatedBlock[] targetBlocks;
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap; final CorruptedBlocks corruptedBlocks;
final BlockReaderInfo[] readerInfos; final BlockReaderInfo[] readerInfos;
StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe, StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe,
LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos, LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { CorruptedBlocks corruptedBlocks) {
this.service = service; this.service = service;
this.alignedStripe = alignedStripe; this.alignedStripe = alignedStripe;
this.targetBlocks = targetBlocks; this.targetBlocks = targetBlocks;
this.readerInfos = readerInfos; this.readerInfos = readerInfos;
this.corruptedBlockMap = corruptedBlockMap; this.corruptedBlocks = corruptedBlocks;
} }
/** prepare all the data chunks */ /** prepare all the data chunks */
@ -731,7 +729,7 @@ public class DFSStripedInputStream extends DFSInputStream {
readerInfos[chunkIndex].datanode, readerInfos[chunkIndex].datanode,
readerInfos[chunkIndex].blockReaderOffset, readerInfos[chunkIndex].blockReaderOffset,
alignedStripe.getOffsetInBlock(), getReadStrategies(chunk), alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
block.getBlock(), corruptedBlockMap); block.getBlock(), corruptedBlocks);
Future<Void> request = service.submit(readCallable); Future<Void> request = service.submit(readCallable);
futures.put(request, chunkIndex); futures.put(request, chunkIndex);
@ -812,10 +810,9 @@ public class DFSStripedInputStream extends DFSInputStream {
PositionStripeReader(CompletionService<Void> service, PositionStripeReader(CompletionService<Void> service,
AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
BlockReaderInfo[] readerInfos, BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) {
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
super(service, alignedStripe, targetBlocks, readerInfos, super(service, alignedStripe, targetBlocks, readerInfos,
corruptedBlockMap); corruptedBlocks);
} }
@Override @Override
@ -849,10 +846,9 @@ public class DFSStripedInputStream extends DFSInputStream {
StatefulStripeReader(CompletionService<Void> service, StatefulStripeReader(CompletionService<Void> service,
AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
BlockReaderInfo[] readerInfos, BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) {
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
super(service, alignedStripe, targetBlocks, readerInfos, super(service, alignedStripe, targetBlocks, readerInfos,
corruptedBlockMap); corruptedBlocks);
} }
@Override @Override

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -69,9 +70,11 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
@ -681,4 +684,37 @@ public class DFSUtilClient {
iioe.initCause(e); iioe.initCause(e);
return iioe; return iioe;
} }
/**
* A utility class as a container to put corrupted blocks, shared by client
* and datanode.
*/
public static class CorruptedBlocks {
private Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap;
public CorruptedBlocks() {
this.corruptionMap = new HashMap<>();
}
/**
* Indicate a block replica on the specified datanode is corrupted
*/
public void addCorruptedBlock(ExtendedBlock blk, DatanodeInfo node) {
Set<DatanodeInfo> dnSet = corruptionMap.get(blk);
if (dnSet == null) {
dnSet = new HashSet<>();
corruptionMap.put(blk, dnSet);
}
if (!dnSet.contains(node)) {
dnSet.add(node);
}
}
/**
* @return the map that contains all the corruption entries.
*/
public Map<ExtendedBlock, Set<DatanodeInfo>> getCorruptionMap() {
return corruptionMap;
}
}
} }

View File

@ -443,6 +443,9 @@ Trunk (Unreleased)
HDFS-9837. BlockManager#countNodes should be able to detect duplicated HDFS-9837. BlockManager#countNodes should be able to detect duplicated
internal blocks. (jing9) internal blocks. (jing9)
HDFS-9734. Refactoring of checksum failure report related codes.
(Kai Zheng via zhz)
BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
HDFS-7347. Configurable erasure coding policy for individual files and HDFS-7347. Configurable erasure coding policy for individual files and

View File

@ -1151,6 +1151,20 @@ public class DataNode extends ReconfigurableBase
bpos.reportRemoteBadBlock(srcDataNode, block); bpos.reportRemoteBadBlock(srcDataNode, block);
} }
public void reportCorruptedBlocks(
DFSUtilClient.CorruptedBlocks corruptedBlocks) throws IOException {
Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap =
corruptedBlocks.getCorruptionMap();
if (!corruptionMap.isEmpty()) {
for (Map.Entry<ExtendedBlock, Set<DatanodeInfo>> entry :
corruptionMap.entrySet()) {
for (DatanodeInfo dnInfo : entry.getValue()) {
reportRemoteBadBlock(dnInfo, entry.getKey());
}
}
}
}
/** /**
* Try to send an error report to the NNs associated with the given * Try to send an error report to the NNs associated with the given
* block pool. * block pool.

View File

@ -32,10 +32,8 @@ import java.util.BitSet;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
@ -54,6 +52,7 @@ import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSPacket; import org.apache.hadoop.hdfs.DFSPacket;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
import org.apache.hadoop.hdfs.RemoteBlockReader2; import org.apache.hadoop.hdfs.RemoteBlockReader2;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -460,13 +459,13 @@ public final class ErasureCodingWorker {
bufferSize, maxTargetLength - positionInBlock); bufferSize, maxTargetLength - positionInBlock);
// step1: read from minimum source DNs required for reconstruction. // step1: read from minimum source DNs required for reconstruction.
// The returned success list is the source DNs we do real read from // The returned success list is the source DNs we do real read from
Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap = new HashMap<>(); CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
try { try {
success = readMinimumStripedData4Reconstruction(success, success = readMinimumStripedData4Reconstruction(success,
toReconstruct, corruptionMap); toReconstruct, corruptedBlocks);
} finally { } finally {
// report corrupted blocks to NN // report corrupted blocks to NN
reportCorruptedBlocks(corruptionMap); datanode.reportCorruptedBlocks(corruptedBlocks);
} }
// step2: decode to reconstruct targets // step2: decode to reconstruct targets
@ -564,8 +563,7 @@ public final class ErasureCodingWorker {
* @throws IOException * @throws IOException
*/ */
private int[] readMinimumStripedData4Reconstruction(final int[] success, private int[] readMinimumStripedData4Reconstruction(final int[] success,
int reconstructLength, int reconstructLength, CorruptedBlocks corruptedBlocks)
Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap)
throws IOException { throws IOException {
Preconditions.checkArgument(reconstructLength >= 0 && Preconditions.checkArgument(reconstructLength >= 0 &&
reconstructLength <= bufferSize); reconstructLength <= bufferSize);
@ -582,7 +580,7 @@ public final class ErasureCodingWorker {
reconstructLength); reconstructLength);
if (toRead > 0) { if (toRead > 0) {
Callable<Void> readCallable = readFromBlock(reader, reader.buffer, Callable<Void> readCallable = readFromBlock(reader, reader.buffer,
toRead, corruptionMap); toRead, corruptedBlocks);
Future<Void> f = readService.submit(readCallable); Future<Void> f = readService.submit(readCallable);
futures.put(f, success[i]); futures.put(f, success[i]);
} else { } else {
@ -608,11 +606,11 @@ public final class ErasureCodingWorker {
IOUtils.closeStream(failedReader.blockReader); IOUtils.closeStream(failedReader.blockReader);
failedReader.blockReader = null; failedReader.blockReader = null;
resultIndex = scheduleNewRead(used, reconstructLength, resultIndex = scheduleNewRead(used, reconstructLength,
corruptionMap); corruptedBlocks);
} else if (result.state == StripingChunkReadResult.TIMEOUT) { } else if (result.state == StripingChunkReadResult.TIMEOUT) {
// If timeout, we also schedule a new read. // If timeout, we also schedule a new read.
resultIndex = scheduleNewRead(used, reconstructLength, resultIndex = scheduleNewRead(used, reconstructLength,
corruptionMap); corruptedBlocks);
} }
if (resultIndex >= 0) { if (resultIndex >= 0) {
newSuccess[nsuccess++] = resultIndex; newSuccess[nsuccess++] = resultIndex;
@ -723,7 +721,7 @@ public final class ErasureCodingWorker {
* @return the array index of source DN if don't need to do real read. * @return the array index of source DN if don't need to do real read.
*/ */
private int scheduleNewRead(BitSet used, int reconstructLen, private int scheduleNewRead(BitSet used, int reconstructLen,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) { CorruptedBlocks corruptedBlocks) {
StripedReader reader = null; StripedReader reader = null;
// step1: initially we may only have <code>minRequiredSources</code> // step1: initially we may only have <code>minRequiredSources</code>
// number of StripedReader, and there may be some source DNs we never // number of StripedReader, and there may be some source DNs we never
@ -775,7 +773,7 @@ public final class ErasureCodingWorker {
// step3: schedule if find a correct source DN and need to do real read. // step3: schedule if find a correct source DN and need to do real read.
if (reader != null) { if (reader != null) {
Callable<Void> readCallable = readFromBlock(reader, reader.buffer, Callable<Void> readCallable = readFromBlock(reader, reader.buffer,
toRead, corruptionMap); toRead, corruptedBlocks);
Future<Void> f = readService.submit(readCallable); Future<Void> f = readService.submit(readCallable);
futures.put(f, m); futures.put(f, m);
used.set(m); used.set(m);
@ -793,7 +791,7 @@ public final class ErasureCodingWorker {
private Callable<Void> readFromBlock(final StripedReader reader, private Callable<Void> readFromBlock(final StripedReader reader,
final ByteBuffer buf, final int length, final ByteBuffer buf, final int length,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) { final CorruptedBlocks corruptedBlocks) {
return new Callable<Void>() { return new Callable<Void>() {
@Override @Override
@ -805,7 +803,7 @@ public final class ErasureCodingWorker {
} catch (ChecksumException e) { } catch (ChecksumException e) {
LOG.warn("Found Checksum error for {} from {} at {}", reader.block, LOG.warn("Found Checksum error for {} from {} at {}", reader.block,
reader.source, e.getPos()); reader.source, e.getPos());
addCorruptedBlock(reader.block, reader.source, corruptionMap); corruptedBlocks.addCorruptedBlock(reader.block, reader.source);
throw e; throw e;
} catch (IOException e) { } catch (IOException e) {
LOG.info(e.getMessage()); LOG.info(e.getMessage());
@ -816,30 +814,6 @@ public final class ErasureCodingWorker {
}; };
} }
private void reportCorruptedBlocks(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) throws IOException {
if (!corruptionMap.isEmpty()) {
for (Map.Entry<ExtendedBlock, Set<DatanodeInfo>> entry :
corruptionMap.entrySet()) {
for (DatanodeInfo dnInfo : entry.getValue()) {
datanode.reportRemoteBadBlock(dnInfo, entry.getKey());
}
}
}
}
private void addCorruptedBlock(ExtendedBlock blk, DatanodeInfo node,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
Set<DatanodeInfo> dnSet = corruptionMap.get(blk);
if (dnSet == null) {
dnSet = new HashSet<>();
corruptionMap.put(blk, dnSet);
}
if (!dnSet.contains(node)) {
dnSet.add(node);
}
}
/** /**
* Read bytes from block * Read bytes from block
*/ */

View File

@ -217,7 +217,7 @@ public class TestReconstructStripedFile {
return d; return d;
} }
private void shutdownDataNodes(DataNode dn) throws IOException { private void shutdownDataNode(DataNode dn) throws IOException {
/* /*
* Kill the datanode which contains one replica * Kill the datanode which contains one replica
* We need to make sure it dead in namenode: clear its update time and * We need to make sure it dead in namenode: clear its update time and
@ -237,7 +237,7 @@ public class TestReconstructStripedFile {
// stop at least one DN to trigger reconstruction // stop at least one DN to trigger reconstruction
LOG.info("Note: stop DataNode " + target.getValue().getDisplayName() LOG.info("Note: stop DataNode " + target.getValue().getDisplayName()
+ " with internal block " + target.getKey()); + " with internal block " + target.getKey());
shutdownDataNodes(target.getValue()); shutdownDataNode(target.getValue());
stoppedDN++; stoppedDN++;
} else { // corrupt the data on the DN } else { // corrupt the data on the DN
LOG.info("Note: corrupt data on " + target.getValue().getDisplayName() LOG.info("Note: corrupt data on " + target.getValue().getDisplayName()