HDFS-14383. Compute datanode load based on StoragePolicy. Contributed by Ayush Saxena.
This commit is contained in:
parent
173310e2f5
commit
2e8cafac3b
|
@ -239,6 +239,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY;
|
||||
public static final boolean DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT =
|
||||
true;
|
||||
public static final String
|
||||
DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY =
|
||||
"dfs.namenode.redundancy.considerLoadByStorageType";
|
||||
public static final boolean
|
||||
DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT = false;
|
||||
public static final String DFS_NAMENODE_READ_CONSIDERLOAD_KEY =
|
||||
"dfs.namenode.read.considerLoad";
|
||||
public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT =
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.util.*;
|
||||
|
@ -93,6 +95,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
}
|
||||
|
||||
protected boolean considerLoad;
|
||||
private boolean considerLoadByStorageType;
|
||||
protected double considerLoadFactor;
|
||||
private boolean preferLocalNode;
|
||||
protected NetworkTopology clusterMap;
|
||||
|
@ -116,6 +119,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
this.considerLoad = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT);
|
||||
this.considerLoadByStorageType = conf.getBoolean(
|
||||
DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY,
|
||||
DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT);
|
||||
this.considerLoadFactor = conf.getDouble(
|
||||
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR,
|
||||
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT);
|
||||
|
@ -976,8 +982,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
* @return Return true if the datanode should be excluded, otherwise false
|
||||
*/
|
||||
boolean excludeNodeByLoad(DatanodeDescriptor node){
|
||||
final double maxLoad = considerLoadFactor *
|
||||
stats.getInServiceXceiverAverage();
|
||||
double inServiceXceiverCount = getInServiceXceiverAverage(node);
|
||||
final double maxLoad = considerLoadFactor * inServiceXceiverCount;
|
||||
|
||||
final int nodeLoad = node.getXceiverCount();
|
||||
if ((nodeLoad > maxLoad) && (maxLoad > 0)) {
|
||||
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY,
|
||||
|
@ -987,6 +994,48 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the inServiceXceiver average count for the cluster, if
|
||||
* considerLoadByStorageType is true, then load is calculated only for the
|
||||
* storage types present on the datanode.
|
||||
* @param node the datanode whose storage types are to be taken into account.
|
||||
* @return the InServiceXceiverAverage count.
|
||||
*/
|
||||
private double getInServiceXceiverAverage(DatanodeDescriptor node) {
|
||||
double inServiceXceiverCount;
|
||||
if (considerLoadByStorageType) {
|
||||
inServiceXceiverCount =
|
||||
getInServiceXceiverAverageByStorageType(node.getStorageTypes());
|
||||
} else {
|
||||
inServiceXceiverCount = stats.getInServiceXceiverAverage();
|
||||
}
|
||||
return inServiceXceiverCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the average xceiver count with respect to the storage types.
|
||||
* @param storageTypes the storage types.
|
||||
* @return the average xceiver count wrt the provided storage types.
|
||||
*/
|
||||
private double getInServiceXceiverAverageByStorageType(
|
||||
Set<StorageType> storageTypes) {
|
||||
double avgLoad = 0;
|
||||
final Map<StorageType, StorageTypeStats> storageStats =
|
||||
stats.getStorageTypeStats();
|
||||
int numNodes = 0;
|
||||
int numXceiver = 0;
|
||||
for (StorageType s : storageTypes) {
|
||||
StorageTypeStats storageTypeStats = storageStats.get(s);
|
||||
numNodes += storageTypeStats.getNodesInService();
|
||||
numXceiver += storageTypeStats.getNodesInServiceXceiverCount();
|
||||
}
|
||||
if (numNodes != 0) {
|
||||
avgLoad = (double) numXceiver / numNodes;
|
||||
}
|
||||
|
||||
return avgLoad;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if a datanode is good for placing block.
|
||||
*
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest
|
|||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;
|
||||
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
|
@ -1978,6 +1979,11 @@ public class DatanodeManager {
|
|||
}
|
||||
return avgLoad;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
|
||||
return heartbeatManager.getStorageTypeStats();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,9 @@
|
|||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This interface is used for retrieving the load related statistics of
|
||||
|
@ -57,4 +60,10 @@ public interface FSClusterStats {
|
|||
* writes that are currently occurring on the cluster.
|
||||
*/
|
||||
public double getInServiceXceiverAverage();
|
||||
|
||||
/**
|
||||
* Indicates the storage statistics per storage type.
|
||||
* @return storage statistics per storage type.
|
||||
*/
|
||||
Map<StorageType, StorageTypeStats> getStorageTypeStats();
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
|||
|
||||
import java.beans.ConstructorProperties;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
|
@ -39,6 +40,15 @@ public class StorageTypeStats {
|
|||
private int nodesInService = 0;
|
||||
private StorageType storageType;
|
||||
|
||||
@VisibleForTesting
|
||||
void setDataNodesInServiceXceiverCount(int avgXceiverPerDatanode,
|
||||
int numNodesInService) {
|
||||
this.nodesInService = numNodesInService;
|
||||
this.nodesInServiceXceiverCount = numNodesInService * avgXceiverPerDatanode;
|
||||
}
|
||||
|
||||
private int nodesInServiceXceiverCount;
|
||||
|
||||
@ConstructorProperties({"capacityTotal", "capacityUsed", "capacityNonDfsUsed",
|
||||
"capacityRemaining", "blockPoolUsed", "nodesInService"})
|
||||
public StorageTypeStats(
|
||||
|
@ -101,6 +111,10 @@ public class StorageTypeStats {
|
|||
return nodesInService;
|
||||
}
|
||||
|
||||
public int getNodesInServiceXceiverCount() {
|
||||
return nodesInServiceXceiverCount;
|
||||
}
|
||||
|
||||
StorageTypeStats(StorageType storageType) {
|
||||
this.storageType = storageType;
|
||||
}
|
||||
|
@ -131,6 +145,7 @@ public class StorageTypeStats {
|
|||
void addNode(final DatanodeDescriptor node) {
|
||||
if (node.isInService()) {
|
||||
nodesInService++;
|
||||
nodesInServiceXceiverCount += node.getXceiverCount();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -151,6 +166,7 @@ public class StorageTypeStats {
|
|||
void subtractNode(final DatanodeDescriptor node) {
|
||||
if (node.isInService()) {
|
||||
nodesInService--;
|
||||
nodesInServiceXceiverCount -= node.getXceiverCount();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -313,6 +313,17 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.redundancy.considerLoadByStorageType</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
Decide if chooseTarget considers the target's load with respect to the
|
||||
storage type. Typically to be used when datanodes contain homogenous
|
||||
storage types. Irrelevent if dfs.namenode.redundancy.considerLoad is
|
||||
false.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.redundancy.considerLoad.factor</name>
|
||||
<value>2.0</value>
|
||||
|
|
|
@ -33,13 +33,18 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
|
@ -239,4 +244,51 @@ public class TestBlockStatsMXBean {
|
|||
storageTypeStats = storageTypeStatsMap.get(StorageType.NVDIMM);
|
||||
assertEquals(1, storageTypeStats.getNodesInService());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStorageTypeLoad() throws Exception {
|
||||
HeartbeatManager heartbeatManager =
|
||||
cluster.getNamesystem().getBlockManager().getDatanodeManager()
|
||||
.getHeartbeatManager();
|
||||
Map<StorageType, StorageTypeStats> storageTypeStatsMap =
|
||||
heartbeatManager.getStorageTypeStats();
|
||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
|
||||
// Create a file with HOT storage policy.
|
||||
Path hotSpDir = new Path("/HOT");
|
||||
dfs.mkdir(hotSpDir, FsPermission.getDirDefault());
|
||||
dfs.setStoragePolicy(hotSpDir, "HOT");
|
||||
FSDataOutputStream hotSpFileStream =
|
||||
dfs.create(new Path(hotSpDir, "hotFile"));
|
||||
hotSpFileStream.write("Storage Policy Hot".getBytes());
|
||||
hotSpFileStream.hflush();
|
||||
|
||||
// Create a file with COLD storage policy.
|
||||
Path coldSpDir = new Path("/COLD");
|
||||
dfs.mkdir(coldSpDir, FsPermission.getDirDefault());
|
||||
dfs.setStoragePolicy(coldSpDir, "COLD");
|
||||
FSDataOutputStream coldSpFileStream =
|
||||
dfs.create(new Path(coldSpDir, "coldFile"));
|
||||
coldSpFileStream.write("Writing to ARCHIVE storage type".getBytes());
|
||||
coldSpFileStream.hflush();
|
||||
|
||||
// Trigger heartbeats manually to speed up the test.
|
||||
cluster.triggerHeartbeats();
|
||||
|
||||
// The load would be 2*replication since both the
|
||||
// write xceiver & packet responder threads are counted.
|
||||
GenericTestUtils.waitFor(() -> storageTypeStatsMap.get(StorageType.DISK)
|
||||
.getNodesInServiceXceiverCount() == 6, 100, 5000);
|
||||
|
||||
// The count for ARCHIVE should be independent of the value of DISK.
|
||||
GenericTestUtils.waitFor(() -> storageTypeStatsMap.get(StorageType.ARCHIVE)
|
||||
.getNodesInServiceXceiverCount() == 6, 100, 5000);
|
||||
|
||||
// The total count should stay unaffected, that is sum of load from all
|
||||
// datanodes.
|
||||
GenericTestUtils
|
||||
.waitFor(() -> heartbeatManager.getInServiceXceiverCount() == 12, 100,
|
||||
5000);
|
||||
IOUtils.closeStreams(hotSpFileStream, coldSpFileStream);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
@ -1609,5 +1610,33 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
when(node.getXceiverCount()).thenReturn(10);
|
||||
assertTrue(bppd.excludeNodeByLoad(node));
|
||||
|
||||
// Enable load check per storage type.
|
||||
conf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY,
|
||||
true);
|
||||
bppd.initialize(conf, statistics, null, null);
|
||||
Map<StorageType, StorageTypeStats> storageStats = new HashMap<>();
|
||||
StorageTypeStats diskStorageTypeStats =
|
||||
new StorageTypeStats(StorageType.DISK);
|
||||
|
||||
// Set xceiver count as 500 for DISK.
|
||||
diskStorageTypeStats.setDataNodesInServiceXceiverCount(50, 10);
|
||||
storageStats.put(StorageType.DISK, diskStorageTypeStats);
|
||||
|
||||
//Set xceiver count as 900 for ARCHIVE
|
||||
StorageTypeStats archiveStorageTypeStats =
|
||||
new StorageTypeStats(StorageType.ARCHIVE);
|
||||
archiveStorageTypeStats.setDataNodesInServiceXceiverCount(10, 90);
|
||||
storageStats.put(StorageType.ARCHIVE, diskStorageTypeStats);
|
||||
|
||||
when(statistics.getStorageTypeStats()).thenReturn(storageStats);
|
||||
when(node.getXceiverCount()).thenReturn(29);
|
||||
when(node.getStorageTypes()).thenReturn(EnumSet.of(StorageType.DISK));
|
||||
when(statistics.getInServiceXceiverAverage()).thenReturn(0.0);
|
||||
//Added for sanity, the number of datanodes are 100, the average xceiver
|
||||
// shall be (50*100+90*100)/100 = 14
|
||||
when(statistics.getInServiceXceiverAverage()).thenReturn(14.0);
|
||||
when(node.getXceiverCount()).thenReturn(100);
|
||||
|
||||
assertFalse(bppd.excludeNodeByLoad(node));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue