HDFS-15288. Add Available Space Rack Fault Tolerant BPP. Contributed by Ayush Saxena.

This commit is contained in:
Ayush Saxena 2020-05-23 18:29:31 +05:30
parent a5657b9657
commit 3355126062
No known key found for this signature in database
GPG Key ID: D09AE71061AB564D
5 changed files with 375 additions and 16 deletions

View File

@ -1124,6 +1124,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.available-space-block-placement-policy.balanced-space-preference-fraction";
public static final float DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT =
0.6f;
public static final String
DFS_NAMENODE_AVAILABLE_SPACE_RACK_FAULT_TOLERANT_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY =
"dfs.namenode.available-space-rack-fault-tolerant-block-placement-policy"
+ ".balanced-space-preference-fraction";
public static final float
DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_RACK_FAULT_TOLERANT_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT =
0.6f;
public static final String
DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCE_LOCAL_NODE_KEY =
"dfs.namenode.available-space-block-placement-policy.balance-local-node";

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Random;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_RACK_FAULT_TOLERANT_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_RACK_FAULT_TOLERANT_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY;
/**
* Space balanced rack fault tolerant block placement policy.
*/
public class AvailableSpaceRackFaultTolerantBlockPlacementPolicy
extends BlockPlacementPolicyRackFaultTolerant {
private static final Logger LOG = LoggerFactory
.getLogger(AvailableSpaceRackFaultTolerantBlockPlacementPolicy.class);
private static final Random RAND = new Random();
private int balancedPreference = (int) (100
* DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_RACK_FAULT_TOLERANT_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT);
@Override
public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap, Host2NodesMap host2datanodeMap) {
super.initialize(conf, stats, clusterMap, host2datanodeMap);
float balancedPreferencePercent = conf.getFloat(
DFS_NAMENODE_AVAILABLE_SPACE_RACK_FAULT_TOLERANT_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY,
DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_RACK_FAULT_TOLERANT_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT);
LOG.info("Available space rack fault tolerant block placement policy "
+ "initialized: "
+ DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_RACK_FAULT_TOLERANT_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY
+ " = " + balancedPreferencePercent);
if (balancedPreferencePercent > 1.0) {
LOG.warn("The value of "
+ DFS_NAMENODE_AVAILABLE_SPACE_RACK_FAULT_TOLERANT_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY
+ " is greater than 1.0 but should be in the range 0.0 - 1.0");
}
if (balancedPreferencePercent < 0.5) {
LOG.warn("The value of "
+ DFS_NAMENODE_AVAILABLE_SPACE_RACK_FAULT_TOLERANT_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY
+ " is less than 0.5 so datanodes with more used percent will"
+ " receive more block allocations.");
}
balancedPreference = (int) (100 * balancedPreferencePercent);
}
@Override
protected DatanodeDescriptor chooseDataNode(final String scope,
final Collection<Node> excludedNode, StorageType type) {
// only the code that uses DFSNetworkTopology should trigger this code path.
Preconditions.checkArgument(clusterMap instanceof DFSNetworkTopology);
DFSNetworkTopology dfsClusterMap = (DFSNetworkTopology) clusterMap;
DatanodeDescriptor a = (DatanodeDescriptor) dfsClusterMap
.chooseRandomWithStorageType(scope, excludedNode, type);
DatanodeDescriptor b = (DatanodeDescriptor) dfsClusterMap
.chooseRandomWithStorageType(scope, excludedNode, type);
return select(a, b);
}
@Override
protected DatanodeDescriptor chooseDataNode(final String scope,
final Collection<Node> excludedNode) {
DatanodeDescriptor a =
(DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode);
DatanodeDescriptor b =
(DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode);
return select(a, b);
}
private DatanodeDescriptor select(DatanodeDescriptor a,
DatanodeDescriptor b) {
if (a != null && b != null) {
int ret = compareDataNode(a, b);
if (ret == 0) {
return a;
} else if (ret < 0) {
return (RAND.nextInt(100) < balancedPreference) ? a : b;
} else {
return (RAND.nextInt(100) < balancedPreference) ? b : a;
}
} else {
return a == null ? b : a;
}
}
/**
* Compare the two data nodes.
*/
protected int compareDataNode(final DatanodeDescriptor a,
final DatanodeDescriptor b) {
if (a.equals(b)
|| Math.abs(a.getDfsUsedPercent() - b.getDfsUsedPercent()) < 5) {
return 0;
}
return a.getDfsUsedPercent() < b.getDfsUsedPercent() ? -1 : 1;
}
}

