HDFS-15879. Exclude slow nodes when choose targets for blocks (#2928)

This commit is contained in:
litao 2021-04-21 09:41:13 +08:00 committed by GitHub
parent 8170a7bb60
commit 29414871bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 321 additions and 3 deletions

View File

@ -979,6 +979,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.outliers.report.interval";
public static final String DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT =
"30m";
public static final String DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY =
"dfs.namenode.max.slowpeer.collect.nodes";
public static final int DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT =
5;
public static final String DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY =
"dfs.namenode.slowpeer.collect.interval";
public static final String DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT =
"30m";
// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@ -1125,6 +1133,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_KEY =
"dfs.namenode.block-placement-policy.default.prefer-local-node";
public static final boolean DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_DEFAULT = true;
public static final String
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY =
"dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled";
public static final boolean
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT =
false;
public static final String DFS_NAMENODE_GC_TIME_MONITOR_ENABLE =
"dfs.namenode.gc.time.monitor.enable";
public static final boolean DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT =

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
import static org.apache.hadoop.util.Time.monotonicNow;
@ -82,7 +84,8 @@ private enum NodeNotChosenReason {
NODE_TOO_BUSY("the node is too busy"),
TOO_MANY_NODES_ON_RACK("the rack has too many chosen nodes"),
NOT_ENOUGH_STORAGE_SPACE("not enough storage space to place the block"),
NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable");
NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable"),
NODE_SLOW("the node is too slow");
private final String text;
@ -99,6 +102,8 @@ private String getText() {
private boolean considerLoadByStorageType;
protected double considerLoadFactor;
private boolean preferLocalNode;
private boolean dataNodePeerStatsEnabled;
private boolean excludeSlowNodesEnabled;
protected NetworkTopology clusterMap;
protected Host2NodesMap host2datanodeMap;
private FSClusterStats stats;
@ -144,6 +149,12 @@ public void initialize(Configuration conf, FSClusterStats stats,
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_KEY,
DFSConfigKeys.
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_DEFAULT);
this.dataNodePeerStatsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
this.excludeSlowNodesEnabled = conf.getBoolean(
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT);
}
@Override
@ -1091,6 +1102,15 @@ boolean isGoodDatanode(DatanodeDescriptor node,
return false;
}
// check if the target is a slow node
if (dataNodePeerStatsEnabled && excludeSlowNodesEnabled) {
Set<Node> nodes = DatanodeManager.getSlowNodes();
if (nodes.contains(node)) {
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_SLOW);
return false;
}
}
return true;
}

View File

@ -18,8 +18,12 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
import static org.apache.hadoop.util.Time.monotonicNow;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;
@ -53,6 +57,7 @@
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Timer;
@ -201,8 +206,16 @@ public class DatanodeManager {
*/
private final boolean useDfsNetworkTopology;
private static final String IP_PORT_SEPARATOR = ":";
@Nullable
private final SlowPeerTracker slowPeerTracker;
private static Set<Node> slowNodesSet = Sets.newConcurrentHashSet();
private Daemon slowPeerCollectorDaemon;
private final long slowPeerCollectionInterval;
private final int maxSlowPeerReportNodes;
private boolean excludeSlowNodesEnabled;
@Nullable
private final SlowDiskTracker slowDiskTracker;
@ -242,11 +255,22 @@ public class DatanodeManager {
DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
DFSConfigKeys.
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_DEFAULT));
final Timer timer = new Timer();
this.slowPeerTracker = dataNodePeerStatsEnabled ?
new SlowPeerTracker(conf, timer) : null;
this.excludeSlowNodesEnabled = conf.getBoolean(
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT);
this.maxSlowPeerReportNodes = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT);
this.slowPeerCollectionInterval = conf.getTimeDuration(
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
if (slowPeerTracker != null && excludeSlowNodesEnabled) {
startSlowPeerCollector();
}
this.slowDiskTracker = dataNodeDiskStatsEnabled ?
new SlowDiskTracker(conf, timer) : null;
@ -356,6 +380,44 @@ public class DatanodeManager {
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
}
private void startSlowPeerCollector() {
if (slowPeerCollectorDaemon != null) {
return;
}
slowPeerCollectorDaemon = new Daemon(new Runnable() {
@Override
public void run() {
while (true) {
try {
slowNodesSet = getSlowPeers();
} catch (Exception e) {
LOG.error("Failed to collect slow peers", e);
}
try {
Thread.sleep(slowPeerCollectionInterval);
} catch (InterruptedException e) {
LOG.error("Slow peers collection thread interrupted", e);
return;
}
}
}
});
slowPeerCollectorDaemon.start();
}
public void stopSlowPeerCollector() {
if (slowPeerCollectorDaemon == null) {
return;
}
slowPeerCollectorDaemon.interrupt();
try {
slowPeerCollectorDaemon.join();
} catch (InterruptedException e) {
LOG.error("Slow peers collection thread did not shutdown", e);
}
}
private static long getStaleIntervalFromConf(Configuration conf,
long heartbeatExpireInterval) {
long staleInterval = conf.getLong(
@ -401,6 +463,7 @@ void activate(final Configuration conf) {
void close() {
datanodeAdminManager.close();
heartbeatManager.close();
stopSlowPeerCollector();
}
/** @return the network topology. */
@ -2019,6 +2082,48 @@ public String getSlowPeersReport() {
return slowPeerTracker != null ? slowPeerTracker.getJson() : null;
}
/**
* Returns all tracking slow peers.
* @return
*/
public Set<Node> getSlowPeers() {
Set<Node> slowPeersSet = Sets.newConcurrentHashSet();
if (slowPeerTracker == null) {
return slowPeersSet;
}
ArrayList<String> slowNodes =
slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
for (String slowNode : slowNodes) {
if (StringUtils.isBlank(slowNode)
|| !slowNode.contains(IP_PORT_SEPARATOR)) {
continue;
}
String ipAddr = slowNode.split(IP_PORT_SEPARATOR)[0];
DatanodeDescriptor datanodeByHost =
host2DatanodeMap.getDatanodeByHost(ipAddr);
if (datanodeByHost != null) {
slowPeersSet.add(datanodeByHost);
}
}
return slowPeersSet;
}
/**
* Returns all tracking slow peers.
* @return
*/
public static Set<Node> getSlowNodes() {
return slowNodesSet;
}
/**
* Use only for testing.
*/
@VisibleForTesting
public SlowPeerTracker getSlowPeerTracker() {
return slowPeerTracker;
}
/**
* Use only for testing.
*/

View File

@ -34,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@ -230,6 +231,23 @@ public SortedSet<String> getReportingNodes() {
}
}
/**
* Returns all tracking slow peers.
* @param numNodes
* @return
*/
public ArrayList<String> getSlowNodes(int numNodes) {
Collection<ReportForJson> jsonReports = getJsonReports(numNodes);
ArrayList<String> slowNodes = new ArrayList<>();
for (ReportForJson jsonReport : jsonReports) {
slowNodes.add(jsonReport.getSlowNode());
}
if (!slowNodes.isEmpty()) {
LOG.warn("Slow nodes list: " + slowNodes);
}
return slowNodes;
}
/**
* Retrieve reports in a structure for generating JSON, limiting the
* output to the top numNodes nodes i.e nodes with the most reports.

View File

@ -2333,6 +2333,36 @@
</description>
</property>
<property>
<name>dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled</name>
<value>false</value>
<description>
If this is set to true, we will filter out slow nodes
when choosing targets for blocks.
</description>
</property>
<property>
<name>dfs.namenode.max.slowpeer.collect.nodes</name>
<value>5</value>
<description>
How many slow nodes we will collect for filtering out
when choosing targets for blocks.
It is ignored if dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled is false.
</description>
</property>
<property>
<name>dfs.namenode.slowpeer.collect.interval</name>
<value>30m</value>
<description>
Interval at which the slow peer trackers runs in the background to collect slow peers.
It is ignored if dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled is false.
</description>
</property>
<property>
<name>dfs.datanode.fileio.profiling.sampling.percentage</name>
<value>0</value>

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.Node;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class TestReplicationPolicyExcludeSlowNodes
extends BaseReplicationPolicyTest {
public TestReplicationPolicyExcludeSlowNodes(String blockPlacementPolicy) {
this.blockPlacementPolicy = blockPlacementPolicy;
}
@Parameterized.Parameters
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{BlockPlacementPolicyDefault.class.getName()},
{BlockPlacementPolicyWithUpgradeDomain.class.getName()},
{AvailableSpaceBlockPlacementPolicy.class.getName()},
{BlockPlacementPolicyRackFaultTolerant.class.getName()}
});
}
@Override
DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
conf.setBoolean(DFSConfigKeys
.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
true);
conf.setStrings(DFSConfigKeys
.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
"1s");
conf.setBoolean(DFSConfigKeys
.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
true);
final String[] racks = {
"/rack1",
"/rack2",
"/rack3",
"/rack4",
"/rack5",
"/rack6"};
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
return DFSTestUtil.toDatanodeDescriptor(storages);
}
/**
* Tests that chooseTarget when excludeSlowNodesEnabled set to true.
*/
@Test
public void testChooseTargetExcludeSlowNodes() throws Exception {
namenode.getNamesystem().writeLock();
try {
// add nodes
for (int i = 0; i < dataNodes.length; i++) {
dnManager.addDatanode(dataNodes[i]);
}
// mock slow nodes
SlowPeerTracker tracker = dnManager.getSlowPeerTracker();
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr());
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr());
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr());
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr());
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr());
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr());
// waiting for slow nodes collector run
Thread.sleep(3000);
// fetch slow nodes
Set<Node> slowPeers = dnManager.getSlowPeers();
// assert slow nodes
assertEquals(3, slowPeers.size());
for (int i = 0; i < slowPeers.size(); i++) {
assertTrue(slowPeers.contains(dataNodes[i]));
}
// mock writer
DatanodeDescriptor writerDn = dataNodes[0];
// call chooseTarget()
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
.getBlockPlacementPolicy().chooseTarget("testFile.txt", 3,
writerDn, new ArrayList<DatanodeStorageInfo>(), false, null,
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
// assert targets
assertEquals(3, targets.length);
for (int i = 0; i < targets.length; i++) {
assertTrue(!slowPeers.contains(targets[i].getDatanodeDescriptor()));
}
} finally {
namenode.getNamesystem().writeUnlock();
}
NameNode.LOG.info("Done working on it");
}
}