svn merge -c 1400986 from trunk for HDFS-4099. Clean up replication code and add more javadoc.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1400987 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
31fc8c504e
commit
35bd16ad62
|
@ -74,6 +74,8 @@ Release 2.0.3-alpha - Unreleased
|
|||
HDFS-4088. Remove "throws QuotaExceededException" from an
|
||||
INodeDirectoryWithQuota constructor. (szetszwo)
|
||||
|
||||
HDFS-4099. Clean up replication code and add more javadoc. (szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.ArrayList;
|
||||
|
@ -50,14 +52,11 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
|
||||
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
|
||||
|
@ -2862,28 +2861,32 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
}
|
||||
}
|
||||
|
||||
public void checkReplication(Block block, short numExpectedReplicas) {
|
||||
// filter out containingNodes that are marked for decommission.
|
||||
NumberReplicas number = countNodes(block);
|
||||
if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) {
|
||||
neededReplications.add(block,
|
||||
number.liveReplicas(),
|
||||
number.decommissionedReplicas(),
|
||||
numExpectedReplicas);
|
||||
return;
|
||||
}
|
||||
if (number.liveReplicas() > numExpectedReplicas) {
|
||||
processOverReplicatedBlock(block, numExpectedReplicas, null, null);
|
||||
/**
|
||||
* Check replication of the blocks in the collection.
|
||||
* If any block is needed replication, insert it into the replication queue.
|
||||
* Otherwise, if the block is more than the expected replication factor,
|
||||
* process it as an over replicated block.
|
||||
*/
|
||||
public void checkReplication(BlockCollection bc) {
|
||||
final short expected = bc.getBlockReplication();
|
||||
for (Block block : bc.getBlocks()) {
|
||||
final NumberReplicas n = countNodes(block);
|
||||
if (isNeededReplication(block, expected, n.liveReplicas())) {
|
||||
neededReplications.add(block, n.liveReplicas(),
|
||||
n.decommissionedReplicas(), expected);
|
||||
} else if (n.liveReplicas() > expected) {
|
||||
processOverReplicatedBlock(block, expected, null, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* get replication factor of a block */
|
||||
/**
|
||||
* @return 0 if the block is not found;
|
||||
* otherwise, return the replication factor of the block.
|
||||
*/
|
||||
private int getReplication(Block block) {
|
||||
BlockCollection bc = blocksMap.getBlockCollection(block);
|
||||
if (bc == null) { // block does not belong to any file
|
||||
return 0;
|
||||
}
|
||||
return bc.getBlockReplication();
|
||||
final BlockCollection bc = blocksMap.getBlockCollection(block);
|
||||
return bc == null? 0: bc.getBlockReplication();
|
||||
}
|
||||
|
||||
|
||||
|
@ -2942,12 +2945,12 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
return enoughRacks;
|
||||
}
|
||||
|
||||
boolean isNeededReplication(Block b, int expectedReplication, int curReplicas) {
|
||||
if ((curReplicas >= expectedReplication) && (blockHasEnoughRacks(b))) {
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* A block needs replication if the number of replicas is less than expected
|
||||
* or if it does not have enough racks.
|
||||
*/
|
||||
private boolean isNeededReplication(Block b, int expected, int current) {
|
||||
return current < expected || !blockHasEnoughRacks(b);
|
||||
}
|
||||
|
||||
public long getMissingBlocksCount() {
|
||||
|
|
|
@ -2413,21 +2413,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check all blocks of a file. If any blocks are lower than their intended
|
||||
* replication factor, then insert them into neededReplication and if
|
||||
* the blocks are more than the intended replication factor then insert
|
||||
* them into invalidateBlocks.
|
||||
*/
|
||||
private void checkReplicationFactor(INodeFile file) {
|
||||
short numExpectedReplicas = file.getBlockReplication();
|
||||
Block[] pendingBlocks = file.getBlocks();
|
||||
int nrBlocks = pendingBlocks.length;
|
||||
for (int i = 0; i < nrBlocks; i++) {
|
||||
blockManager.checkReplication(pendingBlocks[i], numExpectedReplicas);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate a block at the given pending filename
|
||||
*
|
||||
|
@ -3160,7 +3145,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
// close file and persist block allocations for this file
|
||||
dir.closeFile(src, newFile);
|
||||
|
||||
checkReplicationFactor(newFile);
|
||||
blockManager.checkReplication(newFile);
|
||||
}
|
||||
|
||||
void commitBlockSynchronization(ExtendedBlock lastblock,
|
||||
|
|
Loading…
Reference in New Issue