HDFS-9318. considerLoad factor can be improved. Contributed by Kuhu Shukla.
This commit is contained in:
parent
b64242c0d2
commit
bf6aa30a15
|
@ -1683,6 +1683,8 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-9282. Make data directory count and storage raw capacity related tests
|
||||
FsDataset-agnostic. (Tony Wu via lei)
|
||||
|
||||
HDFS-9318. considerLoad factor can be improved. (Kuhu Shukla via kihwal)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs.
|
||||
|
|
|
@ -184,6 +184,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY =
|
||||
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY;
|
||||
public static final boolean DFS_NAMENODE_REPLICATION_CONSIDERLOAD_DEFAULT = true;
|
||||
public static final String DFS_NAMENODE_REPLICATION_CONSIDERLOAD_FACTOR =
|
||||
"dfs.namenode.replication.considerLoad.factor";
|
||||
public static final double
|
||||
DFS_NAMENODE_REPLICATION_CONSIDERLOAD_FACTOR_DEFAULT = 2.0;
|
||||
public static final String DFS_NAMENODE_REPLICATION_INTERVAL_KEY =
|
||||
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY;
|
||||
public static final int DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT = 3;
|
||||
|
|
|
@ -58,6 +58,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
};
|
||||
|
||||
protected boolean considerLoad;
|
||||
protected double considerLoadFactor;
|
||||
private boolean preferLocalNode = true;
|
||||
protected NetworkTopology clusterMap;
|
||||
protected Host2NodesMap host2datanodeMap;
|
||||
|
@ -79,6 +80,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
Host2NodesMap host2datanodeMap) {
|
||||
this.considerLoad = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
|
||||
this.considerLoadFactor = conf.getDouble(
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_FACTOR,
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_FACTOR_DEFAULT);
|
||||
this.stats = stats;
|
||||
this.clusterMap = clusterMap;
|
||||
this.host2datanodeMap = host2datanodeMap;
|
||||
|
@ -809,7 +813,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
|
||||
// check the communication traffic of the target machine
|
||||
if (considerLoad) {
|
||||
final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
|
||||
final double maxLoad = considerLoadFactor *
|
||||
stats.getInServiceXceiverAverage();
|
||||
final int nodeLoad = node.getXceiverCount();
|
||||
if (nodeLoad > maxLoad) {
|
||||
logNodeIsNotChosen(node, "the node is too busy (load: " + nodeLoad
|
||||
|
|
|
@ -301,6 +301,15 @@
|
|||
<description>Decide if chooseTarget considers the target's load or not
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.replication.considerLoad.factor</name>
|
||||
<value>2.0</value>
|
||||
<description>The factor by which a node's load can exceed the average
|
||||
before being rejected for writes, only if considerLoad is true.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.default.chunk.view.size</name>
|
||||
<value>32768</value>
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
@ -51,6 +52,8 @@ public class TestReplicationPolicyConsiderLoad
|
|||
|
||||
@Override
|
||||
DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_FACTOR, 1.2);
|
||||
final String[] racks = {
|
||||
"/rack1",
|
||||
"/rack1",
|
||||
|
@ -125,4 +128,57 @@ public class TestReplicationPolicyConsiderLoad
|
|||
}
|
||||
NameNode.LOG.info("Done working on it");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsiderLoadFactor() throws IOException {
|
||||
namenode.getNamesystem().writeLock();
|
||||
try {
|
||||
dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[0],
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[0]),
|
||||
dataNodes[0].getCacheCapacity(),
|
||||
dataNodes[0].getCacheUsed(),
|
||||
5, 0, null);
|
||||
dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[1],
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[1]),
|
||||
dataNodes[1].getCacheCapacity(),
|
||||
dataNodes[1].getCacheUsed(),
|
||||
10, 0, null);
|
||||
dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[2],
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[2]),
|
||||
dataNodes[2].getCacheCapacity(),
|
||||
dataNodes[2].getCacheUsed(),
|
||||
5, 0, null);
|
||||
|
||||
dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[3],
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
|
||||
dataNodes[3].getCacheCapacity(),
|
||||
dataNodes[3].getCacheUsed(),
|
||||
10, 0, null);
|
||||
dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[4],
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[4]),
|
||||
dataNodes[4].getCacheCapacity(),
|
||||
dataNodes[4].getCacheUsed(),
|
||||
15, 0, null);
|
||||
dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[5],
|
||||
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[5]),
|
||||
dataNodes[5].getCacheCapacity(),
|
||||
dataNodes[5].getCacheUsed(),
|
||||
15, 0, null);
|
||||
//Add values in above heartbeats
|
||||
double load = 5 + 10 + 15 + 10 + 15 + 5;
|
||||
// Call chooseTarget()
|
||||
DatanodeDescriptor writerDn = dataNodes[0];
|
||||
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
|
||||
.getBlockPlacementPolicy().chooseTarget("testFile.txt", 3, writerDn,
|
||||
new ArrayList<DatanodeStorageInfo>(), false, null,
|
||||
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||
for(DatanodeStorageInfo info : targets) {
|
||||
assertTrue("The node "+info.getDatanodeDescriptor().getName()+
|
||||
" has higher load and should not have been picked!",
|
||||
info.getDatanodeDescriptor().getXceiverCount() <= (load/6)*1.2);
|
||||
}
|
||||
} finally {
|
||||
namenode.getNamesystem().writeUnlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue