HDFS-9007. Fix HDFS Balancer to honor upgrade domain policy. (Ming Ma via lei)
This commit is contained in:
parent
c8ffea3db7
commit
3f2be7b18f
|
@ -771,6 +771,8 @@ Release 2.8.0 - UNRELEASED
|
|||
|
||||
HDFS-9339. Extend full test of KMS ACLs. (Daniel Templeton via zhz)
|
||||
|
||||
HDFS-9007. Fix HDFS Balancer to honor upgrade domain policy. (Ming Ma via lei)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||
|
@ -124,6 +125,7 @@ public class Dispatcher {
|
|||
private final int ioFileBufferSize;
|
||||
|
||||
private final boolean connectToDnViaHostname;
|
||||
private BlockPlacementPolicy placementPolicy;
|
||||
|
||||
static class Allocator {
|
||||
private final int max;
|
||||
|
@ -888,6 +890,8 @@ public class Dispatcher {
|
|||
this.connectToDnViaHostname = conf.getBoolean(
|
||||
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
|
||||
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
||||
this.placementPolicy =
|
||||
BlockPlacementPolicy.getInstance(conf, null, cluster, null);
|
||||
}
|
||||
|
||||
public DistributedFileSystem getDistributedFileSystem() {
|
||||
|
@ -1106,66 +1110,24 @@ public class Dispatcher {
|
|||
}
|
||||
}
|
||||
|
||||
if (cluster.isNodeGroupAware()
|
||||
&& isOnSameNodeGroupWithReplicas(source, target, block)) {
|
||||
return false;
|
||||
}
|
||||
if (reduceNumOfRacks(source, target, block)) {
|
||||
if (!isGoodBlockCandidateForPlacementPolicy(source, target, block)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether moving the given block replica from source to target
|
||||
* would reduce the number of racks of the block replicas.
|
||||
*/
|
||||
private boolean reduceNumOfRacks(StorageGroup source, StorageGroup target,
|
||||
DBlock block) {
|
||||
final DatanodeInfo sourceDn = source.getDatanodeInfo();
|
||||
if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) {
|
||||
// source and target are on the same rack
|
||||
return false;
|
||||
}
|
||||
boolean notOnSameRack = true;
|
||||
// Check if the move will violate the block placement policy.
|
||||
private boolean isGoodBlockCandidateForPlacementPolicy(StorageGroup source,
|
||||
StorageGroup target, DBlock block) {
|
||||
List<DatanodeInfo> datanodeInfos = new ArrayList<>();
|
||||
synchronized (block) {
|
||||
for (StorageGroup loc : block.getLocations()) {
|
||||
if (cluster.isOnSameRack(loc.getDatanodeInfo(), target.getDatanodeInfo())) {
|
||||
notOnSameRack = false;
|
||||
break;
|
||||
}
|
||||
for (StorageGroup loc : block.locations) {
|
||||
datanodeInfos.add(loc.getDatanodeInfo());
|
||||
}
|
||||
datanodeInfos.add(target.getDatanodeInfo());
|
||||
}
|
||||
if (notOnSameRack) {
|
||||
// target is not on the same rack as any replica
|
||||
return false;
|
||||
}
|
||||
for (StorageGroup g : block.getLocations()) {
|
||||
if (g != source && cluster.isOnSameRack(g.getDatanodeInfo(), sourceDn)) {
|
||||
// source is on the same rack of another replica
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if there are any replica (other than source) on the same node group
|
||||
* with target. If true, then target is not a good candidate for placing
|
||||
* specific replica as we don't want 2 replicas under the same nodegroup.
|
||||
*
|
||||
* @return true if there are any replica (other than source) on the same node
|
||||
* group with target
|
||||
*/
|
||||
private boolean isOnSameNodeGroupWithReplicas(StorageGroup source,
|
||||
StorageGroup target, DBlock block) {
|
||||
final DatanodeInfo targetDn = target.getDatanodeInfo();
|
||||
for (StorageGroup g : block.getLocations()) {
|
||||
if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), targetDn)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return placementPolicy.isMovable(
|
||||
datanodeInfos, source.getDatanodeInfo(), target.getDatanodeInfo());
|
||||
}
|
||||
|
||||
/** Reset all fields in order to prepare for the next iteration */
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
|
@ -166,6 +167,17 @@ public abstract class BlockPlacementPolicy {
|
|||
return replicator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the move is allowed. Used by balancer and other tools.
|
||||
* @
|
||||
*
|
||||
* @param candidates all replicas including source and target
|
||||
* @param source source replica of the move
|
||||
* @param target target replica of the move
|
||||
*/
|
||||
abstract public boolean isMovable(Collection<DatanodeInfo> candidates,
|
||||
DatanodeInfo source, DatanodeInfo target);
|
||||
|
||||
/**
|
||||
* Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
|
||||
*
|
||||
|
@ -198,6 +210,20 @@ public abstract class BlockPlacementPolicy {
|
|||
}
|
||||
}
|
||||
|
||||
protected <T> DatanodeInfo getDatanodeInfo(T datanode) {
|
||||
Preconditions.checkArgument(
|
||||
datanode instanceof DatanodeInfo ||
|
||||
datanode instanceof DatanodeStorageInfo,
|
||||
"class " + datanode.getClass().getName() + " not allowed");
|
||||
if (datanode instanceof DatanodeInfo) {
|
||||
return ((DatanodeInfo)datanode);
|
||||
} else if (datanode instanceof DatanodeStorageInfo) {
|
||||
return ((DatanodeStorageInfo)datanode).getDatanodeDescriptor();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get rack string from a data node
|
||||
* @return rack of data node
|
||||
|
@ -205,33 +231,33 @@ public abstract class BlockPlacementPolicy {
|
|||
protected String getRack(final DatanodeInfo datanode) {
|
||||
return datanode.getNetworkLocation();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Split data nodes into two sets, one set includes nodes on rack with
|
||||
* more than one replica, the other set contains the remaining nodes.
|
||||
*
|
||||
* @param dataNodes datanodes to be split into two sets
|
||||
* @param storagesOrDataNodes DatanodeStorageInfo/DatanodeInfo to be split
|
||||
* into two sets
|
||||
* @param rackMap a map from rack to datanodes
|
||||
* @param moreThanOne contains nodes on rack with more than one replica
|
||||
* @param exactlyOne remains contains the remaining nodes
|
||||
*/
|
||||
public void splitNodesWithRack(
|
||||
final Iterable<DatanodeStorageInfo> storages,
|
||||
final Map<String, List<DatanodeStorageInfo>> rackMap,
|
||||
final List<DatanodeStorageInfo> moreThanOne,
|
||||
final List<DatanodeStorageInfo> exactlyOne) {
|
||||
for(DatanodeStorageInfo s: storages) {
|
||||
final String rackName = getRack(s.getDatanodeDescriptor());
|
||||
List<DatanodeStorageInfo> storageList = rackMap.get(rackName);
|
||||
public <T> void splitNodesWithRack(
|
||||
final Iterable<T> storagesOrDataNodes,
|
||||
final Map<String, List<T>> rackMap,
|
||||
final List<T> moreThanOne,
|
||||
final List<T> exactlyOne) {
|
||||
for(T s: storagesOrDataNodes) {
|
||||
final String rackName = getRack(getDatanodeInfo(s));
|
||||
List<T> storageList = rackMap.get(rackName);
|
||||
if (storageList == null) {
|
||||
storageList = new ArrayList<DatanodeStorageInfo>();
|
||||
storageList = new ArrayList<T>();
|
||||
rackMap.put(rackName, storageList);
|
||||
}
|
||||
storageList.add(s);
|
||||
}
|
||||
|
||||
// split nodes into two sets
|
||||
for(List<DatanodeStorageInfo> storageList : rackMap.values()) {
|
||||
for(List<T> storageList : rackMap.values()) {
|
||||
if (storageList.size() == 1) {
|
||||
// exactlyOne contains nodes on rack with only one replica
|
||||
exactlyOne.add(storageList.get(0));
|
||||
|
@ -241,5 +267,4 @@ public abstract class BlockPlacementPolicy {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -881,7 +881,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
minRacks = Math.min(minRacks, numberOfReplicas);
|
||||
// 1. Check that all locations are different.
|
||||
// 2. Count locations on different racks.
|
||||
Set<String> racks = new TreeSet<String>();
|
||||
Set<String> racks = new TreeSet<>();
|
||||
for (DatanodeInfo dn : locs)
|
||||
racks.add(dn.getNetworkLocation());
|
||||
return new BlockPlacementStatusDefault(racks.size(), minRacks);
|
||||
|
@ -889,8 +889,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
/**
|
||||
* Decide whether deleting the specified replica of the block still makes
|
||||
* the block conform to the configured block placement policy.
|
||||
* @param replicationFactor The required number of replicas for this block
|
||||
* @param moreThanone The replica locations of this block that are present
|
||||
* @param moreThanOne The replica locations of this block that are present
|
||||
* on more than one unique racks.
|
||||
* @param exactlyOne Replica locations of this block that are present
|
||||
* on exactly one unique racks.
|
||||
|
@ -900,8 +899,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
* @return the replica that is the best candidate for deletion
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor,
|
||||
Collection<DatanodeStorageInfo> moreThanone, Collection<DatanodeStorageInfo> exactlyOne,
|
||||
public DatanodeStorageInfo chooseReplicaToDelete(
|
||||
Collection<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne,
|
||||
final List<StorageType> excessTypes) {
|
||||
long oldestHeartbeat =
|
||||
monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier;
|
||||
|
@ -911,7 +911,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
|
||||
// Pick the node with the oldest heartbeat or with the least free space,
|
||||
// if all hearbeats are within the tolerable heartbeat interval
|
||||
for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanone, exactlyOne)) {
|
||||
for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanOne,
|
||||
exactlyOne)) {
|
||||
if (!excessTypes.contains(storage.getStorageType())) {
|
||||
continue;
|
||||
}
|
||||
|
@ -972,13 +973,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
|
||||
while (candidates.size() - expectedNumOfReplicas > excessReplicas.size()) {
|
||||
final DatanodeStorageInfo cur;
|
||||
if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
|
||||
moreThanOne, excessTypes)) {
|
||||
if (firstOne && useDelHint(delNodeHintStorage, addedNodeStorage,
|
||||
moreThanOne, exactlyOne, excessTypes)) {
|
||||
cur = delNodeHintStorage;
|
||||
} else { // regular excessive replica removal
|
||||
cur =
|
||||
chooseReplicaToDelete((short) expectedNumOfReplicas, moreThanOne, exactlyOne,
|
||||
excessTypes);
|
||||
cur = chooseReplicaToDelete(moreThanOne, exactlyOne, excessTypes);
|
||||
}
|
||||
firstOne = false;
|
||||
if (cur == null) {
|
||||
|
@ -997,26 +996,40 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
|
||||
/** Check if we can use delHint. */
|
||||
@VisibleForTesting
|
||||
static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
|
||||
DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
|
||||
boolean useDelHint(DatanodeStorageInfo delHint,
|
||||
DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne,
|
||||
List<StorageType> excessTypes) {
|
||||
if (!isFirst) {
|
||||
return false; // only consider delHint for the first case
|
||||
} else if (delHint == null) {
|
||||
if (delHint == null) {
|
||||
return false; // no delHint
|
||||
} else if (!excessTypes.contains(delHint.getStorageType())) {
|
||||
return false; // delHint storage type is not an excess type
|
||||
} else {
|
||||
// check if removing delHint reduces the number of racks
|
||||
if (moreThan1Racks.contains(delHint)) {
|
||||
return true; // delHint and some other nodes are under the same rack
|
||||
} else if (added != null && !moreThan1Racks.contains(added)) {
|
||||
return true; // the added node adds a new rack
|
||||
}
|
||||
return false; // removing delHint reduces the number of racks;
|
||||
return notReduceNumOfGroups(moreThanOne, delHint, added);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if moving from source to target will reduce the number of
|
||||
// groups. The groups could be based on racks or upgrade domains.
|
||||
<T> boolean notReduceNumOfGroups(List<T> moreThanOne, T source, T target) {
|
||||
if (moreThanOne.contains(source)) {
|
||||
return true; // source and some other nodes are under the same group.
|
||||
} else if (target != null && !moreThanOne.contains(target)) {
|
||||
return true; // the added node adds a new group.
|
||||
}
|
||||
return false; // removing delHint reduces the number of groups.
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isMovable(Collection<DatanodeInfo> locs,
|
||||
DatanodeInfo source, DatanodeInfo target) {
|
||||
final Map<String, List<DatanodeInfo>> rackMap = new HashMap<>();
|
||||
final List<DatanodeInfo> moreThanOne = new ArrayList<>();
|
||||
final List<DatanodeInfo> exactlyOne = new ArrayList<>();
|
||||
splitNodesWithRack(locs, rackMap, moreThanOne, exactlyOne);
|
||||
return notReduceNumOfGroups(moreThanOne, source, target);
|
||||
}
|
||||
/**
|
||||
* Pick up replica node set for deleting replica as over-replicated.
|
||||
* First set contains replica nodes on rack with more than one
|
||||
|
|
|
@ -39,11 +39,6 @@ import org.apache.hadoop.net.NodeBase;
|
|||
*/
|
||||
public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
|
||||
|
||||
protected BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats,
|
||||
NetworkTopology clusterMap, DatanodeManager datanodeManager) {
|
||||
initialize(conf, stats, clusterMap, host2datanodeMap);
|
||||
}
|
||||
|
||||
protected BlockPlacementPolicyWithNodeGroup() {
|
||||
}
|
||||
|
||||
|
@ -347,22 +342,21 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
|
|||
// Split data nodes in the first set into two sets,
|
||||
// moreThanOne contains nodes on nodegroup with more than one replica
|
||||
// exactlyOne contains the remaining nodes
|
||||
Map<String, List<DatanodeStorageInfo>> nodeGroupMap =
|
||||
new HashMap<String, List<DatanodeStorageInfo>>();
|
||||
Map<String, List<DatanodeStorageInfo>> nodeGroupMap = new HashMap<>();
|
||||
|
||||
for(DatanodeStorageInfo storage : first) {
|
||||
final String nodeGroupName = NetworkTopology.getLastHalf(
|
||||
storage.getDatanodeDescriptor().getNetworkLocation());
|
||||
List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName);
|
||||
if (storageList == null) {
|
||||
storageList = new ArrayList<DatanodeStorageInfo>();
|
||||
storageList = new ArrayList<>();
|
||||
nodeGroupMap.put(nodeGroupName, storageList);
|
||||
}
|
||||
storageList.add(storage);
|
||||
}
|
||||
|
||||
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
|
||||
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
|
||||
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
|
||||
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
|
||||
// split nodes into two sets
|
||||
for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) {
|
||||
if (datanodeList.size() == 1 ) {
|
||||
|
@ -376,5 +370,24 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
|
|||
|
||||
return moreThanOne.isEmpty()? exactlyOne : moreThanOne;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Check if there are any replica (other than source) on the same node group
|
||||
* with target. If true, then target is not a good candidate for placing
|
||||
* specific replica as we don't want 2 replicas under the same nodegroup.
|
||||
*
|
||||
* @return true if there are any replica (other than source) on the same node
|
||||
* group with target
|
||||
*/
|
||||
@Override
|
||||
public boolean isMovable(Collection<DatanodeInfo> locs,
|
||||
DatanodeInfo source, DatanodeInfo target) {
|
||||
for (DatanodeInfo dn : locs) {
|
||||
if (dn != source && dn != target &&
|
||||
clusterMap.isOnSameNodeGroup(dn, target)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Set;
|
|||
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
|
@ -117,13 +118,13 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
|
|||
return upgradeDomains;
|
||||
}
|
||||
|
||||
private Map<String, List<DatanodeStorageInfo>> getUpgradeDomainMap(
|
||||
DatanodeStorageInfo[] storageInfos) {
|
||||
Map<String, List<DatanodeStorageInfo>> upgradeDomainMap = new HashMap<>();
|
||||
for(DatanodeStorageInfo storage : storageInfos) {
|
||||
private <T> Map<String, List<T>> getUpgradeDomainMap(
|
||||
Collection<T> storagesOrDataNodes) {
|
||||
Map<String, List<T>> upgradeDomainMap = new HashMap<>();
|
||||
for(T storage : storagesOrDataNodes) {
|
||||
String upgradeDomain = getUpgradeDomainWithDefaultValue(
|
||||
storage.getDatanodeDescriptor());
|
||||
List<DatanodeStorageInfo> storages = upgradeDomainMap.get(upgradeDomain);
|
||||
getDatanodeInfo(storage));
|
||||
List<T> storages = upgradeDomainMap.get(upgradeDomain);
|
||||
if (storages == null) {
|
||||
storages = new ArrayList<>();
|
||||
upgradeDomainMap.put(upgradeDomain, storages);
|
||||
|
@ -156,6 +157,19 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
|
|||
return getShareUDSet;
|
||||
}
|
||||
|
||||
private Collection<DatanodeStorageInfo> combine(
|
||||
Collection<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne) {
|
||||
List<DatanodeStorageInfo> all = new ArrayList<>();
|
||||
if (moreThanOne != null) {
|
||||
all.addAll(moreThanOne);
|
||||
}
|
||||
if (exactlyOne != null) {
|
||||
all.addAll(exactlyOne);
|
||||
}
|
||||
return all;
|
||||
}
|
||||
|
||||
/*
|
||||
* The policy to pick the replica set for deleting the over-replicated
|
||||
* replica which meet the rack and upgrade domain requirements.
|
||||
|
@ -231,20 +245,11 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
|
|||
protected Collection<DatanodeStorageInfo> pickupReplicaSet(
|
||||
Collection<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne) {
|
||||
List<DatanodeStorageInfo> all = new ArrayList<>();
|
||||
if (moreThanOne != null) {
|
||||
all.addAll(moreThanOne);
|
||||
}
|
||||
if (exactlyOne != null) {
|
||||
all.addAll(exactlyOne);
|
||||
}
|
||||
|
||||
Map<String, List<DatanodeStorageInfo>> upgradeDomains =
|
||||
getUpgradeDomainMap(all.toArray(new DatanodeStorageInfo[all.size()]));
|
||||
|
||||
// shareUDSet includes DatanodeStorageInfo that share same upgrade
|
||||
// domain with another DatanodeStorageInfo.
|
||||
List<DatanodeStorageInfo> shareUDSet = getShareUDSet(upgradeDomains);
|
||||
Collection<DatanodeStorageInfo> all = combine(moreThanOne, exactlyOne);
|
||||
List<DatanodeStorageInfo> shareUDSet = getShareUDSet(
|
||||
getUpgradeDomainMap(all));
|
||||
// shareRackAndUDSet contains those DatanodeStorageInfo that
|
||||
// share rack and upgrade domain with another DatanodeStorageInfo.
|
||||
List<DatanodeStorageInfo> shareRackAndUDSet = new ArrayList<>();
|
||||
|
@ -260,4 +265,47 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
|
|||
}
|
||||
return (shareRackAndUDSet.size() > 0) ? shareRackAndUDSet : shareUDSet;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean useDelHint(DatanodeStorageInfo delHint,
|
||||
DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne,
|
||||
List<StorageType> excessTypes) {
|
||||
if (!super.useDelHint(delHint, added, moreThanOne, exactlyOne,
|
||||
excessTypes)) {
|
||||
// If BlockPlacementPolicyDefault doesn't allow useDelHint, there is no
|
||||
// point checking with upgrade domain policy.
|
||||
return false;
|
||||
}
|
||||
return isMovableBasedOnUpgradeDomain(combine(moreThanOne, exactlyOne),
|
||||
delHint, added);
|
||||
}
|
||||
|
||||
// Check if moving from source to target will preserve the upgrade domain
|
||||
// policy.
|
||||
private <T> boolean isMovableBasedOnUpgradeDomain(Collection<T> all,
|
||||
T source, T target) {
|
||||
Map<String, List<T>> udMap = getUpgradeDomainMap(all);
|
||||
// shareUDSet includes datanodes that share same upgrade
|
||||
// domain with another datanode.
|
||||
List<T> shareUDSet = getShareUDSet(udMap);
|
||||
// check if removing source reduces the number of upgrade domains
|
||||
if (notReduceNumOfGroups(shareUDSet, source, target)) {
|
||||
return true;
|
||||
} else if (udMap.size() > upgradeDomainFactor) {
|
||||
return true; // existing number of upgrade domain exceeds the limit.
|
||||
} else {
|
||||
return false; // removing source reduces the number of UDs.
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isMovable(Collection<DatanodeInfo> locs,
|
||||
DatanodeInfo source, DatanodeInfo target) {
|
||||
if (super.isMovable(locs, source, target)) {
|
||||
return isMovableBasedOnUpgradeDomain(locs, source, target);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1154,7 +1154,7 @@ public class DFSTestUtil {
|
|||
final StorageType type = (types != null && i < types.length) ? types[i]
|
||||
: StorageType.DEFAULT;
|
||||
storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname,
|
||||
type);
|
||||
type, null);
|
||||
}
|
||||
return storages;
|
||||
}
|
||||
|
@ -1162,16 +1162,19 @@ public class DFSTestUtil {
|
|||
public static DatanodeStorageInfo createDatanodeStorageInfo(
|
||||
String storageID, String ip, String rack, String hostname) {
|
||||
return createDatanodeStorageInfo(storageID, ip, rack, hostname,
|
||||
StorageType.DEFAULT);
|
||||
StorageType.DEFAULT, null);
|
||||
}
|
||||
|
||||
public static DatanodeStorageInfo createDatanodeStorageInfo(
|
||||
String storageID, String ip, String rack, String hostname,
|
||||
StorageType type) {
|
||||
StorageType type, String upgradeDomain) {
|
||||
final DatanodeStorage storage = new DatanodeStorage(storageID,
|
||||
DatanodeStorage.State.NORMAL, type);
|
||||
final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(
|
||||
ip, rack, storage, hostname);
|
||||
if (upgradeDomain != null) {
|
||||
dn.setUpgradeDomain(upgradeDomain);
|
||||
}
|
||||
return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage);
|
||||
}
|
||||
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
|
@ -75,7 +76,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
|
||||
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
|
||||
import org.apache.hadoop.hdfs.server.balancer.BalancerParameters;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
|
||||
|
@ -390,7 +394,102 @@ public class TestBalancer {
|
|||
int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
||||
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Verify balancer won't violate the default block placement policy.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testRackPolicyAfterBalance() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
long[] capacities = new long[] { CAPACITY, CAPACITY };
|
||||
String[] hosts = {"host0", "host1"};
|
||||
String[] racks = { RACK0, RACK1 };
|
||||
runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
|
||||
null, CAPACITY, "host2", RACK1, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify balancer won't violate upgrade domain block placement policy.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testUpgradeDomainPolicyAfterBalance() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
||||
BlockPlacementPolicyWithUpgradeDomain.class,
|
||||
BlockPlacementPolicy.class);
|
||||
long[] capacities = new long[] { CAPACITY, CAPACITY, CAPACITY };
|
||||
String[] hosts = {"host0", "host1", "host2"};
|
||||
String[] racks = { RACK0, RACK1, RACK1 };
|
||||
String[] UDs = { "ud0", "ud1", "ud2" };
|
||||
runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
|
||||
UDs, CAPACITY, "host3", RACK2, "ud2");
|
||||
}
|
||||
|
||||
private void runBalancerAndVerifyBlockPlacmentPolicy(Configuration conf,
|
||||
long[] capacities, String[] hosts, String[] racks, String[] UDs,
|
||||
long newCapacity, String newHost, String newRack, String newUD)
|
||||
throws Exception {
|
||||
int numOfDatanodes = capacities.length;
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
|
||||
.hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
|
||||
DatanodeManager dm = cluster.getNamesystem().getBlockManager().
|
||||
getDatanodeManager();
|
||||
if (UDs != null) {
|
||||
for(int i = 0; i < UDs.length; i++) {
|
||||
DatanodeID datanodeId = cluster.getDataNodes().get(i).getDatanodeId();
|
||||
dm.getDatanode(datanodeId).setUpgradeDomain(UDs[i]);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
cluster.waitActive();
|
||||
client = NameNodeProxies.createProxy(conf,
|
||||
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
|
||||
|
||||
// fill up the cluster to be 80% full
|
||||
long totalCapacity = sum(capacities);
|
||||
long totalUsedSpace = totalCapacity * 8 / 10;
|
||||
|
||||
final long fileSize = totalUsedSpace / numOfDatanodes;
|
||||
DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
|
||||
fileSize, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false);
|
||||
|
||||
// start up an empty node with the same capacity on the same rack as the
|
||||
// pinned host.
|
||||
cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
|
||||
new String[] { newHost }, new long[] { newCapacity });
|
||||
if (newUD != null) {
|
||||
DatanodeID newId = cluster.getDataNodes().get(
|
||||
numOfDatanodes).getDatanodeId();
|
||||
dm.getDatanode(newId).setUpgradeDomain(newUD);
|
||||
}
|
||||
totalCapacity += newCapacity;
|
||||
|
||||
// run balancer and validate results
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||
|
||||
// start rebalancing
|
||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
||||
Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
||||
BlockPlacementPolicy placementPolicy =
|
||||
cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy();
|
||||
List<LocatedBlock> locatedBlocks = client.
|
||||
getBlockLocations(fileName, 0, fileSize).getLocatedBlocks();
|
||||
for (LocatedBlock locatedBlock : locatedBlocks) {
|
||||
BlockPlacementStatus status = placementPolicy.verifyBlockPlacement(
|
||||
locatedBlock.getLocations(), numOfDatanodes);
|
||||
assertTrue(status.isPlacementPolicySatisfied());
|
||||
}
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until balanced: each datanode gives utilization within
|
||||
* BALANCE_ALLOWED_VARIANCE of average
|
||||
|
|
|
@ -820,14 +820,15 @@ public class TestBlockManager {
|
|||
DatanodeStorageInfo delHint = new DatanodeStorageInfo(
|
||||
DFSTestUtil.getLocalDatanodeDescriptor(), new DatanodeStorage("id"));
|
||||
List<DatanodeStorageInfo> moreThan1Racks = Arrays.asList(delHint);
|
||||
List<StorageType> excessTypes = new ArrayList<StorageType>();
|
||||
|
||||
List<StorageType> excessTypes = new ArrayList<>();
|
||||
BlockPlacementPolicyDefault policyDefault =
|
||||
(BlockPlacementPolicyDefault) bm.getBlockPlacementPolicy();
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
Assert.assertTrue(BlockPlacementPolicyDefault.useDelHint(true, delHint,
|
||||
null, moreThan1Racks, excessTypes));
|
||||
Assert.assertTrue(policyDefault.useDelHint(delHint, null, moreThan1Racks,
|
||||
null, excessTypes));
|
||||
excessTypes.remove(0);
|
||||
excessTypes.add(StorageType.SSD);
|
||||
Assert.assertFalse(BlockPlacementPolicyDefault.useDelHint(true, delHint,
|
||||
null, moreThan1Racks, excessTypes));
|
||||
Assert.assertFalse(policyDefault.useDelHint(delHint, null, moreThan1Racks,
|
||||
null, excessTypes));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
|
@ -971,11 +972,11 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
// test returning null
|
||||
excessTypes.add(StorageType.SSD);
|
||||
assertNull(((BlockPlacementPolicyDefault) replicator)
|
||||
.chooseReplicaToDelete((short) 3, first, second, excessTypes));
|
||||
.chooseReplicaToDelete(first, second, excessTypes));
|
||||
}
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
|
||||
.chooseReplicaToDelete((short) 3, first, second, excessTypes);
|
||||
.chooseReplicaToDelete(first, second, excessTypes);
|
||||
// Within first set, storages[1] with less free space
|
||||
assertEquals(chosen, storages[1]);
|
||||
|
||||
|
@ -985,25 +986,25 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
// Within second set, storages[5] with less free space
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
|
||||
(short)2, first, second, excessTypes);
|
||||
first, second, excessTypes);
|
||||
assertEquals(chosen, storages[5]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChooseReplicasToDelete() throws Exception {
|
||||
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
|
||||
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
|
||||
nonExcess.add(storages[0]);
|
||||
nonExcess.add(storages[1]);
|
||||
nonExcess.add(storages[2]);
|
||||
nonExcess.add(storages[3]);
|
||||
List<DatanodeStorageInfo> excessReplicas = new ArrayList<>();
|
||||
List<DatanodeStorageInfo> excessReplicas;
|
||||
BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite
|
||||
.createDefaultSuite();
|
||||
BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy();
|
||||
DatanodeStorageInfo excessSSD = DFSTestUtil.createDatanodeStorageInfo(
|
||||
"Storage-excess-SSD-ID", "localhost",
|
||||
storages[0].getDatanodeDescriptor().getNetworkLocation(),
|
||||
"foo.com", StorageType.SSD);
|
||||
"foo.com", StorageType.SSD, null);
|
||||
updateHeartbeatWithUsage(excessSSD.getDatanodeDescriptor(),
|
||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0,
|
||||
|
@ -1016,14 +1017,14 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
|
||||
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
|
||||
assertTrue(excessReplicas.size() > 0);
|
||||
assertTrue(excessReplicas.size() == 1);
|
||||
assertTrue(excessReplicas.contains(storages[0]));
|
||||
|
||||
// Excess type deletion
|
||||
|
||||
DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo(
|
||||
"Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(),
|
||||
"foo.com", StorageType.ARCHIVE);
|
||||
"foo.com", StorageType.ARCHIVE, null);
|
||||
nonExcess.add(excessStorage);
|
||||
excessTypes = storagePolicy.chooseExcess((short) 3,
|
||||
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
|
@ -1057,32 +1058,70 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
|
||||
@Test
|
||||
public void testUseDelHint() throws Exception {
|
||||
List<StorageType> excessTypes = new ArrayList<StorageType>();
|
||||
List<StorageType> excessTypes = new ArrayList<>();
|
||||
excessTypes.add(StorageType.ARCHIVE);
|
||||
// only consider delHint for the first case
|
||||
assertFalse(BlockPlacementPolicyDefault.useDelHint(false, null, null, null,
|
||||
null));
|
||||
BlockPlacementPolicyDefault policyDefault =
|
||||
(BlockPlacementPolicyDefault) replicator;
|
||||
// no delHint
|
||||
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, null, null, null,
|
||||
null));
|
||||
assertFalse(policyDefault.useDelHint(null, null, null, null, null));
|
||||
// delHint storage type is not an excess type
|
||||
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null,
|
||||
null, excessTypes));
|
||||
assertFalse(policyDefault.useDelHint(storages[0], null, null, null,
|
||||
excessTypes));
|
||||
// check if removing delHint reduces the number of racks
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
chosenNodes.add(storages[0]);
|
||||
chosenNodes.add(storages[2]);
|
||||
List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
|
||||
moreThanOne.add(storages[0]);
|
||||
moreThanOne.add(storages[1]);
|
||||
List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
|
||||
exactlyOne.add(storages[3]);
|
||||
exactlyOne.add(storages[5]);
|
||||
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null,
|
||||
chosenNodes, excessTypes));
|
||||
assertTrue(policyDefault.useDelHint(storages[0], null, moreThanOne,
|
||||
exactlyOne, excessTypes));
|
||||
// the added node adds a new rack
|
||||
assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[3],
|
||||
storages[5], chosenNodes, excessTypes));
|
||||
assertTrue(policyDefault.useDelHint(storages[3], storages[5], moreThanOne,
|
||||
exactlyOne, excessTypes));
|
||||
// removing delHint reduces the number of racks;
|
||||
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3],
|
||||
storages[0], chosenNodes, excessTypes));
|
||||
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3], null,
|
||||
chosenNodes, excessTypes));
|
||||
assertFalse(policyDefault.useDelHint(storages[3], storages[0], moreThanOne,
|
||||
exactlyOne, excessTypes));
|
||||
assertFalse(policyDefault.useDelHint(storages[3], null, moreThanOne,
|
||||
exactlyOne, excessTypes));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsMovable() throws Exception {
|
||||
List<DatanodeInfo> candidates = new ArrayList<>();
|
||||
|
||||
// after the move, the number of racks remains 2.
|
||||
candidates.add(dataNodes[0]);
|
||||
candidates.add(dataNodes[1]);
|
||||
candidates.add(dataNodes[2]);
|
||||
candidates.add(dataNodes[3]);
|
||||
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));
|
||||
|
||||
// after the move, the number of racks remains 3.
|
||||
candidates.clear();
|
||||
candidates.add(dataNodes[0]);
|
||||
candidates.add(dataNodes[1]);
|
||||
candidates.add(dataNodes[2]);
|
||||
candidates.add(dataNodes[4]);
|
||||
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[1]));
|
||||
|
||||
// after the move, the number of racks changes from 2 to 3.
|
||||
candidates.clear();
|
||||
candidates.add(dataNodes[0]);
|
||||
candidates.add(dataNodes[1]);
|
||||
candidates.add(dataNodes[2]);
|
||||
candidates.add(dataNodes[4]);
|
||||
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[4]));
|
||||
|
||||
// the move would have reduced the number of racks from 3 to 2.
|
||||
candidates.clear();
|
||||
candidates.add(dataNodes[0]);
|
||||
candidates.add(dataNodes[2]);
|
||||
candidates.add(dataNodes[3]);
|
||||
candidates.add(dataNodes[4]);
|
||||
assertFalse(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -544,7 +544,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
List<StorageType> excessTypes = new ArrayList<>();
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
|
||||
.chooseReplicaToDelete((short) 3, first, second, excessTypes);
|
||||
.chooseReplicaToDelete(first, second, excessTypes);
|
||||
// Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
|
||||
// dataNodes[0] and dataNodes[1] are in the same nodegroup,
|
||||
// but dataNodes[1] is chosen as less free space
|
||||
|
@ -557,7 +557,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
// as less free space
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
|
||||
(short) 2, first, second, excessTypes);
|
||||
first, second, excessTypes);
|
||||
assertEquals(chosen, storages[2]);
|
||||
|
||||
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
|
||||
|
@ -566,7 +566,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
// Within second set, dataNodes[5] with less free space
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
|
||||
(short) 1, first, second, excessTypes);
|
||||
first, second, excessTypes);
|
||||
assertEquals(chosen, storages[5]);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
@ -33,6 +34,8 @@ import org.apache.hadoop.fs.StorageType;
|
|||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
|
@ -189,41 +192,6 @@ public class TestReplicationPolicyWithUpgradeDomain
|
|||
assertEquals(2, targets.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the correct replica is chosen to satisfy both rack and upgrade
|
||||
* domain policy.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testChooseReplicaToDelete() throws Exception {
|
||||
BlockPlacementPolicyWithUpgradeDomain upgradeDomainPolicy =
|
||||
(BlockPlacementPolicyWithUpgradeDomain)replicator;
|
||||
List<DatanodeStorageInfo> first = new ArrayList<>();
|
||||
List<DatanodeStorageInfo> second = new ArrayList<>();
|
||||
List<StorageType> excessTypes = new ArrayList<>();
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
first.add(storages[0]);
|
||||
first.add(storages[1]);
|
||||
second.add(storages[4]);
|
||||
second.add(storages[8]);
|
||||
DatanodeStorageInfo chosenStorage =
|
||||
upgradeDomainPolicy.chooseReplicaToDelete(
|
||||
(short)3, first, second, excessTypes);
|
||||
assertEquals(chosenStorage, storages[1]);
|
||||
first.clear();
|
||||
second.clear();
|
||||
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
first.add(storages[0]);
|
||||
first.add(storages[1]);
|
||||
first.add(storages[4]);
|
||||
first.add(storages[5]);
|
||||
chosenStorage = upgradeDomainPolicy.chooseReplicaToDelete(
|
||||
(short)3, first, second, excessTypes);
|
||||
assertTrue(chosenStorage.equals(storages[1]) ||
|
||||
chosenStorage.equals(storages[4]));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the scenario where not enough replicas can't satisfy the policy.
|
||||
* @throws Exception
|
||||
|
@ -248,7 +216,7 @@ public class TestReplicationPolicyWithUpgradeDomain
|
|||
}
|
||||
|
||||
/**
|
||||
* Test the scenario where not enough replicas can't satisfy the policy.
|
||||
* Test block placement verification.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
|
@ -341,6 +309,137 @@ public class TestReplicationPolicyWithUpgradeDomain
|
|||
assertFalse(status.isPlacementPolicySatisfied());
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the correct replica is chosen to satisfy both rack and upgrade
|
||||
* domain policy.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testChooseReplicasToDelete() throws Exception {
|
||||
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
|
||||
nonExcess.add(storages[0]);
|
||||
nonExcess.add(storages[1]);
|
||||
nonExcess.add(storages[2]);
|
||||
nonExcess.add(storages[3]);
|
||||
List<DatanodeStorageInfo> excessReplicas;
|
||||
BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite
|
||||
.createDefaultSuite();
|
||||
BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy();
|
||||
|
||||
// delete hint accepted.
|
||||
DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor();
|
||||
List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3,
|
||||
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
|
||||
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
|
||||
assertTrue(excessReplicas.size() == 1);
|
||||
assertTrue(excessReplicas.contains(storages[0]));
|
||||
|
||||
// delete hint rejected because deleting storages[1] would have
|
||||
// cause only two upgrade domains left.
|
||||
delHintNode = storages[1].getDatanodeDescriptor();
|
||||
excessTypes = storagePolicy.chooseExcess((short) 3,
|
||||
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
|
||||
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
|
||||
assertTrue(excessReplicas.size() == 1);
|
||||
assertTrue(excessReplicas.contains(storages[0]));
|
||||
|
||||
// no delete hint, case 1
|
||||
nonExcess.clear();
|
||||
nonExcess.add(storages[0]);
|
||||
nonExcess.add(storages[1]);
|
||||
nonExcess.add(storages[4]);
|
||||
nonExcess.add(storages[8]);
|
||||
excessTypes = storagePolicy.chooseExcess((short) 3,
|
||||
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
|
||||
excessTypes, storages[8].getDatanodeDescriptor(), null);
|
||||
assertTrue(excessReplicas.size() == 1);
|
||||
assertTrue(excessReplicas.contains(storages[1]));
|
||||
|
||||
// no delete hint, case 2
|
||||
nonExcess.clear();
|
||||
nonExcess.add(storages[0]);
|
||||
nonExcess.add(storages[1]);
|
||||
nonExcess.add(storages[4]);
|
||||
nonExcess.add(storages[5]);
|
||||
excessTypes = storagePolicy.chooseExcess((short) 3,
|
||||
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
|
||||
excessTypes, storages[8].getDatanodeDescriptor(), null);
|
||||
assertTrue(excessReplicas.size() == 1);
|
||||
assertTrue(excessReplicas.contains(storages[1]) ||
|
||||
excessReplicas.contains(storages[4]));
|
||||
|
||||
// No delete hint, different excess type deletion
|
||||
nonExcess.clear();
|
||||
nonExcess.add(storages[0]);
|
||||
nonExcess.add(storages[1]);
|
||||
nonExcess.add(storages[2]);
|
||||
nonExcess.add(storages[3]);
|
||||
DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo(
|
||||
"Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(),
|
||||
"foo.com", StorageType.ARCHIVE, delHintNode.getUpgradeDomain());
|
||||
nonExcess.add(excessStorage);
|
||||
excessTypes = storagePolicy.chooseExcess((short) 3,
|
||||
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
|
||||
excessTypes, storages[3].getDatanodeDescriptor(), null);
|
||||
assertTrue(excessReplicas.size() == 2);
|
||||
assertTrue(excessReplicas.contains(storages[0]));
|
||||
assertTrue(excessReplicas.contains(excessStorage));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsMovable() throws Exception {
|
||||
List<DatanodeInfo> candidates = new ArrayList<>();
|
||||
|
||||
// after the move, the number of racks changes from 1 to 2.
|
||||
// and number of upgrade domains remains 3.
|
||||
candidates.add(dataNodes[0]);
|
||||
candidates.add(dataNodes[1]);
|
||||
candidates.add(dataNodes[2]);
|
||||
candidates.add(dataNodes[3]);
|
||||
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));
|
||||
|
||||
// the move would have changed the number of racks from 1 to 2.
|
||||
// and the number of UDs from 3 to 2.
|
||||
candidates.clear();
|
||||
candidates.add(dataNodes[0]);
|
||||
candidates.add(dataNodes[1]);
|
||||
candidates.add(dataNodes[2]);
|
||||
candidates.add(dataNodes[4]);
|
||||
assertFalse(replicator.isMovable(candidates, dataNodes[0], dataNodes[4]));
|
||||
|
||||
// after the move, the number of racks remains 2.
|
||||
// the number of UDs remains 3.
|
||||
candidates.clear();
|
||||
candidates.add(dataNodes[0]);
|
||||
candidates.add(dataNodes[4]);
|
||||
candidates.add(dataNodes[5]);
|
||||
candidates.add(dataNodes[6]);
|
||||
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[6]));
|
||||
|
||||
// after the move, the number of racks remains 2.
|
||||
// the number of UDs remains 2.
|
||||
candidates.clear();
|
||||
candidates.add(dataNodes[0]);
|
||||
candidates.add(dataNodes[1]);
|
||||
candidates.add(dataNodes[3]);
|
||||
candidates.add(dataNodes[4]);
|
||||
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[4]));
|
||||
|
||||
// the move would have changed the number of racks from 2 to 3.
|
||||
// and the number of UDs from 2 to 1.
|
||||
candidates.clear();
|
||||
candidates.add(dataNodes[0]);
|
||||
candidates.add(dataNodes[3]);
|
||||
candidates.add(dataNodes[4]);
|
||||
candidates.add(dataNodes[6]);
|
||||
assertFalse(replicator.isMovable(candidates, dataNodes[4], dataNodes[6]));
|
||||
}
|
||||
|
||||
private Set<String> getUpgradeDomains(DatanodeStorageInfo[] nodes) {
|
||||
HashSet<String> upgradeDomains = new HashSet<>();
|
||||
for (DatanodeStorageInfo node : nodes) {
|
||||
|
|
|
@ -629,11 +629,13 @@ public class TestDNFencing {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor,
|
||||
Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo> second,
|
||||
public DatanodeStorageInfo chooseReplicaToDelete(
|
||||
Collection<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne,
|
||||
List<StorageType> excessTypes) {
|
||||
|
||||
Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second;
|
||||
|
||||
Collection<DatanodeStorageInfo> chooseFrom = !moreThanOne.isEmpty() ?
|
||||
moreThanOne : exactlyOne;
|
||||
|
||||
List<DatanodeStorageInfo> l = Lists.newArrayList(chooseFrom);
|
||||
return l.get(ThreadLocalRandom.current().nextInt(l.size()));
|
||||
|
|
Loading…
Reference in New Issue