svn merge -c 1611731 from trunk for HDFS-6700. BlockPlacementPolicy shoud choose storage but not datanode for deletion.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1611734 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
325a7d5d94
commit
9462e9e0e9
|
@ -49,6 +49,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-6597. Add a new option to NN upgrade to terminate the process after
|
HDFS-6597. Add a new option to NN upgrade to terminate the process after
|
||||||
upgrade on NN is completed. (Danilo Vunjak via cnauroth)
|
upgrade on NN is completed. (Danilo Vunjak via cnauroth)
|
||||||
|
|
||||||
|
HDFS-6700. BlockPlacementPolicy shoud choose storage but not datanode for
|
||||||
|
deletion. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||||
|
|
|
@ -725,7 +725,6 @@ public class BlockManager {
|
||||||
final List<DatanodeStorageInfo> locations
|
final List<DatanodeStorageInfo> locations
|
||||||
= new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
|
= new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
|
||||||
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
||||||
final String storageID = storage.getStorageID();
|
|
||||||
// filter invalidate replicas
|
// filter invalidate replicas
|
||||||
if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
|
if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
|
||||||
locations.add(storage);
|
locations.add(storage);
|
||||||
|
@ -2640,7 +2639,7 @@ public class BlockManager {
|
||||||
if (addedNode == delNodeHint) {
|
if (addedNode == delNodeHint) {
|
||||||
delNodeHint = null;
|
delNodeHint = null;
|
||||||
}
|
}
|
||||||
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
|
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
|
||||||
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
|
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
|
||||||
.getNodes(block);
|
.getNodes(block);
|
||||||
for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
|
for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
|
||||||
|
@ -2660,7 +2659,7 @@ public class BlockManager {
|
||||||
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
||||||
// exclude corrupt replicas
|
// exclude corrupt replicas
|
||||||
if (corruptNodes == null || !corruptNodes.contains(cur)) {
|
if (corruptNodes == null || !corruptNodes.contains(cur)) {
|
||||||
nonExcess.add(cur);
|
nonExcess.add(storage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2684,7 +2683,7 @@ public class BlockManager {
|
||||||
* If no such a node is available,
|
* If no such a node is available,
|
||||||
* then pick a node with least free space
|
* then pick a node with least free space
|
||||||
*/
|
*/
|
||||||
private void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess,
|
private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess,
|
||||||
Block b, short replication,
|
Block b, short replication,
|
||||||
DatanodeDescriptor addedNode,
|
DatanodeDescriptor addedNode,
|
||||||
DatanodeDescriptor delNodeHint,
|
DatanodeDescriptor delNodeHint,
|
||||||
|
@ -2692,28 +2691,33 @@ public class BlockManager {
|
||||||
assert namesystem.hasWriteLock();
|
assert namesystem.hasWriteLock();
|
||||||
// first form a rack to datanodes map and
|
// first form a rack to datanodes map and
|
||||||
BlockCollection bc = getBlockCollection(b);
|
BlockCollection bc = getBlockCollection(b);
|
||||||
final Map<String, List<DatanodeDescriptor>> rackMap
|
|
||||||
= new HashMap<String, List<DatanodeDescriptor>>();
|
final Map<String, List<DatanodeStorageInfo>> rackMap
|
||||||
final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
|
= new HashMap<String, List<DatanodeStorageInfo>>();
|
||||||
final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
|
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
|
||||||
|
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
|
||||||
|
|
||||||
// split nodes into two sets
|
// split nodes into two sets
|
||||||
// moreThanOne contains nodes on rack with more than one replica
|
// moreThanOne contains nodes on rack with more than one replica
|
||||||
// exactlyOne contains the remaining nodes
|
// exactlyOne contains the remaining nodes
|
||||||
replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne,
|
replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
|
||||||
exactlyOne);
|
|
||||||
|
|
||||||
// pick one node to delete that favors the delete hint
|
// pick one node to delete that favors the delete hint
|
||||||
// otherwise pick one with least space from priSet if it is not empty
|
// otherwise pick one with least space from priSet if it is not empty
|
||||||
// otherwise one node with least space from remains
|
// otherwise one node with least space from remains
|
||||||
boolean firstOne = true;
|
boolean firstOne = true;
|
||||||
|
final DatanodeStorageInfo delNodeHintStorage
|
||||||
|
= DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
|
||||||
|
final DatanodeStorageInfo addedNodeStorage
|
||||||
|
= DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
|
||||||
while (nonExcess.size() - replication > 0) {
|
while (nonExcess.size() - replication > 0) {
|
||||||
// check if we can delete delNodeHint
|
// check if we can delete delNodeHint
|
||||||
final DatanodeInfo cur;
|
final DatanodeStorageInfo cur;
|
||||||
if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
|
if (firstOne && delNodeHintStorage != null
|
||||||
&& (moreThanOne.contains(delNodeHint)
|
&& (moreThanOne.contains(delNodeHintStorage)
|
||||||
|| (addedNode != null && !moreThanOne.contains(addedNode))) ) {
|
|| (addedNodeStorage != null
|
||||||
cur = delNodeHint;
|
&& !moreThanOne.contains(addedNodeStorage)))) {
|
||||||
|
cur = delNodeHintStorage;
|
||||||
} else { // regular excessive replica removal
|
} else { // regular excessive replica removal
|
||||||
cur = replicator.chooseReplicaToDelete(bc, b, replication,
|
cur = replicator.chooseReplicaToDelete(bc, b, replication,
|
||||||
moreThanOne, exactlyOne);
|
moreThanOne, exactlyOne);
|
||||||
|
@ -2725,7 +2729,7 @@ public class BlockManager {
|
||||||
exactlyOne, cur);
|
exactlyOne, cur);
|
||||||
|
|
||||||
nonExcess.remove(cur);
|
nonExcess.remove(cur);
|
||||||
addToExcessReplicate(cur, b);
|
addToExcessReplicate(cur.getDatanodeDescriptor(), b);
|
||||||
|
|
||||||
//
|
//
|
||||||
// The 'excessblocks' tracks blocks until we get confirmation
|
// The 'excessblocks' tracks blocks until we get confirmation
|
||||||
|
@ -2736,7 +2740,7 @@ public class BlockManager {
|
||||||
// should be deleted. Items are removed from the invalidate list
|
// should be deleted. Items are removed from the invalidate list
|
||||||
// upon giving instructions to the namenode.
|
// upon giving instructions to the namenode.
|
||||||
//
|
//
|
||||||
addToInvalidates(b, cur);
|
addToInvalidates(b, cur.getDatanodeDescriptor());
|
||||||
blockLog.info("BLOCK* chooseExcessReplicates: "
|
blockLog.info("BLOCK* chooseExcessReplicates: "
|
||||||
+"("+cur+", "+b+") is added to invalidated blocks set");
|
+"("+cur+", "+b+") is added to invalidated blocks set");
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,11 +124,12 @@ public abstract class BlockPlacementPolicy {
|
||||||
listed in the previous parameter.
|
listed in the previous parameter.
|
||||||
* @return the replica that is the best candidate for deletion
|
* @return the replica that is the best candidate for deletion
|
||||||
*/
|
*/
|
||||||
abstract public DatanodeDescriptor chooseReplicaToDelete(BlockCollection srcBC,
|
abstract public DatanodeStorageInfo chooseReplicaToDelete(
|
||||||
Block block,
|
BlockCollection srcBC,
|
||||||
short replicationFactor,
|
Block block,
|
||||||
Collection<DatanodeDescriptor> existingReplicas,
|
short replicationFactor,
|
||||||
Collection<DatanodeDescriptor> moreExistingReplicas);
|
Collection<DatanodeStorageInfo> existingReplicas,
|
||||||
|
Collection<DatanodeStorageInfo> moreExistingReplicas);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to setup a BlockPlacementPolicy object. This should be defined by
|
* Used to setup a BlockPlacementPolicy object. This should be defined by
|
||||||
|
@ -175,21 +176,23 @@ public abstract class BlockPlacementPolicy {
|
||||||
* @param exactlyOne The List of replica nodes on rack with only one replica
|
* @param exactlyOne The List of replica nodes on rack with only one replica
|
||||||
* @param cur current replica to remove
|
* @param cur current replica to remove
|
||||||
*/
|
*/
|
||||||
public void adjustSetsWithChosenReplica(final Map<String,
|
public void adjustSetsWithChosenReplica(
|
||||||
List<DatanodeDescriptor>> rackMap,
|
final Map<String, List<DatanodeStorageInfo>> rackMap,
|
||||||
final List<DatanodeDescriptor> moreThanOne,
|
final List<DatanodeStorageInfo> moreThanOne,
|
||||||
final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) {
|
final List<DatanodeStorageInfo> exactlyOne,
|
||||||
|
final DatanodeStorageInfo cur) {
|
||||||
|
|
||||||
String rack = getRack(cur);
|
final String rack = getRack(cur.getDatanodeDescriptor());
|
||||||
final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
|
final List<DatanodeStorageInfo> storages = rackMap.get(rack);
|
||||||
datanodes.remove(cur);
|
storages.remove(cur);
|
||||||
if (datanodes.isEmpty()) {
|
if (storages.isEmpty()) {
|
||||||
rackMap.remove(rack);
|
rackMap.remove(rack);
|
||||||
}
|
}
|
||||||
if (moreThanOne.remove(cur)) {
|
if (moreThanOne.remove(cur)) {
|
||||||
if (datanodes.size() == 1) {
|
if (storages.size() == 1) {
|
||||||
moreThanOne.remove(datanodes.get(0));
|
final DatanodeStorageInfo remaining = storages.get(0);
|
||||||
exactlyOne.add(datanodes.get(0));
|
moreThanOne.remove(remaining);
|
||||||
|
exactlyOne.add(remaining);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
exactlyOne.remove(cur);
|
exactlyOne.remove(cur);
|
||||||
|
@ -214,28 +217,28 @@ public abstract class BlockPlacementPolicy {
|
||||||
* @param exactlyOne remains contains the remaining nodes
|
* @param exactlyOne remains contains the remaining nodes
|
||||||
*/
|
*/
|
||||||
public void splitNodesWithRack(
|
public void splitNodesWithRack(
|
||||||
Collection<DatanodeDescriptor> dataNodes,
|
final Iterable<DatanodeStorageInfo> storages,
|
||||||
final Map<String, List<DatanodeDescriptor>> rackMap,
|
final Map<String, List<DatanodeStorageInfo>> rackMap,
|
||||||
final List<DatanodeDescriptor> moreThanOne,
|
final List<DatanodeStorageInfo> moreThanOne,
|
||||||
final List<DatanodeDescriptor> exactlyOne) {
|
final List<DatanodeStorageInfo> exactlyOne) {
|
||||||
for(DatanodeDescriptor node : dataNodes) {
|
for(DatanodeStorageInfo s: storages) {
|
||||||
final String rackName = getRack(node);
|
final String rackName = getRack(s.getDatanodeDescriptor());
|
||||||
List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
|
List<DatanodeStorageInfo> storageList = rackMap.get(rackName);
|
||||||
if (datanodeList == null) {
|
if (storageList == null) {
|
||||||
datanodeList = new ArrayList<DatanodeDescriptor>();
|
storageList = new ArrayList<DatanodeStorageInfo>();
|
||||||
rackMap.put(rackName, datanodeList);
|
rackMap.put(rackName, storageList);
|
||||||
}
|
}
|
||||||
datanodeList.add(node);
|
storageList.add(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
// split nodes into two sets
|
// split nodes into two sets
|
||||||
for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
|
for(List<DatanodeStorageInfo> storageList : rackMap.values()) {
|
||||||
if (datanodeList.size() == 1) {
|
if (storageList.size() == 1) {
|
||||||
// exactlyOne contains nodes on rack with only one replica
|
// exactlyOne contains nodes on rack with only one replica
|
||||||
exactlyOne.add(datanodeList.get(0));
|
exactlyOne.add(storageList.get(0));
|
||||||
} else {
|
} else {
|
||||||
// moreThanOne contains nodes on rack with more than one replica
|
// moreThanOne contains nodes on rack with more than one replica
|
||||||
moreThanOne.addAll(datanodeList);
|
moreThanOne.addAll(storageList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -727,31 +727,34 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc,
|
public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc,
|
||||||
Block block, short replicationFactor,
|
Block block, short replicationFactor,
|
||||||
Collection<DatanodeDescriptor> first,
|
Collection<DatanodeStorageInfo> first,
|
||||||
Collection<DatanodeDescriptor> second) {
|
Collection<DatanodeStorageInfo> second) {
|
||||||
long oldestHeartbeat =
|
long oldestHeartbeat =
|
||||||
now() - heartbeatInterval * tolerateHeartbeatMultiplier;
|
now() - heartbeatInterval * tolerateHeartbeatMultiplier;
|
||||||
DatanodeDescriptor oldestHeartbeatNode = null;
|
DatanodeStorageInfo oldestHeartbeatStorage = null;
|
||||||
long minSpace = Long.MAX_VALUE;
|
long minSpace = Long.MAX_VALUE;
|
||||||
DatanodeDescriptor minSpaceNode = null;
|
DatanodeStorageInfo minSpaceStorage = null;
|
||||||
|
|
||||||
// Pick the node with the oldest heartbeat or with the least free space,
|
// Pick the node with the oldest heartbeat or with the least free space,
|
||||||
// if all hearbeats are within the tolerable heartbeat interval
|
// if all hearbeats are within the tolerable heartbeat interval
|
||||||
for(DatanodeDescriptor node : pickupReplicaSet(first, second)) {
|
for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
|
||||||
|
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
||||||
long free = node.getRemaining();
|
long free = node.getRemaining();
|
||||||
long lastHeartbeat = node.getLastUpdate();
|
long lastHeartbeat = node.getLastUpdate();
|
||||||
if(lastHeartbeat < oldestHeartbeat) {
|
if(lastHeartbeat < oldestHeartbeat) {
|
||||||
oldestHeartbeat = lastHeartbeat;
|
oldestHeartbeat = lastHeartbeat;
|
||||||
oldestHeartbeatNode = node;
|
oldestHeartbeatStorage = storage;
|
||||||
}
|
}
|
||||||
if (minSpace > free) {
|
if (minSpace > free) {
|
||||||
minSpace = free;
|
minSpace = free;
|
||||||
minSpaceNode = node;
|
minSpaceStorage = storage;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
|
|
||||||
|
return oldestHeartbeatStorage != null? oldestHeartbeatStorage
|
||||||
|
: minSpaceStorage;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -760,9 +763,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
* replica while second set contains remaining replica nodes.
|
* replica while second set contains remaining replica nodes.
|
||||||
* So pick up first set if not empty. If first is empty, then pick second.
|
* So pick up first set if not empty. If first is empty, then pick second.
|
||||||
*/
|
*/
|
||||||
protected Collection<DatanodeDescriptor> pickupReplicaSet(
|
protected Collection<DatanodeStorageInfo> pickupReplicaSet(
|
||||||
Collection<DatanodeDescriptor> first,
|
Collection<DatanodeStorageInfo> first,
|
||||||
Collection<DatanodeDescriptor> second) {
|
Collection<DatanodeStorageInfo> second) {
|
||||||
return first.isEmpty() ? second : first;
|
return first.isEmpty() ? second : first;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -288,9 +288,9 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
|
||||||
* If first is empty, then pick second.
|
* If first is empty, then pick second.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Collection<DatanodeDescriptor> pickupReplicaSet(
|
public Collection<DatanodeStorageInfo> pickupReplicaSet(
|
||||||
Collection<DatanodeDescriptor> first,
|
Collection<DatanodeStorageInfo> first,
|
||||||
Collection<DatanodeDescriptor> second) {
|
Collection<DatanodeStorageInfo> second) {
|
||||||
// If no replica within same rack, return directly.
|
// If no replica within same rack, return directly.
|
||||||
if (first.isEmpty()) {
|
if (first.isEmpty()) {
|
||||||
return second;
|
return second;
|
||||||
|
@ -298,25 +298,24 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
|
||||||
// Split data nodes in the first set into two sets,
|
// Split data nodes in the first set into two sets,
|
||||||
// moreThanOne contains nodes on nodegroup with more than one replica
|
// moreThanOne contains nodes on nodegroup with more than one replica
|
||||||
// exactlyOne contains the remaining nodes
|
// exactlyOne contains the remaining nodes
|
||||||
Map<String, List<DatanodeDescriptor>> nodeGroupMap =
|
Map<String, List<DatanodeStorageInfo>> nodeGroupMap =
|
||||||
new HashMap<String, List<DatanodeDescriptor>>();
|
new HashMap<String, List<DatanodeStorageInfo>>();
|
||||||
|
|
||||||
for(DatanodeDescriptor node : first) {
|
for(DatanodeStorageInfo storage : first) {
|
||||||
final String nodeGroupName =
|
final String nodeGroupName = NetworkTopology.getLastHalf(
|
||||||
NetworkTopology.getLastHalf(node.getNetworkLocation());
|
storage.getDatanodeDescriptor().getNetworkLocation());
|
||||||
List<DatanodeDescriptor> datanodeList =
|
List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName);
|
||||||
nodeGroupMap.get(nodeGroupName);
|
if (storageList == null) {
|
||||||
if (datanodeList == null) {
|
storageList = new ArrayList<DatanodeStorageInfo>();
|
||||||
datanodeList = new ArrayList<DatanodeDescriptor>();
|
nodeGroupMap.put(nodeGroupName, storageList);
|
||||||
nodeGroupMap.put(nodeGroupName, datanodeList);
|
|
||||||
}
|
}
|
||||||
datanodeList.add(node);
|
storageList.add(storage);
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
|
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
|
||||||
final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
|
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
|
||||||
// split nodes into two sets
|
// split nodes into two sets
|
||||||
for(List<DatanodeDescriptor> datanodeList : nodeGroupMap.values()) {
|
for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) {
|
||||||
if (datanodeList.size() == 1 ) {
|
if (datanodeList.size() == 1 ) {
|
||||||
// exactlyOne contains nodes on nodegroup with exactly one replica
|
// exactlyOne contains nodes on nodegroup with exactly one replica
|
||||||
exactlyOne.add(datanodeList.get(0));
|
exactlyOne.add(datanodeList.get(0));
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.StorageType;
|
import org.apache.hadoop.hdfs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
|
@ -290,4 +291,21 @@ public class DatanodeStorageInfo {
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "[" + storageType + "]" + storageID + ":" + state;
|
return "[" + storageType + "]" + storageID + ":" + state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @return the first {@link DatanodeStorageInfo} corresponding to
|
||||||
|
* the given datanode
|
||||||
|
*/
|
||||||
|
static DatanodeStorageInfo getDatanodeStorageInfo(
|
||||||
|
final Iterable<DatanodeStorageInfo> infos,
|
||||||
|
final DatanodeDescriptor datanode) {
|
||||||
|
if (datanode == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
for(DatanodeStorageInfo storage : infos) {
|
||||||
|
if (storage.getDatanodeDescriptor() == datanode) {
|
||||||
|
return storage;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -893,6 +893,54 @@ public class TestReplicationPolicy {
|
||||||
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size());
|
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for the chooseReplicaToDelete are processed based on
|
||||||
|
* block locality and free space
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testChooseReplicaToDelete() throws Exception {
|
||||||
|
List<DatanodeStorageInfo> replicaList = new ArrayList<DatanodeStorageInfo>();
|
||||||
|
final Map<String, List<DatanodeStorageInfo>> rackMap
|
||||||
|
= new HashMap<String, List<DatanodeStorageInfo>>();
|
||||||
|
|
||||||
|
dataNodes[0].setRemaining(4*1024*1024);
|
||||||
|
replicaList.add(storages[0]);
|
||||||
|
|
||||||
|
dataNodes[1].setRemaining(3*1024*1024);
|
||||||
|
replicaList.add(storages[1]);
|
||||||
|
|
||||||
|
dataNodes[2].setRemaining(2*1024*1024);
|
||||||
|
replicaList.add(storages[2]);
|
||||||
|
|
||||||
|
dataNodes[5].setRemaining(1*1024*1024);
|
||||||
|
replicaList.add(storages[5]);
|
||||||
|
|
||||||
|
// Refresh the last update time for all the datanodes
|
||||||
|
for (int i = 0; i < dataNodes.length; i++) {
|
||||||
|
dataNodes[i].setLastUpdate(Time.now());
|
||||||
|
}
|
||||||
|
|
||||||
|
List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>();
|
||||||
|
List<DatanodeStorageInfo> second = new ArrayList<DatanodeStorageInfo>();
|
||||||
|
replicator.splitNodesWithRack(replicaList, rackMap, first, second);
|
||||||
|
// storages[0] and storages[1] are in first set as their rack has two
|
||||||
|
// replica nodes, while storages[2] and dataNodes[5] are in second set.
|
||||||
|
assertEquals(2, first.size());
|
||||||
|
assertEquals(2, second.size());
|
||||||
|
DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
|
||||||
|
null, null, (short)3, first, second);
|
||||||
|
// Within first set, storages[1] with less free space
|
||||||
|
assertEquals(chosen, storages[1]);
|
||||||
|
|
||||||
|
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
|
||||||
|
assertEquals(0, first.size());
|
||||||
|
assertEquals(3, second.size());
|
||||||
|
// Within second set, storages[5] with less free space
|
||||||
|
chosen = replicator.chooseReplicaToDelete(
|
||||||
|
null, null, (short)2, first, second);
|
||||||
|
assertEquals(chosen, storages[5]);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This testcase tests whether the default value returned by
|
* This testcase tests whether the default value returned by
|
||||||
* DFSUtil.getInvalidateWorkPctPerIteration() is positive,
|
* DFSUtil.getInvalidateWorkPctPerIteration() is positive,
|
||||||
|
@ -980,50 +1028,4 @@ public class TestReplicationPolicy {
|
||||||
exception.expect(IllegalArgumentException.class);
|
exception.expect(IllegalArgumentException.class);
|
||||||
blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
|
blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Test for the chooseReplicaToDelete are processed based on
|
|
||||||
* block locality and free space
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testChooseReplicaToDelete() throws Exception {
|
|
||||||
List<DatanodeDescriptor> replicaNodeList = new
|
|
||||||
ArrayList<DatanodeDescriptor>();
|
|
||||||
final Map<String, List<DatanodeDescriptor>> rackMap
|
|
||||||
= new HashMap<String, List<DatanodeDescriptor>>();
|
|
||||||
|
|
||||||
dataNodes[0].setRemaining(4*1024*1024);
|
|
||||||
replicaNodeList.add(dataNodes[0]);
|
|
||||||
|
|
||||||
dataNodes[1].setRemaining(3*1024*1024);
|
|
||||||
replicaNodeList.add(dataNodes[1]);
|
|
||||||
|
|
||||||
dataNodes[2].setRemaining(2*1024*1024);
|
|
||||||
replicaNodeList.add(dataNodes[2]);
|
|
||||||
|
|
||||||
dataNodes[5].setRemaining(1*1024*1024);
|
|
||||||
replicaNodeList.add(dataNodes[5]);
|
|
||||||
|
|
||||||
List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
|
|
||||||
List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
|
|
||||||
replicator.splitNodesWithRack(
|
|
||||||
replicaNodeList, rackMap, first, second);
|
|
||||||
// dataNodes[0] and dataNodes[1] are in first set as their rack has two
|
|
||||||
// replica nodes, while datanodes[2] and dataNodes[5] are in second set.
|
|
||||||
assertEquals(2, first.size());
|
|
||||||
assertEquals(2, second.size());
|
|
||||||
DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
|
|
||||||
null, null, (short)3, first, second);
|
|
||||||
// Within first set, dataNodes[1] with less free space
|
|
||||||
assertEquals(chosenNode, dataNodes[1]);
|
|
||||||
|
|
||||||
replicator.adjustSetsWithChosenReplica(
|
|
||||||
rackMap, first, second, chosenNode);
|
|
||||||
assertEquals(0, first.size());
|
|
||||||
assertEquals(3, second.size());
|
|
||||||
// Within second set, dataNodes[5] with less free space
|
|
||||||
chosenNode = replicator.chooseReplicaToDelete(
|
|
||||||
null, null, (short)2, first, second);
|
|
||||||
assertEquals(chosenNode, dataNodes[5]);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -591,51 +591,50 @@ public class TestReplicationPolicyWithNodeGroup {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testChooseReplicaToDelete() throws Exception {
|
public void testChooseReplicaToDelete() throws Exception {
|
||||||
List<DatanodeDescriptor> replicaNodeList =
|
List<DatanodeStorageInfo> replicaList = new ArrayList<DatanodeStorageInfo>();
|
||||||
new ArrayList<DatanodeDescriptor>();
|
final Map<String, List<DatanodeStorageInfo>> rackMap
|
||||||
final Map<String, List<DatanodeDescriptor>> rackMap =
|
= new HashMap<String, List<DatanodeStorageInfo>>();
|
||||||
new HashMap<String, List<DatanodeDescriptor>>();
|
|
||||||
dataNodes[0].setRemaining(4*1024*1024);
|
dataNodes[0].setRemaining(4*1024*1024);
|
||||||
replicaNodeList.add(dataNodes[0]);
|
replicaList.add(storages[0]);
|
||||||
|
|
||||||
dataNodes[1].setRemaining(3*1024*1024);
|
dataNodes[1].setRemaining(3*1024*1024);
|
||||||
replicaNodeList.add(dataNodes[1]);
|
replicaList.add(storages[1]);
|
||||||
|
|
||||||
dataNodes[2].setRemaining(2*1024*1024);
|
dataNodes[2].setRemaining(2*1024*1024);
|
||||||
replicaNodeList.add(dataNodes[2]);
|
replicaList.add(storages[2]);
|
||||||
|
|
||||||
dataNodes[5].setRemaining(1*1024*1024);
|
dataNodes[5].setRemaining(1*1024*1024);
|
||||||
replicaNodeList.add(dataNodes[5]);
|
replicaList.add(storages[5]);
|
||||||
|
|
||||||
List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
|
List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>();
|
||||||
List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
|
List<DatanodeStorageInfo> second = new ArrayList<DatanodeStorageInfo>();
|
||||||
replicator.splitNodesWithRack(
|
replicator.splitNodesWithRack(
|
||||||
replicaNodeList, rackMap, first, second);
|
replicaList, rackMap, first, second);
|
||||||
assertEquals(3, first.size());
|
assertEquals(3, first.size());
|
||||||
assertEquals(1, second.size());
|
assertEquals(1, second.size());
|
||||||
DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
|
DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
|
||||||
null, null, (short)3, first, second);
|
null, null, (short)3, first, second);
|
||||||
// Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
|
// Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
|
||||||
// dataNodes[0] and dataNodes[1] are in the same nodegroup,
|
// dataNodes[0] and dataNodes[1] are in the same nodegroup,
|
||||||
// but dataNodes[1] is chosen as less free space
|
// but dataNodes[1] is chosen as less free space
|
||||||
assertEquals(chosenNode, dataNodes[1]);
|
assertEquals(chosen, storages[1]);
|
||||||
|
|
||||||
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
|
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
|
||||||
assertEquals(2, first.size());
|
assertEquals(2, first.size());
|
||||||
assertEquals(1, second.size());
|
assertEquals(1, second.size());
|
||||||
// Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
|
// Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
|
||||||
// as less free space
|
// as less free space
|
||||||
chosenNode = replicator.chooseReplicaToDelete(
|
chosen = replicator.chooseReplicaToDelete(
|
||||||
null, null, (short)2, first, second);
|
null, null, (short)2, first, second);
|
||||||
assertEquals(chosenNode, dataNodes[2]);
|
assertEquals(chosen, storages[2]);
|
||||||
|
|
||||||
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
|
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
|
||||||
assertEquals(0, first.size());
|
assertEquals(0, first.size());
|
||||||
assertEquals(2, second.size());
|
assertEquals(2, second.size());
|
||||||
// Within second set, dataNodes[5] with less free space
|
// Within second set, dataNodes[5] with less free space
|
||||||
chosenNode = replicator.chooseReplicaToDelete(
|
chosen = replicator.chooseReplicaToDelete(
|
||||||
null, null, (short)1, first, second);
|
null, null, (short)1, first, second);
|
||||||
assertEquals(chosenNode, dataNodes[5]);
|
assertEquals(chosen, storages[5]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
@ -585,15 +585,14 @@ public class TestDNFencing {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatanodeDescriptor chooseReplicaToDelete(BlockCollection inode,
|
public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection inode,
|
||||||
Block block, short replicationFactor,
|
Block block, short replicationFactor,
|
||||||
Collection<DatanodeDescriptor> first,
|
Collection<DatanodeStorageInfo> first,
|
||||||
Collection<DatanodeDescriptor> second) {
|
Collection<DatanodeStorageInfo> second) {
|
||||||
|
|
||||||
Collection<DatanodeDescriptor> chooseFrom =
|
Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second;
|
||||||
!first.isEmpty() ? first : second;
|
|
||||||
|
|
||||||
List<DatanodeDescriptor> l = Lists.newArrayList(chooseFrom);
|
List<DatanodeStorageInfo> l = Lists.newArrayList(chooseFrom);
|
||||||
return l.get(DFSUtil.getRandom().nextInt(l.size()));
|
return l.get(DFSUtil.getRandom().nextInt(l.size()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue