From 29414871bdd0b7be466a38a2e5735aaa32059e1c Mon Sep 17 00:00:00 2001 From: litao Date: Wed, 21 Apr 2021 09:41:13 +0800 Subject: [PATCH] HDFS-15879. Exclude slow nodes when choose targets for blocks (#2928) --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 15 ++ .../BlockPlacementPolicyDefault.java | 22 ++- .../blockmanagement/DatanodeManager.java | 109 ++++++++++++++- .../blockmanagement/SlowPeerTracker.java | 18 +++ .../src/main/resources/hdfs-default.xml | 30 ++++ ...TestReplicationPolicyExcludeSlowNodes.java | 130 ++++++++++++++++++ 6 files changed, 321 insertions(+), 3 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index ca72dfcb1cc..ec6c6d17b27 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -979,6 +979,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.outliers.report.interval"; public static final String DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT = "30m"; + public static final String DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY = + "dfs.namenode.max.slowpeer.collect.nodes"; + public static final int DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT = + 5; + public static final String DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY = + "dfs.namenode.slowpeer.collect.interval"; + public static final String DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT = + "30m"; // property for fsimage compression public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress"; @@ -1125,6 +1133,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_KEY = "dfs.namenode.block-placement-policy.default.prefer-local-node"; public static final boolean DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_DEFAULT = true; + public static final String + DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY = + "dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled"; + public static final boolean + DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT = + false; + public static final String DFS_NAMENODE_GC_TIME_MONITOR_ENABLE = "dfs.namenode.gc.time.monitor.enable"; public static final boolean DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index df687f4cde0..9f68c36033c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY; import static org.apache.hadoop.util.Time.monotonicNow; @@ -82,7 +84,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { NODE_TOO_BUSY("the node is too busy"), TOO_MANY_NODES_ON_RACK("the rack has too many chosen nodes"), NOT_ENOUGH_STORAGE_SPACE("not enough storage space to place the block"), - NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable"); + NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable"), + NODE_SLOW("the node is too slow"); private final String text; @@ -99,6 +102,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { private boolean considerLoadByStorageType; protected double considerLoadFactor; private boolean preferLocalNode; + private boolean dataNodePeerStatsEnabled; + private boolean excludeSlowNodesEnabled; protected NetworkTopology clusterMap; protected Host2NodesMap host2datanodeMap; private FSClusterStats stats; @@ -144,6 +149,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_KEY, DFSConfigKeys. DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_DEFAULT); + this.dataNodePeerStatsEnabled = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, + DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT); + this.excludeSlowNodesEnabled = conf.getBoolean( + DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, + DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT); } @Override @@ -1091,6 +1102,15 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return false; } + // check if the target is a slow node + if (dataNodePeerStatsEnabled && excludeSlowNodesEnabled) { + Set nodes = DatanodeManager.getSlowNodes(); + if (nodes.contains(node)) { + logNodeIsNotChosen(node, NodeNotChosenReason.NODE_SLOW); + return false; + } + } + return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 01dfe04cb13..7f3601f5ca2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -18,8 +18,12 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT; import static org.apache.hadoop.util.Time.monotonicNow; +import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses; @@ -53,6 +57,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.*; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Timer; @@ -201,8 +206,16 @@ public class DatanodeManager { */ private final boolean useDfsNetworkTopology; + private static final String IP_PORT_SEPARATOR = ":"; + @Nullable private final SlowPeerTracker slowPeerTracker; + private static Set slowNodesSet = Sets.newConcurrentHashSet(); + private Daemon slowPeerCollectorDaemon; + private final long slowPeerCollectionInterval; + private final int maxSlowPeerReportNodes; + private boolean excludeSlowNodesEnabled; + @Nullable private final SlowDiskTracker slowDiskTracker; @@ -242,11 +255,22 @@ public class DatanodeManager { DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, DFSConfigKeys. DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_DEFAULT)); - final Timer timer = new Timer(); this.slowPeerTracker = dataNodePeerStatsEnabled ? new SlowPeerTracker(conf, timer) : null; - + this.excludeSlowNodesEnabled = conf.getBoolean( + DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, + DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT); + this.maxSlowPeerReportNodes = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, + DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT); + this.slowPeerCollectionInterval = conf.getTimeDuration( + DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + if (slowPeerTracker != null && excludeSlowNodesEnabled) { + startSlowPeerCollector(); + } this.slowDiskTracker = dataNodeDiskStatsEnabled ? new SlowDiskTracker(conf, timer) : null; @@ -356,6 +380,44 @@ public class DatanodeManager { DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT); } + private void startSlowPeerCollector() { + if (slowPeerCollectorDaemon != null) { + return; + } + slowPeerCollectorDaemon = new Daemon(new Runnable() { + @Override + public void run() { + while (true) { + try { + slowNodesSet = getSlowPeers(); + } catch (Exception e) { + LOG.error("Failed to collect slow peers", e); + } + + try { + Thread.sleep(slowPeerCollectionInterval); + } catch (InterruptedException e) { + LOG.error("Slow peers collection thread interrupted", e); + return; + } + } + } + }); + slowPeerCollectorDaemon.start(); + } + + public void stopSlowPeerCollector() { + if (slowPeerCollectorDaemon == null) { + return; + } + slowPeerCollectorDaemon.interrupt(); + try { + slowPeerCollectorDaemon.join(); + } catch (InterruptedException e) { + LOG.error("Slow peers collection thread did not shutdown", e); + } + } + private static long getStaleIntervalFromConf(Configuration conf, long heartbeatExpireInterval) { long staleInterval = conf.getLong( @@ -401,6 +463,7 @@ public class DatanodeManager { void close() { datanodeAdminManager.close(); heartbeatManager.close(); + stopSlowPeerCollector(); } /** @return the network topology. */ @@ -2019,6 +2082,48 @@ public class DatanodeManager { return slowPeerTracker != null ? slowPeerTracker.getJson() : null; } + /** + * Returns all tracking slow peers. + * @return + */ + public Set getSlowPeers() { + Set slowPeersSet = Sets.newConcurrentHashSet(); + if (slowPeerTracker == null) { + return slowPeersSet; + } + ArrayList slowNodes = + slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes); + for (String slowNode : slowNodes) { + if (StringUtils.isBlank(slowNode) + || !slowNode.contains(IP_PORT_SEPARATOR)) { + continue; + } + String ipAddr = slowNode.split(IP_PORT_SEPARATOR)[0]; + DatanodeDescriptor datanodeByHost = + host2DatanodeMap.getDatanodeByHost(ipAddr); + if (datanodeByHost != null) { + slowPeersSet.add(datanodeByHost); + } + } + return slowPeersSet; + } + + /** + * Returns all tracking slow peers. + * @return + */ + public static Set getSlowNodes() { + return slowNodesSet; + } + + /** + * Use only for testing. + */ + @VisibleForTesting + public SlowPeerTracker getSlowPeerTracker() { + return slowPeerTracker; + } + /** * Use only for testing. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java index 5b30b738c7a..453818e38c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java @@ -34,6 +34,7 @@ import org.apache.hadoop.util.Timer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -230,6 +231,23 @@ public class SlowPeerTracker { } } + /** + * Returns all tracking slow peers. + * @param numNodes + * @return + */ + public ArrayList getSlowNodes(int numNodes) { + Collection jsonReports = getJsonReports(numNodes); + ArrayList slowNodes = new ArrayList<>(); + for (ReportForJson jsonReport : jsonReports) { + slowNodes.add(jsonReport.getSlowNode()); + } + if (!slowNodes.isEmpty()) { + LOG.warn("Slow nodes list: " + slowNodes); + } + return slowNodes; + } + /** * Retrieve reports in a structure for generating JSON, limiting the * output to the top numNodes nodes i.e nodes with the most reports. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index b565a0d6197..532d7b002f1 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2333,6 +2333,36 @@ + + dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled + false + + If this is set to true, we will filter out slow nodes + when choosing targets for blocks. + + + + + dfs.namenode.max.slowpeer.collect.nodes + 5 + + How many slow nodes we will collect for filtering out + when choosing targets for blocks. + + It is ignored if dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled is false. + + + + + dfs.namenode.slowpeer.collect.interval + 30m + + Interval at which the slow peer trackers runs in the background to collect slow peers. + + It is ignored if dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled is false. + + + dfs.datanode.fileio.profiling.sampling.percentage 0 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java new file mode 100644 index 00000000000..d9f3cf39cab --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.TestBlockStoragePolicy; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.net.Node; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class TestReplicationPolicyExcludeSlowNodes + extends BaseReplicationPolicyTest { + + public TestReplicationPolicyExcludeSlowNodes(String blockPlacementPolicy) { + this.blockPlacementPolicy = blockPlacementPolicy; + } + + @Parameterized.Parameters + public static Iterable data() { + return Arrays.asList(new Object[][] { + {BlockPlacementPolicyDefault.class.getName()}, + {BlockPlacementPolicyWithUpgradeDomain.class.getName()}, + {AvailableSpaceBlockPlacementPolicy.class.getName()}, + {BlockPlacementPolicyRackFaultTolerant.class.getName()} + }); + } + + @Override + DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) { + conf.setBoolean(DFSConfigKeys + .DFS_DATANODE_PEER_STATS_ENABLED_KEY, + true); + conf.setStrings(DFSConfigKeys + .DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, + "1s"); + conf.setBoolean(DFSConfigKeys + .DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, + true); + final String[] racks = { + "/rack1", + "/rack2", + "/rack3", + "/rack4", + "/rack5", + "/rack6"}; + storages = DFSTestUtil.createDatanodeStorageInfos(racks); + return DFSTestUtil.toDatanodeDescriptor(storages); + } + + /** + * Tests that chooseTarget when excludeSlowNodesEnabled set to true. + */ + @Test + public void testChooseTargetExcludeSlowNodes() throws Exception { + namenode.getNamesystem().writeLock(); + try { + // add nodes + for (int i = 0; i < dataNodes.length; i++) { + dnManager.addDatanode(dataNodes[i]); + } + + // mock slow nodes + SlowPeerTracker tracker = dnManager.getSlowPeerTracker(); + tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr()); + tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr()); + tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr()); + tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr()); + tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr()); + tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr()); + + // waiting for slow nodes collector run + Thread.sleep(3000); + + // fetch slow nodes + Set slowPeers = dnManager.getSlowPeers(); + + // assert slow nodes + assertEquals(3, slowPeers.size()); + for (int i = 0; i < slowPeers.size(); i++) { + assertTrue(slowPeers.contains(dataNodes[i])); + } + + // mock writer + DatanodeDescriptor writerDn = dataNodes[0]; + + // call chooseTarget() + DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() + .getBlockPlacementPolicy().chooseTarget("testFile.txt", 3, + writerDn, new ArrayList(), false, null, + 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); + + // assert targets + assertEquals(3, targets.length); + for (int i = 0; i < targets.length; i++) { + assertTrue(!slowPeers.contains(targets[i].getDatanodeDescriptor())); + } + } finally { + namenode.getNamesystem().writeUnlock(); + } + NameNode.LOG.info("Done working on it"); + } + +}