HDFS-3498. Support replica removal in BlockPlacementPolicy and make BlockPlacementPolicyDefault extensible for reusing code in subclasses. Contributed by Junping Du

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1353807 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-06-26 03:25:47 +00:00
parent 361ea9a62c
commit bbab35e6d8
5 changed files with 167 additions and 55 deletions

View File

@ -93,6 +93,10 @@ Trunk (unreleased changes)
HDFS-3478. Test quotas with Long.Max_Value. (Sujay Rau via eli) HDFS-3478. Test quotas with Long.Max_Value. (Sujay Rau via eli)
HDFS-3498. Support replica removal in BlockPlacementPolicy and make
BlockPlacementPolicyDefault extensible for reusing code in subclasses.
(Junping Du via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -2259,30 +2259,14 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
BlockCollection bc = getBlockCollection(b); BlockCollection bc = getBlockCollection(b);
final Map<String, List<DatanodeDescriptor>> rackMap final Map<String, List<DatanodeDescriptor>> rackMap
= new HashMap<String, List<DatanodeDescriptor>>(); = new HashMap<String, List<DatanodeDescriptor>>();
for(final Iterator<DatanodeDescriptor> iter = nonExcess.iterator(); final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
iter.hasNext(); ) { final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
final DatanodeDescriptor node = iter.next();
final String rackName = node.getNetworkLocation();
List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
if (datanodeList == null) {
datanodeList = new ArrayList<DatanodeDescriptor>();
rackMap.put(rackName, datanodeList);
}
datanodeList.add(node);
}
// split nodes into two sets // split nodes into two sets
// priSet contains nodes on rack with more than one replica // moreThanOne contains nodes on rack with more than one replica
// remains contains the remaining nodes // exactlyOne contains the remaining nodes
final List<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>(); replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne,
final List<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>(); exactlyOne);
for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
if (datanodeList.size() == 1 ) {
remains.add(datanodeList.get(0));
} else {
priSet.addAll(datanodeList);
}
}
// pick one node to delete that favors the delete hint // pick one node to delete that favors the delete hint
// otherwise pick one with least space from priSet if it is not empty // otherwise pick one with least space from priSet if it is not empty
@ -2292,30 +2276,18 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
// check if we can delete delNodeHint // check if we can delete delNodeHint
final DatanodeInfo cur; final DatanodeInfo cur;
if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
&& (priSet.contains(delNodeHint) && (moreThanOne.contains(delNodeHint)
|| (addedNode != null && !priSet.contains(addedNode))) ) { || (addedNode != null && !moreThanOne.contains(addedNode))) ) {
cur = delNodeHint; cur = delNodeHint;
} else { // regular excessive replica removal } else { // regular excessive replica removal
cur = replicator.chooseReplicaToDelete(bc, b, replication, cur = replicator.chooseReplicaToDelete(bc, b, replication,
priSet, remains); moreThanOne, exactlyOne);
} }
firstOne = false; firstOne = false;
// adjust rackmap, priSet, and remains // adjust rackmap, moreThanOne, and exactlyOne
String rack = cur.getNetworkLocation(); replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
final List<DatanodeDescriptor> datanodes = rackMap.get(rack); exactlyOne, cur);
datanodes.remove(cur);
if (datanodes.isEmpty()) {
rackMap.remove(rack);
}
if (priSet.remove(cur)) {
if (datanodes.size() == 1) {
priSet.remove(datanodes.get(0));
remains.add(datanodes.get(0));
}
} else {
remains.remove(cur);
}
nonExcess.remove(cur); nonExcess.remove(cur);
addToExcessReplicate(cur, b); addToExcessReplicate(cur, b);

View File

