svn merge -c 1353807 from trunk for HDFS-3498. Support replica removal in BlockPlacementPolicy and make BlockPlacementPolicyDefault extensible for reusing code in subclasses.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1488844 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
62ab1db78b
commit
4c734a8ee8
|
@ -1032,6 +1032,10 @@ Release 2.0.3-alpha - 2013-02-06
|
||||||
|
|
||||||
HDFS-3131. Improve TestStorageRestore. (Brandon Li via atm)
|
HDFS-3131. Improve TestStorageRestore. (Brandon Li via atm)
|
||||||
|
|
||||||
|
HDFS-3498. Support replica removal in BlockPlacementPolicy and make
|
||||||
|
BlockPlacementPolicyDefault extensible for reusing code in subclasses.
|
||||||
|
(Junping Du via szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-3429. DataNode reads checksums even if client does not need them (todd)
|
HDFS-3429. DataNode reads checksums even if client does not need them (todd)
|
||||||
|
|
|
@ -2422,30 +2422,14 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
BlockCollection bc = getBlockCollection(b);
|
BlockCollection bc = getBlockCollection(b);
|
||||||
final Map<String, List<DatanodeDescriptor>> rackMap
|
final Map<String, List<DatanodeDescriptor>> rackMap
|
||||||
= new HashMap<String, List<DatanodeDescriptor>>();
|
= new HashMap<String, List<DatanodeDescriptor>>();
|
||||||
for(final Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
|
final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
|
||||||
iter.hasNext(); ) {
|
final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
|
||||||
final DatanodeDescriptor node = iter.next();
|
|
||||||
final String rackName = node.getNetworkLocation();
|
|
||||||
List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
|
|
||||||
if (datanodeList == null) {
|
|
||||||
datanodeList = new ArrayList<DatanodeDescriptor>();
|
|
||||||
rackMap.put(rackName, datanodeList);
|
|
||||||
}
|
|
||||||
datanodeList.add(node);
|
|
||||||
}
|
|
||||||
|
|
||||||
// split nodes into two sets
|
// split nodes into two sets
|
||||||
// priSet contains nodes on rack with more than one replica
|
// moreThanOne contains nodes on rack with more than one replica
|
||||||
// remains contains the remaining nodes
|
// exactlyOne contains the remaining nodes
|
||||||
final List<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
|
replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne,
|
||||||
final List<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
|
exactlyOne);
|
||||||
for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
|
|
||||||
if (datanodeList.size() == 1 ) {
|
|
||||||
remains.add(datanodeList.get(0));
|
|
||||||
} else {
|
|
||||||
priSet.addAll(datanodeList);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// pick one node to delete that favors the delete hint
|
// pick one node to delete that favors the delete hint
|
||||||
// otherwise pick one with least space from priSet if it is not empty
|
// otherwise pick one with least space from priSet if it is not empty
|
||||||
|
@ -2455,30 +2439,18 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
// check if we can delete delNodeHint
|
// check if we can delete delNodeHint
|
||||||
final DatanodeInfo cur;
|
final DatanodeInfo cur;
|
||||||
if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
|
if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
|
||||||
&& (priSet.contains(delNodeHint)
|
&& (moreThanOne.contains(delNodeHint)
|
||||||
|| (addedNode != null && !priSet.contains(addedNode))) ) {
|
|| (addedNode != null && !moreThanOne.contains(addedNode))) ) {
|
||||||
cur = delNodeHint;
|
cur = delNodeHint;
|
||||||
} else { // regular excessive replica removal
|
} else { // regular excessive replica removal
|
||||||
cur = replicator.chooseReplicaToDelete(bc, b, replication,
|
cur = replicator.chooseReplicaToDelete(bc, b, replication,
|
||||||
priSet, remains);
|
moreThanOne, exactlyOne);
|
||||||
}
|
}
|
||||||
firstOne = false;
|
firstOne = false;
|
||||||
|
|
||||||
// adjust rackmap, priSet, and remains
|
// adjust rackmap, moreThanOne, and exactlyOne
|
||||||
String rack = cur.getNetworkLocation();
|
replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
|
||||||
final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
|
exactlyOne, cur);
|
||||||
datanodes.remove(cur);
|
|
||||||
if (datanodes.isEmpty()) {
|
|
||||||
rackMap.remove(rack);
|
|
||||||
}
|
|
||||||
if (priSet.remove(cur)) {
|
|
||||||
if (datanodes.size() == 1) {
|
|
||||||
priSet.remove(datanodes.get(0));
|
|
||||||
remains.add(datanodes.get(0));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
remains.remove(cur);
|
|
||||||
}
|
|
||||||
|
|
||||||
nonExcess.remove(cur);
|
nonExcess.remove(cur);
|
||||||
addToExcessReplicate(cur, b);
|
addToExcessReplicate(cur, b);
|
||||||
|
|
|
@ -21,12 +21,14 @@ import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
|
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
|
@ -201,4 +203,79 @@ public abstract class BlockPlacementPolicy {
|
||||||
return replicator;
|
return replicator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
|
||||||
|
*
|
||||||
|
* @param rackMap a map from rack to replica
|
||||||
|
* @param moreThanOne The List of replica nodes on rack which has more than
|
||||||
|
* one replica
|
||||||
|
* @param exactlyOne The List of replica nodes on rack with only one replica
|
||||||
|
* @param cur current replica to remove
|
||||||
|
*/
|
||||||
|
public void adjustSetsWithChosenReplica(final Map<String,
|
||||||
|
List<DatanodeDescriptor>> rackMap,
|
||||||
|
final List<DatanodeDescriptor> moreThanOne,
|
||||||
|
final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) {
|
||||||
|
|
||||||
|
String rack = getRack(cur);
|
||||||
|
final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
|
||||||
|
datanodes.remove(cur);
|
||||||
|
if (datanodes.isEmpty()) {
|
||||||
|
rackMap.remove(rack);
|
||||||
|
}
|
||||||
|
if (moreThanOne.remove(cur)) {
|
||||||
|
if (datanodes.size() == 1) {
|
||||||
|
moreThanOne.remove(datanodes.get(0));
|
||||||
|
exactlyOne.add(datanodes.get(0));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
exactlyOne.remove(cur);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get rack string from a data node
|
||||||
|
* @param datanode
|
||||||
|
* @return rack of data node
|
||||||
|
*/
|
||||||
|
protected String getRack(final DatanodeInfo datanode) {
|
||||||
|
return datanode.getNetworkLocation();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Split data nodes into two sets, one set includes nodes on rack with
|
||||||
|
* more than one replica, the other set contains the remaining nodes.
|
||||||
|
*
|
||||||
|
* @param dataNodes
|
||||||
|
* @param rackMap a map from rack to datanodes
|
||||||
|
* @param moreThanOne contains nodes on rack with more than one replica
|
||||||
|
* @param exactlyOne remains contains the remaining nodes
|
||||||
|
*/
|
||||||
|
public void splitNodesWithRack(
|
||||||
|
Collection<DatanodeDescriptor> dataNodes,
|
||||||
|
final Map<String, List<DatanodeDescriptor>> rackMap,
|
||||||
|
final List<DatanodeDescriptor> moreThanOne,
|
||||||
|
final List<DatanodeDescriptor> exactlyOne) {
|
||||||
|
for(DatanodeDescriptor node : dataNodes) {
|
||||||
|
final String rackName = getRack(node);
|
||||||
|
List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
|
||||||
|
if (datanodeList == null) {
|
||||||
|
datanodeList = new ArrayList<DatanodeDescriptor>();
|
||||||
|
rackMap.put(rackName, datanodeList);
|
||||||
|
}
|
||||||
|
datanodeList.add(node);
|
||||||
|
}
|
||||||
|
|
||||||
|
// split nodes into two sets
|
||||||
|
for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
|
||||||
|
if (datanodeList.size() == 1) {
|
||||||
|
// exactlyOne contains nodes on rack with only one replica
|
||||||
|
exactlyOne.add(datanodeList.get(0));
|
||||||
|
} else {
|
||||||
|
// moreThanOne contains nodes on rack with more than one replica
|
||||||
|
moreThanOne.addAll(datanodeList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,17 +57,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
"For more information, please enable DEBUG log level on "
|
"For more information, please enable DEBUG log level on "
|
||||||
+ LOG.getClass().getName();
|
+ LOG.getClass().getName();
|
||||||
|
|
||||||
private boolean considerLoad;
|
protected boolean considerLoad;
|
||||||
private boolean preferLocalNode = true;
|
private boolean preferLocalNode = true;
|
||||||
private NetworkTopology clusterMap;
|
protected NetworkTopology clusterMap;
|
||||||
private FSClusterStats stats;
|
private FSClusterStats stats;
|
||||||
private long heartbeatInterval; // interval for DataNode heartbeats
|
protected long heartbeatInterval; // interval for DataNode heartbeats
|
||||||
private long staleInterval; // interval used to identify stale DataNodes
|
private long staleInterval; // interval used to identify stale DataNodes
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A miss of that many heartbeats is tolerated for replica deletion policy.
|
* A miss of that many heartbeats is tolerated for replica deletion policy.
|
||||||
*/
|
*/
|
||||||
private int tolerateHeartbeatMultiplier;
|
protected int tolerateHeartbeatMultiplier;
|
||||||
|
|
||||||
BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
|
BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
|
||||||
NetworkTopology clusterMap) {
|
NetworkTopology clusterMap) {
|
||||||
|
@ -95,7 +95,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ThreadLocal<StringBuilder> threadLocalBuilder =
|
protected ThreadLocal<StringBuilder> threadLocalBuilder =
|
||||||
new ThreadLocal<StringBuilder>() {
|
new ThreadLocal<StringBuilder>() {
|
||||||
@Override
|
@Override
|
||||||
protected StringBuilder initialValue() {
|
protected StringBuilder initialValue() {
|
||||||
|
@ -319,7 +319,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
* choose a node on the same rack
|
* choose a node on the same rack
|
||||||
* @return the chosen node
|
* @return the chosen node
|
||||||
*/
|
*/
|
||||||
private DatanodeDescriptor chooseLocalNode(
|
protected DatanodeDescriptor chooseLocalNode(
|
||||||
DatanodeDescriptor localMachine,
|
DatanodeDescriptor localMachine,
|
||||||
HashMap<Node, Node> excludedNodes,
|
HashMap<Node, Node> excludedNodes,
|
||||||
long blocksize,
|
long blocksize,
|
||||||
|
@ -354,7 +354,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
* in the cluster.
|
* in the cluster.
|
||||||
* @return the chosen node
|
* @return the chosen node
|
||||||
*/
|
*/
|
||||||
private DatanodeDescriptor chooseLocalRack(
|
protected DatanodeDescriptor chooseLocalRack(
|
||||||
DatanodeDescriptor localMachine,
|
DatanodeDescriptor localMachine,
|
||||||
HashMap<Node, Node> excludedNodes,
|
HashMap<Node, Node> excludedNodes,
|
||||||
long blocksize,
|
long blocksize,
|
||||||
|
@ -406,7 +406,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
* from the local rack
|
* from the local rack
|
||||||
*/
|
*/
|
||||||
|
|
||||||
private void chooseRemoteRack(int numOfReplicas,
|
protected void chooseRemoteRack(int numOfReplicas,
|
||||||
DatanodeDescriptor localMachine,
|
DatanodeDescriptor localMachine,
|
||||||
HashMap<Node, Node> excludedNodes,
|
HashMap<Node, Node> excludedNodes,
|
||||||
long blocksize,
|
long blocksize,
|
||||||
|
@ -430,7 +430,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
/* Randomly choose one target from <i>nodes</i>.
|
/* Randomly choose one target from <i>nodes</i>.
|
||||||
* @return the chosen node
|
* @return the chosen node
|
||||||
*/
|
*/
|
||||||
private DatanodeDescriptor chooseRandom(
|
protected DatanodeDescriptor chooseRandom(
|
||||||
String nodes,
|
String nodes,
|
||||||
HashMap<Node, Node> excludedNodes,
|
HashMap<Node, Node> excludedNodes,
|
||||||
long blocksize,
|
long blocksize,
|
||||||
|
@ -476,7 +476,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
|
|
||||||
/* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
|
/* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
|
||||||
*/
|
*/
|
||||||
private void chooseRandom(int numOfReplicas,
|
protected void chooseRandom(int numOfReplicas,
|
||||||
String nodes,
|
String nodes,
|
||||||
HashMap<Node, Node> excludedNodes,
|
HashMap<Node, Node> excludedNodes,
|
||||||
long blocksize,
|
long blocksize,
|
||||||
|
@ -551,7 +551,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
* does not have too much load,
|
* does not have too much load,
|
||||||
* and the rack does not have too many nodes.
|
* and the rack does not have too many nodes.
|
||||||
*/
|
*/
|
||||||
private boolean isGoodTarget(DatanodeDescriptor node,
|
protected boolean isGoodTarget(DatanodeDescriptor node,
|
||||||
long blockSize, int maxTargetPerRack,
|
long blockSize, int maxTargetPerRack,
|
||||||
boolean considerLoad,
|
boolean considerLoad,
|
||||||
List<DatanodeDescriptor> results,
|
List<DatanodeDescriptor> results,
|
||||||
|
@ -699,8 +699,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
|
|
||||||
// pick replica from the first Set. If first is empty, then pick replicas
|
// pick replica from the first Set. If first is empty, then pick replicas
|
||||||
// from second set.
|
// from second set.
|
||||||
Iterator<DatanodeDescriptor> iter =
|
Iterator<DatanodeDescriptor> iter = pickupReplicaSet(first, second);
|
||||||
first.isEmpty() ? second.iterator() : first.iterator();
|
|
||||||
|
|
||||||
// Pick the node with the oldest heartbeat or with the least free space,
|
// Pick the node with the oldest heartbeat or with the least free space,
|
||||||
// if all hearbeats are within the tolerable heartbeat interval
|
// if all hearbeats are within the tolerable heartbeat interval
|
||||||
|
@ -720,6 +719,20 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
|
return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pick up replica node set for deleting replica as over-replicated.
|
||||||
|
* First set contains replica nodes on rack with more than one
|
||||||
|
* replica while second set contains remaining replica nodes.
|
||||||
|
* So pick up first set if not empty. If first is empty, then pick second.
|
||||||
|
*/
|
||||||
|
protected Iterator<DatanodeDescriptor> pickupReplicaSet(
|
||||||
|
Collection<DatanodeDescriptor> first,
|
||||||
|
Collection<DatanodeDescriptor> second) {
|
||||||
|
Iterator<DatanodeDescriptor> iter =
|
||||||
|
first.isEmpty() ? second.iterator() : first.iterator();
|
||||||
|
return iter;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void setPreferLocalNode(boolean prefer) {
|
void setPreferLocalNode(boolean prefer) {
|
||||||
this.preferLocalNode = prefer;
|
this.preferLocalNode = prefer;
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -952,4 +953,50 @@ public class TestReplicationPolicy {
|
||||||
exception.expect(IllegalArgumentException.class);
|
exception.expect(IllegalArgumentException.class);
|
||||||
blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
|
blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for the chooseReplicaToDelete are processed based on
|
||||||
|
* block locality and free space
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testChooseReplicaToDelete() throws Exception {
|
||||||
|
List<DatanodeDescriptor> replicaNodeList = new
|
||||||
|
ArrayList<DatanodeDescriptor>();
|
||||||
|
final Map<String, List<DatanodeDescriptor>> rackMap
|
||||||
|
= new HashMap<String, List<DatanodeDescriptor>>();
|
||||||
|
|
||||||
|
dataNodes[0].setRemaining(4*1024*1024);
|
||||||
|
replicaNodeList.add(dataNodes[0]);
|
||||||
|
|
||||||
|
dataNodes[1].setRemaining(3*1024*1024);
|
||||||
|
replicaNodeList.add(dataNodes[1]);
|
||||||
|
|
||||||
|
dataNodes[2].setRemaining(2*1024*1024);
|
||||||
|
replicaNodeList.add(dataNodes[2]);
|
||||||
|
|
||||||
|
dataNodes[5].setRemaining(1*1024*1024);
|
||||||
|
replicaNodeList.add(dataNodes[5]);
|
||||||
|
|
||||||
|
List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
|
||||||
|
List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
|
||||||
|
replicator.splitNodesWithRack(
|
||||||
|
replicaNodeList, rackMap, first, second);
|
||||||
|
// dataNodes[0] and dataNodes[1] are in first set as their rack has two
|
||||||
|
// replica nodes, while datanodes[2] and dataNodes[5] are in second set.
|
||||||
|
assertEquals(2, first.size());
|
||||||
|
assertEquals(2, second.size());
|
||||||
|
DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
|
||||||
|
null, null, (short)3, first, second);
|
||||||
|
// Within first set, dataNodes[1] with less free space
|
||||||
|
assertEquals(chosenNode, dataNodes[1]);
|
||||||
|
|
||||||
|
replicator.adjustSetsWithChosenReplica(
|
||||||
|
rackMap, first, second, chosenNode);
|
||||||
|
assertEquals(0, first.size());
|
||||||
|
assertEquals(3, second.size());
|
||||||
|
// Within second set, dataNodes[5] with less free space
|
||||||
|
chosenNode = replicator.chooseReplicaToDelete(
|
||||||
|
null, null, (short)2, first, second);
|
||||||
|
assertEquals(chosenNode, dataNodes[5]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue