HDFS-2985. Improve logging when replicas are marked as corrupt. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1292609 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-02-23 01:16:33 +00:00
parent b69e4d9173
commit 475db83b87
7 changed files with 99 additions and 39 deletions

View File

@ -241,6 +241,8 @@ Release 0.23.2 - UNRELEASED
HDFS-2907. Add a conf property dfs.datanode.fsdataset.factory to make
FSDataset in Datanode pluggable. (szetszwo)
HDFS-2985. Improve logging when replicas are marked as corrupt. (todd)
OPTIMIZATIONS
BUG FIXES

View File

@ -804,9 +804,11 @@ public class BlockManager {
* Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt
* @param dn Datanode which holds the corrupt replica
* @param reason a textual reason why the block should be marked corrupt,
* for logging purposes
*/
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
final DatanodeInfo dn) throws IOException {
final DatanodeInfo dn, String reason) throws IOException {
namesystem.writeLock();
try {
final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
@ -819,14 +821,15 @@ public class BlockManager {
+ blk + " not found.");
return;
}
markBlockAsCorrupt(storedBlock, dn);
markBlockAsCorrupt(storedBlock, dn, reason);
} finally {
namesystem.writeUnlock();
}
}
private void markBlockAsCorrupt(BlockInfo storedBlock,
DatanodeInfo dn) throws IOException {
DatanodeInfo dn,
String reason) throws IOException {
assert storedBlock != null : "storedBlock should not be null";
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
@ -850,7 +853,7 @@ public class BlockManager {
node.addBlock(storedBlock);
// Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(storedBlock, node);
corruptReplicas.addToCorruptReplicasMap(storedBlock, node, reason);
if (countNodes(storedBlock).liveReplicas() >= inode.getReplication()) {
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(storedBlock, node);
@ -1277,6 +1280,21 @@ public class BlockManager {
this.reportedState = reportedState;
}
}
/**
* BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
* list of blocks that should be considered corrupt due to a block report.
*/
private static class BlockToMarkCorrupt {
final BlockInfo blockInfo;
final String reason;
BlockToMarkCorrupt(BlockInfo blockInfo, String reason) {
super();
this.blockInfo = blockInfo;
this.reason = reason;
}
}
/**
* The given datanode is reporting all its blocks.
@ -1331,7 +1349,7 @@ public class BlockManager {
Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
Collection<Block> toRemove = new LinkedList<Block>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
@ -1351,8 +1369,8 @@ public class BlockManager {
+ " does not belong to any file.");
addToInvalidates(b, node);
}
for (BlockInfo b : toCorrupt) {
markBlockAsCorrupt(b, node);
for (BlockToMarkCorrupt b : toCorrupt) {
markBlockAsCorrupt(b.blockInfo, node, b.reason);
}
}
@ -1383,8 +1401,10 @@ public class BlockManager {
// If block is corrupt, mark it and continue to next block.
BlockUCState ucState = storedBlock.getBlockUCState();
if (isReplicaCorrupt(iblk, reportedState, storedBlock, ucState, node)) {
markBlockAsCorrupt(storedBlock, node);
BlockToMarkCorrupt c = checkReplicaCorrupt(
iblk, reportedState, storedBlock, ucState, node);
if (c != null) {
markBlockAsCorrupt(c.blockInfo, node, c.reason);
continue;
}
@ -1406,7 +1426,7 @@ public class BlockManager {
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockInfo> toCorrupt, // add to corrupt replicas list
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
// place a delimiter in the list which separates blocks
// that have been reported from those that have not
@ -1473,7 +1493,7 @@ public class BlockManager {
final Block block, final ReplicaState reportedState,
final Collection<BlockInfo> toAdd,
final Collection<Block> toInvalidate,
final Collection<BlockInfo> toCorrupt,
final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
if(LOG.isDebugEnabled()) {
@ -1504,8 +1524,10 @@ public class BlockManager {
return storedBlock;
}
if (isReplicaCorrupt(block, reportedState, storedBlock, ucState, dn)) {
toCorrupt.add(storedBlock);
BlockToMarkCorrupt c = checkReplicaCorrupt(
block, reportedState, storedBlock, ucState, dn);
if (c != null) {
toCorrupt.add(c);
return storedBlock;
}
@ -1529,8 +1551,11 @@ public class BlockManager {
* as switch statements, on the theory that it is easier to understand
* the combinatorics of reportedState and ucState that way. It should be
* at least as efficient as boolean expressions.
*
* @return a BlockToMarkCorrupt object, or null if the replica is not corrupt
*/
private boolean isReplicaCorrupt(Block iblk, ReplicaState reportedState,
private BlockToMarkCorrupt checkReplicaCorrupt(
Block iblk, ReplicaState reportedState,
BlockInfo storedBlock, BlockUCState ucState,
DatanodeDescriptor dn) {
switch(reportedState) {
@ -1538,17 +1563,31 @@ public class BlockManager {
switch(ucState) {
case COMPLETE:
case COMMITTED:
return (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()
|| storedBlock.getNumBytes() != iblk.getNumBytes());
if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
return new BlockToMarkCorrupt(storedBlock,
"block is " + ucState + " and reported genstamp " +
iblk.getGenerationStamp() + " does not match " +
"genstamp in block map " + storedBlock.getGenerationStamp());
} else if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
return new BlockToMarkCorrupt(storedBlock,
"block is " + ucState + " and reported length " +
iblk.getNumBytes() + " does not match " +
"length in block map " + storedBlock.getNumBytes());
} else {
return null; // not corrupt
}
default:
return false;
return null;
}
case RBW:
case RWR:
if (!storedBlock.isComplete()) {
return false;
return null; // not corrupt
} else if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
return true;
return new BlockToMarkCorrupt(storedBlock,
"reported " + reportedState + " replica with genstamp " +
iblk.getGenerationStamp() + " does not match COMPLETE block's " +
"genstamp in block map " + storedBlock.getGenerationStamp());
} else { // COMPLETE block, same genstamp
if (reportedState == ReplicaState.RBW) {
// If it's a RBW report for a COMPLETE block, it may just be that
@ -1558,18 +1597,22 @@ public class BlockManager {
LOG.info("Received an RBW replica for block " + storedBlock +
" on " + dn.getName() + ": ignoring it, since the block is " +
"complete with the same generation stamp.");
return false;
return null;
} else {
return true;
return new BlockToMarkCorrupt(storedBlock,
"reported replica has invalid state " + reportedState);
}
}
case RUR: // should not be reported
case TEMPORARY: // should not be reported
default:
LOG.warn("Unexpected replica state " + reportedState
+ " for block: " + storedBlock +
" on " + dn.getName() + " size " + storedBlock.getNumBytes());
return true;
String msg = "Unexpected replica state " + reportedState
+ " for block: " + storedBlock +
" on " + dn.getName() + " size " + storedBlock.getNumBytes();
// log here at WARN level since this is really a broken HDFS
// invariant
LOG.warn(msg);
return new BlockToMarkCorrupt(storedBlock, msg);
}
}
@ -2100,7 +2143,7 @@ public class BlockManager {
// blockReceived reports a finalized block
Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
processReportedBlock(node, block, ReplicaState.FINALIZED,
toAdd, toInvalidate, toCorrupt, toUC);
@ -2121,8 +2164,8 @@ public class BlockManager {
+ " does not belong to any file.");
addToInvalidates(b, node);
}
for (BlockInfo b : toCorrupt) {
markBlockAsCorrupt(b, node);
for (BlockToMarkCorrupt b : toCorrupt) {
markBlockAsCorrupt(b.blockInfo, node, b.reason);
}
}

View File

@ -44,25 +44,37 @@ public class CorruptReplicasMap{
*
* @param blk Block to be added to CorruptReplicasMap
* @param dn DatanodeDescriptor which holds the corrupt replica
* @param reason a textual reason (for logging purposes)
*/
public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn) {
public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
String reason) {
Collection<DatanodeDescriptor> nodes = getNodes(blk);
if (nodes == null) {
nodes = new TreeSet<DatanodeDescriptor>();
corruptReplicasMap.put(blk, nodes);
}
String reasonText;
if (reason != null) {
reasonText = " because " + reason;
} else {
reasonText = "";
}
if (!nodes.contains(dn)) {
nodes.add(dn);
NameNode.stateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
blk.getBlockName() +
" added as corrupt on " + dn.getName() +
" by " + Server.getRemoteIp());
" by " + Server.getRemoteIp() +
reasonText);
} else {
NameNode.stateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
"duplicate requested for " +
blk.getBlockName() + " to add as corrupt " +
"on " + dn.getName() +
" by " + Server.getRemoteIp());
" by " + Server.getRemoteIp() +
reasonText);
}
}

