diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b7f24ffec02..dfea3f31340 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -771,6 +771,8 @@ Release 2.8.0 - UNRELEASED HDFS-9339. Extend full test of KMS ACLs. (Daniel Templeton via zhz) + HDFS-9007. Fix HDFS Balancer to honor upgrade domain policy. (Ming Ma via lei) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 509b652e2a5..151ab09a304 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; @@ -124,6 +125,7 @@ public class Dispatcher { private final int ioFileBufferSize; private final boolean connectToDnViaHostname; + private BlockPlacementPolicy placementPolicy; static class Allocator { private final int max; @@ -888,6 +890,8 @@ public class Dispatcher { this.connectToDnViaHostname = conf.getBoolean( HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); + this.placementPolicy = + BlockPlacementPolicy.getInstance(conf, null, cluster, null); } public DistributedFileSystem getDistributedFileSystem() { @@ -1106,66 +1110,24 @@ public class Dispatcher { } } - if (cluster.isNodeGroupAware() - && isOnSameNodeGroupWithReplicas(source, target, block)) { - return false; - } - if (reduceNumOfRacks(source, target, block)) { + if (!isGoodBlockCandidateForPlacementPolicy(source, target, block)) { return false; } return true; } - /** - * Determine whether moving the given block replica from source to target - * would reduce the number of racks of the block replicas. - */ - private boolean reduceNumOfRacks(StorageGroup source, StorageGroup target, - DBlock block) { - final DatanodeInfo sourceDn = source.getDatanodeInfo(); - if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) { - // source and target are on the same rack - return false; - } - boolean notOnSameRack = true; + // Check if the move will violate the block placement policy. + private boolean isGoodBlockCandidateForPlacementPolicy(StorageGroup source, + StorageGroup target, DBlock block) { + List datanodeInfos = new ArrayList<>(); synchronized (block) { - for (StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameRack(loc.getDatanodeInfo(), target.getDatanodeInfo())) { - notOnSameRack = false; - break; - } + for (StorageGroup loc : block.locations) { + datanodeInfos.add(loc.getDatanodeInfo()); } + datanodeInfos.add(target.getDatanodeInfo()); } - if (notOnSameRack) { - // target is not on the same rack as any replica - return false; - } - for (StorageGroup g : block.getLocations()) { - if (g != source && cluster.isOnSameRack(g.getDatanodeInfo(), sourceDn)) { - // source is on the same rack of another replica - return false; - } - } - return true; - } - - /** - * Check if there are any replica (other than source) on the same node group - * with target. If true, then target is not a good candidate for placing - * specific replica as we don't want 2 replicas under the same nodegroup. - * - * @return true if there are any replica (other than source) on the same node - * group with target - */ - private boolean isOnSameNodeGroupWithReplicas(StorageGroup source, - StorageGroup target, DBlock block) { - final DatanodeInfo targetDn = target.getDatanodeInfo(); - for (StorageGroup g : block.getLocations()) { - if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), targetDn)) { - return true; - } - } - return false; + return placementPolicy.isMovable( + datanodeInfos, source.getDatanodeInfo(), target.getDatanodeInfo()); } /** Reset all fields in order to prepare for the next iteration */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index e75efa0a6ee..d35b24623cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; @@ -166,6 +167,17 @@ public abstract class BlockPlacementPolicy { return replicator; } + /** + * Check if the move is allowed. Used by balancer and other tools. + * @ + * + * @param candidates all replicas including source and target + * @param source source replica of the move + * @param target target replica of the move + */ + abstract public boolean isMovable(Collection candidates, + DatanodeInfo source, DatanodeInfo target); + /** * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur. * @@ -198,6 +210,20 @@ public abstract class BlockPlacementPolicy { } } + protected DatanodeInfo getDatanodeInfo(T datanode) { + Preconditions.checkArgument( + datanode instanceof DatanodeInfo || + datanode instanceof DatanodeStorageInfo, + "class " + datanode.getClass().getName() + " not allowed"); + if (datanode instanceof DatanodeInfo) { + return ((DatanodeInfo)datanode); + } else if (datanode instanceof DatanodeStorageInfo) { + return ((DatanodeStorageInfo)datanode).getDatanodeDescriptor(); + } else { + return null; + } + } + /** * Get rack string from a data node * @return rack of data node @@ -205,33 +231,33 @@ public abstract class BlockPlacementPolicy { protected String getRack(final DatanodeInfo datanode) { return datanode.getNetworkLocation(); } - + /** * Split data nodes into two sets, one set includes nodes on rack with * more than one replica, the other set contains the remaining nodes. * - * @param dataNodes datanodes to be split into two sets + * @param storagesOrDataNodes DatanodeStorageInfo/DatanodeInfo to be split + * into two sets * @param rackMap a map from rack to datanodes * @param moreThanOne contains nodes on rack with more than one replica * @param exactlyOne remains contains the remaining nodes */ - public void splitNodesWithRack( - final Iterable storages, - final Map> rackMap, - final List moreThanOne, - final List exactlyOne) { - for(DatanodeStorageInfo s: storages) { - final String rackName = getRack(s.getDatanodeDescriptor()); - List storageList = rackMap.get(rackName); + public void splitNodesWithRack( + final Iterable storagesOrDataNodes, + final Map> rackMap, + final List moreThanOne, + final List exactlyOne) { + for(T s: storagesOrDataNodes) { + final String rackName = getRack(getDatanodeInfo(s)); + List storageList = rackMap.get(rackName); if (storageList == null) { - storageList = new ArrayList(); + storageList = new ArrayList(); rackMap.put(rackName, storageList); } storageList.add(s); } - // split nodes into two sets - for(List storageList : rackMap.values()) { + for(List storageList : rackMap.values()) { if (storageList.size() == 1) { // exactlyOne contains nodes on rack with only one replica exactlyOne.add(storageList.get(0)); @@ -241,5 +267,4 @@ public abstract class BlockPlacementPolicy { } } } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 2723ed95f91..56ebc35998a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -881,7 +881,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { minRacks = Math.min(minRacks, numberOfReplicas); // 1. Check that all locations are different. // 2. Count locations on different racks. - Set racks = new TreeSet(); + Set racks = new TreeSet<>(); for (DatanodeInfo dn : locs) racks.add(dn.getNetworkLocation()); return new BlockPlacementStatusDefault(racks.size(), minRacks); @@ -889,8 +889,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { /** * Decide whether deleting the specified replica of the block still makes * the block conform to the configured block placement policy. - * @param replicationFactor The required number of replicas for this block - * @param moreThanone The replica locations of this block that are present + * @param moreThanOne The replica locations of this block that are present * on more than one unique racks. * @param exactlyOne Replica locations of this block that are present * on exactly one unique racks. @@ -900,8 +899,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * @return the replica that is the best candidate for deletion */ @VisibleForTesting - public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor, - Collection moreThanone, Collection exactlyOne, + public DatanodeStorageInfo chooseReplicaToDelete( + Collection moreThanOne, + Collection exactlyOne, final List excessTypes) { long oldestHeartbeat = monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier; @@ -911,7 +911,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // Pick the node with the oldest heartbeat or with the least free space, // if all hearbeats are within the tolerable heartbeat interval - for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanone, exactlyOne)) { + for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanOne, + exactlyOne)) { if (!excessTypes.contains(storage.getStorageType())) { continue; } @@ -972,13 +973,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { while (candidates.size() - expectedNumOfReplicas > excessReplicas.size()) { final DatanodeStorageInfo cur; - if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage, - moreThanOne, excessTypes)) { + if (firstOne && useDelHint(delNodeHintStorage, addedNodeStorage, + moreThanOne, exactlyOne, excessTypes)) { cur = delNodeHintStorage; } else { // regular excessive replica removal - cur = - chooseReplicaToDelete((short) expectedNumOfReplicas, moreThanOne, exactlyOne, - excessTypes); + cur = chooseReplicaToDelete(moreThanOne, exactlyOne, excessTypes); } firstOne = false; if (cur == null) { @@ -997,26 +996,40 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { /** Check if we can use delHint. */ @VisibleForTesting - static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint, - DatanodeStorageInfo added, List moreThan1Racks, + boolean useDelHint(DatanodeStorageInfo delHint, + DatanodeStorageInfo added, List moreThanOne, + Collection exactlyOne, List excessTypes) { - if (!isFirst) { - return false; // only consider delHint for the first case - } else if (delHint == null) { + if (delHint == null) { return false; // no delHint } else if (!excessTypes.contains(delHint.getStorageType())) { return false; // delHint storage type is not an excess type } else { // check if removing delHint reduces the number of racks - if (moreThan1Racks.contains(delHint)) { - return true; // delHint and some other nodes are under the same rack - } else if (added != null && !moreThan1Racks.contains(added)) { - return true; // the added node adds a new rack - } - return false; // removing delHint reduces the number of racks; + return notReduceNumOfGroups(moreThanOne, delHint, added); } } + // Check if moving from source to target will reduce the number of + // groups. The groups could be based on racks or upgrade domains. + boolean notReduceNumOfGroups(List moreThanOne, T source, T target) { + if (moreThanOne.contains(source)) { + return true; // source and some other nodes are under the same group. + } else if (target != null && !moreThanOne.contains(target)) { + return true; // the added node adds a new group. + } + return false; // removing delHint reduces the number of groups. + } + + @Override + public boolean isMovable(Collection locs, + DatanodeInfo source, DatanodeInfo target) { + final Map> rackMap = new HashMap<>(); + final List moreThanOne = new ArrayList<>(); + final List exactlyOne = new ArrayList<>(); + splitNodesWithRack(locs, rackMap, moreThanOne, exactlyOne); + return notReduceNumOfGroups(moreThanOne, source, target); + } /** * Pick up replica node set for deleting replica as over-replicated. * First set contains replica nodes on rack with more than one diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index f26c4d352cc..0481e07a9ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -39,11 +39,6 @@ import org.apache.hadoop.net.NodeBase; */ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault { - protected BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap, DatanodeManager datanodeManager) { - initialize(conf, stats, clusterMap, host2datanodeMap); - } - protected BlockPlacementPolicyWithNodeGroup() { } @@ -347,22 +342,21 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau // Split data nodes in the first set into two sets, // moreThanOne contains nodes on nodegroup with more than one replica // exactlyOne contains the remaining nodes - Map> nodeGroupMap = - new HashMap>(); + Map> nodeGroupMap = new HashMap<>(); for(DatanodeStorageInfo storage : first) { final String nodeGroupName = NetworkTopology.getLastHalf( storage.getDatanodeDescriptor().getNetworkLocation()); List storageList = nodeGroupMap.get(nodeGroupName); if (storageList == null) { - storageList = new ArrayList(); + storageList = new ArrayList<>(); nodeGroupMap.put(nodeGroupName, storageList); } storageList.add(storage); } - final List moreThanOne = new ArrayList(); - final List exactlyOne = new ArrayList(); + final List moreThanOne = new ArrayList<>(); + final List exactlyOne = new ArrayList<>(); // split nodes into two sets for(List datanodeList : nodeGroupMap.values()) { if (datanodeList.size() == 1 ) { @@ -376,5 +370,24 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau return moreThanOne.isEmpty()? exactlyOne : moreThanOne; } - + + /** + * Check if there are any replica (other than source) on the same node group + * with target. If true, then target is not a good candidate for placing + * specific replica as we don't want 2 replicas under the same nodegroup. + * + * @return true if there are any replica (other than source) on the same node + * group with target + */ + @Override + public boolean isMovable(Collection locs, + DatanodeInfo source, DatanodeInfo target) { + for (DatanodeInfo dn : locs) { + if (dn != source && dn != target && + clusterMap.isOnSameNodeGroup(dn, target)) { + return false; + } + } + return true; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java index 32419080aeb..8d6b13c8cbd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java @@ -30,6 +30,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.net.NetworkTopology; @@ -117,13 +118,13 @@ public class BlockPlacementPolicyWithUpgradeDomain extends return upgradeDomains; } - private Map> getUpgradeDomainMap( - DatanodeStorageInfo[] storageInfos) { - Map> upgradeDomainMap = new HashMap<>(); - for(DatanodeStorageInfo storage : storageInfos) { + private Map> getUpgradeDomainMap( + Collection storagesOrDataNodes) { + Map> upgradeDomainMap = new HashMap<>(); + for(T storage : storagesOrDataNodes) { String upgradeDomain = getUpgradeDomainWithDefaultValue( - storage.getDatanodeDescriptor()); - List storages = upgradeDomainMap.get(upgradeDomain); + getDatanodeInfo(storage)); + List storages = upgradeDomainMap.get(upgradeDomain); if (storages == null) { storages = new ArrayList<>(); upgradeDomainMap.put(upgradeDomain, storages); @@ -156,6 +157,19 @@ public class BlockPlacementPolicyWithUpgradeDomain extends return getShareUDSet; } + private Collection combine( + Collection moreThanOne, + Collection exactlyOne) { + List all = new ArrayList<>(); + if (moreThanOne != null) { + all.addAll(moreThanOne); + } + if (exactlyOne != null) { + all.addAll(exactlyOne); + } + return all; + } + /* * The policy to pick the replica set for deleting the over-replicated * replica which meet the rack and upgrade domain requirements. @@ -231,20 +245,11 @@ public class BlockPlacementPolicyWithUpgradeDomain extends protected Collection pickupReplicaSet( Collection moreThanOne, Collection exactlyOne) { - List all = new ArrayList<>(); - if (moreThanOne != null) { - all.addAll(moreThanOne); - } - if (exactlyOne != null) { - all.addAll(exactlyOne); - } - - Map> upgradeDomains = - getUpgradeDomainMap(all.toArray(new DatanodeStorageInfo[all.size()])); - // shareUDSet includes DatanodeStorageInfo that share same upgrade // domain with another DatanodeStorageInfo. - List shareUDSet = getShareUDSet(upgradeDomains); + Collection all = combine(moreThanOne, exactlyOne); + List shareUDSet = getShareUDSet( + getUpgradeDomainMap(all)); // shareRackAndUDSet contains those DatanodeStorageInfo that // share rack and upgrade domain with another DatanodeStorageInfo. List shareRackAndUDSet = new ArrayList<>(); @@ -260,4 +265,47 @@ public class BlockPlacementPolicyWithUpgradeDomain extends } return (shareRackAndUDSet.size() > 0) ? shareRackAndUDSet : shareUDSet; } + + @Override + boolean useDelHint(DatanodeStorageInfo delHint, + DatanodeStorageInfo added, List moreThanOne, + Collection exactlyOne, + List excessTypes) { + if (!super.useDelHint(delHint, added, moreThanOne, exactlyOne, + excessTypes)) { + // If BlockPlacementPolicyDefault doesn't allow useDelHint, there is no + // point checking with upgrade domain policy. + return false; + } + return isMovableBasedOnUpgradeDomain(combine(moreThanOne, exactlyOne), + delHint, added); + } + + // Check if moving from source to target will preserve the upgrade domain + // policy. + private boolean isMovableBasedOnUpgradeDomain(Collection all, + T source, T target) { + Map> udMap = getUpgradeDomainMap(all); + // shareUDSet includes datanodes that share same upgrade + // domain with another datanode. + List shareUDSet = getShareUDSet(udMap); + // check if removing source reduces the number of upgrade domains + if (notReduceNumOfGroups(shareUDSet, source, target)) { + return true; + } else if (udMap.size() > upgradeDomainFactor) { + return true; // existing number of upgrade domain exceeds the limit. + } else { + return false; // removing source reduces the number of UDs. + } + } + + @Override + public boolean isMovable(Collection locs, + DatanodeInfo source, DatanodeInfo target) { + if (super.isMovable(locs, source, target)) { + return isMovableBasedOnUpgradeDomain(locs, source, target); + } else { + return false; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index ebbc9b5fd99..50c6d058b15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1154,7 +1154,7 @@ public class DFSTestUtil { final StorageType type = (types != null && i < types.length) ? types[i] : StorageType.DEFAULT; storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname, - type); + type, null); } return storages; } @@ -1162,16 +1162,19 @@ public class DFSTestUtil { public static DatanodeStorageInfo createDatanodeStorageInfo( String storageID, String ip, String rack, String hostname) { return createDatanodeStorageInfo(storageID, ip, rack, hostname, - StorageType.DEFAULT); + StorageType.DEFAULT, null); } public static DatanodeStorageInfo createDatanodeStorageInfo( String storageID, String ip, String rack, String hostname, - StorageType type) { + StorageType type, String upgradeDomain) { final DatanodeStorage storage = new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, type); final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor( ip, rack, storage, hostname); + if (upgradeDomain != null) { + dn.setUpgradeDomain(upgradeDomain); + } return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index e33e5862291..a24cf9539e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -75,7 +76,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; -import org.apache.hadoop.hdfs.server.balancer.BalancerParameters; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; @@ -390,7 +394,102 @@ public class TestBalancer { int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); } - + + /** + * Verify balancer won't violate the default block placement policy. + * @throws Exception + */ + @Test(timeout=100000) + public void testRackPolicyAfterBalance() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + long[] capacities = new long[] { CAPACITY, CAPACITY }; + String[] hosts = {"host0", "host1"}; + String[] racks = { RACK0, RACK1 }; + runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks, + null, CAPACITY, "host2", RACK1, null); + } + + /** + * Verify balancer won't violate upgrade domain block placement policy. + * @throws Exception + */ + @Test(timeout=100000) + public void testUpgradeDomainPolicyAfterBalance() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyWithUpgradeDomain.class, + BlockPlacementPolicy.class); + long[] capacities = new long[] { CAPACITY, CAPACITY, CAPACITY }; + String[] hosts = {"host0", "host1", "host2"}; + String[] racks = { RACK0, RACK1, RACK1 }; + String[] UDs = { "ud0", "ud1", "ud2" }; + runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks, + UDs, CAPACITY, "host3", RACK2, "ud2"); + } + + private void runBalancerAndVerifyBlockPlacmentPolicy(Configuration conf, + long[] capacities, String[] hosts, String[] racks, String[] UDs, + long newCapacity, String newHost, String newRack, String newUD) + throws Exception { + int numOfDatanodes = capacities.length; + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) + .hosts(hosts).racks(racks).simulatedCapacities(capacities).build(); + DatanodeManager dm = cluster.getNamesystem().getBlockManager(). + getDatanodeManager(); + if (UDs != null) { + for(int i = 0; i < UDs.length; i++) { + DatanodeID datanodeId = cluster.getDataNodes().get(i).getDatanodeId(); + dm.getDatanode(datanodeId).setUpgradeDomain(UDs[i]); + } + } + + try { + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); + + // fill up the cluster to be 80% full + long totalCapacity = sum(capacities); + long totalUsedSpace = totalCapacity * 8 / 10; + + final long fileSize = totalUsedSpace / numOfDatanodes; + DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024, + fileSize, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false); + + // start up an empty node with the same capacity on the same rack as the + // pinned host. + cluster.startDataNodes(conf, 1, true, null, new String[] { newRack }, + new String[] { newHost }, new long[] { newCapacity }); + if (newUD != null) { + DatanodeID newId = cluster.getDataNodes().get( + numOfDatanodes).getDatanodeId(); + dm.getDatanode(newId).setUpgradeDomain(newUD); + } + totalCapacity += newCapacity; + + // run balancer and validate results + waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); + + // start rebalancing + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); + BlockPlacementPolicy placementPolicy = + cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy(); + List locatedBlocks = client. + getBlockLocations(fileName, 0, fileSize).getLocatedBlocks(); + for (LocatedBlock locatedBlock : locatedBlocks) { + BlockPlacementStatus status = placementPolicy.verifyBlockPlacement( + locatedBlock.getLocations(), numOfDatanodes); + assertTrue(status.isPlacementPolicySatisfied()); + } + } finally { + cluster.shutdown(); + } + } + /** * Wait until balanced: each datanode gives utilization within * BALANCE_ALLOWED_VARIANCE of average diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 3a55cc4dda1..51b0f216c1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -820,14 +820,15 @@ public class TestBlockManager { DatanodeStorageInfo delHint = new DatanodeStorageInfo( DFSTestUtil.getLocalDatanodeDescriptor(), new DatanodeStorage("id")); List moreThan1Racks = Arrays.asList(delHint); - List excessTypes = new ArrayList(); - + List excessTypes = new ArrayList<>(); + BlockPlacementPolicyDefault policyDefault = + (BlockPlacementPolicyDefault) bm.getBlockPlacementPolicy(); excessTypes.add(StorageType.DEFAULT); - Assert.assertTrue(BlockPlacementPolicyDefault.useDelHint(true, delHint, - null, moreThan1Racks, excessTypes)); + Assert.assertTrue(policyDefault.useDelHint(delHint, null, moreThan1Racks, + null, excessTypes)); excessTypes.remove(0); excessTypes.add(StorageType.SSD); - Assert.assertFalse(BlockPlacementPolicyDefault.useDelHint(true, delHint, - null, moreThan1Racks, excessTypes)); + Assert.assertFalse(policyDefault.useDelHint(delHint, null, moreThan1Racks, + null, excessTypes)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 60aa9ef042c..3723a6c9722 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.TestBlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; @@ -971,11 +972,11 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // test returning null excessTypes.add(StorageType.SSD); assertNull(((BlockPlacementPolicyDefault) replicator) - .chooseReplicaToDelete((short) 3, first, second, excessTypes)); + .chooseReplicaToDelete(first, second, excessTypes)); } excessTypes.add(StorageType.DEFAULT); DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator) - .chooseReplicaToDelete((short) 3, first, second, excessTypes); + .chooseReplicaToDelete(first, second, excessTypes); // Within first set, storages[1] with less free space assertEquals(chosen, storages[1]); @@ -985,25 +986,25 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Within second set, storages[5] with less free space excessTypes.add(StorageType.DEFAULT); chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( - (short)2, first, second, excessTypes); + first, second, excessTypes); assertEquals(chosen, storages[5]); } @Test public void testChooseReplicasToDelete() throws Exception { - Collection nonExcess = new ArrayList(); + Collection nonExcess = new ArrayList<>(); nonExcess.add(storages[0]); nonExcess.add(storages[1]); nonExcess.add(storages[2]); nonExcess.add(storages[3]); - List excessReplicas = new ArrayList<>(); + List excessReplicas; BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite .createDefaultSuite(); BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy(); DatanodeStorageInfo excessSSD = DFSTestUtil.createDatanodeStorageInfo( "Storage-excess-SSD-ID", "localhost", storages[0].getDatanodeDescriptor().getNetworkLocation(), - "foo.com", StorageType.SSD); + "foo.com", StorageType.SSD, null); updateHeartbeatWithUsage(excessSSD.getDatanodeDescriptor(), 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, @@ -1016,14 +1017,14 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { DatanodeStorageInfo.toStorageTypes(nonExcess)); excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessTypes, storages[3].getDatanodeDescriptor(), delHintNode); - assertTrue(excessReplicas.size() > 0); + assertTrue(excessReplicas.size() == 1); assertTrue(excessReplicas.contains(storages[0])); // Excess type deletion DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo( "Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(), - "foo.com", StorageType.ARCHIVE); + "foo.com", StorageType.ARCHIVE, null); nonExcess.add(excessStorage); excessTypes = storagePolicy.chooseExcess((short) 3, DatanodeStorageInfo.toStorageTypes(nonExcess)); @@ -1057,32 +1058,70 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { @Test public void testUseDelHint() throws Exception { - List excessTypes = new ArrayList(); + List excessTypes = new ArrayList<>(); excessTypes.add(StorageType.ARCHIVE); - // only consider delHint for the first case - assertFalse(BlockPlacementPolicyDefault.useDelHint(false, null, null, null, - null)); + BlockPlacementPolicyDefault policyDefault = + (BlockPlacementPolicyDefault) replicator; // no delHint - assertFalse(BlockPlacementPolicyDefault.useDelHint(true, null, null, null, - null)); + assertFalse(policyDefault.useDelHint(null, null, null, null, null)); // delHint storage type is not an excess type - assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null, - null, excessTypes)); + assertFalse(policyDefault.useDelHint(storages[0], null, null, null, + excessTypes)); // check if removing delHint reduces the number of racks - List chosenNodes = new ArrayList(); - chosenNodes.add(storages[0]); - chosenNodes.add(storages[2]); + List moreThanOne = new ArrayList<>(); + moreThanOne.add(storages[0]); + moreThanOne.add(storages[1]); + List exactlyOne = new ArrayList<>(); + exactlyOne.add(storages[3]); + exactlyOne.add(storages[5]); + excessTypes.add(StorageType.DEFAULT); - assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null, - chosenNodes, excessTypes)); + assertTrue(policyDefault.useDelHint(storages[0], null, moreThanOne, + exactlyOne, excessTypes)); // the added node adds a new rack - assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[3], - storages[5], chosenNodes, excessTypes)); + assertTrue(policyDefault.useDelHint(storages[3], storages[5], moreThanOne, + exactlyOne, excessTypes)); // removing delHint reduces the number of racks; - assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3], - storages[0], chosenNodes, excessTypes)); - assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3], null, - chosenNodes, excessTypes)); + assertFalse(policyDefault.useDelHint(storages[3], storages[0], moreThanOne, + exactlyOne, excessTypes)); + assertFalse(policyDefault.useDelHint(storages[3], null, moreThanOne, + exactlyOne, excessTypes)); + } + + @Test + public void testIsMovable() throws Exception { + List candidates = new ArrayList<>(); + + // after the move, the number of racks remains 2. + candidates.add(dataNodes[0]); + candidates.add(dataNodes[1]); + candidates.add(dataNodes[2]); + candidates.add(dataNodes[3]); + assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[3])); + + // after the move, the number of racks remains 3. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[1]); + candidates.add(dataNodes[2]); + candidates.add(dataNodes[4]); + assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[1])); + + // after the move, the number of racks changes from 2 to 3. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[1]); + candidates.add(dataNodes[2]); + candidates.add(dataNodes[4]); + assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[4])); + + // the move would have reduced the number of racks from 3 to 2. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[2]); + candidates.add(dataNodes[3]); + candidates.add(dataNodes[4]); + assertFalse(replicator.isMovable(candidates, dataNodes[0], dataNodes[3])); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index 0ff77702522..367faea0e37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -544,7 +544,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes List excessTypes = new ArrayList<>(); excessTypes.add(StorageType.DEFAULT); DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator) - .chooseReplicaToDelete((short) 3, first, second, excessTypes); + .chooseReplicaToDelete(first, second, excessTypes); // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]}, // dataNodes[0] and dataNodes[1] are in the same nodegroup, // but dataNodes[1] is chosen as less free space @@ -557,7 +557,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes // as less free space excessTypes.add(StorageType.DEFAULT); chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( - (short) 2, first, second, excessTypes); + first, second, excessTypes); assertEquals(chosen, storages[2]); replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen); @@ -566,7 +566,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes // Within second set, dataNodes[5] with less free space excessTypes.add(StorageType.DEFAULT); chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( - (short) 1, first, second, excessTypes); + first, second, excessTypes); assertEquals(chosen, storages[5]); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java index b5caebfe6de..608817f302b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -33,6 +34,8 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.TestBlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -189,41 +192,6 @@ public class TestReplicationPolicyWithUpgradeDomain assertEquals(2, targets.length); } - /** - * Verify the correct replica is chosen to satisfy both rack and upgrade - * domain policy. - * @throws Exception - */ - @Test - public void testChooseReplicaToDelete() throws Exception { - BlockPlacementPolicyWithUpgradeDomain upgradeDomainPolicy = - (BlockPlacementPolicyWithUpgradeDomain)replicator; - List first = new ArrayList<>(); - List second = new ArrayList<>(); - List excessTypes = new ArrayList<>(); - excessTypes.add(StorageType.DEFAULT); - first.add(storages[0]); - first.add(storages[1]); - second.add(storages[4]); - second.add(storages[8]); - DatanodeStorageInfo chosenStorage = - upgradeDomainPolicy.chooseReplicaToDelete( - (short)3, first, second, excessTypes); - assertEquals(chosenStorage, storages[1]); - first.clear(); - second.clear(); - - excessTypes.add(StorageType.DEFAULT); - first.add(storages[0]); - first.add(storages[1]); - first.add(storages[4]); - first.add(storages[5]); - chosenStorage = upgradeDomainPolicy.chooseReplicaToDelete( - (short)3, first, second, excessTypes); - assertTrue(chosenStorage.equals(storages[1]) || - chosenStorage.equals(storages[4])); - } - /** * Test the scenario where not enough replicas can't satisfy the policy. * @throws Exception @@ -248,7 +216,7 @@ public class TestReplicationPolicyWithUpgradeDomain } /** - * Test the scenario where not enough replicas can't satisfy the policy. + * Test block placement verification. * @throws Exception */ @Test @@ -341,6 +309,137 @@ public class TestReplicationPolicyWithUpgradeDomain assertFalse(status.isPlacementPolicySatisfied()); } + /** + * Verify the correct replica is chosen to satisfy both rack and upgrade + * domain policy. + * @throws Exception + */ + @Test + public void testChooseReplicasToDelete() throws Exception { + Collection nonExcess = new ArrayList<>(); + nonExcess.add(storages[0]); + nonExcess.add(storages[1]); + nonExcess.add(storages[2]); + nonExcess.add(storages[3]); + List excessReplicas; + BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite + .createDefaultSuite(); + BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy(); + + // delete hint accepted. + DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor(); + List excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[3].getDatanodeDescriptor(), delHintNode); + assertTrue(excessReplicas.size() == 1); + assertTrue(excessReplicas.contains(storages[0])); + + // delete hint rejected because deleting storages[1] would have + // cause only two upgrade domains left. + delHintNode = storages[1].getDatanodeDescriptor(); + excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[3].getDatanodeDescriptor(), delHintNode); + assertTrue(excessReplicas.size() == 1); + assertTrue(excessReplicas.contains(storages[0])); + + // no delete hint, case 1 + nonExcess.clear(); + nonExcess.add(storages[0]); + nonExcess.add(storages[1]); + nonExcess.add(storages[4]); + nonExcess.add(storages[8]); + excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[8].getDatanodeDescriptor(), null); + assertTrue(excessReplicas.size() == 1); + assertTrue(excessReplicas.contains(storages[1])); + + // no delete hint, case 2 + nonExcess.clear(); + nonExcess.add(storages[0]); + nonExcess.add(storages[1]); + nonExcess.add(storages[4]); + nonExcess.add(storages[5]); + excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[8].getDatanodeDescriptor(), null); + assertTrue(excessReplicas.size() == 1); + assertTrue(excessReplicas.contains(storages[1]) || + excessReplicas.contains(storages[4])); + + // No delete hint, different excess type deletion + nonExcess.clear(); + nonExcess.add(storages[0]); + nonExcess.add(storages[1]); + nonExcess.add(storages[2]); + nonExcess.add(storages[3]); + DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo( + "Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(), + "foo.com", StorageType.ARCHIVE, delHintNode.getUpgradeDomain()); + nonExcess.add(excessStorage); + excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[3].getDatanodeDescriptor(), null); + assertTrue(excessReplicas.size() == 2); + assertTrue(excessReplicas.contains(storages[0])); + assertTrue(excessReplicas.contains(excessStorage)); + } + + @Test + public void testIsMovable() throws Exception { + List candidates = new ArrayList<>(); + + // after the move, the number of racks changes from 1 to 2. + // and number of upgrade domains remains 3. + candidates.add(dataNodes[0]); + candidates.add(dataNodes[1]); + candidates.add(dataNodes[2]); + candidates.add(dataNodes[3]); + assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[3])); + + // the move would have changed the number of racks from 1 to 2. + // and the number of UDs from 3 to 2. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[1]); + candidates.add(dataNodes[2]); + candidates.add(dataNodes[4]); + assertFalse(replicator.isMovable(candidates, dataNodes[0], dataNodes[4])); + + // after the move, the number of racks remains 2. + // the number of UDs remains 3. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[4]); + candidates.add(dataNodes[5]); + candidates.add(dataNodes[6]); + assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[6])); + + // after the move, the number of racks remains 2. + // the number of UDs remains 2. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[1]); + candidates.add(dataNodes[3]); + candidates.add(dataNodes[4]); + assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[4])); + + // the move would have changed the number of racks from 2 to 3. + // and the number of UDs from 2 to 1. + candidates.clear(); + candidates.add(dataNodes[0]); + candidates.add(dataNodes[3]); + candidates.add(dataNodes[4]); + candidates.add(dataNodes[6]); + assertFalse(replicator.isMovable(candidates, dataNodes[4], dataNodes[6])); + } + private Set getUpgradeDomains(DatanodeStorageInfo[] nodes) { HashSet upgradeDomains = new HashSet<>(); for (DatanodeStorageInfo node : nodes) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java index 143665a4491..9df43990097 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java @@ -629,11 +629,13 @@ public class TestDNFencing { } @Override - public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor, - Collection first, Collection second, + public DatanodeStorageInfo chooseReplicaToDelete( + Collection moreThanOne, + Collection exactlyOne, List excessTypes) { - - Collection chooseFrom = !first.isEmpty() ? first : second; + + Collection chooseFrom = !moreThanOne.isEmpty() ? + moreThanOne : exactlyOne; List l = Lists.newArrayList(chooseFrom); return l.get(ThreadLocalRandom.current().nextInt(l.size()));