HDFS-8418. Fix the isNeededReplication calculation for Striped block in NN. Contributed by Yi Liu.

This commit is contained in:
Jing Zhao 2015-05-18 19:06:34 -07:00 committed by Zhe Zhang
parent 6c310db159
commit b008348dbf
4 changed files with 43 additions and 27 deletions

View File

@ -220,3 +220,6 @@
HDFS-8417. Erasure Coding: Pread failed to read data starting from not-first stripe. HDFS-8417. Erasure Coding: Pread failed to read data starting from not-first stripe.
(Walter Su via jing9) (Walter Su via jing9)
HDFS-8418. Fix the isNeededReplication calculation for Striped block in NN.
(Yi Liu via jing9)

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
@ -85,6 +84,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
@ -603,16 +603,7 @@ public class BlockManager {
public short getMinStorageNum(BlockInfo block) { public short getMinStorageNum(BlockInfo block) {
if (block.isStriped()) { if (block.isStriped()) {
final BlockInfoStriped sblock = (BlockInfoStriped) block; return getStripedDataBlockNum(block);
short dataBlockNum = sblock.getDataBlockNum();
if (sblock.isComplete() ||
sblock.getBlockUCState() == BlockUCState.COMMITTED) {
// if the sblock is committed/completed and its length is less than a
// full stripe, the minimum storage number needs to be adjusted
dataBlockNum = (short) Math.min(dataBlockNum,
(sblock.getNumBytes() - 1) / HdfsConstants.BLOCK_STRIPED_CELL_SIZE + 1);
}
return dataBlockNum;
} else { } else {
return minReplication; return minReplication;
} }
@ -1258,7 +1249,7 @@ public class BlockManager {
return; return;
} }
short expectedReplicas = short expectedReplicas =
b.stored.getBlockCollection().getPreferredBlockReplication(); getExpectedReplicaNum(b.stored.getBlockCollection(), b.stored);
// Add replica to the data-node if it is not already there // Add replica to the data-node if it is not already there
if (storageInfo != null) { if (storageInfo != null) {
@ -1437,7 +1428,7 @@ public class BlockManager {
continue; continue;
} }
requiredReplication = bc.getPreferredBlockReplication(); requiredReplication = getExpectedReplicaNum(bc, block);
// get a source data-node // get a source data-node
containingNodes = new ArrayList<>(); containingNodes = new ArrayList<>();
@ -1537,7 +1528,7 @@ public class BlockManager {
rw.targets = null; rw.targets = null;
continue; continue;
} }
requiredReplication = bc.getPreferredBlockReplication(); requiredReplication = getExpectedReplicaNum(bc, block);
// do not schedule more if enough replicas is already pending // do not schedule more if enough replicas is already pending
NumberReplicas numReplicas = countNodes(block); NumberReplicas numReplicas = countNodes(block);
@ -2539,7 +2530,7 @@ public class BlockManager {
int reportedBlkIdx = BlockIdManager.getBlockIndex(reported); int reportedBlkIdx = BlockIdManager.getBlockIndex(reported);
wrongSize = reported.getNumBytes() != wrongSize = reported.getNumBytes() !=
getInternalBlockLength(stripedBlock.getNumBytes(), getInternalBlockLength(stripedBlock.getNumBytes(),
HdfsConstants.BLOCK_STRIPED_CELL_SIZE, BLOCK_STRIPED_CELL_SIZE,
stripedBlock.getDataBlockNum(), reportedBlkIdx); stripedBlock.getDataBlockNum(), reportedBlkIdx);
} else { } else {
wrongSize = storedBlock.getNumBytes() != reported.getNumBytes(); wrongSize = storedBlock.getNumBytes() != reported.getNumBytes();
@ -2763,7 +2754,7 @@ public class BlockManager {
} }
// handle underReplication/overReplication // handle underReplication/overReplication
short fileReplication = bc.getPreferredBlockReplication(); short fileReplication = getExpectedReplicaNum(bc, storedBlock);
if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) { if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
neededReplications.remove(storedBlock, numCurrentReplica, neededReplications.remove(storedBlock, numCurrentReplica,
num.decommissionedAndDecommissioning(), fileReplication); num.decommissionedAndDecommissioning(), fileReplication);
@ -3003,7 +2994,7 @@ public class BlockManager {
} }
// calculate current replication // calculate current replication
short expectedReplication = short expectedReplication =
block.getBlockCollection().getPreferredBlockReplication(); getExpectedReplicaNum(block.getBlockCollection(), block);
NumberReplicas num = countNodes(block); NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas(); int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be // add to under-replicated queue if need to be
@ -3638,8 +3629,8 @@ public class BlockManager {
* process it as an over replicated block. * process it as an over replicated block.
*/ */
public void checkReplication(BlockCollection bc) { public void checkReplication(BlockCollection bc) {
final short expected = bc.getPreferredBlockReplication();
for (BlockInfo block : bc.getBlocks()) { for (BlockInfo block : bc.getBlocks()) {
short expected = getExpectedReplicaNum(bc, block);
final NumberReplicas n = countNodes(block); final NumberReplicas n = countNodes(block);
if (isNeededReplication(block, expected, n.liveReplicas())) { if (isNeededReplication(block, expected, n.liveReplicas())) {
neededReplications.add(block, n.liveReplicas(), neededReplications.add(block, n.liveReplicas(),
@ -3674,9 +3665,9 @@ public class BlockManager {
* @return 0 if the block is not found; * @return 0 if the block is not found;
* otherwise, return the replication factor of the block. * otherwise, return the replication factor of the block.
*/ */
private int getReplication(Block block) { private int getReplication(BlockInfo block) {
final BlockCollection bc = blocksMap.getBlockCollection(block); final BlockCollection bc = blocksMap.getBlockCollection(block);
return bc == null? 0: bc.getPreferredBlockReplication(); return bc == null? 0: getExpectedReplicaNum(bc, block);
} }
@ -3759,6 +3750,29 @@ public class BlockManager {
return current < expected || !blockHasEnoughRacks(storedBlock, expected); return current < expected || !blockHasEnoughRacks(storedBlock, expected);
} }
public short getExpectedReplicaNum(BlockCollection bc, BlockInfo block) {
if (block.isStriped()) {
return (short) (getStripedDataBlockNum(block) +
((BlockInfoStriped) block).getParityBlockNum());
} else {
return bc.getPreferredBlockReplication();
}
}
short getStripedDataBlockNum(BlockInfo block) {
assert block.isStriped();
final BlockInfoStriped sblock = (BlockInfoStriped) block;
short dataBlockNum = sblock.getDataBlockNum();
if (sblock.isComplete() ||
sblock.getBlockUCState() == BlockUCState.COMMITTED) {
// if the sblock is committed/completed and its length is less than a
// full stripe, the minimum storage number needs to be adjusted
dataBlockNum = (short) Math.min(dataBlockNum,
(sblock.getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1);
}
return dataBlockNum;
}
public long getMissingBlocksCount() { public long getMissingBlocksCount() {
// not locking // not locking
return this.neededReplications.getCorruptBlockSize(); return this.neededReplications.getCorruptBlockSize();

View File

@ -36,7 +36,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.util.CyclicIteration; import org.apache.hadoop.hdfs.util.CyclicIteration;
@ -251,7 +250,7 @@ public class DecommissionManager {
*/ */
private boolean isSufficient(BlockInfo block, BlockCollection bc, private boolean isSufficient(BlockInfo block, BlockCollection bc,
NumberReplicas numberReplicas) { NumberReplicas numberReplicas) {
final int numExpected = bc.getPreferredBlockReplication(); final int numExpected = blockManager.getExpectedReplicaNum(bc, block);
final int numLive = numberReplicas.liveReplicas(); final int numLive = numberReplicas.liveReplicas();
if (!blockManager.isNeededReplication(block, numExpected, numLive)) { if (!blockManager.isNeededReplication(block, numExpected, numLive)) {
// Block doesn't need replication. Skip. // Block doesn't need replication. Skip.
@ -285,11 +284,11 @@ public class DecommissionManager {
return false; return false;
} }
private static void logBlockReplicationInfo(Block block, BlockCollection bc, private void logBlockReplicationInfo(BlockInfo block, BlockCollection bc,
DatanodeDescriptor srcNode, NumberReplicas num, DatanodeDescriptor srcNode, NumberReplicas num,
Iterable<DatanodeStorageInfo> storages) { Iterable<DatanodeStorageInfo> storages) {
int curReplicas = num.liveReplicas(); int curReplicas = num.liveReplicas();
int curExpectedReplicas = bc.getPreferredBlockReplication(); int curExpectedReplicas = blockManager.getExpectedReplicaNum(bc, block);
StringBuilder nodeList = new StringBuilder(); StringBuilder nodeList = new StringBuilder();
for (DatanodeStorageInfo storage : storages) { for (DatanodeStorageInfo storage : storages) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); final DatanodeDescriptor node = storage.getDatanodeDescriptor();
@ -561,7 +560,7 @@ public class DecommissionManager {
// Schedule under-replicated blocks for replication if not already // Schedule under-replicated blocks for replication if not already
// pending // pending
if (blockManager.isNeededReplication(block, if (blockManager.isNeededReplication(block,
bc.getPreferredBlockReplication(), liveReplicas)) { blockManager.getExpectedReplicaNum(bc, block), liveReplicas)) {
if (!blockManager.neededReplications.contains(block) && if (!blockManager.neededReplications.contains(block) &&
blockManager.pendingReplications.getNumReplicas(block) == 0 && blockManager.pendingReplications.getNumReplicas(block) == 0 &&
namesystem.isPopulatingReplQueues()) { namesystem.isPopulatingReplQueues()) {
@ -569,7 +568,7 @@ public class DecommissionManager {
blockManager.neededReplications.add(block, blockManager.neededReplications.add(block,
liveReplicas, liveReplicas,
num.decommissionedAndDecommissioning(), num.decommissionedAndDecommissioning(),
bc.getPreferredBlockReplication()); blockManager.getExpectedReplicaNum(bc, block));
} }
} }

View File

@ -256,7 +256,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
out.println("Block Id: " + blockId); out.println("Block Id: " + blockId);
out.println("Block belongs to: "+iNode.getFullPathName()); out.println("Block belongs to: "+iNode.getFullPathName());
out.println("No. of Expected Replica: " + out.println("No. of Expected Replica: " +
bc.getPreferredBlockReplication()); bm.getExpectedReplicaNum(bc, blockInfo));
out.println("No. of live Replica: " + numberReplicas.liveReplicas()); out.println("No. of live Replica: " + numberReplicas.liveReplicas());
out.println("No. of excess Replica: " + numberReplicas.excessReplicas()); out.println("No. of excess Replica: " + numberReplicas.excessReplicas());
out.println("No. of stale Replica: " + out.println("No. of stale Replica: " +