HDFS-7907. Erasure Coding: track invalid, corrupt, and under-recovery striped blocks in NameNode. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-03-30 11:25:09 -07:00 committed by Zhe Zhang
parent 97378e4cd0
commit abf833a7b2
6 changed files with 175 additions and 155 deletions

View File

@ -18,11 +18,13 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import java.io.DataOutput;
import java.io.IOException;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
/**
* Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
*
@ -37,7 +39,6 @@ import java.io.IOException;
* array to record the block index for each triplet.
*/
public class BlockInfoStriped extends BlockInfo {
private final int chunkSize = HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
private final short dataBlockNum;
private final short parityBlockNum;
/**
@ -132,6 +133,22 @@ public class BlockInfoStriped extends BlockInfo {
return i == -1 ? -1 : indices[i];
}
/**
* Identify the block stored in the given datanode storage. Note that
* the returned block has the same block Id with the one seen/reported by the
* DataNode.
*/
Block getBlockOnStorage(DatanodeStorageInfo storage) {
int index = getStorageBlockIndex(storage);
if (index < 0) {
return null;
} else {
Block block = new Block(this);
block.setBlockId(this.getBlockId() + index);
return block;
}
}
@Override
boolean removeStorage(DatanodeStorageInfo storage) {
int dnIndex = findStorageInfoFromEnd(storage);
@ -186,8 +203,8 @@ public class BlockInfoStriped extends BlockInfo {
// In case striped blocks, total usage by this striped blocks should
// be the total of data blocks and parity blocks because
// `getNumBytes` is the total of actual data block size.
return ((getNumBytes() - 1) / (dataBlockNum * chunkSize) + 1)
* chunkSize * parityBlockNum + getNumBytes();
return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CHUNK_SIZE) + 1)
* BLOCK_STRIPED_CHUNK_SIZE * parityBlockNum + getNumBytes();
}
@Override

View File

