HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain. (Ming Ma via lei)

(cherry picked from commit 0f5f9846ed)
This commit is contained in:
Lei Xu 2015-10-12 16:23:42 -07:00
parent eed61a5eda
commit 218aa7bba1
11 changed files with 772 additions and 50 deletions

View File

@ -688,6 +688,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8988. Use LightWeightHashSet instead of LightWeightLinkedSet in
BlockManager#excessReplicateMap. (yliu)
HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain.
(Ming Ma via lei)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -326,6 +326,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_MAX_XATTR_SIZE_DEFAULT = 16384;
public static final int DFS_NAMENODE_MAX_XATTR_SIZE_HARD_LIMIT = 32768;
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
//Following keys have no defaults
public static final String DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir";

View File

@ -125,14 +125,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
new HashSet<Node>() : new HashSet<Node>(excludedNodes);
new HashSet<Node>() : new HashSet<>(excludedNodes);
final List<StorageType> requiredStorageTypes = storagePolicy
.chooseStorageTypes((short)numOfReplicas);
final EnumMap<StorageType, Integer> storageTypes =
getRequiredStorageTypes(requiredStorageTypes);
// Choose favored nodes
List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> results = new ArrayList<>();
boolean avoidStaleNodes = stats != null
&& stats.isAvoidingStaleDataNodesForWrite();
@ -192,14 +192,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
if (excludedNodes == null) {
excludedNodes = new HashSet<Node>();
excludedNodes = new HashSet<>();
}
int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
numOfReplicas = result[0];
int maxNodesPerRack = result[1];
final List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(chosenStorage);
final List<DatanodeStorageInfo> results = new ArrayList<>(chosenStorage);
for (DatanodeStorageInfo storage : chosenStorage) {
// add localMachine and related nodes to excludedNodes
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
@ -266,8 +266,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
private EnumMap<StorageType, Integer> getRequiredStorageTypes(
List<StorageType> types) {
EnumMap<StorageType, Integer> map = new EnumMap<StorageType,
Integer>(StorageType.class);
EnumMap<StorageType, Integer> map = new EnumMap<>(StorageType.class);
for (StorageType type : types) {
if (!map.containsKey(type)) {
map.put(type, 1);
@ -310,7 +309,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
// Keep a copy of original excludedNodes
final Set<Node> oldExcludedNodes = new HashSet<Node>(excludedNodes);
final Set<Node> oldExcludedNodes = new HashSet<>(excludedNodes);
// choose storage types; use fallbacks for unavailable storages
final List<StorageType> requiredStorageTypes = storagePolicy
@ -929,11 +928,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* So pick up first set if not empty. If first is empty, then pick second.
*/
protected Collection<DatanodeStorageInfo> pickupReplicaSet(
Collection<DatanodeStorageInfo> first,
Collection<DatanodeStorageInfo> second) {
return first.isEmpty() ? second : first;
Collection<DatanodeStorageInfo> moreThanOne,
Collection<DatanodeStorageInfo> exactlyOne) {
return moreThanOne.isEmpty() ? exactlyOne : moreThanOne;
}
@VisibleForTesting
void setPreferLocalNode(boolean prefer) {
this.preferLocalNode = prefer;

View File

@ -0,0 +1,264 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.net.NetworkTopology;
/**
* The class is responsible for choosing the desired number of targets
* for placing block replicas that honors upgrade domain policy.
* Here is the replica placement strategy. If the writer is on a datanode,
* the 1st replica is placed on the local machine,
* otherwise a random datanode. The 2nd replica is placed on a datanode
* that is on a different rack. The 3rd replica is placed on a datanode
* which is on a different node of the rack as the second replica.
* All 3 replicas have unique upgrade domains.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockPlacementPolicyWithUpgradeDomain extends
BlockPlacementPolicyDefault {
private int upgradeDomainFactor;
@Override
public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap, Host2NodesMap host2datanodeMap) {
super.initialize(conf, stats, clusterMap, host2datanodeMap);
upgradeDomainFactor = conf.getInt(
DFSConfigKeys.DFS_UPGRADE_DOMAIN_FACTOR,
DFSConfigKeys.DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT);
}
@Override
protected boolean isGoodDatanode(DatanodeDescriptor node,
int maxTargetPerRack, boolean considerLoad,
List<DatanodeStorageInfo> results, boolean avoidStaleNodes) {
boolean isGoodTarget = super.isGoodDatanode(node,
maxTargetPerRack, considerLoad, results, avoidStaleNodes);
if (isGoodTarget) {
if (results.size() > 0 && results.size() < upgradeDomainFactor) {
// Each node in "results" has a different upgrade domain. Make sure
// the candidate node introduces a new upgrade domain.
Set<String> upgradeDomains = getUpgradeDomains(results);
if (upgradeDomains.contains(node.getUpgradeDomain())) {
isGoodTarget = false;
}
}
}
return isGoodTarget;
}
// If upgrade domain isn't specified, uses its XferAddr as upgrade domain.
// Such fallback is useful to test the scenario where upgrade domain isn't
// defined but the block placement is set to upgrade domain policy.
public String getUpgradeDomainWithDefaultValue(DatanodeInfo datanodeInfo) {
String upgradeDomain = datanodeInfo.getUpgradeDomain();
if (upgradeDomain == null) {
LOG.warn("Upgrade domain isn't defined for " + datanodeInfo);
upgradeDomain = datanodeInfo.getXferAddr();
}
return upgradeDomain;
}
private String getUpgradeDomain(DatanodeStorageInfo storage) {
return getUpgradeDomainWithDefaultValue(storage.getDatanodeDescriptor());
}
private Set<String> getUpgradeDomains(List<DatanodeStorageInfo> results) {
Set<String> upgradeDomains = new HashSet<>();
if (results == null) {
return upgradeDomains;
}
for(DatanodeStorageInfo storageInfo : results) {
upgradeDomains.add(getUpgradeDomain(storageInfo));
}
return upgradeDomains;
}
private Set<String> getUpgradeDomainsFromNodes(DatanodeInfo[] nodes) {
Set<String> upgradeDomains = new HashSet<>();
if (nodes == null) {
return upgradeDomains;
}
for(DatanodeInfo node : nodes) {
upgradeDomains.add(getUpgradeDomainWithDefaultValue(node));
}
return upgradeDomains;
}
private Map<String, List<DatanodeStorageInfo>> getUpgradeDomainMap(
DatanodeStorageInfo[] storageInfos) {
Map<String, List<DatanodeStorageInfo>> upgradeDomainMap = new HashMap<>();
for(DatanodeStorageInfo storage : storageInfos) {
String upgradeDomain = getUpgradeDomainWithDefaultValue(
storage.getDatanodeDescriptor());
List<DatanodeStorageInfo> storages = upgradeDomainMap.get(upgradeDomain);
if (storages == null) {
storages = new ArrayList<>();
upgradeDomainMap.put(upgradeDomain, storages);
}
storages.add(storage);
}
return upgradeDomainMap;
}
@Override
public BlockPlacementStatus verifyBlockPlacement(String srcPath,
LocatedBlock lBlk, int numberOfReplicas) {
BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(srcPath,
lBlk, numberOfReplicas);
BlockPlacementStatusWithUpgradeDomain upgradeDomainStatus =
new BlockPlacementStatusWithUpgradeDomain(defaultStatus,
getUpgradeDomainsFromNodes(lBlk.getLocations()),
numberOfReplicas, upgradeDomainFactor);
return upgradeDomainStatus;
}
private <T> List<T> getShareUDSet(
Map<String, List<T>> upgradeDomains) {
List<T> getShareUDSet = new ArrayList<>();
for (Map.Entry<String, List<T>> e : upgradeDomains.entrySet()) {
if (e.getValue().size() > 1) {
getShareUDSet.addAll(e.getValue());
}
}
return getShareUDSet;
}
/*
* The policy to pick the replica set for deleting the over-replicated
* replica which meet the rack and upgrade domain requirements.
* The algorithm:
* a. Each replica has a boolean attribute "shareRack" that defines
* whether it shares its rack with another replica of the same block.
* b. Each replica has another boolean attribute "shareUD" that defines
* whether it shares its upgrade domain with another replica of the same
* block.
* c. Partition the replicas into 4 sets (some might be empty.):
* shareRackAndUDSet: {shareRack==true, shareUD==true}
* shareUDNotRackSet: {shareRack==false, shareUD==true}
* shareRackNotUDSet: {shareRack==true, shareUD==false}
* NoShareRackOrUDSet: {shareRack==false, shareUD==false}
* d. Pick the first not-empty replica set in the following order.
* shareRackAndUDSet, shareUDNotRackSet, shareRackNotUDSet,
* NoShareRackOrUDSet
* e. Proof this won't degrade the existing rack-based data
* availability model under different scenarios.
* 1. shareRackAndUDSet isn't empty. Removing a node
* from shareRackAndUDSet won't change # of racks and # of UD.
* The followings cover empty shareUDNotRackSet scenarios.
* 2. shareUDNotRackSet isn't empty and shareRackNotUDSet isn't empty.
* Let us proof that # of racks >= 3 before the deletion and thus
* after deletion # of racks >= 2.
* Given shareUDNotRackSet is empty, there won't be overlap between
* shareUDNotRackSet and shareRackNotUDSet. It means DNs in
* shareRackNotUDSet should be on at least a rack
* different from any DN' rack in shareUDNotRackSet.
* Given shareUDNotRackSet.size() >= 2 and each DN in the set
* doesn't share rack with any other DNs, there are at least 2 racks
* coming from shareUDNotRackSet.
* Thus the # of racks from DNs in {shareUDNotRackSet,
* shareRackNotUDSet} >= 3. Removing a node from shareUDNotRackSet
* will reduce the # of racks by 1 and won't change # of upgrade
* domains.
* Note that this is different from BlockPlacementPolicyDefault which
* will keep the # of racks after deletion. With upgrade domain policy,
* given # of racks is still >= 2 after deletion, the data availability
* model remains the same as BlockPlacementPolicyDefault (only supports
* one rack failure).
* For example, assume we have 4 replicas: d1(rack1, ud1),
* d2(rack2, ud1), d3(rack3, ud3), d4(rack3, ud4). Thus we have
* shareUDNotRackSet: {d1, d2} and shareRackNotUDSet: {d3, d4}.
* With upgrade domain policy, the remaining replicas after deletion
* are {d1(or d2), d3, d4} which has 2 racks.
* With BlockPlacementPolicyDefault policy, the remaining replicas
* after deletion are {d1, d2, d3(or d4)} which has 3 racks.
* 3. shareUDNotRackSet isn't empty and shareRackNotUDSet is empty. This
* implies all replicas are on unique racks. Removing a node from
* shareUDNotRackSet will reduce # of racks (no different from
* BlockPlacementPolicyDefault) by 1 and won't change #
* of upgrade domains.
* 4. shareUDNotRackSet is empty and shareRackNotUDSet isn't empty.
* Removing a node from shareRackNotUDSet is no different from
* BlockPlacementPolicyDefault.
* 5. shareUDNotRackSet is empty and shareRackNotUDSet is empty.
* Removing a node from NoShareRackOrUDSet is no different from
* BlockPlacementPolicyDefault.
* The implementation:
* 1. Generate set shareUDSet which includes all DatanodeStorageInfo that
* share the same upgrade domain with another DatanodeStorageInfo,
* e.g. {shareRackAndUDSet, shareUDNotRackSet}.
* 2. If shareUDSet is empty, it means shareRackAndUDSet is empty and
* shareUDNotRackSet is empty. Use the default rack based policy.
* 3. If shareUDSet isn't empty, intersect it with moreThanOne(
* {shareRackAndUDSet, shareRackNotUDSet})to generate shareRackAndUDSet.
* 4. If shareRackAndUDSet isn't empty, return
* shareRackAndUDSet, otherwise return shareUDSet which is the same as
* shareUDNotRackSet.
*/
@Override
protected Collection<DatanodeStorageInfo> pickupReplicaSet(
Collection<DatanodeStorageInfo> moreThanOne,
Collection<DatanodeStorageInfo> exactlyOne) {
List<DatanodeStorageInfo> all = new ArrayList<>();
if (moreThanOne != null) {
all.addAll(moreThanOne);
}
if (exactlyOne != null) {
all.addAll(exactlyOne);
}
Map<String, List<DatanodeStorageInfo>> upgradeDomains =
getUpgradeDomainMap(all.toArray(new DatanodeStorageInfo[all.size()]));
// shareUDSet includes DatanodeStorageInfo that share same upgrade
// domain with another DatanodeStorageInfo.
List<DatanodeStorageInfo> shareUDSet = getShareUDSet(upgradeDomains);
// shareRackAndUDSet contains those DatanodeStorageInfo that
// share rack and upgrade domain with another DatanodeStorageInfo.
List<DatanodeStorageInfo> shareRackAndUDSet = new ArrayList<>();
if (shareUDSet.size() == 0) {
// All upgrade domains are unique, use the parent set.
return super.pickupReplicaSet(moreThanOne, exactlyOne);
} else if (moreThanOne != null) {
for (DatanodeStorageInfo storage : shareUDSet) {
if (moreThanOne.contains(storage)) {
shareRackAndUDSet.add(storage);
}
}
}
return (shareRackAndUDSet.size() > 0) ? shareRackAndUDSet : shareUDSet;
}
}

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.util.Set;
/**
* An implementation of @see BlockPlacementStatus for
* @see BlockPlacementPolicyWithUpgradeDomain
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockPlacementStatusWithUpgradeDomain implements
BlockPlacementStatus {
private final BlockPlacementStatus parentBlockPlacementStatus;
private final Set<String> upgradeDomains;
private final int numberOfReplicas;
private final int upgradeDomainFactor;
/**
* @param parentBlockPlacementStatus the parent class' status
* @param upgradeDomains the set of upgrade domains of the replicas
* @param numberOfReplicas the number of replicas of the block
* @param upgradeDomainFactor the configured upgrade domain factor
*/
public BlockPlacementStatusWithUpgradeDomain(
BlockPlacementStatus parentBlockPlacementStatus,
Set<String> upgradeDomains, int numberOfReplicas,
int upgradeDomainFactor){
this.parentBlockPlacementStatus = parentBlockPlacementStatus;
this.upgradeDomains = upgradeDomains;
this.numberOfReplicas = numberOfReplicas;
this.upgradeDomainFactor = upgradeDomainFactor;
}
@Override
public boolean isPlacementPolicySatisfied() {
return parentBlockPlacementStatus.isPlacementPolicySatisfied() &&
isUpgradeDomainPolicySatisfied();
}
private boolean isUpgradeDomainPolicySatisfied() {
if (numberOfReplicas <= upgradeDomainFactor) {
return (numberOfReplicas == upgradeDomains.size());
} else {
return upgradeDomains.size() >= upgradeDomainFactor;
}
}
@Override
public String getErrorDescription() {
if (isPlacementPolicySatisfied()) {
return null;
}
StringBuilder errorDescription = new StringBuilder();
if (!parentBlockPlacementStatus.isPlacementPolicySatisfied()) {
errorDescription.append(parentBlockPlacementStatus.getErrorDescription());
}
if (!isUpgradeDomainPolicySatisfied()) {
if (errorDescription.length() != 0) {
errorDescription.append(" ");
}
errorDescription.append("The block has " + numberOfReplicas +
" replicas. But it only has " + upgradeDomains.size() +
" upgrade domains " + upgradeDomains +".");
}
return errorDescription.toString();
}
}

View File

@ -2482,4 +2482,18 @@
</description>
</property>
<property>
<name>dfs.namenode.upgrade.domain.factor</name>
<value>${dfs.replication}</value>
<description>
This is valid only when block placement policy is set to
BlockPlacementPolicyWithUpgradeDomain. It defines the number of
unique upgrade domains any block's replicas should have.
When the number of replicas is less or equal to this value, the policy
ensures each replica has an unique upgrade domain. When the number of
replicas is greater than this value, the policy ensures the number of
unique domains is at least this value.
</description>
</property>
</configuration>

View File

@ -96,7 +96,6 @@ abstract public class BaseReplicationPolicyTest {
// construct network topology
for (int i=0; i < dataNodes.length; i++) {
cluster.add(dataNodes[i]);
//bm.getDatanodeManager().getHost2DatanodeMap().add(dataNodes[i]);
bm.getDatanodeManager().getHeartbeatManager().addDatanode(
dataNodes[i]);
}

View File

@ -83,7 +83,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
@Parameterized.Parameters
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{ BlockPlacementPolicyDefault.class.getName() } });
{ BlockPlacementPolicyDefault.class.getName() },
{ BlockPlacementPolicyWithUpgradeDomain.class.getName() } });
}
private void updateHeartbeatForExtraStorage(long capacity,
@ -231,10 +232,10 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
public void testChooseTarget2() throws Exception {
Set<Node> excludedNodes;
DatanodeStorageInfo[] targets;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
excludedNodes = new HashSet<Node>();
excludedNodes.add(dataNodes[1]);
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
excludedNodes = new HashSet<>();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(0, chosenNodes, excludedNodes);
assertEquals(targets.length, 0);
@ -422,9 +423,9 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
"DS-xxxx", "7.7.7.7", "/d2/r3", "host7");
DatanodeDescriptor newDn = storage.getDatanodeDescriptor();
Set<Node> excludedNodes;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
excludedNodes = new HashSet<Node>();
excludedNodes = new HashSet<>();
excludedNodes.add(dataNodes[0]);
excludedNodes.add(dataNodes[1]);
excludedNodes.add(dataNodes[2]);
@ -554,9 +555,9 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
assertEquals(targets.length, 1);
assertEquals(storages[1], targets[0]);
Set<Node> excludedNodes = new HashSet<Node>();
Set<Node> excludedNodes = new HashSet<>();
excludedNodes.add(dataNodes[1]);
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
targets = chooseTarget(1, chosenNodes, excludedNodes);
assertEquals(targets.length, 1);
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
@ -726,8 +727,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
*/
@Test
public void testRereplicate1() throws Exception {
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
chosenNodes.add(storages[0]);
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
chosenNodes.add(storages[0]);
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
@ -757,7 +758,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
*/
@Test
public void testRereplicate2() throws Exception {
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
chosenNodes.add(storages[0]);
chosenNodes.add(storages[1]);
@ -784,7 +785,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
*/
@Test
public void testRereplicate3() throws Exception {
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
chosenNodes.add(storages[0]);
chosenNodes.add(storages[2]);
@ -950,7 +951,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
*/
@Test
public void testChooseReplicaToDelete() throws Exception {
List<DatanodeStorageInfo> replicaList = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> replicaList = new ArrayList<>();
final Map<String, List<DatanodeStorageInfo>> rackMap
= new HashMap<String, List<DatanodeStorageInfo>>();
@ -971,14 +972,14 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0);
}
List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> second = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> first = new ArrayList<>();
List<DatanodeStorageInfo> second = new ArrayList<>();
replicator.splitNodesWithRack(replicaList, rackMap, first, second);
// storages[0] and storages[1] are in first set as their rack has two
// replica nodes, while storages[2] and dataNodes[5] are in second set.
assertEquals(2, first.size());
assertEquals(2, second.size());
List<StorageType> excessTypes = new ArrayList<StorageType>();
List<StorageType> excessTypes = new ArrayList<>();
{
// test returning null
excessTypes.add(StorageType.SSD);

View File

@ -45,7 +45,8 @@ public class TestReplicationPolicyConsiderLoad
@Parameterized.Parameters
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{ BlockPlacementPolicyDefault.class.getName() } });
{ BlockPlacementPolicyDefault.class.getName() },
{ BlockPlacementPolicyWithUpgradeDomain.class.getName() } });
}
@Override
@ -111,7 +112,7 @@ public class TestReplicationPolicyConsiderLoad
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
assertEquals(3, targets.length);
Set<DatanodeStorageInfo> targetSet = new HashSet<DatanodeStorageInfo>(
Set<DatanodeStorageInfo> targetSet = new HashSet<>(
Arrays.asList(targets));
for (int i = 3; i < storages.length; i++) {
assertTrue(targetSet.contains(storages[i]));

View File

@ -137,7 +137,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
DatanodeStorageInfo[] targets) {
if(targets.length == 0)
return true;
Set<String> targetSet = new HashSet<String>();
Set<String> targetSet = new HashSet<>();
for(DatanodeStorageInfo storage:targets) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation());
@ -217,7 +217,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
}
private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeStorageInfo[] targets) {
Set<String> nodeGroupSet = new HashSet<String>();
Set<String> nodeGroupSet = new HashSet<>();
for (DatanodeStorageInfo target: targets) {
nodeGroupSet.add(target.getDatanodeDescriptor().getNetworkLocation());
}
@ -236,10 +236,10 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
public void testChooseTarget2() throws Exception {
DatanodeStorageInfo[] targets;
BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
Set<Node> excludedNodes = new HashSet<Node>();
excludedNodes.add(dataNodes[1]);
Set<Node> excludedNodes = new HashSet<>();
excludedNodes.add(dataNodes[1]);
targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false,
excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
assertEquals(targets.length, 4);
@ -415,7 +415,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
@Test
public void testRereplicate1() throws Exception {
updateHeartbeatWithUsage();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
chosenNodes.add(storages[0]);
DatanodeStorageInfo[] targets;
@ -448,7 +448,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
@Test
public void testRereplicate2() throws Exception {
updateHeartbeatWithUsage();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
chosenNodes.add(storages[0]);
chosenNodes.add(storages[1]);
@ -476,7 +476,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
@Test
public void testRereplicate3() throws Exception {
updateHeartbeatWithUsage();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
chosenNodes.add(storages[0]);
chosenNodes.add(storages[3]);
@ -511,9 +511,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
*/
@Test
public void testChooseReplicaToDelete() throws Exception {
List<DatanodeStorageInfo> replicaList = new ArrayList<DatanodeStorageInfo>();
final Map<String, List<DatanodeStorageInfo>> rackMap
= new HashMap<String, List<DatanodeStorageInfo>>();
List<DatanodeStorageInfo> replicaList = new ArrayList<>();
final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>();
dataNodes[0].setRemaining(4*1024*1024);
replicaList.add(storages[0]);
@ -526,13 +525,13 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
dataNodes[5].setRemaining(1*1024*1024);
replicaList.add(storages[5]);
List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> second = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> first = new ArrayList<>();
List<DatanodeStorageInfo> second = new ArrayList<>();
replicator.splitNodesWithRack(
replicaList, rackMap, first, second);
assertEquals(3, first.size());
assertEquals(1, second.size());
List<StorageType> excessTypes = new ArrayList<StorageType>();
List<StorageType> excessTypes = new ArrayList<>();
excessTypes.add(StorageType.DEFAULT);
DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
null, null, (short)3, first, second, excessTypes);
@ -614,7 +613,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
chosenNodes.add(storagesInBoundaryCase[0]);
chosenNodes.add(storagesInBoundaryCase[5]);
DatanodeStorageInfo[] targets;
@ -703,11 +702,10 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
DatanodeStorageInfo[] targets;
Set<Node> excludedNodes = new HashSet<Node>();
Set<Node> excludedNodes = new HashSet<>();
excludedNodes.add(dataNodesForDependencies[5]);
//try to select three targets as there are three node groups

View File

@ -0,0 +1,353 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.net.Node;
import org.junit.Test;
public class TestReplicationPolicyWithUpgradeDomain
extends BaseReplicationPolicyTest {
public TestReplicationPolicyWithUpgradeDomain() {
this.blockPlacementPolicy =
BlockPlacementPolicyWithUpgradeDomain.class.getName();
}
@Override
DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
final String[] racks = {
"/d1/r1",
"/d1/r1",
"/d1/r1",
"/d1/r2",
"/d1/r2",
"/d1/r2",
"/d1/r3",
"/d1/r3",
"/d1/r3"};
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
DatanodeDescriptor dataNodes[] =
DFSTestUtil.toDatanodeDescriptor(storages);
for (int i=0; i < dataNodes.length; i++) {
// each rack has 3 DNs with upgrade domain id 1,2,3 respectively.
String upgradeDomain = Integer.toString((i%3)+1);
dataNodes[i].setUpgradeDomain(upgradeDomain);
}
return dataNodes;
}
/**
* Verify the targets are chosen to honor both
* rack and upgrade domain policies when number of replica is
* 0, 1, 2, 3, 4 respectively.
* @throws Exception
*/
@Test
public void testChooseTarget1() throws Exception {
updateHeartbeatWithUsage(dataNodes[0],
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
0L, 0L, 4, 0);
DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(storages[0], targets[0]);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
assertEquals(getUpgradeDomains(targets).size(), 2);
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
assertTrue(isOnSameRack(targets[1], targets[2]));
assertEquals(getUpgradeDomains(targets).size(), 3);
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertEquals(storages[0], targets[0]);
assertTrue(isOnSameRack(targets[1], targets[2]) ||
isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[0], targets[2]));
assertEquals(getUpgradeDomains(targets).size(), 3);
updateHeartbeatWithUsage(dataNodes[0],
2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
/**
* Verify the rack and upgrade domain policies when excludeNodes are
* specified.
* @throws Exception
*/
@Test
public void testChooseTargetWithExcludeNodes() throws Exception {
Set<Node> excludedNodes = new HashSet<>();
DatanodeStorageInfo[] targets;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[4]);
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 3);
assertEquals(storages[0], targets[0]);
assertEquals(getRacks(targets).size(), 2);
assertEquals(getUpgradeDomains(targets).size(), 3);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[4]);
excludedNodes.add(dataNodes[8]);
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 3);
assertEquals(storages[0], targets[0]);
assertEquals(getRacks(targets).size(), 2);
assertEquals(getUpgradeDomains(targets).size(), 3);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[4]);
excludedNodes.add(dataNodes[5]);
excludedNodes.add(dataNodes[8]);
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 3);
assertEquals(storages[0], targets[0]);
assertEquals(storages[2], targets[1]);
assertEquals(storages[7], targets[2]);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[4]);
targets = chooseTarget(4, chosenNodes, excludedNodes);
assertEquals(targets.length, 4);
assertEquals(storages[0], targets[0]);
assertTrue(getRacks(targets).size()>=2);
assertEquals(getUpgradeDomains(targets).size(), 3);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[4]);
excludedNodes.add(dataNodes[8]);
targets = chooseTarget(4, chosenNodes, excludedNodes);
assertEquals(targets.length, 4);
assertEquals(storages[0], targets[0]);
assertTrue(getRacks(targets).size()>=2);
assertEquals(getUpgradeDomains(targets).size(), 3);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
chosenNodes.add(storages[2]);
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes,
true, excludedNodes, BLOCK_SIZE,
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
}
/**
* Verify the correct replica is chosen to satisfy both rack and upgrade
* domain policy.
* @throws Exception
*/
@Test
public void testChooseReplicaToDelete() throws Exception {
BlockPlacementPolicyWithUpgradeDomain upgradeDomainPolicy =
(BlockPlacementPolicyWithUpgradeDomain)replicator;
List<DatanodeStorageInfo> first = new ArrayList<>();
List<DatanodeStorageInfo> second = new ArrayList<>();
List<StorageType> excessTypes = new ArrayList<>();
excessTypes.add(StorageType.DEFAULT);
first.add(storages[0]);
first.add(storages[1]);
second.add(storages[4]);
second.add(storages[8]);
DatanodeStorageInfo chosenStorage =
upgradeDomainPolicy.chooseReplicaToDelete(
null, null, (short)3, first, second, excessTypes);
assertEquals(chosenStorage, storages[1]);
first.clear();
second.clear();
excessTypes.add(StorageType.DEFAULT);
first.add(storages[0]);
first.add(storages[1]);
first.add(storages[4]);
first.add(storages[5]);
chosenStorage = upgradeDomainPolicy.chooseReplicaToDelete(
null, null, (short)3, first, second, excessTypes);
assertTrue(chosenStorage.equals(storages[1]) ||
chosenStorage.equals(storages[4]));
}
/**
* Test the scenario where not enough replicas can't satisfy the policy.
* @throws Exception
*/
@Test
public void testChooseTargetWithoutEnoughReplica() throws Exception {
Set<Node> excludedNodes = new HashSet<>();
DatanodeStorageInfo[] targets;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[4]);
excludedNodes.add(dataNodes[5]);
excludedNodes.add(dataNodes[7]);
excludedNodes.add(dataNodes[8]);
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 2);
assertEquals(storages[0], targets[0]);
assertTrue(targets[1].equals(storages[1]) ||
targets[1].equals(storages[2]));
}
/**
* Test the scenario where not enough replicas can't satisfy the policy.
* @throws Exception
*/
@Test
public void testVerifyBlockPlacement() throws Exception {
LocatedBlock locatedBlock;
BlockPlacementStatus status;
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
List<DatanodeStorageInfo> set = new ArrayList<>();
// 2 upgrade domains (not enough), 2 racks (enough)
set.clear();
set.add(storages[0]);
set.add(storages[1]);
set.add(storages[4]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
assertFalse(status.isPlacementPolicySatisfied());
// 3 upgrade domains (enough), 2 racks (enough)
set.clear();
set.add(storages[0]);
set.add(storages[1]);
set.add(storages[5]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
assertTrue(status.isPlacementPolicySatisfied());
// 3 upgrade domains (enough), 1 rack (not enough)
set.clear();
set.add(storages[0]);
set.add(storages[1]);
set.add(storages[2]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
assertFalse(status.isPlacementPolicySatisfied());
assertFalse(status.getErrorDescription().contains("upgrade domain"));
// 2 upgrade domains( not enough), 3 racks (enough)
set.clear();
set.add(storages[0]);
set.add(storages[5]);
set.add(storages[8]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
assertFalse(status.isPlacementPolicySatisfied());
assertTrue(status.getErrorDescription().contains("upgrade domain"));
// 3 upgrade domains (enough), 3 racks (enough)
set.clear();
set.add(storages[0]);
set.add(storages[4]);
set.add(storages[8]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
assertTrue(status.isPlacementPolicySatisfied());
// 3 upgrade domains (enough), 3 racks (enough), 4 replicas
set.clear();
set.add(storages[0]);
set.add(storages[1]);
set.add(storages[5]);
set.add(storages[8]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
assertTrue(status.isPlacementPolicySatisfied());
// 2 upgrade domains (not enough), 3 racks (enough), 4 replicas
set.clear();
set.add(storages[0]);
set.add(storages[3]);
set.add(storages[5]);
set.add(storages[8]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
assertFalse(status.isPlacementPolicySatisfied());
}
private Set<String> getUpgradeDomains(DatanodeStorageInfo[] nodes) {
HashSet<String> upgradeDomains = new HashSet<>();
for (DatanodeStorageInfo node : nodes) {
upgradeDomains.add(node.getDatanodeDescriptor().getUpgradeDomain());
}
return upgradeDomains;
}
private Set<String> getRacks(DatanodeStorageInfo[] nodes) {
HashSet<String> racks = new HashSet<>();
for (DatanodeStorageInfo node : nodes) {
String rack = node.getDatanodeDescriptor().getNetworkLocation();
racks.add(rack);
}
return racks;
}
}