@ -21,12 +21,14 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats; import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
@ -242,4 +244,79 @@ public abstract class BlockPlacementPolicy {
blocksize); blocksize);
} }
/**
* Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
*
* @param rackMap a map from rack to replica
* @param moreThanOne The List of replica nodes on rack which has more than
* one replica
* @param exactlyOne The List of replica nodes on rack with only one replica
* @param cur current replica to remove
*/
public void adjustSetsWithChosenReplica(final Map<String,
List<DatanodeDescriptor>> rackMap,
final List<DatanodeDescriptor> moreThanOne,
final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) {
String rack = getRack(cur);
final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
datanodes.remove(cur);
if (datanodes.isEmpty()) {
rackMap.remove(rack);
}
if (moreThanOne.remove(cur)) {
if (datanodes.size() == 1) {
moreThanOne.remove(datanodes.get(0));
exactlyOne.add(datanodes.get(0));
}
} else {
exactlyOne.remove(cur);
}
}
/**
* Get rack string from a data node
* @param datanode
* @return rack of data node
*/
protected String getRack(final DatanodeInfo datanode) {
return datanode.getNetworkLocation();
}
/**
* Split data nodes into two sets, one set includes nodes on rack with
* more than one replica, the other set contains the remaining nodes.
*
* @param dataNodes
* @param rackMap a map from rack to datanodes
* @param moreThanOne contains nodes on rack with more than one replica
* @param exactlyOne remains contains the remaining nodes
*/
public void splitNodesWithRack(
Collection<DatanodeDescriptor> dataNodes,
final Map<String, List<DatanodeDescriptor>> rackMap,
final List<DatanodeDescriptor> moreThanOne,
final List<DatanodeDescriptor> exactlyOne) {
for(DatanodeDescriptor node : dataNodes) {
final String rackName = getRack(node);
List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
if (datanodeList == null) {
datanodeList = new ArrayList<DatanodeDescriptor>();
rackMap.put(rackName, datanodeList);
}
datanodeList.add(node);
}
// split nodes into two sets
for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
if (datanodeList.size() == 1) {
// exactlyOne contains nodes on rack with only one replica
exactlyOne.add(datanodeList.get(0));
} else {
// moreThanOne contains nodes on rack with more than one replica
moreThanOne.addAll(datanodeList);
}
}
}
} }

View File

@ -56,15 +56,15 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
"For more information, please enable DEBUG log level on " "For more information, please enable DEBUG log level on "
+ ((Log4JLogger)LOG).getLogger().getName(); + ((Log4JLogger)LOG).getLogger().getName();
private boolean considerLoad; protected boolean considerLoad;
private boolean preferLocalNode = true; private boolean preferLocalNode = true;
private NetworkTopology clusterMap; protected NetworkTopology clusterMap;
private FSClusterStats stats; private FSClusterStats stats;
private long heartbeatInterval; // interval for DataNode heartbeats protected long heartbeatInterval; // interval for DataNode heartbeats
/** /**
* A miss of that many heartbeats is tolerated for replica deletion policy. * A miss of that many heartbeats is tolerated for replica deletion policy.
*/ */
private int tolerateHeartbeatMultiplier; protected int tolerateHeartbeatMultiplier;
BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats, BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) { NetworkTopology clusterMap) {
@ -88,7 +88,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT); DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
} }
private ThreadLocal<StringBuilder> threadLocalBuilder = protected ThreadLocal<StringBuilder> threadLocalBuilder =
new ThreadLocal<StringBuilder>() { new ThreadLocal<StringBuilder>() {
@Override @Override
protected StringBuilder initialValue() { protected StringBuilder initialValue() {
@ -229,7 +229,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* choose a node on the same rack * choose a node on the same rack
* @return the chosen node * @return the chosen node
*/ */
private DatanodeDescriptor chooseLocalNode( protected DatanodeDescriptor chooseLocalNode(
DatanodeDescriptor localMachine, DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes, HashMap<Node, Node> excludedNodes,
long blocksize, long blocksize,
@ -263,7 +263,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* in the cluster. * in the cluster.
* @return the chosen node * @return the chosen node
*/ */
private DatanodeDescriptor chooseLocalRack( protected DatanodeDescriptor chooseLocalRack(
DatanodeDescriptor localMachine, DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes, HashMap<Node, Node> excludedNodes,
long blocksize, long blocksize,
@ -316,7 +316,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* from the local rack * from the local rack
*/ */
private void chooseRemoteRack(int numOfReplicas, protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine, DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes, HashMap<Node, Node> excludedNodes,
long blocksize, long blocksize,
@ -338,7 +338,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
/* Randomly choose one target from <i>nodes</i>. /* Randomly choose one target from <i>nodes</i>.
* @return the chosen node * @return the chosen node
*/ */
private DatanodeDescriptor chooseRandom( protected DatanodeDescriptor chooseRandom(
String nodes, String nodes,
HashMap<Node, Node> excludedNodes, HashMap<Node, Node> excludedNodes,
long blocksize, long blocksize,
@ -382,7 +382,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
/* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>. /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
*/ */
private void chooseRandom(int numOfReplicas, protected void chooseRandom(int numOfReplicas,
String nodes, String nodes,
HashMap<Node, Node> excludedNodes, HashMap<Node, Node> excludedNodes,
long blocksize, long blocksize,
@ -438,7 +438,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
this.considerLoad, results); this.considerLoad, results);
} }
private boolean isGoodTarget(DatanodeDescriptor node, protected boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerLoc, long blockSize, int maxTargetPerLoc,
boolean considerLoad, boolean considerLoad,
List<DatanodeDescriptor> results) { List<DatanodeDescriptor> results) {
@ -574,8 +574,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
// pick replica from the first Set. If first is empty, then pick replicas // pick replica from the first Set. If first is empty, then pick replicas
// from second set. // from second set.
Iterator<DatanodeDescriptor> iter = Iterator<DatanodeDescriptor> iter = pickupReplicaSet(first, second);
first.isEmpty() ? second.iterator() : first.iterator();
// Pick the node with the oldest heartbeat or with the least free space, // Pick the node with the oldest heartbeat or with the least free space,
// if all hearbeats are within the tolerable heartbeat interval // if all hearbeats are within the tolerable heartbeat interval
@ -595,6 +594,20 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode; return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
} }
/**
* Pick up replica node set for deleting replica as over-replicated.
* First set contains replica nodes on rack with more than one
* replica while second set contains remaining replica nodes.
* So pick up first set if not empty. If first is empty, then pick second.
*/
protected Iterator<DatanodeDescriptor> pickupReplicaSet(
Collection<DatanodeDescriptor> first,
Collection<DatanodeDescriptor> second) {
Iterator<DatanodeDescriptor> iter =
first.isEmpty() ? second.iterator() : first.iterator();
return iter;
}
@VisibleForTesting @VisibleForTesting
void setPreferLocalNode(boolean prefer) { void setPreferLocalNode(boolean prefer) {
this.preferLocalNode = prefer; this.preferLocalNode = prefer;

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -34,7 +35,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
@ -587,4 +587,50 @@ public class TestReplicationPolicy {
fifthPrioritySize, chosenBlocks.get( fifthPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size()); UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size());
} }
/**
* Test for the chooseReplicaToDelete are processed based on
* block locality and free space
*/
@Test
public void testChooseReplicaToDelete() throws Exception {
List<DatanodeDescriptor> replicaNodeList = new
ArrayList<DatanodeDescriptor>();
final Map<String, List<DatanodeDescriptor>> rackMap
= new HashMap<String, List<DatanodeDescriptor>>();
dataNodes[0].setRemaining(4*1024*1024);
replicaNodeList.add(dataNodes[0]);
dataNodes[1].setRemaining(3*1024*1024);
replicaNodeList.add(dataNodes[1]);
dataNodes[2].setRemaining(2*1024*1024);
replicaNodeList.add(dataNodes[2]);
dataNodes[5].setRemaining(1*1024*1024);
replicaNodeList.add(dataNodes[5]);
List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
replicator.splitNodesWithRack(
replicaNodeList, rackMap, first, second);
// dataNodes[0] and dataNodes[1] are in first set as their rack has two
// replica nodes, while datanodes[2] and dataNodes[5] are in second set.
assertEquals(2, first.size());
assertEquals(2, second.size());
DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
null, null, (short)3, first, second);
// Within first set, dataNodes[1] with less free space
assertEquals(chosenNode, dataNodes[1]);
replicator.adjustSetsWithChosenReplica(
rackMap, first, second, chosenNode);
assertEquals(0, first.size());
assertEquals(3, second.size());
// Within second set, dataNodes[5] with less free space
chosenNode = replicator.chooseReplicaToDelete(
null, null, (short)2, first, second);
assertEquals(chosenNode, dataNodes[5]);
}
} }