HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain. (Ming Ma via lei)
This commit is contained in:
parent
c60a16fcea
commit
0f5f9846ed
|
@ -1515,6 +1515,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-8988. Use LightWeightHashSet instead of LightWeightLinkedSet in
|
||||
BlockManager#excessReplicateMap. (yliu)
|
||||
|
||||
HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain.
|
||||
(Ming Ma via lei)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -333,6 +333,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final int DFS_NAMENODE_MAX_XATTR_SIZE_DEFAULT = 16384;
|
||||
public static final int DFS_NAMENODE_MAX_XATTR_SIZE_HARD_LIMIT = 32768;
|
||||
|
||||
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
|
||||
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
|
||||
|
||||
//Following keys have no defaults
|
||||
public static final String DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir";
|
||||
|
|
|
@ -125,14 +125,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
}
|
||||
|
||||
Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
|
||||
new HashSet<Node>() : new HashSet<Node>(excludedNodes);
|
||||
new HashSet<Node>() : new HashSet<>(excludedNodes);
|
||||
final List<StorageType> requiredStorageTypes = storagePolicy
|
||||
.chooseStorageTypes((short)numOfReplicas);
|
||||
final EnumMap<StorageType, Integer> storageTypes =
|
||||
getRequiredStorageTypes(requiredStorageTypes);
|
||||
|
||||
// Choose favored nodes
|
||||
List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> results = new ArrayList<>();
|
||||
boolean avoidStaleNodes = stats != null
|
||||
&& stats.isAvoidingStaleDataNodesForWrite();
|
||||
|
||||
|
@ -192,14 +192,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
}
|
||||
|
||||
if (excludedNodes == null) {
|
||||
excludedNodes = new HashSet<Node>();
|
||||
excludedNodes = new HashSet<>();
|
||||
}
|
||||
|
||||
int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
|
||||
numOfReplicas = result[0];
|
||||
int maxNodesPerRack = result[1];
|
||||
|
||||
final List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(chosenStorage);
|
||||
final List<DatanodeStorageInfo> results = new ArrayList<>(chosenStorage);
|
||||
for (DatanodeStorageInfo storage : chosenStorage) {
|
||||
// add localMachine and related nodes to excludedNodes
|
||||
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
|
||||
|
@ -266,8 +266,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
|
||||
private EnumMap<StorageType, Integer> getRequiredStorageTypes(
|
||||
List<StorageType> types) {
|
||||
EnumMap<StorageType, Integer> map = new EnumMap<StorageType,
|
||||
Integer>(StorageType.class);
|
||||
EnumMap<StorageType, Integer> map = new EnumMap<>(StorageType.class);
|
||||
for (StorageType type : types) {
|
||||
if (!map.containsKey(type)) {
|
||||
map.put(type, 1);
|
||||
|
@ -310,7 +309,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
}
|
||||
|
||||
// Keep a copy of original excludedNodes
|
||||
final Set<Node> oldExcludedNodes = new HashSet<Node>(excludedNodes);
|
||||
final Set<Node> oldExcludedNodes = new HashSet<>(excludedNodes);
|
||||
|
||||
// choose storage types; use fallbacks for unavailable storages
|
||||
final List<StorageType> requiredStorageTypes = storagePolicy
|
||||
|
@ -929,9 +928,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
* So pick up first set if not empty. If first is empty, then pick second.
|
||||
*/
|
||||
protected Collection<DatanodeStorageInfo> pickupReplicaSet(
|
||||
Collection<DatanodeStorageInfo> first,
|
||||
Collection<DatanodeStorageInfo> second) {
|
||||
return first.isEmpty() ? second : first;
|
||||
Collection<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne) {
|
||||
return moreThanOne.isEmpty() ? exactlyOne : moreThanOne;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
|
@ -0,0 +1,264 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
|
||||
/**
|
||||
* The class is responsible for choosing the desired number of targets
|
||||
* for placing block replicas that honors upgrade domain policy.
|
||||
* Here is the replica placement strategy. If the writer is on a datanode,
|
||||
* the 1st replica is placed on the local machine,
|
||||
* otherwise a random datanode. The 2nd replica is placed on a datanode
|
||||
* that is on a different rack. The 3rd replica is placed on a datanode
|
||||
* which is on a different node of the rack as the second replica.
|
||||
* All 3 replicas have unique upgrade domains.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class BlockPlacementPolicyWithUpgradeDomain extends
|
||||
BlockPlacementPolicyDefault {
|
||||
|
||||
private int upgradeDomainFactor;
|
||||
|
||||
@Override
|
||||
public void initialize(Configuration conf, FSClusterStats stats,
|
||||
NetworkTopology clusterMap, Host2NodesMap host2datanodeMap) {
|
||||
super.initialize(conf, stats, clusterMap, host2datanodeMap);
|
||||
upgradeDomainFactor = conf.getInt(
|
||||
DFSConfigKeys.DFS_UPGRADE_DOMAIN_FACTOR,
|
||||
DFSConfigKeys.DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isGoodDatanode(DatanodeDescriptor node,
|
||||
int maxTargetPerRack, boolean considerLoad,
|
||||
List<DatanodeStorageInfo> results, boolean avoidStaleNodes) {
|
||||
boolean isGoodTarget = super.isGoodDatanode(node,
|
||||
maxTargetPerRack, considerLoad, results, avoidStaleNodes);
|
||||
if (isGoodTarget) {
|
||||
if (results.size() > 0 && results.size() < upgradeDomainFactor) {
|
||||
// Each node in "results" has a different upgrade domain. Make sure
|
||||
// the candidate node introduces a new upgrade domain.
|
||||
Set<String> upgradeDomains = getUpgradeDomains(results);
|
||||
if (upgradeDomains.contains(node.getUpgradeDomain())) {
|
||||
isGoodTarget = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return isGoodTarget;
|
||||
}
|
||||
|
||||
// If upgrade domain isn't specified, uses its XferAddr as upgrade domain.
|
||||
// Such fallback is useful to test the scenario where upgrade domain isn't
|
||||
// defined but the block placement is set to upgrade domain policy.
|
||||
public String getUpgradeDomainWithDefaultValue(DatanodeInfo datanodeInfo) {
|
||||
String upgradeDomain = datanodeInfo.getUpgradeDomain();
|
||||
if (upgradeDomain == null) {
|
||||
LOG.warn("Upgrade domain isn't defined for " + datanodeInfo);
|
||||
upgradeDomain = datanodeInfo.getXferAddr();
|
||||
}
|
||||
return upgradeDomain;
|
||||
}
|
||||
|
||||
private String getUpgradeDomain(DatanodeStorageInfo storage) {
|
||||
return getUpgradeDomainWithDefaultValue(storage.getDatanodeDescriptor());
|
||||
}
|
||||
|
||||
private Set<String> getUpgradeDomains(List<DatanodeStorageInfo> results) {
|
||||
Set<String> upgradeDomains = new HashSet<>();
|
||||
if (results == null) {
|
||||
return upgradeDomains;
|
||||
}
|
||||
for(DatanodeStorageInfo storageInfo : results) {
|
||||
upgradeDomains.add(getUpgradeDomain(storageInfo));
|
||||
}
|
||||
return upgradeDomains;
|
||||
}
|
||||
|
||||
private Set<String> getUpgradeDomainsFromNodes(DatanodeInfo[] nodes) {
|
||||
Set<String> upgradeDomains = new HashSet<>();
|
||||
if (nodes == null) {
|
||||
return upgradeDomains;
|
||||
}
|
||||
for(DatanodeInfo node : nodes) {
|
||||
upgradeDomains.add(getUpgradeDomainWithDefaultValue(node));
|
||||
}
|
||||
return upgradeDomains;
|
||||
}
|
||||
|
||||
private Map<String, List<DatanodeStorageInfo>> getUpgradeDomainMap(
|
||||
DatanodeStorageInfo[] storageInfos) {
|
||||
Map<String, List<DatanodeStorageInfo>> upgradeDomainMap = new HashMap<>();
|
||||
for(DatanodeStorageInfo storage : storageInfos) {
|
||||
String upgradeDomain = getUpgradeDomainWithDefaultValue(
|
||||
storage.getDatanodeDescriptor());
|
||||
List<DatanodeStorageInfo> storages = upgradeDomainMap.get(upgradeDomain);
|
||||
if (storages == null) {
|
||||
storages = new ArrayList<>();
|
||||
upgradeDomainMap.put(upgradeDomain, storages);
|
||||
}
|
||||
storages.add(storage);
|
||||
}
|
||||
return upgradeDomainMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockPlacementStatus verifyBlockPlacement(String srcPath,
|
||||
LocatedBlock lBlk, int numberOfReplicas) {
|
||||
BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(srcPath,
|
||||
lBlk, numberOfReplicas);
|
||||
BlockPlacementStatusWithUpgradeDomain upgradeDomainStatus =
|
||||
new BlockPlacementStatusWithUpgradeDomain(defaultStatus,
|
||||
getUpgradeDomainsFromNodes(lBlk.getLocations()),
|
||||
numberOfReplicas, upgradeDomainFactor);
|
||||
return upgradeDomainStatus;
|
||||
}
|
||||
|
||||
private <T> List<T> getShareUDSet(
|
||||
Map<String, List<T>> upgradeDomains) {
|
||||
List<T> getShareUDSet = new ArrayList<>();
|
||||
for (Map.Entry<String, List<T>> e : upgradeDomains.entrySet()) {
|
||||
if (e.getValue().size() > 1) {
|
||||
getShareUDSet.addAll(e.getValue());
|
||||
}
|
||||
}
|
||||
return getShareUDSet;
|
||||
}
|
||||
|
||||
/*
|
||||
* The policy to pick the replica set for deleting the over-replicated
|
||||
* replica which meet the rack and upgrade domain requirements.
|
||||
* The algorithm:
|
||||
* a. Each replica has a boolean attribute "shareRack" that defines
|
||||
* whether it shares its rack with another replica of the same block.
|
||||
* b. Each replica has another boolean attribute "shareUD" that defines
|
||||
* whether it shares its upgrade domain with another replica of the same
|
||||
* block.
|
||||
* c. Partition the replicas into 4 sets (some might be empty.):
|
||||
* shareRackAndUDSet: {shareRack==true, shareUD==true}
|
||||
* shareUDNotRackSet: {shareRack==false, shareUD==true}
|
||||
* shareRackNotUDSet: {shareRack==true, shareUD==false}
|
||||
* NoShareRackOrUDSet: {shareRack==false, shareUD==false}
|
||||
* d. Pick the first not-empty replica set in the following order.
|
||||
* shareRackAndUDSet, shareUDNotRackSet, shareRackNotUDSet,
|
||||
* NoShareRackOrUDSet
|
||||
* e. Proof this won't degrade the existing rack-based data
|
||||
* availability model under different scenarios.
|
||||
* 1. shareRackAndUDSet isn't empty. Removing a node
|
||||
* from shareRackAndUDSet won't change # of racks and # of UD.
|
||||
* The followings cover empty shareUDNotRackSet scenarios.
|
||||
* 2. shareUDNotRackSet isn't empty and shareRackNotUDSet isn't empty.
|
||||
* Let us proof that # of racks >= 3 before the deletion and thus
|
||||
* after deletion # of racks >= 2.
|
||||
* Given shareUDNotRackSet is empty, there won't be overlap between
|
||||
* shareUDNotRackSet and shareRackNotUDSet. It means DNs in
|
||||
* shareRackNotUDSet should be on at least a rack
|
||||
* different from any DN' rack in shareUDNotRackSet.
|
||||
* Given shareUDNotRackSet.size() >= 2 and each DN in the set
|
||||
* doesn't share rack with any other DNs, there are at least 2 racks
|
||||
* coming from shareUDNotRackSet.
|
||||
* Thus the # of racks from DNs in {shareUDNotRackSet,
|
||||
* shareRackNotUDSet} >= 3. Removing a node from shareUDNotRackSet
|
||||
* will reduce the # of racks by 1 and won't change # of upgrade
|
||||
* domains.
|
||||
* Note that this is different from BlockPlacementPolicyDefault which
|
||||
* will keep the # of racks after deletion. With upgrade domain policy,
|
||||
* given # of racks is still >= 2 after deletion, the data availability
|
||||
* model remains the same as BlockPlacementPolicyDefault (only supports
|
||||
* one rack failure).
|
||||
* For example, assume we have 4 replicas: d1(rack1, ud1),
|
||||
* d2(rack2, ud1), d3(rack3, ud3), d4(rack3, ud4). Thus we have
|
||||
* shareUDNotRackSet: {d1, d2} and shareRackNotUDSet: {d3, d4}.
|
||||
* With upgrade domain policy, the remaining replicas after deletion
|
||||
* are {d1(or d2), d3, d4} which has 2 racks.
|
||||
* With BlockPlacementPolicyDefault policy, the remaining replicas
|
||||
* after deletion are {d1, d2, d3(or d4)} which has 3 racks.
|
||||
* 3. shareUDNotRackSet isn't empty and shareRackNotUDSet is empty. This
|
||||
* implies all replicas are on unique racks. Removing a node from
|
||||
* shareUDNotRackSet will reduce # of racks (no different from
|
||||
* BlockPlacementPolicyDefault) by 1 and won't change #
|
||||
* of upgrade domains.
|
||||
* 4. shareUDNotRackSet is empty and shareRackNotUDSet isn't empty.
|
||||
* Removing a node from shareRackNotUDSet is no different from
|
||||
* BlockPlacementPolicyDefault.
|
||||
* 5. shareUDNotRackSet is empty and shareRackNotUDSet is empty.
|
||||
* Removing a node from NoShareRackOrUDSet is no different from
|
||||
* BlockPlacementPolicyDefault.
|
||||
* The implementation:
|
||||
* 1. Generate set shareUDSet which includes all DatanodeStorageInfo that
|
||||
* share the same upgrade domain with another DatanodeStorageInfo,
|
||||
* e.g. {shareRackAndUDSet, shareUDNotRackSet}.
|
||||
* 2. If shareUDSet is empty, it means shareRackAndUDSet is empty and
|
||||
* shareUDNotRackSet is empty. Use the default rack based policy.
|
||||
* 3. If shareUDSet isn't empty, intersect it with moreThanOne(
|
||||
* {shareRackAndUDSet, shareRackNotUDSet})to generate shareRackAndUDSet.
|
||||
* 4. If shareRackAndUDSet isn't empty, return
|
||||
* shareRackAndUDSet, otherwise return shareUDSet which is the same as
|
||||
* shareUDNotRackSet.
|
||||
*/
|
||||
@Override
|
||||
protected Collection<DatanodeStorageInfo> pickupReplicaSet(
|
||||
Collection<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne) {
|
||||
List<DatanodeStorageInfo> all = new ArrayList<>();
|
||||
if (moreThanOne != null) {
|
||||
all.addAll(moreThanOne);
|
||||
}
|
||||
if (exactlyOne != null) {
|
||||
all.addAll(exactlyOne);
|
||||
}
|
||||
|
||||
Map<String, List<DatanodeStorageInfo>> upgradeDomains =
|
||||
getUpgradeDomainMap(all.toArray(new DatanodeStorageInfo[all.size()]));
|
||||
|
||||
// shareUDSet includes DatanodeStorageInfo that share same upgrade
|
||||
// domain with another DatanodeStorageInfo.
|
||||
List<DatanodeStorageInfo> shareUDSet = getShareUDSet(upgradeDomains);
|
||||
// shareRackAndUDSet contains those DatanodeStorageInfo that
|
||||
// share rack and upgrade domain with another DatanodeStorageInfo.
|
||||
List<DatanodeStorageInfo> shareRackAndUDSet = new ArrayList<>();
|
||||
if (shareUDSet.size() == 0) {
|
||||
// All upgrade domains are unique, use the parent set.
|
||||
return super.pickupReplicaSet(moreThanOne, exactlyOne);
|
||||
} else if (moreThanOne != null) {
|
||||
for (DatanodeStorageInfo storage : shareUDSet) {
|
||||
if (moreThanOne.contains(storage)) {
|
||||
shareRackAndUDSet.add(storage);
|
||||
}
|
||||
}
|
||||
}
|
||||
return (shareRackAndUDSet.size() > 0) ? shareRackAndUDSet : shareUDSet;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* An implementation of @see BlockPlacementStatus for
|
||||
* @see BlockPlacementPolicyWithUpgradeDomain
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class BlockPlacementStatusWithUpgradeDomain implements
|
||||
BlockPlacementStatus {
|
||||
|
||||
private final BlockPlacementStatus parentBlockPlacementStatus;
|
||||
private final Set<String> upgradeDomains;
|
||||
private final int numberOfReplicas;
|
||||
private final int upgradeDomainFactor;
|
||||
|
||||
/**
|
||||
* @param parentBlockPlacementStatus the parent class' status
|
||||
* @param upgradeDomains the set of upgrade domains of the replicas
|
||||
* @param numberOfReplicas the number of replicas of the block
|
||||
* @param upgradeDomainFactor the configured upgrade domain factor
|
||||
*/
|
||||
public BlockPlacementStatusWithUpgradeDomain(
|
||||
BlockPlacementStatus parentBlockPlacementStatus,
|
||||
Set<String> upgradeDomains, int numberOfReplicas,
|
||||
int upgradeDomainFactor){
|
||||
this.parentBlockPlacementStatus = parentBlockPlacementStatus;
|
||||
this.upgradeDomains = upgradeDomains;
|
||||
this.numberOfReplicas = numberOfReplicas;
|
||||
this.upgradeDomainFactor = upgradeDomainFactor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPlacementPolicySatisfied() {
|
||||
return parentBlockPlacementStatus.isPlacementPolicySatisfied() &&
|
||||
isUpgradeDomainPolicySatisfied();
|
||||
}
|
||||
|
||||
private boolean isUpgradeDomainPolicySatisfied() {
|
||||
if (numberOfReplicas <= upgradeDomainFactor) {
|
||||
return (numberOfReplicas == upgradeDomains.size());
|
||||
} else {
|
||||
return upgradeDomains.size() >= upgradeDomainFactor;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getErrorDescription() {
|
||||
if (isPlacementPolicySatisfied()) {
|
||||
return null;
|
||||
}
|
||||
StringBuilder errorDescription = new StringBuilder();
|
||||
if (!parentBlockPlacementStatus.isPlacementPolicySatisfied()) {
|
||||
errorDescription.append(parentBlockPlacementStatus.getErrorDescription());
|
||||
}
|
||||
if (!isUpgradeDomainPolicySatisfied()) {
|
||||
if (errorDescription.length() != 0) {
|
||||
errorDescription.append(" ");
|
||||
}
|
||||
errorDescription.append("The block has " + numberOfReplicas +
|
||||
" replicas. But it only has " + upgradeDomains.size() +
|
||||
" upgrade domains " + upgradeDomains +".");
|
||||
}
|
||||
return errorDescription.toString();
|
||||
}
|
||||
}
|
|
@ -2491,4 +2491,18 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.upgrade.domain.factor</name>
|
||||
<value>${dfs.replication}</value>
|
||||
<description>
|
||||
This is valid only when block placement policy is set to
|
||||
BlockPlacementPolicyWithUpgradeDomain. It defines the number of
|
||||
unique upgrade domains any block's replicas should have.
|
||||
When the number of replicas is less or equal to this value, the policy
|
||||
ensures each replica has an unique upgrade domain. When the number of
|
||||
replicas is greater than this value, the policy ensures the number of
|
||||
unique domains is at least this value.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -96,7 +96,6 @@ abstract public class BaseReplicationPolicyTest {
|
|||
// construct network topology
|
||||
for (int i=0; i < dataNodes.length; i++) {
|
||||
cluster.add(dataNodes[i]);
|
||||
//bm.getDatanodeManager().getHost2DatanodeMap().add(dataNodes[i]);
|
||||
bm.getDatanodeManager().getHeartbeatManager().addDatanode(
|
||||
dataNodes[i]);
|
||||
}
|
||||
|
|
|
@ -83,7 +83,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
@Parameterized.Parameters
|
||||
public static Iterable<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{ BlockPlacementPolicyDefault.class.getName() } });
|
||||
{ BlockPlacementPolicyDefault.class.getName() },
|
||||
{ BlockPlacementPolicyWithUpgradeDomain.class.getName() } });
|
||||
}
|
||||
|
||||
private void updateHeartbeatForExtraStorage(long capacity,
|
||||
|
@ -231,9 +232,9 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
public void testChooseTarget2() throws Exception {
|
||||
Set<Node> excludedNodes;
|
||||
DatanodeStorageInfo[] targets;
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
|
||||
excludedNodes = new HashSet<Node>();
|
||||
excludedNodes = new HashSet<>();
|
||||
excludedNodes.add(dataNodes[1]);
|
||||
targets = chooseTarget(0, chosenNodes, excludedNodes);
|
||||
assertEquals(targets.length, 0);
|
||||
|
@ -422,9 +423,9 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
"DS-xxxx", "7.7.7.7", "/d2/r3", "host7");
|
||||
DatanodeDescriptor newDn = storage.getDatanodeDescriptor();
|
||||
Set<Node> excludedNodes;
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
|
||||
excludedNodes = new HashSet<Node>();
|
||||
excludedNodes = new HashSet<>();
|
||||
excludedNodes.add(dataNodes[0]);
|
||||
excludedNodes.add(dataNodes[1]);
|
||||
excludedNodes.add(dataNodes[2]);
|
||||
|
@ -554,9 +555,9 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
assertEquals(targets.length, 1);
|
||||
assertEquals(storages[1], targets[0]);
|
||||
|
||||
Set<Node> excludedNodes = new HashSet<Node>();
|
||||
Set<Node> excludedNodes = new HashSet<>();
|
||||
excludedNodes.add(dataNodes[1]);
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
targets = chooseTarget(1, chosenNodes, excludedNodes);
|
||||
assertEquals(targets.length, 1);
|
||||
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
|
||||
|
@ -726,7 +727,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
*/
|
||||
@Test
|
||||
public void testRereplicate1() throws Exception {
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
chosenNodes.add(storages[0]);
|
||||
DatanodeStorageInfo[] targets;
|
||||
|
||||
|
@ -757,7 +758,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
*/
|
||||
@Test
|
||||
public void testRereplicate2() throws Exception {
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
chosenNodes.add(storages[0]);
|
||||
chosenNodes.add(storages[1]);
|
||||
|
||||
|
@ -784,7 +785,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
*/
|
||||
@Test
|
||||
public void testRereplicate3() throws Exception {
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
chosenNodes.add(storages[0]);
|
||||
chosenNodes.add(storages[2]);
|
||||
|
||||
|
@ -950,7 +951,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
*/
|
||||
@Test
|
||||
public void testChooseReplicaToDelete() throws Exception {
|
||||
List<DatanodeStorageInfo> replicaList = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> replicaList = new ArrayList<>();
|
||||
final Map<String, List<DatanodeStorageInfo>> rackMap
|
||||
= new HashMap<String, List<DatanodeStorageInfo>>();
|
||||
|
||||
|
@ -971,14 +972,14 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0);
|
||||
}
|
||||
|
||||
List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> second = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> first = new ArrayList<>();
|
||||
List<DatanodeStorageInfo> second = new ArrayList<>();
|
||||
replicator.splitNodesWithRack(replicaList, rackMap, first, second);
|
||||
// storages[0] and storages[1] are in first set as their rack has two
|
||||
// replica nodes, while storages[2] and dataNodes[5] are in second set.
|
||||
assertEquals(2, first.size());
|
||||
assertEquals(2, second.size());
|
||||
List<StorageType> excessTypes = new ArrayList<StorageType>();
|
||||
List<StorageType> excessTypes = new ArrayList<>();
|
||||
{
|
||||
// test returning null
|
||||
excessTypes.add(StorageType.SSD);
|
||||
|
|
|
@ -45,7 +45,8 @@ public class TestReplicationPolicyConsiderLoad
|
|||
@Parameterized.Parameters
|
||||
public static Iterable<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{ BlockPlacementPolicyDefault.class.getName() } });
|
||||
{ BlockPlacementPolicyDefault.class.getName() },
|
||||
{ BlockPlacementPolicyWithUpgradeDomain.class.getName() } });
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -111,7 +112,7 @@ public class TestReplicationPolicyConsiderLoad
|
|||
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
|
||||
assertEquals(3, targets.length);
|
||||
Set<DatanodeStorageInfo> targetSet = new HashSet<DatanodeStorageInfo>(
|
||||
Set<DatanodeStorageInfo> targetSet = new HashSet<>(
|
||||
Arrays.asList(targets));
|
||||
for (int i = 3; i < storages.length; i++) {
|
||||
assertTrue(targetSet.contains(storages[i]));
|
||||
|
|
|
@ -137,7 +137,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
DatanodeStorageInfo[] targets) {
|
||||
if(targets.length == 0)
|
||||
return true;
|
||||
Set<String> targetSet = new HashSet<String>();
|
||||
Set<String> targetSet = new HashSet<>();
|
||||
for(DatanodeStorageInfo storage:targets) {
|
||||
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
||||
String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation());
|
||||
|
@ -217,7 +217,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
}
|
||||
|
||||
private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeStorageInfo[] targets) {
|
||||
Set<String> nodeGroupSet = new HashSet<String>();
|
||||
Set<String> nodeGroupSet = new HashSet<>();
|
||||
for (DatanodeStorageInfo target: targets) {
|
||||
nodeGroupSet.add(target.getDatanodeDescriptor().getNetworkLocation());
|
||||
}
|
||||
|
@ -236,9 +236,9 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
public void testChooseTarget2() throws Exception {
|
||||
DatanodeStorageInfo[] targets;
|
||||
BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
|
||||
Set<Node> excludedNodes = new HashSet<Node>();
|
||||
Set<Node> excludedNodes = new HashSet<>();
|
||||
excludedNodes.add(dataNodes[1]);
|
||||
targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false,
|
||||
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
|
@ -415,7 +415,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
@Test
|
||||
public void testRereplicate1() throws Exception {
|
||||
updateHeartbeatWithUsage();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
chosenNodes.add(storages[0]);
|
||||
DatanodeStorageInfo[] targets;
|
||||
|
||||
|
@ -448,7 +448,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
@Test
|
||||
public void testRereplicate2() throws Exception {
|
||||
updateHeartbeatWithUsage();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
chosenNodes.add(storages[0]);
|
||||
chosenNodes.add(storages[1]);
|
||||
|
||||
|
@ -476,7 +476,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
@Test
|
||||
public void testRereplicate3() throws Exception {
|
||||
updateHeartbeatWithUsage();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
chosenNodes.add(storages[0]);
|
||||
chosenNodes.add(storages[3]);
|
||||
|
||||
|
@ -511,9 +511,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
*/
|
||||
@Test
|
||||
public void testChooseReplicaToDelete() throws Exception {
|
||||
List<DatanodeStorageInfo> replicaList = new ArrayList<DatanodeStorageInfo>();
|
||||
final Map<String, List<DatanodeStorageInfo>> rackMap
|
||||
= new HashMap<String, List<DatanodeStorageInfo>>();
|
||||
List<DatanodeStorageInfo> replicaList = new ArrayList<>();
|
||||
final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>();
|
||||
dataNodes[0].setRemaining(4*1024*1024);
|
||||
replicaList.add(storages[0]);
|
||||
|
||||
|
@ -526,13 +525,13 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
dataNodes[5].setRemaining(1*1024*1024);
|
||||
replicaList.add(storages[5]);
|
||||
|
||||
List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> second = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> first = new ArrayList<>();
|
||||
List<DatanodeStorageInfo> second = new ArrayList<>();
|
||||
replicator.splitNodesWithRack(
|
||||
replicaList, rackMap, first, second);
|
||||
assertEquals(3, first.size());
|
||||
assertEquals(1, second.size());
|
||||
List<StorageType> excessTypes = new ArrayList<StorageType>();
|
||||
List<StorageType> excessTypes = new ArrayList<>();
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)3, first, second, excessTypes);
|
||||
|
@ -614,7 +613,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
||||
}
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
chosenNodes.add(storagesInBoundaryCase[0]);
|
||||
chosenNodes.add(storagesInBoundaryCase[5]);
|
||||
DatanodeStorageInfo[] targets;
|
||||
|
@ -704,10 +703,9 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
||||
}
|
||||
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
DatanodeStorageInfo[] targets;
|
||||
Set<Node> excludedNodes = new HashSet<Node>();
|
||||
Set<Node> excludedNodes = new HashSet<>();
|
||||
excludedNodes.add(dataNodesForDependencies[5]);
|
||||
|
||||
//try to select three targets as there are three node groups
|
||||
|
|
|
@ -0,0 +1,353 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
public class TestReplicationPolicyWithUpgradeDomain
|
||||
extends BaseReplicationPolicyTest {
|
||||
public TestReplicationPolicyWithUpgradeDomain() {
|
||||
this.blockPlacementPolicy =
|
||||
BlockPlacementPolicyWithUpgradeDomain.class.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
|
||||
final String[] racks = {
|
||||
"/d1/r1",
|
||||
"/d1/r1",
|
||||
"/d1/r1",
|
||||
"/d1/r2",
|
||||
"/d1/r2",
|
||||
"/d1/r2",
|
||||
"/d1/r3",
|
||||
"/d1/r3",
|
||||
"/d1/r3"};
|
||||
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
|
||||
DatanodeDescriptor dataNodes[] =
|
||||
DFSTestUtil.toDatanodeDescriptor(storages);
|
||||
for (int i=0; i < dataNodes.length; i++) {
|
||||
// each rack has 3 DNs with upgrade domain id 1,2,3 respectively.
|
||||
String upgradeDomain = Integer.toString((i%3)+1);
|
||||
dataNodes[i].setUpgradeDomain(upgradeDomain);
|
||||
}
|
||||
return dataNodes;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Verify the targets are chosen to honor both
|
||||
* rack and upgrade domain policies when number of replica is
|
||||
* 0, 1, 2, 3, 4 respectively.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testChooseTarget1() throws Exception {
|
||||
updateHeartbeatWithUsage(dataNodes[0],
|
||||
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
0L, 0L, 4, 0);
|
||||
|
||||
DatanodeStorageInfo[] targets;
|
||||
targets = chooseTarget(0);
|
||||
assertEquals(targets.length, 0);
|
||||
|
||||
targets = chooseTarget(1);
|
||||
assertEquals(targets.length, 1);
|
||||
assertEquals(storages[0], targets[0]);
|
||||
|
||||
targets = chooseTarget(2);
|
||||
assertEquals(targets.length, 2);
|
||||
assertEquals(storages[0], targets[0]);
|
||||
assertFalse(isOnSameRack(targets[0], targets[1]));
|
||||
assertEquals(getUpgradeDomains(targets).size(), 2);
|
||||
|
||||
targets = chooseTarget(3);
|
||||
assertEquals(targets.length, 3);
|
||||
assertEquals(storages[0], targets[0]);
|
||||
assertFalse(isOnSameRack(targets[0], targets[1]));
|
||||
assertTrue(isOnSameRack(targets[1], targets[2]));
|
||||
assertEquals(getUpgradeDomains(targets).size(), 3);
|
||||
|
||||
targets = chooseTarget(4);
|
||||
assertEquals(targets.length, 4);
|
||||
assertEquals(storages[0], targets[0]);
|
||||
assertTrue(isOnSameRack(targets[1], targets[2]) ||
|
||||
isOnSameRack(targets[2], targets[3]));
|
||||
assertFalse(isOnSameRack(targets[0], targets[2]));
|
||||
assertEquals(getUpgradeDomains(targets).size(), 3);
|
||||
|
||||
updateHeartbeatWithUsage(dataNodes[0],
|
||||
2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the rack and upgrade domain policies when excludeNodes are
|
||||
* specified.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testChooseTargetWithExcludeNodes() throws Exception {
|
||||
Set<Node> excludedNodes = new HashSet<>();
|
||||
DatanodeStorageInfo[] targets;
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
|
||||
excludedNodes.clear();
|
||||
chosenNodes.clear();
|
||||
excludedNodes.add(dataNodes[4]);
|
||||
targets = chooseTarget(3, chosenNodes, excludedNodes);
|
||||
assertEquals(targets.length, 3);
|
||||
assertEquals(storages[0], targets[0]);
|
||||
assertEquals(getRacks(targets).size(), 2);
|
||||
assertEquals(getUpgradeDomains(targets).size(), 3);
|
||||
|
||||
excludedNodes.clear();
|
||||
chosenNodes.clear();
|
||||
excludedNodes.add(dataNodes[4]);
|
||||
excludedNodes.add(dataNodes[8]);
|
||||
targets = chooseTarget(3, chosenNodes, excludedNodes);
|
||||
assertEquals(targets.length, 3);
|
||||
assertEquals(storages[0], targets[0]);
|
||||
assertEquals(getRacks(targets).size(), 2);
|
||||
assertEquals(getUpgradeDomains(targets).size(), 3);
|
||||
|
||||
excludedNodes.clear();
|
||||
chosenNodes.clear();
|
||||
excludedNodes.add(dataNodes[4]);
|
||||
excludedNodes.add(dataNodes[5]);
|
||||
excludedNodes.add(dataNodes[8]);
|
||||
targets = chooseTarget(3, chosenNodes, excludedNodes);
|
||||
assertEquals(targets.length, 3);
|
||||
assertEquals(storages[0], targets[0]);
|
||||
assertEquals(storages[2], targets[1]);
|
||||
assertEquals(storages[7], targets[2]);
|
||||
|
||||
excludedNodes.clear();
|
||||
chosenNodes.clear();
|
||||
excludedNodes.add(dataNodes[4]);
|
||||
targets = chooseTarget(4, chosenNodes, excludedNodes);
|
||||
assertEquals(targets.length, 4);
|
||||
assertEquals(storages[0], targets[0]);
|
||||
assertTrue(getRacks(targets).size()>=2);
|
||||
assertEquals(getUpgradeDomains(targets).size(), 3);
|
||||
|
||||
excludedNodes.clear();
|
||||
chosenNodes.clear();
|
||||
excludedNodes.add(dataNodes[4]);
|
||||
excludedNodes.add(dataNodes[8]);
|
||||
targets = chooseTarget(4, chosenNodes, excludedNodes);
|
||||
assertEquals(targets.length, 4);
|
||||
assertEquals(storages[0], targets[0]);
|
||||
assertTrue(getRacks(targets).size()>=2);
|
||||
assertEquals(getUpgradeDomains(targets).size(), 3);
|
||||
|
||||
excludedNodes.clear();
|
||||
chosenNodes.clear();
|
||||
excludedNodes.add(dataNodes[1]);
|
||||
chosenNodes.add(storages[2]);
|
||||
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes,
|
||||
true, excludedNodes, BLOCK_SIZE,
|
||||
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
System.out.println("targets=" + Arrays.asList(targets));
|
||||
assertEquals(2, targets.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the correct replica is chosen to satisfy both rack and upgrade
|
||||
* domain policy.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testChooseReplicaToDelete() throws Exception {
|
||||
BlockPlacementPolicyWithUpgradeDomain upgradeDomainPolicy =
|
||||
(BlockPlacementPolicyWithUpgradeDomain)replicator;
|
||||
List<DatanodeStorageInfo> first = new ArrayList<>();
|
||||
List<DatanodeStorageInfo> second = new ArrayList<>();
|
||||
List<StorageType> excessTypes = new ArrayList<>();
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
first.add(storages[0]);
|
||||
first.add(storages[1]);
|
||||
second.add(storages[4]);
|
||||
second.add(storages[8]);
|
||||
DatanodeStorageInfo chosenStorage =
|
||||
upgradeDomainPolicy.chooseReplicaToDelete(
|
||||
null, null, (short)3, first, second, excessTypes);
|
||||
assertEquals(chosenStorage, storages[1]);
|
||||
first.clear();
|
||||
second.clear();
|
||||
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
first.add(storages[0]);
|
||||
first.add(storages[1]);
|
||||
first.add(storages[4]);
|
||||
first.add(storages[5]);
|
||||
chosenStorage = upgradeDomainPolicy.chooseReplicaToDelete(
|
||||
null, null, (short)3, first, second, excessTypes);
|
||||
assertTrue(chosenStorage.equals(storages[1]) ||
|
||||
chosenStorage.equals(storages[4]));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the scenario where not enough replicas can't satisfy the policy.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testChooseTargetWithoutEnoughReplica() throws Exception {
|
||||
Set<Node> excludedNodes = new HashSet<>();
|
||||
DatanodeStorageInfo[] targets;
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
|
||||
|
||||
excludedNodes.clear();
|
||||
chosenNodes.clear();
|
||||
excludedNodes.add(dataNodes[4]);
|
||||
excludedNodes.add(dataNodes[5]);
|
||||
excludedNodes.add(dataNodes[7]);
|
||||
excludedNodes.add(dataNodes[8]);
|
||||
targets = chooseTarget(3, chosenNodes, excludedNodes);
|
||||
assertEquals(targets.length, 2);
|
||||
assertEquals(storages[0], targets[0]);
|
||||
assertTrue(targets[1].equals(storages[1]) ||
|
||||
targets[1].equals(storages[2]));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the scenario where not enough replicas can't satisfy the policy.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testVerifyBlockPlacement() throws Exception {
|
||||
LocatedBlock locatedBlock;
|
||||
BlockPlacementStatus status;
|
||||
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
|
||||
List<DatanodeStorageInfo> set = new ArrayList<>();
|
||||
|
||||
// 2 upgrade domains (not enough), 2 racks (enough)
|
||||
set.clear();
|
||||
set.add(storages[0]);
|
||||
set.add(storages[1]);
|
||||
set.add(storages[4]);
|
||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
||||
assertFalse(status.isPlacementPolicySatisfied());
|
||||
|
||||
// 3 upgrade domains (enough), 2 racks (enough)
|
||||
set.clear();
|
||||
set.add(storages[0]);
|
||||
set.add(storages[1]);
|
||||
set.add(storages[5]);
|
||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
||||
assertTrue(status.isPlacementPolicySatisfied());
|
||||
|
||||
// 3 upgrade domains (enough), 1 rack (not enough)
|
||||
set.clear();
|
||||
set.add(storages[0]);
|
||||
set.add(storages[1]);
|
||||
set.add(storages[2]);
|
||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
||||
assertFalse(status.isPlacementPolicySatisfied());
|
||||
assertFalse(status.getErrorDescription().contains("upgrade domain"));
|
||||
|
||||
// 2 upgrade domains( not enough), 3 racks (enough)
|
||||
set.clear();
|
||||
set.add(storages[0]);
|
||||
set.add(storages[5]);
|
||||
set.add(storages[8]);
|
||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
||||
assertFalse(status.isPlacementPolicySatisfied());
|
||||
assertTrue(status.getErrorDescription().contains("upgrade domain"));
|
||||
|
||||
// 3 upgrade domains (enough), 3 racks (enough)
|
||||
set.clear();
|
||||
set.add(storages[0]);
|
||||
set.add(storages[4]);
|
||||
set.add(storages[8]);
|
||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
||||
assertTrue(status.isPlacementPolicySatisfied());
|
||||
|
||||
|
||||
// 3 upgrade domains (enough), 3 racks (enough), 4 replicas
|
||||
set.clear();
|
||||
set.add(storages[0]);
|
||||
set.add(storages[1]);
|
||||
set.add(storages[5]);
|
||||
set.add(storages[8]);
|
||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
||||
assertTrue(status.isPlacementPolicySatisfied());
|
||||
|
||||
// 2 upgrade domains (not enough), 3 racks (enough), 4 replicas
|
||||
set.clear();
|
||||
set.add(storages[0]);
|
||||
set.add(storages[3]);
|
||||
set.add(storages[5]);
|
||||
set.add(storages[8]);
|
||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
||||
assertFalse(status.isPlacementPolicySatisfied());
|
||||
}
|
||||
|
||||
private Set<String> getUpgradeDomains(DatanodeStorageInfo[] nodes) {
|
||||
HashSet<String> upgradeDomains = new HashSet<>();
|
||||
for (DatanodeStorageInfo node : nodes) {
|
||||
upgradeDomains.add(node.getDatanodeDescriptor().getUpgradeDomain());
|
||||
}
|
||||
return upgradeDomains;
|
||||
}
|
||||
|
||||
private Set<String> getRacks(DatanodeStorageInfo[] nodes) {
|
||||
HashSet<String> racks = new HashSet<>();
|
||||
for (DatanodeStorageInfo node : nodes) {
|
||||
String rack = node.getDatanodeDescriptor().getNetworkLocation();
|
||||
racks.add(rack);
|
||||
}
|
||||
return racks;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue