From 7f20fad41912217c03cdd7f2d54c01e40f49d051 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Mon, 19 Oct 2020 10:48:47 +0530 Subject: [PATCH] HDFS-14383. Compute datanode load based on StoragePolicy. Contributed by Ayush Saxena. (cherry picked from commit 2e8cafac3b071fe5b943542827fd8a496b137fa9) --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 ++ .../BlockPlacementPolicyDefault.java | 55 ++++++++++++++++++- .../blockmanagement/DatanodeManager.java | 6 ++ .../blockmanagement/FSClusterStats.java | 9 +++ .../blockmanagement/StorageTypeStats.java | 16 ++++++ .../src/main/resources/hdfs-default.xml | 11 ++++ .../blockmanagement/TestBlockStatsMXBean.java | 52 ++++++++++++++++++ .../TestReplicationPolicy.java | 29 ++++++++++ 8 files changed, 180 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f7da4cb97c2..75b7389246d 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -239,6 +239,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY; public static final boolean DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT = true; + public static final String + DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY = + "dfs.namenode.redundancy.considerLoadByStorageType"; + public static final boolean + DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT = false; public static final String DFS_NAMENODE_READ_CONSIDERLOAD_KEY = "dfs.namenode.read.considerLoad"; public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index a1a83b042d0..5761690cc3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY; import static org.apache.hadoop.util.Time.monotonicNow; import java.util.*; @@ -92,7 +94,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } } - protected boolean considerLoad; + protected boolean considerLoad; + private boolean considerLoadByStorageType; protected double considerLoadFactor; private boolean preferLocalNode; protected NetworkTopology clusterMap; @@ -116,6 +119,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { this.considerLoad = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT); + this.considerLoadByStorageType = conf.getBoolean( + DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY, + DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT); this.considerLoadFactor = conf.getDouble( DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR, DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT); @@ -976,8 +982,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * @return Return true if the datanode should be excluded, otherwise false */ boolean excludeNodeByLoad(DatanodeDescriptor node){ - final double maxLoad = considerLoadFactor * - stats.getInServiceXceiverAverage(); + double inServiceXceiverCount = getInServiceXceiverAverage(node); + final double maxLoad = considerLoadFactor * inServiceXceiverCount; + final int nodeLoad = node.getXceiverCount(); if ((nodeLoad > maxLoad) && (maxLoad > 0)) { logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY, @@ -987,6 +994,48 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return false; } + /** + * Gets the inServiceXceiver average count for the cluster, if + * considerLoadByStorageType is true, then load is calculated only for the + * storage types present on the datanode. + * @param node the datanode whose storage types are to be taken into account. + * @return the InServiceXceiverAverage count. + */ + private double getInServiceXceiverAverage(DatanodeDescriptor node) { + double inServiceXceiverCount; + if (considerLoadByStorageType) { + inServiceXceiverCount = + getInServiceXceiverAverageByStorageType(node.getStorageTypes()); + } else { + inServiceXceiverCount = stats.getInServiceXceiverAverage(); + } + return inServiceXceiverCount; + } + + /** + * Gets the average xceiver count with respect to the storage types. + * @param storageTypes the storage types. + * @return the average xceiver count wrt the provided storage types. + */ + private double getInServiceXceiverAverageByStorageType( + Set storageTypes) { + double avgLoad = 0; + final Map storageStats = + stats.getStorageTypeStats(); + int numNodes = 0; + int numXceiver = 0; + for (StorageType s : storageTypes) { + StorageTypeStats storageTypeStats = storageStats.get(s); + numNodes += storageTypeStats.getNodesInService(); + numXceiver += storageTypeStats.getNodesInServiceXceiverCount(); + } + if (numNodes != 0) { + avgLoad = (double) numXceiver / numNodes; + } + + return avgLoad; + } + /** * Determine if a datanode is good for placing block. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 22750ec0aee..01dfe04cb13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -24,6 +24,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses; +import org.apache.hadoop.fs.StorageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -1978,6 +1979,11 @@ public class DatanodeManager { } return avgLoad; } + + @Override + public Map getStorageTypeStats() { + return heartbeatManager.getStorageTypeStats(); + } }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java index 556b7fcaad7..14122952bb1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.StorageType; + +import java.util.Map; /** * This interface is used for retrieving the load related statistics of @@ -57,4 +60,10 @@ public interface FSClusterStats { * writes that are currently occurring on the cluster. */ public double getInServiceXceiverAverage(); + + /** + * Indicates the storage statistics per storage type. + * @return storage statistics per storage type. + */ + Map getStorageTypeStats(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java index c335ec6a7ef..f90dbad6980 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.beans.ConstructorProperties; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; @@ -39,6 +40,15 @@ public class StorageTypeStats { private int nodesInService = 0; private StorageType storageType; + @VisibleForTesting + void setDataNodesInServiceXceiverCount(int avgXceiverPerDatanode, + int numNodesInService) { + this.nodesInService = numNodesInService; + this.nodesInServiceXceiverCount = numNodesInService * avgXceiverPerDatanode; + } + + private int nodesInServiceXceiverCount; + @ConstructorProperties({"capacityTotal", "capacityUsed", "capacityNonDfsUsed", "capacityRemaining", "blockPoolUsed", "nodesInService"}) public StorageTypeStats( @@ -101,6 +111,10 @@ public class StorageTypeStats { return nodesInService; } + public int getNodesInServiceXceiverCount() { + return nodesInServiceXceiverCount; + } + StorageTypeStats(StorageType storageType) { this.storageType = storageType; } @@ -131,6 +145,7 @@ public class StorageTypeStats { void addNode(final DatanodeDescriptor node) { if (node.isInService()) { nodesInService++; + nodesInServiceXceiverCount += node.getXceiverCount(); } } @@ -151,6 +166,7 @@ public class StorageTypeStats { void subtractNode(final DatanodeDescriptor node) { if (node.isInService()) { nodesInService--; + nodesInServiceXceiverCount -= node.getXceiverCount(); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d2cb3c7aa69..e63303fc372 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -313,6 +313,17 @@ + + dfs.namenode.redundancy.considerLoadByStorageType + false + + Decide if chooseTarget considers the target's load with respect to the + storage type. Typically to be used when datanodes contain homogenous + storage types. Irrelevent if dfs.namenode.redundancy.considerLoad is + false. + + + dfs.namenode.redundancy.considerLoad.factor 2.0 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java index 81549a6d5b1..692f8c7de4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java @@ -33,13 +33,18 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -218,4 +223,51 @@ public class TestBlockStatsMXBean { storageTypeStats = storageTypeStatsMap.get(StorageType.ARCHIVE); assertEquals(3, storageTypeStats.getNodesInService()); } + + @Test + public void testStorageTypeLoad() throws Exception { + HeartbeatManager heartbeatManager = + cluster.getNamesystem().getBlockManager().getDatanodeManager() + .getHeartbeatManager(); + Map storageTypeStatsMap = + heartbeatManager.getStorageTypeStats(); + DistributedFileSystem dfs = cluster.getFileSystem(); + + // Create a file with HOT storage policy. + Path hotSpDir = new Path("/HOT"); + dfs.mkdir(hotSpDir, FsPermission.getDirDefault()); + dfs.setStoragePolicy(hotSpDir, "HOT"); + FSDataOutputStream hotSpFileStream = + dfs.create(new Path(hotSpDir, "hotFile")); + hotSpFileStream.write("Storage Policy Hot".getBytes()); + hotSpFileStream.hflush(); + + // Create a file with COLD storage policy. + Path coldSpDir = new Path("/COLD"); + dfs.mkdir(coldSpDir, FsPermission.getDirDefault()); + dfs.setStoragePolicy(coldSpDir, "COLD"); + FSDataOutputStream coldSpFileStream = + dfs.create(new Path(coldSpDir, "coldFile")); + coldSpFileStream.write("Writing to ARCHIVE storage type".getBytes()); + coldSpFileStream.hflush(); + + // Trigger heartbeats manually to speed up the test. + cluster.triggerHeartbeats(); + + // The load would be 2*replication since both the + // write xceiver & packet responder threads are counted. + GenericTestUtils.waitFor(() -> storageTypeStatsMap.get(StorageType.DISK) + .getNodesInServiceXceiverCount() == 6, 100, 5000); + + // The count for ARCHIVE should be independent of the value of DISK. + GenericTestUtils.waitFor(() -> storageTypeStatsMap.get(StorageType.ARCHIVE) + .getNodesInServiceXceiverCount() == 6, 100, 5000); + + // The total count should stay unaffected, that is sum of load from all + // datanodes. + GenericTestUtils + .waitFor(() -> heartbeatManager.getInServiceXceiverCount() == 12, 100, + 5000); + IOUtils.closeStreams(hotSpFileStream, coldSpFileStream); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 78629fe9903..cf5f5a14b6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -1618,5 +1619,33 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { when(node.getXceiverCount()).thenReturn(10); assertTrue(bppd.excludeNodeByLoad(node)); + // Enable load check per storage type. + conf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY, + true); + bppd.initialize(conf, statistics, null, null); + Map storageStats = new HashMap<>(); + StorageTypeStats diskStorageTypeStats = + new StorageTypeStats(StorageType.DISK); + + // Set xceiver count as 500 for DISK. + diskStorageTypeStats.setDataNodesInServiceXceiverCount(50, 10); + storageStats.put(StorageType.DISK, diskStorageTypeStats); + + //Set xceiver count as 900 for ARCHIVE + StorageTypeStats archiveStorageTypeStats = + new StorageTypeStats(StorageType.ARCHIVE); + archiveStorageTypeStats.setDataNodesInServiceXceiverCount(10, 90); + storageStats.put(StorageType.ARCHIVE, diskStorageTypeStats); + + when(statistics.getStorageTypeStats()).thenReturn(storageStats); + when(node.getXceiverCount()).thenReturn(29); + when(node.getStorageTypes()).thenReturn(EnumSet.of(StorageType.DISK)); + when(statistics.getInServiceXceiverAverage()).thenReturn(0.0); + //Added for sanity, the number of datanodes are 100, the average xceiver + // shall be (50*100+90*100)/100 = 14 + when(statistics.getInServiceXceiverAverage()).thenReturn(14.0); + when(node.getXceiverCount()).thenReturn(100); + + assertFalse(bppd.excludeNodeByLoad(node)); } }