HDFS-9007. Fix HDFS Balancer to honor upgrade domain policy. (Ming Ma via lei)

This commit is contained in:
Lei Xu 2015-11-03 14:17:11 -08:00
parent 88beb46cf6
commit ec414600ed
13 changed files with 503 additions and 198 deletions

View File

@ -1618,6 +1618,8 @@ Release 2.8.0 - UNRELEASED
HDFS-9339. Extend full test of KMS ACLs. (Daniel Templeton via zhz)
HDFS-9007. Fix HDFS Balancer to honor upgrade domain policy. (Ming Ma via lei)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@ -124,6 +125,7 @@ public class Dispatcher {
private final int ioFileBufferSize;
private final boolean connectToDnViaHostname;
private BlockPlacementPolicies placementPolicies;
static class Allocator {
private final int max;
@ -949,6 +951,7 @@ public class Dispatcher {
this.connectToDnViaHostname = conf.getBoolean(
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
placementPolicies = new BlockPlacementPolicies(conf, null, cluster, null);
}
public DistributedFileSystem getDistributedFileSystem() {
@ -1166,66 +1169,24 @@ public class Dispatcher {
}
}
if (cluster.isNodeGroupAware()
&& isOnSameNodeGroupWithReplicas(source, target, block)) {
return false;
}
if (reduceNumOfRacks(source, target, block)) {
if (!isGoodBlockCandidateForPlacementPolicy(source, target, block)) {
return false;
}
return true;
}
/**
* Determine whether moving the given block replica from source to target
* would reduce the number of racks of the block replicas.
*/
private boolean reduceNumOfRacks(StorageGroup source, StorageGroup target,
DBlock block) {
final DatanodeInfo sourceDn = source.getDatanodeInfo();
if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) {
// source and target are on the same rack
return false;
}
boolean notOnSameRack = true;
// Check if the move will violate the block placement policy.
private boolean isGoodBlockCandidateForPlacementPolicy(StorageGroup source,
StorageGroup target, DBlock block) {
List<DatanodeInfo> datanodeInfos = new ArrayList<>();
synchronized (block) {
for (StorageGroup loc : block.getLocations()) {
if (cluster.isOnSameRack(loc.getDatanodeInfo(), target.getDatanodeInfo())) {
notOnSameRack = false;
break;
}
for (StorageGroup loc : block.locations) {
datanodeInfos.add(loc.getDatanodeInfo());
}
datanodeInfos.add(target.getDatanodeInfo());
}
if (notOnSameRack) {
// target is not on the same rack as any replica
return false;
}
for (StorageGroup g : block.getLocations()) {
if (g != source && cluster.isOnSameRack(g.getDatanodeInfo(), sourceDn)) {
// source is on the same rack of another replica
return false;
}
}
return true;
}
/**
* Check if there are any replica (other than source) on the same node group
* with target. If true, then target is not a good candidate for placing
* specific replica as we don't want 2 replicas under the same nodegroup.
*
* @return true if there are any replica (other than source) on the same node
* group with target
*/
private boolean isOnSameNodeGroupWithReplicas(StorageGroup source,
StorageGroup target, DBlock block) {
final DatanodeInfo targetDn = target.getDatanodeInfo();
for (StorageGroup g : block.getLocations()) {
if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), targetDn)) {
return true;
}
}
return false;
return placementPolicies.getPolicy(false).isMovable(
datanodeInfos, source.getDatanodeInfo(), target.getDatanodeInfo());
}
/** Reset all fields in order to prepare for the next iteration */

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
@ -140,6 +141,17 @@ public abstract class BlockPlacementPolicy {
NetworkTopology clusterMap,
Host2NodesMap host2datanodeMap);
/**
* Check if the move is allowed. Used by balancer and other tools.
* @
*
* @param candidates all replicas including source and target
* @param source source replica of the move
* @param target target replica of the move
*/
abstract public boolean isMovable(Collection<DatanodeInfo> candidates,
DatanodeInfo source, DatanodeInfo target);
/**
* Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
*
@ -172,6 +184,20 @@ public abstract class BlockPlacementPolicy {
}
}
protected <T> DatanodeInfo getDatanodeInfo(T datanode) {
Preconditions.checkArgument(
datanode instanceof DatanodeInfo ||
datanode instanceof DatanodeStorageInfo,
"class " + datanode.getClass().getName() + " not allowed");
if (datanode instanceof DatanodeInfo) {
return ((DatanodeInfo)datanode);
} else if (datanode instanceof DatanodeStorageInfo) {
return ((DatanodeStorageInfo)datanode).getDatanodeDescriptor();
} else {
return null;
}
}
/**
* Get rack string from a data node
* @return rack of data node
@ -184,28 +210,28 @@ public abstract class BlockPlacementPolicy {
* Split data nodes into two sets, one set includes nodes on rack with
* more than one replica, the other set contains the remaining nodes.
*
* @param dataNodes datanodes to be split into two sets
* @param storagesOrDataNodes DatanodeStorageInfo/DatanodeInfo to be split
* into two sets
* @param rackMap a map from rack to datanodes
* @param moreThanOne contains nodes on rack with more than one replica
* @param exactlyOne remains contains the remaining nodes
*/
public void splitNodesWithRack(
final Iterable<DatanodeStorageInfo> storages,
final Map<String, List<DatanodeStorageInfo>> rackMap,
final List<DatanodeStorageInfo> moreThanOne,
final List<DatanodeStorageInfo> exactlyOne) {
for(DatanodeStorageInfo s: storages) {
final String rackName = getRack(s.getDatanodeDescriptor());
List<DatanodeStorageInfo> storageList = rackMap.get(rackName);
public <T> void splitNodesWithRack(
final Iterable<T> storagesOrDataNodes,
final Map<String, List<T>> rackMap,
final List<T> moreThanOne,
final List<T> exactlyOne) {
for(T s: storagesOrDataNodes) {
final String rackName = getRack(getDatanodeInfo(s));
List<T> storageList = rackMap.get(rackName);
if (storageList == null) {
storageList = new ArrayList<DatanodeStorageInfo>();
storageList = new ArrayList<T>();
rackMap.put(rackName, storageList);
}
storageList.add(s);
}
// split nodes into two sets
for(List<DatanodeStorageInfo> storageList : rackMap.values()) {
for(List<T> storageList : rackMap.values()) {
if (storageList.size() == 1) {
// exactlyOne contains nodes on rack with only one replica
exactlyOne.add(storageList.get(0));
@ -215,5 +241,4 @@ public abstract class BlockPlacementPolicy {
}
}
}
}

View File

@ -881,7 +881,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
minRacks = Math.min(minRacks, numberOfReplicas);
// 1. Check that all locations are different.
// 2. Count locations on different racks.
Set<String> racks = new TreeSet<String>();
Set<String> racks = new TreeSet<>();
for (DatanodeInfo dn : locs)
racks.add(dn.getNetworkLocation());
return new BlockPlacementStatusDefault(racks.size(), minRacks);
@ -889,8 +889,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
/**
* Decide whether deleting the specified replica of the block still makes
* the block conform to the configured block placement policy.
* @param replicationFactor The required number of replicas for this block
* @param moreThanone The replica locations of this block that are present
* @param moreThanOne The replica locations of this block that are present
* on more than one unique racks.
* @param exactlyOne Replica locations of this block that are present
* on exactly one unique racks.
@ -900,8 +899,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* @return the replica that is the best candidate for deletion
*/
@VisibleForTesting
public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor,
Collection<DatanodeStorageInfo> moreThanone, Collection<DatanodeStorageInfo> exactlyOne,
public DatanodeStorageInfo chooseReplicaToDelete(
Collection<DatanodeStorageInfo> moreThanOne,
Collection<DatanodeStorageInfo> exactlyOne,
final List<StorageType> excessTypes) {
long oldestHeartbeat =
monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier;
@ -911,7 +911,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
// Pick the node with the oldest heartbeat or with the least free space,
// if all hearbeats are within the tolerable heartbeat interval
for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanone, exactlyOne)) {
for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanOne,
exactlyOne)) {
if (!excessTypes.contains(storage.getStorageType())) {
continue;
}
@ -972,13 +973,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
while (candidates.size() - expectedNumOfReplicas > excessReplicas.size()) {
final DatanodeStorageInfo cur;
if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
moreThanOne, excessTypes)) {
if (firstOne && useDelHint(delNodeHintStorage, addedNodeStorage,
moreThanOne, exactlyOne, excessTypes)) {
cur = delNodeHintStorage;
} else { // regular excessive replica removal
cur =
chooseReplicaToDelete((short) expectedNumOfReplicas, moreThanOne, exactlyOne,
excessTypes);
cur = chooseReplicaToDelete(moreThanOne, exactlyOne, excessTypes);
}
firstOne = false;
if (cur == null) {
@ -997,26 +996,40 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
/** Check if we can use delHint. */
@VisibleForTesting
static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
boolean useDelHint(DatanodeStorageInfo delHint,
DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThanOne,
Collection<DatanodeStorageInfo> exactlyOne,
List<StorageType> excessTypes) {
if (!isFirst) {
return false; // only consider delHint for the first case
} else if (delHint == null) {
if (delHint == null) {
return false; // no delHint
} else if (!excessTypes.contains(delHint.getStorageType())) {
return false; // delHint storage type is not an excess type
} else {
// check if removing delHint reduces the number of racks
if (moreThan1Racks.contains(delHint)) {
return true; // delHint and some other nodes are under the same rack
} else if (added != null && !moreThan1Racks.contains(added)) {
return true; // the added node adds a new rack
}
return false; // removing delHint reduces the number of racks;
return notReduceNumOfGroups(moreThanOne, delHint, added);
}
}
// Check if moving from source to target will reduce the number of
// groups. The groups could be based on racks or upgrade domains.
<T> boolean notReduceNumOfGroups(List<T> moreThanOne, T source, T target) {
if (moreThanOne.contains(source)) {
return true; // source and some other nodes are under the same group.
} else if (target != null && !moreThanOne.contains(target)) {
return true; // the added node adds a new group.
}
return false; // removing delHint reduces the number of groups.
}
@Override
public boolean isMovable(Collection<DatanodeInfo> locs,
DatanodeInfo source, DatanodeInfo target) {
final Map<String, List<DatanodeInfo>> rackMap = new HashMap<>();
final List<DatanodeInfo> moreThanOne = new ArrayList<>();
final List<DatanodeInfo> exactlyOne = new ArrayList<>();
splitNodesWithRack(locs, rackMap, moreThanOne, exactlyOne);
return notReduceNumOfGroups(moreThanOne, source, target);
}
/**
* Pick up replica node set for deleting replica as over-replicated.
* First set contains replica nodes on rack with more than one

View File

@ -39,11 +39,6 @@ import org.apache.hadoop.net.NodeBase;
*/
public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
protected BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap, DatanodeManager datanodeManager) {
initialize(conf, stats, clusterMap, host2datanodeMap);
}
protected BlockPlacementPolicyWithNodeGroup() {
}
@ -345,22 +340,21 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
// Split data nodes in the first set into two sets,
// moreThanOne contains nodes on nodegroup with more than one replica
// exactlyOne contains the remaining nodes
Map<String, List<DatanodeStorageInfo>> nodeGroupMap =
new HashMap<String, List<DatanodeStorageInfo>>();
Map<String, List<DatanodeStorageInfo>> nodeGroupMap = new HashMap<>();
for(DatanodeStorageInfo storage : first) {
final String nodeGroupName = NetworkTopology.getLastHalf(
storage.getDatanodeDescriptor().getNetworkLocation());
List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName);
if (storageList == null) {
storageList = new ArrayList<DatanodeStorageInfo>();
storageList = new ArrayList<>();
nodeGroupMap.put(nodeGroupName, storageList);
}
storageList.add(storage);
}
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
// split nodes into two sets
for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) {
if (datanodeList.size() == 1 ) {
@ -375,4 +369,23 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
return moreThanOne.isEmpty()? exactlyOne : moreThanOne;
}
/**
* Check if there are any replica (other than source) on the same node group
* with target. If true, then target is not a good candidate for placing
* specific replica as we don't want 2 replicas under the same nodegroup.
*
* @return true if there are any replica (other than source) on the same node
* group with target
*/
@Override
public boolean isMovable(Collection<DatanodeInfo> locs,
DatanodeInfo source, DatanodeInfo target) {
for (DatanodeInfo dn : locs) {
if (dn != source && dn != target &&
clusterMap.isOnSameNodeGroup(dn, target)) {
return false;
}
}
return true;
}
}

View File

@ -30,6 +30,7 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.net.NetworkTopology;
@ -117,13 +118,13 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
return upgradeDomains;
}
private Map<String, List<DatanodeStorageInfo>> getUpgradeDomainMap(
DatanodeStorageInfo[] storageInfos) {
Map<String, List<DatanodeStorageInfo>> upgradeDomainMap = new HashMap<>();
for(DatanodeStorageInfo storage : storageInfos) {
private <T> Map<String, List<T>> getUpgradeDomainMap(
Collection<T> storagesOrDataNodes) {
Map<String, List<T>> upgradeDomainMap = new HashMap<>();
for(T storage : storagesOrDataNodes) {
String upgradeDomain = getUpgradeDomainWithDefaultValue(
storage.getDatanodeDescriptor());
List<DatanodeStorageInfo> storages = upgradeDomainMap.get(upgradeDomain);
getDatanodeInfo(storage));
List<T> storages = upgradeDomainMap.get(upgradeDomain);
if (storages == null) {
storages = new ArrayList<>();
upgradeDomainMap.put(upgradeDomain, storages);
@ -156,6 +157,19 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
return getShareUDSet;
}
private Collection<DatanodeStorageInfo> combine(
Collection<DatanodeStorageInfo> moreThanOne,
Collection<DatanodeStorageInfo> exactlyOne) {
List<DatanodeStorageInfo> all = new ArrayList<>();
if (moreThanOne != null) {
all.addAll(moreThanOne);
}
if (exactlyOne != null) {
all.addAll(exactlyOne);
}
return all;
}
/*
* The policy to pick the replica set for deleting the over-replicated
* replica which meet the rack and upgrade domain requirements.
@ -231,20 +245,11 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
protected Collection<DatanodeStorageInfo> pickupReplicaSet(
Collection<DatanodeStorageInfo> moreThanOne,
Collection<DatanodeStorageInfo> exactlyOne) {
List<DatanodeStorageInfo> all = new ArrayList<>();
if (moreThanOne != null) {
all.addAll(moreThanOne);
}
if (exactlyOne != null) {
all.addAll(exactlyOne);
}
Map<String, List<DatanodeStorageInfo>> upgradeDomains =
getUpgradeDomainMap(all.toArray(new DatanodeStorageInfo[all.size()]));
// shareUDSet includes DatanodeStorageInfo that share same upgrade
// domain with another DatanodeStorageInfo.
List<DatanodeStorageInfo> shareUDSet = getShareUDSet(upgradeDomains);
Collection<DatanodeStorageInfo> all = combine(moreThanOne, exactlyOne);
List<DatanodeStorageInfo> shareUDSet = getShareUDSet(
getUpgradeDomainMap(all));
// shareRackAndUDSet contains those DatanodeStorageInfo that
// share rack and upgrade domain with another DatanodeStorageInfo.
List<DatanodeStorageInfo> shareRackAndUDSet = new ArrayList<>();
@ -260,4 +265,47 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
}
return (shareRackAndUDSet.size() > 0) ? shareRackAndUDSet : shareUDSet;
}
@Override
boolean useDelHint(DatanodeStorageInfo delHint,
DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThanOne,
Collection<DatanodeStorageInfo> exactlyOne,
List<StorageType> excessTypes) {
if (!super.useDelHint(delHint, added, moreThanOne, exactlyOne,
excessTypes)) {
// If BlockPlacementPolicyDefault doesn't allow useDelHint, there is no
// point checking with upgrade domain policy.
return false;
}
return isMovableBasedOnUpgradeDomain(combine(moreThanOne, exactlyOne),
delHint, added);
}
// Check if moving from source to target will preserve the upgrade domain
// policy.
private <T> boolean isMovableBasedOnUpgradeDomain(Collection<T> all,
T source, T target) {
Map<String, List<T>> udMap = getUpgradeDomainMap(all);
// shareUDSet includes datanodes that share same upgrade
// domain with another datanode.
List<T> shareUDSet = getShareUDSet(udMap);
// check if removing source reduces the number of upgrade domains
if (notReduceNumOfGroups(shareUDSet, source, target)) {
return true;
} else if (udMap.size() > upgradeDomainFactor) {
return true; // existing number of upgrade domain exceeds the limit.
} else {
return false; // removing source reduces the number of UDs.
}
}
@Override
public boolean isMovable(Collection<DatanodeInfo> locs,
DatanodeInfo source, DatanodeInfo target) {
if (super.isMovable(locs, source, target)) {
return isMovableBasedOnUpgradeDomain(locs, source, target);
} else {
return false;
}
}
}

View File

@ -1156,7 +1156,7 @@ public class DFSTestUtil {
final StorageType type = (types != null && i < types.length) ? types[i]
: StorageType.DEFAULT;
storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname,
type);
type, null);
}
return storages;
}
@ -1164,16 +1164,19 @@ public class DFSTestUtil {
public static DatanodeStorageInfo createDatanodeStorageInfo(
String storageID, String ip, String rack, String hostname) {
return createDatanodeStorageInfo(storageID, ip, rack, hostname,
StorageType.DEFAULT);
StorageType.DEFAULT, null);
}
public static DatanodeStorageInfo createDatanodeStorageInfo(
String storageID, String ip, String rack, String hostname,
StorageType type) {
StorageType type, String upgradeDomain) {
final DatanodeStorage storage = new DatanodeStorage(storageID,
DatanodeStorage.State.NORMAL, type);
final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(
ip, rack, storage, hostname);
if (upgradeDomain != null) {
dn.setUpgradeDomain(upgradeDomain);
}
return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage);
}

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -77,7 +78,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
import org.apache.hadoop.hdfs.server.balancer.BalancerParameters;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
@ -410,6 +414,101 @@ public class TestBalancer {
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
}
/**
* Verify balancer won't violate the default block placement policy.
* @throws Exception
*/
@Test(timeout=100000)
public void testRackPolicyAfterBalance() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
long[] capacities = new long[] { CAPACITY, CAPACITY };
String[] hosts = {"host0", "host1"};
String[] racks = { RACK0, RACK1 };
runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
null, CAPACITY, "host2", RACK1, null);
}
/**
* Verify balancer won't violate upgrade domain block placement policy.
* @throws Exception
*/
@Test(timeout=100000)
public void testUpgradeDomainPolicyAfterBalance() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyWithUpgradeDomain.class,
BlockPlacementPolicy.class);
long[] capacities = new long[] { CAPACITY, CAPACITY, CAPACITY };
String[] hosts = {"host0", "host1", "host2"};
String[] racks = { RACK0, RACK1, RACK1 };
String[] UDs = { "ud0", "ud1", "ud2" };
runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
UDs, CAPACITY, "host3", RACK2, "ud2");
}
private void runBalancerAndVerifyBlockPlacmentPolicy(Configuration conf,
long[] capacities, String[] hosts, String[] racks, String[] UDs,
long newCapacity, String newHost, String newRack, String newUD)
throws Exception {
int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
DatanodeManager dm = cluster.getNamesystem().getBlockManager().
getDatanodeManager();
if (UDs != null) {
for(int i = 0; i < UDs.length; i++) {
DatanodeID datanodeId = cluster.getDataNodes().get(i).getDatanodeId();
dm.getDatanode(datanodeId).setUpgradeDomain(UDs[i]);
}
}
try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
// fill up the cluster to be 80% full
long totalCapacity = sum(capacities);
long totalUsedSpace = totalCapacity * 8 / 10;
final long fileSize = totalUsedSpace / numOfDatanodes;
DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
fileSize, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false);
// start up an empty node with the same capacity on the same rack as the
// pinned host.
cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
new String[] { newHost }, new long[] { newCapacity });
if (newUD != null) {
DatanodeID newId = cluster.getDataNodes().get(
numOfDatanodes).getDatanodeId();
dm.getDatanode(newId).setUpgradeDomain(newUD);
}
totalCapacity += newCapacity;
// run balancer and validate results
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
BlockPlacementPolicy placementPolicy =
cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy();
List<LocatedBlock> locatedBlocks = client.
getBlockLocations(fileName, 0, fileSize).getLocatedBlocks();
for (LocatedBlock locatedBlock : locatedBlocks) {
BlockPlacementStatus status = placementPolicy.verifyBlockPlacement(
locatedBlock.getLocations(), numOfDatanodes);
assertTrue(status.isPlacementPolicySatisfied());
}
} finally {
cluster.shutdown();
}
}
/**
* Wait until balanced: each datanode gives utilization within
* BALANCE_ALLOWED_VARIANCE of average

View File

@ -821,14 +821,15 @@ public class TestBlockManager {
DatanodeStorageInfo delHint = new DatanodeStorageInfo(
DFSTestUtil.getLocalDatanodeDescriptor(), new DatanodeStorage("id"));
List<DatanodeStorageInfo> moreThan1Racks = Arrays.asList(delHint);
List<StorageType> excessTypes = new ArrayList<StorageType>();
List<StorageType> excessTypes = new ArrayList<>();
BlockPlacementPolicyDefault policyDefault =
(BlockPlacementPolicyDefault) bm.getBlockPlacementPolicy();
excessTypes.add(StorageType.DEFAULT);
Assert.assertTrue(BlockPlacementPolicyDefault.useDelHint(true, delHint,
null, moreThan1Racks, excessTypes));
Assert.assertTrue(policyDefault.useDelHint(delHint, null, moreThan1Racks,
null, excessTypes));
excessTypes.remove(0);
excessTypes.add(StorageType.SSD);
Assert.assertFalse(BlockPlacementPolicyDefault.useDelHint(true, delHint,
null, moreThan1Racks, excessTypes));
Assert.assertFalse(policyDefault.useDelHint(delHint, null, moreThan1Racks,
null, excessTypes));
}
}

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@ -971,11 +972,11 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
// test returning null
excessTypes.add(StorageType.SSD);
assertNull(((BlockPlacementPolicyDefault) replicator)
.chooseReplicaToDelete((short) 3, first, second, excessTypes));
.chooseReplicaToDelete(first, second, excessTypes));
}
excessTypes.add(StorageType.DEFAULT);
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
.chooseReplicaToDelete((short) 3, first, second, excessTypes);
.chooseReplicaToDelete(first, second, excessTypes);
// Within first set, storages[1] with less free space
assertEquals(chosen, storages[1]);
@ -985,25 +986,25 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
// Within second set, storages[5] with less free space
excessTypes.add(StorageType.DEFAULT);
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
(short)2, first, second, excessTypes);
first, second, excessTypes);
assertEquals(chosen, storages[5]);
}
@Test
public void testChooseReplicasToDelete() throws Exception {
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
nonExcess.add(storages[0]);
nonExcess.add(storages[1]);
nonExcess.add(storages[2]);
nonExcess.add(storages[3]);
List<DatanodeStorageInfo> excessReplicas = new ArrayList<>();
List<DatanodeStorageInfo> excessReplicas;
BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite
.createDefaultSuite();
BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy();
DatanodeStorageInfo excessSSD = DFSTestUtil.createDatanodeStorageInfo(
"Storage-excess-SSD-ID", "localhost",
storages[0].getDatanodeDescriptor().getNetworkLocation(),
"foo.com", StorageType.SSD);
"foo.com", StorageType.SSD, null);
updateHeartbeatWithUsage(excessSSD.getDatanodeDescriptor(),
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0,
@ -1016,14 +1017,14 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
assertTrue(excessReplicas.size() > 0);
assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[0]));
// Excess type deletion
DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo(
"Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(),
"foo.com", StorageType.ARCHIVE);
"foo.com", StorageType.ARCHIVE, null);
nonExcess.add(excessStorage);
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
@ -1057,32 +1058,70 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
@Test
public void testUseDelHint() throws Exception {
List<StorageType> excessTypes = new ArrayList<StorageType>();
List<StorageType> excessTypes = new ArrayList<>();
excessTypes.add(StorageType.ARCHIVE);
// only consider delHint for the first case
assertFalse(BlockPlacementPolicyDefault.useDelHint(false, null, null, null,
null));
BlockPlacementPolicyDefault policyDefault =
(BlockPlacementPolicyDefault) replicator;
// no delHint
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, null, null, null,
null));
assertFalse(policyDefault.useDelHint(null, null, null, null, null));
// delHint storage type is not an excess type
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null,
null, excessTypes));
assertFalse(policyDefault.useDelHint(storages[0], null, null, null,
excessTypes));
// check if removing delHint reduces the number of racks
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
chosenNodes.add(storages[0]);
chosenNodes.add(storages[2]);
List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
moreThanOne.add(storages[0]);
moreThanOne.add(storages[1]);
List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
exactlyOne.add(storages[3]);
exactlyOne.add(storages[5]);
excessTypes.add(StorageType.DEFAULT);
assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null,
chosenNodes, excessTypes));
assertTrue(policyDefault.useDelHint(storages[0], null, moreThanOne,
exactlyOne, excessTypes));
// the added node adds a new rack
assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[3],
storages[5], chosenNodes, excessTypes));
assertTrue(policyDefault.useDelHint(storages[3], storages[5], moreThanOne,
exactlyOne, excessTypes));
// removing delHint reduces the number of racks;
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3],
storages[0], chosenNodes, excessTypes));
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3], null,
chosenNodes, excessTypes));
assertFalse(policyDefault.useDelHint(storages[3], storages[0], moreThanOne,
exactlyOne, excessTypes));
assertFalse(policyDefault.useDelHint(storages[3], null, moreThanOne,
exactlyOne, excessTypes));
}
@Test
public void testIsMovable() throws Exception {
List<DatanodeInfo> candidates = new ArrayList<>();
// after the move, the number of racks remains 2.
candidates.add(dataNodes[0]);
candidates.add(dataNodes[1]);
candidates.add(dataNodes[2]);
candidates.add(dataNodes[3]);
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));
// after the move, the number of racks remains 3.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[1]);
candidates.add(dataNodes[2]);
candidates.add(dataNodes[4]);
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[1]));
// after the move, the number of racks changes from 2 to 3.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[1]);
candidates.add(dataNodes[2]);
candidates.add(dataNodes[4]);
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[4]));
// the move would have reduced the number of racks from 3 to 2.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[2]);
candidates.add(dataNodes[3]);
candidates.add(dataNodes[4]);
assertFalse(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));
}
/**

View File

@ -544,7 +544,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
List<StorageType> excessTypes = new ArrayList<>();
excessTypes.add(StorageType.DEFAULT);
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
.chooseReplicaToDelete((short) 3, first, second, excessTypes);
.chooseReplicaToDelete(first, second, excessTypes);
// Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
// dataNodes[0] and dataNodes[1] are in the same nodegroup,
// but dataNodes[1] is chosen as less free space
@ -557,7 +557,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
// as less free space
excessTypes.add(StorageType.DEFAULT);
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
(short) 2, first, second, excessTypes);
first, second, excessTypes);
assertEquals(chosen, storages[2]);
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
@ -566,7 +566,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
// Within second set, dataNodes[5] with less free space
excessTypes.add(StorageType.DEFAULT);
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
(short) 1, first, second, excessTypes);
first, second, excessTypes);
assertEquals(chosen, storages[5]);
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -33,6 +34,8 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -189,41 +192,6 @@ public class TestReplicationPolicyWithUpgradeDomain
assertEquals(2, targets.length);
}
/**
* Verify the correct replica is chosen to satisfy both rack and upgrade
* domain policy.
* @throws Exception
*/
@Test
public void testChooseReplicaToDelete() throws Exception {
BlockPlacementPolicyWithUpgradeDomain upgradeDomainPolicy =
(BlockPlacementPolicyWithUpgradeDomain)replicator;
List<DatanodeStorageInfo> first = new ArrayList<>();
List<DatanodeStorageInfo> second = new ArrayList<>();
List<StorageType> excessTypes = new ArrayList<>();
excessTypes.add(StorageType.DEFAULT);
first.add(storages[0]);
first.add(storages[1]);
second.add(storages[4]);
second.add(storages[8]);
DatanodeStorageInfo chosenStorage =
upgradeDomainPolicy.chooseReplicaToDelete(
(short)3, first, second, excessTypes);
assertEquals(chosenStorage, storages[1]);
first.clear();
second.clear();
excessTypes.add(StorageType.DEFAULT);
first.add(storages[0]);
first.add(storages[1]);
first.add(storages[4]);
first.add(storages[5]);
chosenStorage = upgradeDomainPolicy.chooseReplicaToDelete(
(short)3, first, second, excessTypes);
assertTrue(chosenStorage.equals(storages[1]) ||
chosenStorage.equals(storages[4]));
}
/**
* Test the scenario where not enough replicas can't satisfy the policy.
* @throws Exception
@ -248,7 +216,7 @@ public class TestReplicationPolicyWithUpgradeDomain
}
/**
* Test the scenario where not enough replicas can't satisfy the policy.
* Test block placement verification.
* @throws Exception
*/
@Test
@ -341,6 +309,137 @@ public class TestReplicationPolicyWithUpgradeDomain
assertFalse(status.isPlacementPolicySatisfied());
}
/**
* Verify the correct replica is chosen to satisfy both rack and upgrade
* domain policy.
* @throws Exception
*/
@Test
public void testChooseReplicasToDelete() throws Exception {
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
nonExcess.add(storages[0]);
nonExcess.add(storages[1]);
nonExcess.add(storages[2]);
nonExcess.add(storages[3]);
List<DatanodeStorageInfo> excessReplicas;
BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite
.createDefaultSuite();
BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy();
// delete hint accepted.
DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor();
List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[0]));
// delete hint rejected because deleting storages[1] would have
// cause only two upgrade domains left.
delHintNode = storages[1].getDatanodeDescriptor();
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[0]));
// no delete hint, case 1
nonExcess.clear();
nonExcess.add(storages[0]);
nonExcess.add(storages[1]);
nonExcess.add(storages[4]);
nonExcess.add(storages[8]);
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
excessTypes, storages[8].getDatanodeDescriptor(), null);
assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[1]));
// no delete hint, case 2
nonExcess.clear();
nonExcess.add(storages[0]);
nonExcess.add(storages[1]);
nonExcess.add(storages[4]);
nonExcess.add(storages[5]);
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
excessTypes, storages[8].getDatanodeDescriptor(), null);
assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[1]) ||
excessReplicas.contains(storages[4]));
// No delete hint, different excess type deletion
nonExcess.clear();
nonExcess.add(storages[0]);
nonExcess.add(storages[1]);
nonExcess.add(storages[2]);
nonExcess.add(storages[3]);
DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo(
"Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(),
"foo.com", StorageType.ARCHIVE, delHintNode.getUpgradeDomain());
nonExcess.add(excessStorage);
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), null);
assertTrue(excessReplicas.size() == 2);
assertTrue(excessReplicas.contains(storages[0]));
assertTrue(excessReplicas.contains(excessStorage));
}
@Test
public void testIsMovable() throws Exception {
List<DatanodeInfo> candidates = new ArrayList<>();
// after the move, the number of racks changes from 1 to 2.
// and number of upgrade domains remains 3.
candidates.add(dataNodes[0]);
candidates.add(dataNodes[1]);
candidates.add(dataNodes[2]);
candidates.add(dataNodes[3]);
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));
// the move would have changed the number of racks from 1 to 2.
// and the number of UDs from 3 to 2.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[1]);
candidates.add(dataNodes[2]);
candidates.add(dataNodes[4]);
assertFalse(replicator.isMovable(candidates, dataNodes[0], dataNodes[4]));
// after the move, the number of racks remains 2.
// the number of UDs remains 3.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[4]);
candidates.add(dataNodes[5]);
candidates.add(dataNodes[6]);
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[6]));
// after the move, the number of racks remains 2.
// the number of UDs remains 2.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[1]);
candidates.add(dataNodes[3]);
candidates.add(dataNodes[4]);
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[4]));
// the move would have changed the number of racks from 2 to 3.
// and the number of UDs from 2 to 1.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[3]);
candidates.add(dataNodes[4]);
candidates.add(dataNodes[6]);
assertFalse(replicator.isMovable(candidates, dataNodes[4], dataNodes[6]));
}
private Set<String> getUpgradeDomains(DatanodeStorageInfo[] nodes) {
HashSet<String> upgradeDomains = new HashSet<>();
for (DatanodeStorageInfo node : nodes) {

View File

@ -629,11 +629,13 @@ public class TestDNFencing {
}
@Override
public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor,
Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo> second,
public DatanodeStorageInfo chooseReplicaToDelete(
Collection<DatanodeStorageInfo> moreThanOne,
Collection<DatanodeStorageInfo> exactlyOne,
List<StorageType> excessTypes) {
Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second;
Collection<DatanodeStorageInfo> chooseFrom = !moreThanOne.isEmpty() ?
moreThanOne : exactlyOne;
List<DatanodeStorageInfo> l = Lists.newArrayList(chooseFrom);
return l.get(ThreadLocalRandom.current().nextInt(l.size()));