View File

@ -4961,6 +4961,20 @@
</description>
</property>
<property>
<name>dfs.namenode.available-space-rack-fault-tolerant-block-placement-policy.balanced-space-preference-fraction</name>
<value>0.6</value>
<description>
Only used when the dfs.block.replicator.classname is set to
org.apache.hadoop.hdfs.server.blockmanagement.AvailableSpaceRackFaultTolerantBlockPlacementPolicy.
Special value between 0 and 1, noninclusive. Increases chance of
placing blocks on Datanodes with less disk space used. More the value near 1
more are the chances of choosing the datanode with less percentage of data.
Similarly as the value moves near 0, the chances of choosing datanode with
high load increases as the value reaches near 0.
</description>
</property>
<property>
<name>dfs.namenode.backup.dnrpc-address</name>
<value></value>

View File

@ -35,10 +35,11 @@ import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.test.PathUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class TestAvailableSpaceBlockPlacementPolicy {
private final static int numRacks = 4;
private final static int nodesPerRack = 5;
@ -127,7 +128,7 @@ public class TestAvailableSpaceBlockPlacementPolicy {
*/
@Test
public void testPolicyReplacement() {
Assert.assertTrue((placementPolicy instanceof AvailableSpaceBlockPlacementPolicy));
assertTrue((placementPolicy instanceof AvailableSpaceBlockPlacementPolicy));
}
/*
@ -147,7 +148,7 @@ public class TestAvailableSpaceBlockPlacementPolicy {
.chooseTarget(file, replica, null, new ArrayList<DatanodeStorageInfo>(), false, null,
blockSize, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
Assert.assertTrue(targets.length == replica);
assertTrue(targets.length == replica);
for (int j = 0; j < replica; j++) {
total++;
if (targets[j].getDatanodeDescriptor().getRemainingPercent() > 60) {
@ -155,24 +156,20 @@ public class TestAvailableSpaceBlockPlacementPolicy {
}
}
}
Assert.assertTrue(total == replica * chooseTimes);
assertTrue(total == replica * chooseTimes);
double possibility = 1.0 * moreRemainingNode / total;
Assert.assertTrue(possibility > 0.52);
Assert.assertTrue(possibility < 0.55);
assertTrue(possibility > 0.52);
assertTrue(possibility < 0.55);
}
@Test
public void testChooseDataNode() {
try {
Collection<Node> allNodes = new ArrayList<>(dataNodes.length);
Collections.addAll(allNodes, dataNodes);
if (placementPolicy instanceof AvailableSpaceBlockPlacementPolicy){
// exclude all datanodes when chooseDataNode, no NPE should be thrown
((AvailableSpaceBlockPlacementPolicy)placementPolicy)
.chooseDataNode("~", allNodes);
}
}catch (NullPointerException npe){
Assert.fail("NPE should not be thrown");
Collection<Node> allNodes = new ArrayList<>(dataNodes.length);
Collections.addAll(allNodes, dataNodes);
if (placementPolicy instanceof AvailableSpaceBlockPlacementPolicy) {
// exclude all datanodes when chooseDataNode, no NPE should be thrown
((AvailableSpaceBlockPlacementPolicy) placementPolicy)
.chooseDataNode("~", allNodes);
}
}

View File

@ -0,0 +1,215 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.test.PathUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import static org.junit.Assert.assertEquals;
/**
* Tests AvailableSpaceRackFaultTolerant block placement policy.
*/
public class TestAvailableSpaceRackFaultTolerantBPP {
private final static int NUM_RACKS = 4;
private final static int NODES_PER_RACK = 5;
private final static int BLOCK_SIZE = 1024;
private final static int CHOOSE_TIMES = 10000;
private final static String FILE = "/tobers/test";
private final static int REPLICA = 3;
private static DatanodeStorageInfo[] storages;
private static DatanodeDescriptor[] dataNodes;
private static Configuration conf;
private static NameNode namenode;
private static BlockPlacementPolicy placementPolicy;
private static NetworkTopology cluster;
@BeforeClass
public static void setupCluster() throws Exception {
conf = new HdfsConfiguration();
conf.setFloat(
DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY,
0.6f);
String[] racks = new String[NUM_RACKS];
for (int i = 0; i < NUM_RACKS; i++) {
racks[i] = "/rack" + i;
}
String[] owerRackOfNodes = new String[NUM_RACKS * NODES_PER_RACK];
for (int i = 0; i < NODES_PER_RACK; i++) {
for (int j = 0; j < NUM_RACKS; j++) {
owerRackOfNodes[i * NUM_RACKS + j] = racks[j];
}
}
storages = DFSTestUtil.createDatanodeStorageInfos(owerRackOfNodes);
dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
File baseDir = PathUtils
.getTestDir(AvailableSpaceRackFaultTolerantBlockPlacementPolicy.class);
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
new File(baseDir, "name").getPath());
conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
AvailableSpaceRackFaultTolerantBlockPlacementPolicy.class.getName());
DFSTestUtil.formatNameNode(conf);
namenode = new NameNode(conf);
final BlockManager bm = namenode.getNamesystem().getBlockManager();
placementPolicy = bm.getBlockPlacementPolicy();
cluster = bm.getDatanodeManager().getNetworkTopology();
for (int i = 0; i < NODES_PER_RACK * NUM_RACKS; i++) {
cluster.add(dataNodes[i]);
}
setupDataNodeCapacity();
}
private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
long dnCacheCapacity, long dnCacheUsed, int xceiverCount,
int volFailures) {
dn.getStorageInfos()[0]
.setUtilizationForTesting(capacity, dfsUsed, remaining, blockPoolUsed);
dn.updateHeartbeat(BlockManagerTestUtil.getStorageReportsForDatanode(dn),
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
}
private static void setupDataNodeCapacity() {
for (int i = 0; i < NODES_PER_RACK * NUM_RACKS; i++) {
if ((i % 2) == 0) {
// remaining 100%
updateHeartbeatWithUsage(dataNodes[i],
2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L,
2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L, 0L,
0L, 0, 0);
} else {
// remaining 50%
updateHeartbeatWithUsage(dataNodes[i],
2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE,
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE,
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L, 0L, 0L,
0, 0);
}
}
}
/*
* To verify that the BlockPlacementPolicy can be replaced by
* AvailableSpaceRackFaultTolerantBlockPlacementPolicy via
* changing the configuration.
*/
@Test
public void testPolicyReplacement() {
Assert.assertTrue(
(placementPolicy instanceof
AvailableSpaceRackFaultTolerantBlockPlacementPolicy));
}
/*
* Call choose target many times and verify that nodes with more remaining
* percent will be chosen with high possibility.
*/
@Test
public void testChooseTarget() {
int total = 0;
int moreRemainingNode = 0;
for (int i = 0; i < CHOOSE_TIMES; i++) {
DatanodeStorageInfo[] targets =
namenode.getNamesystem().getBlockManager().getBlockPlacementPolicy()
.chooseTarget(FILE, REPLICA, null,
new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
Assert.assertTrue(targets.length == REPLICA);
for (int j = 0; j < REPLICA; j++) {
total++;
if (targets[j].getDatanodeDescriptor().getRemainingPercent() > 60) {
moreRemainingNode++;
}
}
}
Assert.assertTrue(total == REPLICA * CHOOSE_TIMES);
double possibility = 1.0 * moreRemainingNode / total;
Assert.assertTrue(possibility > 0.52);
Assert.assertTrue(possibility < 0.55);
}
@Test
public void testChooseDataNode() {
try {
Collection<Node> allNodes = new ArrayList<>(dataNodes.length);
Collections.addAll(allNodes, dataNodes);
if (placementPolicy instanceof AvailableSpaceBlockPlacementPolicy) {
// exclude all datanodes when chooseDataNode, no NPE should be thrown
((AvailableSpaceRackFaultTolerantBlockPlacementPolicy) placementPolicy)
.chooseDataNode("~", allNodes);
}
} catch (NullPointerException npe) {
Assert.fail("NPE should not be thrown");
}
}
/**
* Test if the nodes are all spread across all racks.
*/
@Test
public void testMaxRackAllocation() {
DatanodeStorageInfo[] targets =
namenode.getNamesystem().getBlockManager().getBlockPlacementPolicy()
.chooseTarget(FILE, REPLICA, null,
new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
HashSet<String> racks = new HashSet<String>();
for (int i = 0; i < targets.length; i++) {
racks.add(targets[i].getDatanodeDescriptor().getNetworkLocation());
}
assertEquals(REPLICA, racks.size());
}
@AfterClass
public static void teardownCluster() {
if (namenode != null) {
namenode.stop();
}
}
}