View File

@ -549,7 +549,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
DatanodeInfo[] nodes = blocks[i].getLocations();
for (int j = 0; j < nodes.length; j++) {
DatanodeInfo dn = nodes[j];
namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn);
namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn,
"client machine reported it");
}
}
}

View File

@ -147,7 +147,7 @@ public class TestFileCorruption extends TestCase {
DatanodeRegistration dnR =
DataNodeTestUtils.getDNRegistrationForBP(dataNode, blk.getBlockPoolId());
cluster.getNamesystem().getBlockManager().findAndMarkBlockAsCorrupt(
blk, new DatanodeInfo(dnR));
blk, new DatanodeInfo(dnR), "TEST");
// open the file
fs.open(FILE_PATH);

View File

@ -83,14 +83,14 @@ public class TestCorruptReplicaInfo extends TestCase {
DatanodeDescriptor dn1 = new DatanodeDescriptor();
DatanodeDescriptor dn2 = new DatanodeDescriptor();
crm.addToCorruptReplicasMap(getBlock(0), dn1);
crm.addToCorruptReplicasMap(getBlock(0), dn1, "TEST");
assertEquals("Number of corrupt blocks not returning correctly",
1, crm.size());
crm.addToCorruptReplicasMap(getBlock(1), dn1);
crm.addToCorruptReplicasMap(getBlock(1), dn1, "TEST");
assertEquals("Number of corrupt blocks not returning correctly",
2, crm.size());
crm.addToCorruptReplicasMap(getBlock(1), dn2);
crm.addToCorruptReplicasMap(getBlock(1), dn2, "TEST");
assertEquals("Number of corrupt blocks not returning correctly",
2, crm.size());
@ -103,7 +103,7 @@ public class TestCorruptReplicaInfo extends TestCase {
0, crm.size());
for (Long block_id: block_ids) {
crm.addToCorruptReplicasMap(getBlock(block_id), dn1);
crm.addToCorruptReplicasMap(getBlock(block_id), dn1, "TEST");
}
assertEquals("Number of corrupt blocks not returning correctly",

View File

@ -172,7 +172,8 @@ public class TestNameNodeMetrics {
// Corrupt first replica of the block
LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), file.toString(), 0, 1).get(0);
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
"TEST");
updateMetrics();
MetricsRecordBuilder rb = getMetrics(NS_METRICS);
assertGauge("CorruptBlocks", 1L, rb);
@ -211,7 +212,8 @@ public class TestNameNodeMetrics {
// Corrupt the only replica of the block to result in a missing block
LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), file.toString(), 0, 1).get(0);
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
"TEST");
updateMetrics();
MetricsRecordBuilder rb = getMetrics(NS_METRICS);
assertGauge("UnderReplicatedBlocks", 1L, rb);