@ -179,7 +179,11 @@ public class BlockManager {
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
/** Blocks to be invalidated. */
/**
* Blocks to be invalidated.
* For a striped block to invalidate, we should track its individual internal
* blocks.
*/
private final InvalidateBlocks invalidateBlocks;
/**
@ -195,8 +199,8 @@ public class BlockManager {
* Maps a StorageID to the set of blocks that are "extra" for this
* DataNode. We'll eventually remove these extras.
*/
public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
new TreeMap<String, LightWeightLinkedSet<Block>>();
public final Map<String, LightWeightLinkedSet<BlockInfo>> excessReplicateMap =
new TreeMap<>();
/**
* Store set of Blocks that need to be replicated 1 or more times.
@ -594,11 +598,11 @@ public class BlockManager {
((BlockInfoStriped) block).getDataBlockNum() : minReplication;
}
public boolean checkMinStorage(BlockInfo block) {
public boolean hasMinStorage(BlockInfo block) {
return countNodes(block).liveReplicas() >= getMinStorageNum(block);
}
public boolean checkMinStorage(BlockInfo block, int liveNum) {
public boolean hasMinStorage(BlockInfo block, int liveNum) {
return liveNum >= getMinStorageNum(block);
}
@ -643,7 +647,7 @@ public class BlockManager {
return false; // already completed (e.g. by syncBlock)
final boolean b = commitBlock(lastBlock, commitBlock);
if (checkMinStorage(lastBlock)) {
if (hasMinStorage(lastBlock)) {
completeBlock(bc, bc.numBlocks() - 1, false);
}
return b;
@ -667,7 +671,7 @@ public class BlockManager {
}
int numNodes = curBlock.numNodes();
if (!force && !checkMinStorage(curBlock, numNodes)) {
if (!force && !hasMinStorage(curBlock, numNodes)) {
throw new IOException("Cannot complete block: " +
"block does not satisfy minimal replication requirement.");
}
@ -765,7 +769,7 @@ public class BlockManager {
// count in safe-mode.
namesystem.adjustSafeModeBlockTotals(
// decrement safe if we had enough
checkMinStorage(oldBlock, targets.length) ? -1 : 0,
hasMinStorage(oldBlock, targets.length) ? -1 : 0,
// always decrement total blocks
-1);
@ -1099,7 +1103,7 @@ public class BlockManager {
/** Remove the blocks associated to the given datanode. */
void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
final Iterator<? extends Block> it = node.getBlockIterator();
final Iterator<BlockInfo> it = node.getBlockIterator();
while(it.hasNext()) {
removeStoredBlock(it.next(), node);
}
@ -1113,10 +1117,10 @@ public class BlockManager {
/** Remove the blocks associated to the given DatanodeStorageInfo. */
void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
assert namesystem.hasWriteLock();
final Iterator<? extends Block> it = storageInfo.getBlockIterator();
final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
while(it.hasNext()) {
Block block = it.next();
BlockInfo block = it.next();
removeStoredBlock(block, node);
invalidateBlocks.remove(node, block);
}
@ -1138,21 +1142,32 @@ public class BlockManager {
* Adds block to list of blocks which will be invalidated on all its
* datanodes.
*/
private void addToInvalidates(Block b) {
private void addToInvalidates(BlockInfo storedBlock) {
if (!namesystem.isPopulatingReplQueues()) {
return;
}
StringBuilder datanodes = new StringBuilder();
for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock,
State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
invalidateBlocks.add(b, node, false);
datanodes.append(node).append(" ");
final Block b = getBlockToInvalidate(storedBlock, storage);
if (b != null) {
invalidateBlocks.add(b, node, false);
datanodes.append(node).append(" ");
}
}
if (datanodes.length() != 0) {
blockLog.info("BLOCK* addToInvalidates: {} {}", b, datanodes.toString());
blockLog.info("BLOCK* addToInvalidates: {} {}", storedBlock,
datanodes.toString());
}
}
private Block getBlockToInvalidate(BlockInfo storedBlock,
DatanodeStorageInfo storage) {
return storedBlock.isStriped() ?
((BlockInfoStriped) storedBlock).getBlockOnStorage(storage) : storedBlock;
}
/**
* Remove all block invalidation tasks under this datanode UUID;
* used when a datanode registers with a new UUID and the old one
@ -1210,18 +1225,18 @@ public class BlockManager {
DatanodeStorageInfo storageInfo,
DatanodeDescriptor node) throws IOException {
if (b.corrupted.isDeleted()) {
if (b.stored.isDeleted()) {
blockLog.info("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
" corrupt as it does not belong to any file", b);
addToInvalidates(b.corrupted, node);
return;
}
short expectedReplicas =
b.corrupted.getBlockCollection().getPreferredBlockReplication();
b.stored.getBlockCollection().getPreferredBlockReplication();
// Add replica to the data-node if it is not already there
if (storageInfo != null) {
storageInfo.addBlock(b.stored, b.reportedBlock);
storageInfo.addBlock(b.stored, b.corrupted);
}
// Add this replica to corruptReplicas Map
@ -1231,8 +1246,10 @@ public class BlockManager {
NumberReplicas numberOfReplicas = countNodes(b.stored);
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
expectedReplicas;
boolean minReplicationSatisfied = checkMinStorage(b.stored,
boolean minReplicationSatisfied = hasMinStorage(b.stored,
numberOfReplicas.liveReplicas());
boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
expectedReplicas;
@ -1424,7 +1441,7 @@ public class BlockManager {
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
(blockHasEnoughRacks(block, requiredReplication)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
blockLog.info("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block);
@ -1507,7 +1524,7 @@ public class BlockManager {
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
(blockHasEnoughRacks(block, requiredReplication)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null;
blockLog.info("BLOCK* Removing {} from neededReplications as" +
@ -1517,7 +1534,7 @@ public class BlockManager {
}
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
(!blockHasEnoughRacks(block)) ) {
(!blockHasEnoughRacks(block, requiredReplication)) ) {
if (rw.srcNodes[0].getNetworkLocation().equals(
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
//No use continuing, unless a new rack in this case
@ -1711,7 +1728,7 @@ public class BlockManager {
getStorageBlockIndex(storage));
}
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
LightWeightLinkedSet<Block> excessBlocks =
LightWeightLinkedSet<BlockInfo> excessBlocks =
excessReplicateMap.get(node.getDatanodeUuid());
int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
@ -1847,39 +1864,32 @@ public class BlockManager {
* list of blocks that should be considered corrupt due to a block report.
*/
private static class BlockToMarkCorrupt {
/** The corrupted block in a datanode. */
final BlockInfo corrupted;
/**
* The corrupted block in a datanode. This is the one reported by the
* datanode.
*/
final Block corrupted;
/** The corresponding block stored in the BlockManager. */
final BlockInfo stored;
/** The block reported from a datanode */
final Block reportedBlock;
/** The reason to mark corrupt. */
final String reason;
/** The reason code to be stored */
final Reason reasonCode;
BlockToMarkCorrupt(Block reported, BlockInfo corrupted,
BlockInfo stored, String reason, Reason reasonCode) {
Preconditions.checkNotNull(reported, "reported is null");
BlockToMarkCorrupt(Block corrupted, BlockInfo stored, String reason,
Reason reasonCode) {
Preconditions.checkNotNull(corrupted, "corrupted is null");
Preconditions.checkNotNull(stored, "stored is null");
this.reportedBlock = reported;
this.corrupted = corrupted;
this.stored = stored;
this.reason = reason;
this.reasonCode = reasonCode;
}
BlockToMarkCorrupt(Block reported, BlockInfo stored, String reason,
Reason reasonCode) {
this(reported, stored, stored, reason, reasonCode);
}
BlockToMarkCorrupt(Block reported, BlockInfo stored, long gs,
BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs,
String reason, Reason reasonCode) {
this(reported, BlockInfo.copyOf(stored), stored, reason,
reasonCode);
this(corrupted, stored, reason, reasonCode);
//the corrupted block in datanode has a different generation stamp
corrupted.setGenerationStamp(gs);
}
@ -2098,10 +2108,10 @@ public class BlockManager {
// between the old and new block report.
//
Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
Collection<Block> toRemove = new TreeSet<Block>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
Collection<BlockInfo> toRemove = new TreeSet<>();
Collection<Block> toInvalidate = new LinkedList<>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
Collection<StatefulBlockInfo> toUC = new LinkedList<>();
reportDiff(storageInfo, report,
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
@ -2110,7 +2120,7 @@ public class BlockManager {
for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b, storageInfo);
}
for (Block b : toRemove) {
for (BlockInfo b : toRemove) {
removeStoredBlock(b, node);
}
int numBlocksLogged = 0;
@ -2250,7 +2260,7 @@ public class BlockManager {
private void reportDiff(DatanodeStorageInfo storageInfo,
BlockListAsLongs newReport,
Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor
Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
@ -2285,8 +2295,9 @@ public class BlockManager {
// collect blocks that have not been reported
// all of them are next to the delimiter
Iterator<BlockInfo> it = storageInfo.new BlockIterator(delimiter.getNext(0));
while(it.hasNext())
while (it.hasNext()) {
toRemove.add(it.next());
}
storageInfo.removeBlock(delimiter);
}
@ -2617,7 +2628,7 @@ public class BlockManager {
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
&& checkMinStorage(storedBlock, numCurrentReplica)) {
&& hasMinStorage(storedBlock, numCurrentReplica)) {
completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
// check whether safe replication is reached for the block
@ -2692,7 +2703,7 @@ public class BlockManager {
+ pendingReplications.getNumReplicas(storedBlock);
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
checkMinStorage(storedBlock, numLiveReplicas)) {
hasMinStorage(storedBlock, numLiveReplicas)) {
storedBlock = completeBlock(bc, storedBlock, false);
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
// check whether safe replication is reached for the block
@ -2730,7 +2741,7 @@ public class BlockManager {
int numCorruptNodes = num.corruptReplicas();
if (numCorruptNodes != corruptReplicasCount) {
LOG.warn("Inconsistent number of corrupt replicas for " +
storedBlock + "blockMap has " + numCorruptNodes +
storedBlock + ". blockMap has " + numCorruptNodes +
" but corrupt replicas map has " + corruptReplicasCount);
}
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
@ -3004,14 +3015,14 @@ public class BlockManager {
* If there are any extras, call chooseExcessReplicates() to
* mark them in the excessReplicateMap.
*/
private void processOverReplicatedBlock(final Block block,
private void processOverReplicatedBlock(final BlockInfo block,
final short replication, final DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
assert namesystem.hasWriteLock();
if (addedNode == delNodeHint) {
delNodeHint = null;
}
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
@ -3025,8 +3036,8 @@ public class BlockManager {
postponeBlock(block);
return;
}
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
.getDatanodeUuid());
LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
cur.getDatanodeUuid());
if (excessBlocks == null || !excessBlocks.contains(block)) {
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
// exclude corrupt replicas
@ -3056,22 +3067,22 @@ public class BlockManager {
* then pick a node with least free space
*/
private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess,
Block b, short replication,
BlockInfo storedBlock, short replication,
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint,
BlockPlacementPolicy replicator) {
assert namesystem.hasWriteLock();
// first form a rack to datanodes map and
BlockCollection bc = getBlockCollection(b);
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
BlockCollection bc = getBlockCollection(storedBlock);
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
bc.getStoragePolicyID());
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
final Map<String, List<DatanodeStorageInfo>> rackMap
= new HashMap<String, List<DatanodeStorageInfo>>();
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>();
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
// split nodes into two sets
// moreThanOne contains nodes on rack with more than one replica
@ -3092,7 +3103,7 @@ public class BlockManager {
moreThanOne, excessTypes)) {
cur = delNodeHintStorage;
} else { // regular excessive replica removal
cur = replicator.chooseReplicaToDelete(bc, b, replication,
cur = replicator.chooseReplicaToDelete(bc, storedBlock, replication,
moreThanOne, exactlyOne, excessTypes);
}
firstOne = false;
@ -3102,7 +3113,7 @@ public class BlockManager {
exactlyOne, cur);
nonExcess.remove(cur);
addToExcessReplicate(cur.getDatanodeDescriptor(), b);
addToExcessReplicate(cur.getDatanodeDescriptor(), storedBlock);
//
// The 'excessblocks' tracks blocks until we get confirmation
@ -3111,11 +3122,12 @@ public class BlockManager {
//
// The 'invalidate' list is used to inform the datanode the block
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the namenode.
// upon giving instructions to the datanodes.
//
addToInvalidates(b, cur.getDatanodeDescriptor());
final Block blockToInvalidate = getBlockToInvalidate(storedBlock, cur);
addToInvalidates(blockToInvalidate, cur.getDatanodeDescriptor());
blockLog.info("BLOCK* chooseExcessReplicates: "
+"({}, {}) is added to invalidated blocks set", cur, b);
+"({}, {}) is added to invalidated blocks set", cur, storedBlock);
}
}
@ -3140,17 +3152,18 @@ public class BlockManager {
}
}
private void addToExcessReplicate(DatanodeInfo dn, Block block) {
private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
assert namesystem.hasWriteLock();
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid());
LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
dn.getDatanodeUuid());
if (excessBlocks == null) {
excessBlocks = new LightWeightLinkedSet<Block>();
excessBlocks = new LightWeightLinkedSet<>();
excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks);
}
if (excessBlocks.add(block)) {
if (excessBlocks.add(storedBlock)) {
excessBlocksCount.incrementAndGet();
blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to"
+ " excessReplicateMap", dn, block);
+ " excessReplicateMap", dn, storedBlock);
}
}
@ -3169,14 +3182,13 @@ public class BlockManager {
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
* removed block is still valid.
*/
public void removeStoredBlock(Block block, DatanodeDescriptor node) {
blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node);
assert (namesystem.hasWriteLock());
{
BlockInfo storedBlock = getStoredBlock(block);
if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
" removed from node {}", block, node);
" removed from node {}", storedBlock, node);
return;
}
@ -3186,7 +3198,7 @@ public class BlockManager {
// necessary. In that case, put block on a possibly-will-
// be-replicated list.
//
BlockCollection bc = blocksMap.getBlockCollection(block);
BlockCollection bc = storedBlock.getBlockCollection();
if (bc != null) {
namesystem.decrementSafeBlockCount(storedBlock);
updateNeededReplications(storedBlock, -1, 0);
@ -3196,13 +3208,13 @@ public class BlockManager {
// We've removed a block from a node, so it's definitely no longer
// in "excess" there.
//
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
.getDatanodeUuid());
LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
node.getDatanodeUuid());
if (excessBlocks != null) {
if (excessBlocks.remove(block)) {
if (excessBlocks.remove(storedBlock)) {
excessBlocksCount.decrementAndGet();
blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " +
"excessBlocks", block);
"excessBlocks", storedBlock);
if (excessBlocks.size() == 0) {
excessReplicateMap.remove(node.getDatanodeUuid());
}
@ -3210,7 +3222,7 @@ public class BlockManager {
}
// Remove the replica from corruptReplicas
corruptReplicas.removeFromCorruptReplicasMap(block, node);
corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node);
}
}
@ -3344,7 +3356,7 @@ public class BlockManager {
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
switch (rdbi.getStatus()) {
case DELETED_BLOCK:
removeStoredBlock(storageInfo, rdbi.getBlock(), node);
removeStoredBlock(storageInfo, getStoredBlock(rdbi.getBlock()), node);
deleted++;
break;
case RECEIVED_BLOCK:
@ -3395,8 +3407,8 @@ public class BlockManager {
} else if (node.isDecommissioned()) {
decommissioned++;
} else {
LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
.getDatanodeUuid());
LightWeightLinkedSet<BlockInfo> blocksExcess = excessReplicateMap.get(
node.getDatanodeUuid());
if (blocksExcess != null && blocksExcess.contains(b)) {
excess++;
} else {
@ -3449,13 +3461,13 @@ public class BlockManager {
int numOverReplicated = 0;
while(it.hasNext()) {
final BlockInfo block = it.next();
BlockCollection bc = blocksMap.getBlockCollection(block);
short expectedReplication = bc.getPreferredBlockReplication();
int expectedReplication = this.getReplication(block);
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
if (numCurrentReplica > expectedReplication) {
// over-replicated block
processOverReplicatedBlock(block, expectedReplication, null, null);
processOverReplicatedBlock(block, (short) expectedReplication, null,
null);
numOverReplicated++;
}
}
@ -3655,21 +3667,20 @@ public class BlockManager {
return toInvalidate.size();
}
boolean blockHasEnoughRacks(Block b) {
// TODO: update the enough rack logic for striped blocks
boolean blockHasEnoughRacks(BlockInfo storedBlock, int expectedStorageNum) {
if (!this.shouldCheckForEnoughRacks) {
return true;
}
boolean enoughRacks = false;
Collection<DatanodeDescriptor> corruptNodes =
corruptReplicas.getNodes(b);
int numExpectedReplicas = getReplication(b);
Collection<DatanodeDescriptor> corruptNodes =
corruptReplicas.getNodes(storedBlock);
String rackName = null;
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
if (numExpectedReplicas == 1 ||
(numExpectedReplicas > 1 &&
if (expectedStorageNum == 1 || (expectedStorageNum > 1 &&
!datanodeManager.hasClusterEverBeenMultiRack())) {
enoughRacks = true;
break;
@ -3691,8 +3702,8 @@ public class BlockManager {
* A block needs replication if the number of replicas is less than expected
* or if it does not have enough racks.
*/
boolean isNeededReplication(Block b, int expected, int current) {
return current < expected || !blockHasEnoughRacks(b);
boolean isNeededReplication(BlockInfo storedBlock, int expected, int current) {
return current < expected || !blockHasEnoughRacks(storedBlock, expected);
}
public long getMissingBlocksCount() {
@ -3876,8 +3887,7 @@ public class BlockManager {
/**
* This class is used internally by {@link this#computeRecoveryWorkForBlocks}
* to represent a task to recover a block through replication or erasure
* coding. Recovery is done by transferring data from {@link srcNodes} to
* {@link targets}
* coding. Recovery is done by transferring data from srcNodes to targets
*/
private static class BlockRecoveryWork {
protected final BlockInfo block;

View File

@ -100,7 +100,7 @@ public class DecommissionManager {
* reports or other events. Before being finally marking as decommissioned,
* another check is done with the actual block map.
*/
private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>
private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfo>>
decomNodeBlocks;
/**
@ -244,12 +244,12 @@ public class DecommissionManager {
}
/**
* Checks whether a block is sufficiently replicated for decommissioning.
* Full-strength replication is not always necessary, hence "sufficient".
* Checks whether a block is sufficiently replicated/stored for
* decommissioning. For replicated blocks or striped blocks, full-strength
* replication or storage is not always necessary, hence "sufficient".
* @return true if sufficient, else false.
*/
private boolean isSufficientlyReplicated(BlockInfoContiguous block,
BlockCollection bc,
private boolean isSufficient(BlockInfo block, BlockCollection bc,
NumberReplicas numberReplicas) {
final int numExpected = bc.getPreferredBlockReplication();
final int numLive = numberReplicas.liveReplicas();
@ -265,18 +265,19 @@ public class DecommissionManager {
if (numExpected > numLive) {
if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
// Can decom a UC block as long as there will still be minReplicas
if (numLive >= blockManager.minReplication) {
if (blockManager.hasMinStorage(block, numLive)) {
LOG.trace("UC block {} sufficiently-replicated since numLive ({}) "
+ ">= minR ({})", block, numLive, blockManager.minReplication);
+ ">= minR ({})", block, numLive,
blockManager.getMinStorageNum(block));
return true;
} else {
LOG.trace("UC block {} insufficiently-replicated since numLive "
+ "({}) < minR ({})", block, numLive,
blockManager.minReplication);
blockManager.getMinStorageNum(block));
}
} else {
// Can decom a non-UC as long as the default replication is met
if (numLive >= blockManager.defaultReplication) {
if (numLive >= blockManager.getDefaultStorageNum(block)) {
return true;
}
}
@ -412,7 +413,7 @@ public class DecommissionManager {
}
private void check() {
final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>>
final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>>
it = new CyclicIteration<>(decomNodeBlocks, iterkey).iterator();
final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
@ -420,10 +421,10 @@ public class DecommissionManager {
&& !exceededNumBlocksPerCheck()
&& !exceededNumNodesPerCheck()) {
numNodesChecked++;
final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>
final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
entry = it.next();
final DatanodeDescriptor dn = entry.getKey();
AbstractList<BlockInfoContiguous> blocks = entry.getValue();
AbstractList<BlockInfo> blocks = entry.getValue();
boolean fullScan = false;
if (blocks == null) {
// This is a newly added datanode, run through its list to schedule
@ -431,14 +432,14 @@ public class DecommissionManager {
// that are insufficiently replicated for further tracking
LOG.debug("Newly-added node {}, doing full scan to find " +
"insufficiently-replicated blocks.", dn);
blocks = handleInsufficientlyReplicated(dn);
blocks = handleInsufficientlyStored(dn);
decomNodeBlocks.put(dn, blocks);
fullScan = true;
} else {
// This is a known datanode, check if its # of insufficiently
// replicated blocks has dropped to zero and if it can be decommed
LOG.debug("Processing decommission-in-progress node {}", dn);
pruneSufficientlyReplicated(dn, blocks);
pruneReliableBlocks(dn, blocks);
}
if (blocks.size() == 0) {
if (!fullScan) {
@ -450,7 +451,7 @@ public class DecommissionManager {
// marking the datanode as decommissioned
LOG.debug("Node {} has finished replicating current set of "
+ "blocks, checking with the full block map.", dn);
blocks = handleInsufficientlyReplicated(dn);
blocks = handleInsufficientlyStored(dn);
decomNodeBlocks.put(dn, blocks);
}
// If the full scan is clean AND the node liveness is okay,
@ -491,27 +492,25 @@ public class DecommissionManager {
}
/**
* Removes sufficiently replicated blocks from the block list of a
* datanode.
* Removes reliable blocks from the block list of a datanode.
*/
private void pruneSufficientlyReplicated(final DatanodeDescriptor datanode,
AbstractList<BlockInfoContiguous> blocks) {
private void pruneReliableBlocks(final DatanodeDescriptor datanode,
AbstractList<BlockInfo> blocks) {
processBlocksForDecomInternal(datanode, blocks.iterator(), null, true);
}
/**
* Returns a list of blocks on a datanode that are insufficiently
* replicated, i.e. are under-replicated enough to prevent decommission.
* Returns a list of blocks on a datanode that are insufficiently replicated
* or require recovery, i.e. requiring recovery and should prevent
* decommission.
* <p/>
* As part of this, it also schedules replication work for
* any under-replicated blocks.
* As part of this, it also schedules replication/recovery work.
*
* @param datanode
* @return List of insufficiently replicated blocks
* @return List of blocks requiring recovery
*/
private AbstractList<BlockInfoContiguous> handleInsufficientlyReplicated(
private AbstractList<BlockInfo> handleInsufficientlyStored(
final DatanodeDescriptor datanode) {
AbstractList<BlockInfoContiguous> insufficient = new ChunkedArrayList<>();
AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>();
processBlocksForDecomInternal(datanode, datanode.getBlockIterator(),
insufficient, false);
return insufficient;
@ -520,24 +519,22 @@ public class DecommissionManager {
/**
* Used while checking if decommission-in-progress datanodes can be marked
* as decommissioned. Combines shared logic of
* pruneSufficientlyReplicated and handleInsufficientlyReplicated.
* pruneReliableBlocks and handleInsufficientlyStored.
*
* @param datanode Datanode
* @param it Iterator over the blocks on the
* datanode
* @param insufficientlyReplicated Return parameter. If it's not null,
* @param insufficientList Return parameter. If it's not null,
* will contain the insufficiently
* replicated-blocks from the list.
* @param pruneSufficientlyReplicated whether to remove sufficiently
* replicated blocks from the iterator
* @return true if there are under-replicated blocks in the provided block
* iterator, else false.
* @param pruneReliableBlocks whether to remove blocks reliable
* enough from the iterator
*/
private void processBlocksForDecomInternal(
final DatanodeDescriptor datanode,
final Iterator<? extends BlockInfo> it,
final List<BlockInfoContiguous> insufficientlyReplicated,
boolean pruneSufficientlyReplicated) {
final Iterator<BlockInfo> it,
final List<BlockInfo> insufficientList,
boolean pruneReliableBlocks) {
boolean firstReplicationLog = true;
int underReplicatedBlocks = 0;
int decommissionOnlyReplicas = 0;
@ -552,7 +549,7 @@ public class DecommissionManager {
it.remove();
continue;
}
BlockCollection bc = blockManager.blocksMap.getBlockCollection(block);
BlockCollection bc = blockManager.getBlockCollection(block);
if (bc == null) {
// Orphan block, will be invalidated eventually. Skip.
continue;
@ -560,7 +557,6 @@ public class DecommissionManager {
final NumberReplicas num = blockManager.countNodes(block);
final int liveReplicas = num.liveReplicas();
final int curReplicas = liveReplicas;
// Schedule under-replicated blocks for replication if not already
// pending
@ -571,7 +567,7 @@ public class DecommissionManager {
namesystem.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
blockManager.neededReplications.add(block,
curReplicas,
liveReplicas,
num.decommissionedAndDecommissioning(),
bc.getPreferredBlockReplication());
}
@ -579,17 +575,16 @@ public class DecommissionManager {
// Even if the block is under-replicated,
// it doesn't block decommission if it's sufficiently replicated
BlockInfoContiguous blk = (BlockInfoContiguous) block;
if (isSufficientlyReplicated(blk, bc, num)) {
if (pruneSufficientlyReplicated) {
if (isSufficient(block, bc, num)) {
if (pruneReliableBlocks) {
it.remove();
}
continue;
}
// We've found an insufficiently replicated block.
if (insufficientlyReplicated != null) {
insufficientlyReplicated.add(blk);
if (insufficientList != null) {
insufficientList.add(block);
}
// Log if this is our first time through
if (firstReplicationLog) {
@ -602,7 +597,7 @@ public class DecommissionManager {
if (bc.isUnderConstruction()) {
underReplicatedInOpenFiles++;
}
if ((curReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) {
if ((liveReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) {
decommissionOnlyReplicas++;
}
}

View File

@ -3273,7 +3273,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (trackBlockCounts) {
if (b.isComplete()) {
numRemovedComplete++;
if (blockManager.checkMinStorage(b, b.numNodes())) {
if (blockManager.hasMinStorage(b, b.numNodes())) {
numRemovedSafe++;
}
}
@ -3502,7 +3502,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
curBlock = blocks[nrCompleteBlocks];
if(!curBlock.isComplete())
break;
assert blockManager.checkMinStorage(curBlock) :
assert blockManager.hasMinStorage(curBlock) :
"A COMPLETE block is not minimally replicated in " + src;
}
@ -3538,7 +3538,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// If penultimate block doesn't exist then its minReplication is met
boolean penultimateBlockMinStorage = penultimateBlock == null ||
blockManager.checkMinStorage(penultimateBlock);
blockManager.hasMinStorage(penultimateBlock);
switch(lastBlockState) {
case COMPLETE:
@ -3547,7 +3547,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
case COMMITTED:
// Close file if committed blocks are minimally replicated
if(penultimateBlockMinStorage &&
blockManager.checkMinStorage(lastBlock)) {
blockManager.hasMinStorage(lastBlock)) {
finalizeINodeFileUnderConstruction(src, pendingFile,
iip.getLatestSnapshotId());
NameNode.stateChangeLog.warn("BLOCK*"

View File

@ -100,7 +100,7 @@ public class TestNodeCount {
DatanodeDescriptor nonExcessDN = null;
for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) {
final DatanodeDescriptor dn = storage.getDatanodeDescriptor();
Collection<Block> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
Collection<BlockInfo> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
if (blocks == null || !blocks.contains(block.getLocalBlock()) ) {
nonExcessDN = dn;
break;

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -42,7 +41,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.util.Time;
import org.junit.Test;
public class TestOverReplicatedBlocks {
@ -185,7 +183,7 @@ public class TestOverReplicatedBlocks {
// All replicas for deletion should be scheduled on lastDN.
// And should not actually be deleted, because lastDN does not heartbeat.
namesystem.readLock();
Collection<Block> dnBlocks =
Collection<BlockInfo> dnBlocks =
namesystem.getBlockManager().excessReplicateMap.get(lastDNid);
assertEquals("Replicas on node " + lastDNid + " should have been deleted",
SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size());