HDFS-10236. Erasure Coding: Rename replication-based names in BlockManager to more generic [part-3]. Contributed by Rakesh R.

This commit is contained in:
Zhe Zhang 2016-05-26 16:49:57 -07:00
parent 1b0cfc84e7
commit 8c84a2a93c
4 changed files with 40 additions and 40 deletions

View File

@ -775,7 +775,7 @@ public class BlockManager implements BlockStatsMXBean {
/** /**
* Commit the last block of the file and mark it as complete if it has * Commit the last block of the file and mark it as complete if it has
* meets the minimum replication requirement * meets the minimum redundancy requirement
* *
* @param bc block collection * @param bc block collection
* @param commitBlock - contains client reported block length and generation * @param commitBlock - contains client reported block length and generation
@ -911,7 +911,7 @@ public class BlockManager implements BlockStatsMXBean {
NumberReplicas replicas = countNodes(lastBlock); NumberReplicas replicas = countNodes(lastBlock);
neededReconstruction.remove(lastBlock, replicas.liveReplicas(), neededReconstruction.remove(lastBlock, replicas.liveReplicas(),
replicas.readOnlyReplicas(), replicas.readOnlyReplicas(),
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock)); replicas.decommissionedAndDecommissioning(), getRedundancy(lastBlock));
pendingReconstruction.remove(lastBlock); pendingReconstruction.remove(lastBlock);
// remove this block from the list of pending blocks to be deleted. // remove this block from the list of pending blocks to be deleted.
@ -1400,8 +1400,8 @@ public class BlockManager implements BlockStatsMXBean {
addToInvalidates(b.getCorrupted(), node); addToInvalidates(b.getCorrupted(), node);
return; return;
} }
short expectedReplicas = short expectedRedundancies =
getExpectedReplicaNum(b.getStored()); getExpectedRedundancyNum(b.getStored());
// Add replica to the data-node if it is not already there // Add replica to the data-node if it is not already there
if (storageInfo != null) { if (storageInfo != null) {
@ -1419,14 +1419,14 @@ public class BlockManager implements BlockStatsMXBean {
NumberReplicas numberOfReplicas = countNodes(b.getStored()); NumberReplicas numberOfReplicas = countNodes(b.getStored());
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
expectedReplicas; expectedRedundancies;
boolean minReplicationSatisfied = hasMinStorage(b.getStored(), boolean minReplicationSatisfied = hasMinStorage(b.getStored(),
numberOfReplicas.liveReplicas()); numberOfReplicas.liveReplicas());
boolean hasMoreCorruptReplicas = minReplicationSatisfied && boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) > (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
expectedReplicas; expectedRedundancies;
boolean corruptedDuringWrite = minReplicationSatisfied && boolean corruptedDuringWrite = minReplicationSatisfied &&
b.isCorruptedDuringWrite(); b.isCorruptedDuringWrite();
// case 1: have enough number of live replicas // case 1: have enough number of live replicas
@ -1658,7 +1658,7 @@ public class BlockManager implements BlockStatsMXBean {
return null; return null;
} }
short requiredReplication = getExpectedReplicaNum(block); short requiredRedundancy = getExpectedRedundancyNum(block);
// get a source data-node // get a source data-node
List<DatanodeDescriptor> containingNodes = new ArrayList<>(); List<DatanodeDescriptor> containingNodes = new ArrayList<>();
@ -1681,7 +1681,7 @@ public class BlockManager implements BlockStatsMXBean {
int pendingNum = pendingReconstruction.getNumReplicas(block); int pendingNum = pendingReconstruction.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
requiredReplication)) { requiredRedundancy)) {
neededReconstruction.remove(block, priority); neededReconstruction.remove(block, priority);
blockLog.debug("BLOCK* Removing {} from neededReconstruction as" + blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
" it has enough replicas", block); " it has enough replicas", block);
@ -1689,8 +1689,8 @@ public class BlockManager implements BlockStatsMXBean {
} }
int additionalReplRequired; int additionalReplRequired;
if (numReplicas.liveReplicas() < requiredReplication) { if (numReplicas.liveReplicas() < requiredRedundancy) {
additionalReplRequired = requiredReplication - numReplicas.liveReplicas() additionalReplRequired = requiredRedundancy - numReplicas.liveReplicas()
- pendingNum; - pendingNum;
} else { } else {
additionalReplRequired = 1; // Needed on a new rack additionalReplRequired = 1; // Needed on a new rack
@ -1745,11 +1745,11 @@ public class BlockManager implements BlockStatsMXBean {
} }
// do not schedule more if enough replicas is already pending // do not schedule more if enough replicas is already pending
final short requiredReplication = getExpectedReplicaNum(block); final short requiredRedundancy = getExpectedRedundancyNum(block);
NumberReplicas numReplicas = countNodes(block); NumberReplicas numReplicas = countNodes(block);
final int pendingNum = pendingReconstruction.getNumReplicas(block); final int pendingNum = pendingReconstruction.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
requiredReplication)) { requiredRedundancy)) {
neededReconstruction.remove(block, priority); neededReconstruction.remove(block, priority);
rw.resetTargets(); rw.resetTargets();
blockLog.debug("BLOCK* Removing {} from neededReplications as" + blockLog.debug("BLOCK* Removing {} from neededReplications as" +
@ -1758,7 +1758,7 @@ public class BlockManager implements BlockStatsMXBean {
} }
DatanodeStorageInfo[] targets = rw.getTargets(); DatanodeStorageInfo[] targets = rw.getTargets();
if ( (numReplicas.liveReplicas() >= requiredReplication) && if ((numReplicas.liveReplicas() >= requiredRedundancy) &&
(!isPlacementPolicySatisfied(block)) ) { (!isPlacementPolicySatisfied(block)) ) {
if (!isInNewRack(rw.getSrcNodes(), targets[0].getDatanodeDescriptor())) { if (!isInNewRack(rw.getSrcNodes(), targets[0].getDatanodeDescriptor())) {
// No use continuing, unless a new rack in this case // No use continuing, unless a new rack in this case
@ -1783,7 +1783,7 @@ public class BlockManager implements BlockStatsMXBean {
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum; int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
// remove from neededReconstruction // remove from neededReconstruction
if(numEffectiveReplicas + targets.length >= requiredReplication) { if(numEffectiveReplicas + targets.length >= requiredRedundancy) {
neededReconstruction.remove(block, priority); neededReconstruction.remove(block, priority);
} }
return true; return true;
@ -1986,7 +1986,7 @@ public class BlockManager implements BlockStatsMXBean {
if (isNeededReconstruction(bi, num.liveReplicas())) { if (isNeededReconstruction(bi, num.liveReplicas())) {
neededReconstruction.add(bi, num.liveReplicas(), neededReconstruction.add(bi, num.liveReplicas(),
num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
getReplication(bi)); getRedundancy(bi));
} }
} }
} finally { } finally {
@ -2975,16 +2975,16 @@ public class BlockManager implements BlockStatsMXBean {
} }
// handle low redundancy/extra redundancy // handle low redundancy/extra redundancy
short fileReplication = getExpectedReplicaNum(storedBlock); short fileRedundancy = getExpectedRedundancyNum(storedBlock);
if (!isNeededReconstruction(storedBlock, numCurrentReplica)) { if (!isNeededReconstruction(storedBlock, numCurrentReplica)) {
neededReconstruction.remove(storedBlock, numCurrentReplica, neededReconstruction.remove(storedBlock, numCurrentReplica,
num.readOnlyReplicas(), num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), fileReplication); num.decommissionedAndDecommissioning(), fileRedundancy);
} else { } else {
updateNeededReconstructions(storedBlock, curReplicaDelta, 0); updateNeededReconstructions(storedBlock, curReplicaDelta, 0);
} }
if (shouldProcessExtraRedundancy(num, fileReplication)) { if (shouldProcessExtraRedundancy(num, fileRedundancy)) {
processExtraRedundancyBlock(storedBlock, fileReplication, node, processExtraRedundancyBlock(storedBlock, fileRedundancy, node,
delNodeHint); delNodeHint);
} }
// If the file redundancy has reached desired value // If the file redundancy has reached desired value
@ -2996,7 +2996,7 @@ public class BlockManager implements BlockStatsMXBean {
storedBlock + ". blockMap has " + numCorruptNodes + storedBlock + ". blockMap has " + numCorruptNodes +
" but corrupt replicas map has " + corruptReplicasCount); " but corrupt replicas map has " + corruptReplicasCount);
} }
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) { if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileRedundancy)) {
invalidateCorruptReplicas(storedBlock, reportedBlock, num); invalidateCorruptReplicas(storedBlock, reportedBlock, num);
} }
return storedBlock; return storedBlock;
@ -3212,20 +3212,20 @@ public class BlockManager implements BlockStatsMXBean {
// they'll be reached when they are completed or recovered. // they'll be reached when they are completed or recovered.
return MisReplicationResult.UNDER_CONSTRUCTION; return MisReplicationResult.UNDER_CONSTRUCTION;
} }
// calculate current replication // calculate current redundancy
short expectedReplication = getExpectedReplicaNum(block); short expectedRedundancy = getExpectedRedundancyNum(block);
NumberReplicas num = countNodes(block); NumberReplicas num = countNodes(block);
final int numCurrentReplica = num.liveReplicas(); final int numCurrentReplica = num.liveReplicas();
// add to low redundancy queue if need to be // add to low redundancy queue if need to be
if (isNeededReconstruction(block, numCurrentReplica)) { if (isNeededReconstruction(block, numCurrentReplica)) {
if (neededReconstruction.add(block, numCurrentReplica, if (neededReconstruction.add(block, numCurrentReplica,
num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
expectedReplication)) { expectedRedundancy)) {
return MisReplicationResult.UNDER_REPLICATED; return MisReplicationResult.UNDER_REPLICATED;
} }
} }
if (shouldProcessExtraRedundancy(num, expectedReplication)) { if (shouldProcessExtraRedundancy(num, expectedRedundancy)) {
if (num.replicasOnStaleNodes() > 0) { if (num.replicasOnStaleNodes() > 0) {
// If any of the replicas of this block are on nodes that are // If any of the replicas of this block are on nodes that are
// considered "stale", then these replicas may in fact have // considered "stale", then these replicas may in fact have
@ -3236,7 +3236,7 @@ public class BlockManager implements BlockStatsMXBean {
} }
// extra redundancy block // extra redundancy block
processExtraRedundancyBlock(block, expectedReplication, null, null); processExtraRedundancyBlock(block, expectedRedundancy, null, null);
return MisReplicationResult.OVER_REPLICATED; return MisReplicationResult.OVER_REPLICATED;
} }
@ -3843,7 +3843,7 @@ public class BlockManager implements BlockStatsMXBean {
int numExtraRedundancy = 0; int numExtraRedundancy = 0;
while(it.hasNext()) { while(it.hasNext()) {
final BlockInfo block = it.next(); final BlockInfo block = it.next();
int expectedReplication = this.getReplication(block); int expectedReplication = this.getRedundancy(block);
NumberReplicas num = countNodes(block); NumberReplicas num = countNodes(block);
if (shouldProcessExtraRedundancy(num, expectedReplication)) { if (shouldProcessExtraRedundancy(num, expectedReplication)) {
// extra redundancy block // extra redundancy block
@ -3951,7 +3951,7 @@ public class BlockManager implements BlockStatsMXBean {
return; return;
} }
NumberReplicas repl = countNodes(block); NumberReplicas repl = countNodes(block);
int curExpectedReplicas = getReplication(block); int curExpectedReplicas = getRedundancy(block);
if (isNeededReconstruction(block, repl.liveReplicas())) { if (isNeededReconstruction(block, repl.liveReplicas())) {
neededReconstruction.update(block, repl.liveReplicas(), neededReconstruction.update(block, repl.liveReplicas(),
repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(), repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(),
@ -3973,9 +3973,9 @@ public class BlockManager implements BlockStatsMXBean {
* Otherwise, if the block is more than the expected replication factor, * Otherwise, if the block is more than the expected replication factor,
* process it as an extra redundancy block. * process it as an extra redundancy block.
*/ */
public void checkReplication(BlockCollection bc) { public void checkRedundancy(BlockCollection bc) {
for (BlockInfo block : bc.getBlocks()) { for (BlockInfo block : bc.getBlocks()) {
short expected = getExpectedReplicaNum(block); short expected = getExpectedRedundancyNum(block);
final NumberReplicas n = countNodes(block); final NumberReplicas n = countNodes(block);
final int pending = pendingReconstruction.getNumReplicas(block); final int pending = pendingReconstruction.getNumReplicas(block);
if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) { if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
@ -3992,8 +3992,8 @@ public class BlockManager implements BlockStatsMXBean {
* @return 0 if the block is not found; * @return 0 if the block is not found;
* otherwise, return the replication factor of the block. * otherwise, return the replication factor of the block.
*/ */
private int getReplication(BlockInfo block) { private int getRedundancy(BlockInfo block) {
return getExpectedReplicaNum(block); return getExpectedRedundancyNum(block);
} }
/** /**
@ -4063,16 +4063,16 @@ public class BlockManager implements BlockStatsMXBean {
} }
/** /**
* A block needs reconstruction if the number of replicas is less than * A block needs reconstruction if the number of redundancies is less than
* expected or if it does not have enough racks. * expected or if it does not have enough racks.
*/ */
boolean isNeededReconstruction(BlockInfo storedBlock, int current) { boolean isNeededReconstruction(BlockInfo storedBlock, int current) {
int expected = getExpectedReplicaNum(storedBlock); int expected = getExpectedRedundancyNum(storedBlock);
return storedBlock.isComplete() return storedBlock.isComplete()
&& (current < expected || !isPlacementPolicySatisfied(storedBlock)); && (current < expected || !isPlacementPolicySatisfied(storedBlock));
} }
public short getExpectedReplicaNum(BlockInfo block) { public short getExpectedRedundancyNum(BlockInfo block) {
return block.isStriped() ? return block.isStriped() ?
((BlockInfoStriped) block).getRealTotalBlockNum() : ((BlockInfoStriped) block).getRealTotalBlockNum() :
block.getReplication(); block.getReplication();

View File

@ -242,7 +242,7 @@ public class DecommissionManager {
*/ */
private boolean isSufficient(BlockInfo block, BlockCollection bc, private boolean isSufficient(BlockInfo block, BlockCollection bc,
NumberReplicas numberReplicas) { NumberReplicas numberReplicas) {
final int numExpected = blockManager.getExpectedReplicaNum(block); final int numExpected = blockManager.getExpectedRedundancyNum(block);
final int numLive = numberReplicas.liveReplicas(); final int numLive = numberReplicas.liveReplicas();
if (numLive >= numExpected if (numLive >= numExpected
&& blockManager.isPlacementPolicySatisfied(block)) { && blockManager.isPlacementPolicySatisfied(block)) {
@ -286,7 +286,7 @@ public class DecommissionManager {
} }
int curReplicas = num.liveReplicas(); int curReplicas = num.liveReplicas();
int curExpectedReplicas = blockManager.getExpectedReplicaNum(block); int curExpectedRedundancy = blockManager.getExpectedRedundancyNum(block);
StringBuilder nodeList = new StringBuilder(); StringBuilder nodeList = new StringBuilder();
for (DatanodeStorageInfo storage : storages) { for (DatanodeStorageInfo storage : storages) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); final DatanodeDescriptor node = storage.getDatanodeDescriptor();
@ -295,7 +295,7 @@ public class DecommissionManager {
} }
NameNode.blockStateChangeLog.info( NameNode.blockStateChangeLog.info(
"Block: " + block + ", Expected Replicas: " "Block: " + block + ", Expected Replicas: "
+ curExpectedReplicas + ", live replicas: " + curReplicas + curExpectedRedundancy + ", live replicas: " + curReplicas
+ ", corrupt replicas: " + num.corruptReplicas() + ", corrupt replicas: " + num.corruptReplicas()
+ ", decommissioned replicas: " + num.decommissioned() + ", decommissioned replicas: " + num.decommissioned()
+ ", decommissioning replicas: " + num.decommissioning() + ", decommissioning replicas: " + num.decommissioning()
@ -547,7 +547,7 @@ public class DecommissionManager {
blockManager.neededReconstruction.add(block, blockManager.neededReconstruction.add(block,
liveReplicas, num.readOnlyReplicas(), liveReplicas, num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), num.decommissionedAndDecommissioning(),
blockManager.getExpectedReplicaNum(block)); blockManager.getExpectedRedundancyNum(block));
} }
} }

View File

@ -3322,7 +3322,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// close file and persist block allocations for this file // close file and persist block allocations for this file
closeFile(src, pendingFile); closeFile(src, pendingFile);
blockManager.checkReplication(pendingFile); blockManager.checkRedundancy(pendingFile);
} }
@VisibleForTesting @VisibleForTesting

View File

@ -264,7 +264,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
out.println("Block Id: " + blockId); out.println("Block Id: " + blockId);
out.println("Block belongs to: "+iNode.getFullPathName()); out.println("Block belongs to: "+iNode.getFullPathName());
out.println("No. of Expected Replica: " + out.println("No. of Expected Replica: " +
blockManager.getExpectedReplicaNum(blockInfo)); blockManager.getExpectedRedundancyNum(blockInfo));
out.println("No. of live Replica: " + numberReplicas.liveReplicas()); out.println("No. of live Replica: " + numberReplicas.liveReplicas());
out.println("No. of excess Replica: " + numberReplicas.excessReplicas()); out.println("No. of excess Replica: " + numberReplicas.excessReplicas());
out.println("No. of stale Replica: " + out.println("No. of stale Replica: " +