HDFS-16327. Make DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY reconfigurable (#3716)
(cherry picked from commit c56a07f36b
)
This commit is contained in:
parent
a45df81822
commit
f16d9dfc31
|
@ -214,7 +214,7 @@ public class DatanodeManager {
|
||||||
private static Set<String> slowNodesUuidSet = Sets.newConcurrentHashSet();
|
private static Set<String> slowNodesUuidSet = Sets.newConcurrentHashSet();
|
||||||
private Daemon slowPeerCollectorDaemon;
|
private Daemon slowPeerCollectorDaemon;
|
||||||
private final long slowPeerCollectionInterval;
|
private final long slowPeerCollectionInterval;
|
||||||
private final int maxSlowPeerReportNodes;
|
private volatile int maxSlowPeerReportNodes;
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private final SlowDiskTracker slowDiskTracker;
|
private final SlowDiskTracker slowDiskTracker;
|
||||||
|
@ -515,6 +515,15 @@ public class DatanodeManager {
|
||||||
return this.avoidSlowDataNodesForRead;
|
return this.avoidSlowDataNodesForRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setMaxSlowpeerCollectNodes(int maxNodes) {
|
||||||
|
this.maxSlowPeerReportNodes = maxNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getMaxSlowpeerCollectNodes() {
|
||||||
|
return this.maxSlowPeerReportNodes;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sort the non-striped located blocks by the distance to the target host.
|
* Sort the non-striped located blocks by the distance to the target host.
|
||||||
*
|
*
|
||||||
|
|
|
@ -190,6 +190,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATAN
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT;
|
||||||
|
|
||||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
|
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
|
||||||
|
@ -333,7 +335,8 @@ public class NameNode extends ReconfigurableBase implements
|
||||||
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
|
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
|
||||||
DFS_IMAGE_PARALLEL_LOAD_KEY,
|
DFS_IMAGE_PARALLEL_LOAD_KEY,
|
||||||
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY,
|
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY,
|
||||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY));
|
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
|
||||||
|
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY));
|
||||||
|
|
||||||
private static final String USAGE = "Usage: hdfs namenode ["
|
private static final String USAGE = "Usage: hdfs namenode ["
|
||||||
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
||||||
|
@ -2200,7 +2203,8 @@ public class NameNode extends ReconfigurableBase implements
|
||||||
} else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) {
|
} else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) {
|
||||||
return reconfigureParallelLoad(newVal);
|
return reconfigureParallelLoad(newVal);
|
||||||
} else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)
|
} else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)
|
||||||
|| (property.equals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY))) {
|
|| (property.equals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY))
|
||||||
|
|| (property.equals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY))) {
|
||||||
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
|
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
|
||||||
} else {
|
} else {
|
||||||
throw new ReconfigurationException(property, newVal, getConf().get(
|
throw new ReconfigurationException(property, newVal, getConf().get(
|
||||||
|
@ -2392,24 +2396,32 @@ public class NameNode extends ReconfigurableBase implements
|
||||||
final String property, final String newVal) throws ReconfigurationException {
|
final String property, final String newVal) throws ReconfigurationException {
|
||||||
BlockManager bm = namesystem.getBlockManager();
|
BlockManager bm = namesystem.getBlockManager();
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
boolean enable;
|
String result;
|
||||||
try {
|
try {
|
||||||
if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)) {
|
if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)) {
|
||||||
enable = (newVal == null ? DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT :
|
boolean enable = (newVal == null ? DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT :
|
||||||
Boolean.parseBoolean(newVal));
|
Boolean.parseBoolean(newVal));
|
||||||
|
result = Boolean.toString(enable);
|
||||||
datanodeManager.setAvoidSlowDataNodesForReadEnabled(enable);
|
datanodeManager.setAvoidSlowDataNodesForReadEnabled(enable);
|
||||||
} else if (property.equals(
|
} else if (property.equals(
|
||||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)) {
|
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)) {
|
||||||
enable = (newVal == null ?
|
boolean enable = (newVal == null ?
|
||||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT :
|
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT :
|
||||||
Boolean.parseBoolean(newVal));
|
Boolean.parseBoolean(newVal));
|
||||||
|
result = Boolean.toString(enable);
|
||||||
bm.setExcludeSlowNodesEnabled(enable);
|
bm.setExcludeSlowNodesEnabled(enable);
|
||||||
|
} else if (property.equals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY)) {
|
||||||
|
int maxSlowpeerCollectNodes = (newVal == null ?
|
||||||
|
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT :
|
||||||
|
Integer.parseInt(newVal));
|
||||||
|
result = Integer.toString(maxSlowpeerCollectNodes);
|
||||||
|
datanodeManager.setMaxSlowpeerCollectNodes(maxSlowpeerCollectNodes);
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Unexpected property " +
|
throw new IllegalArgumentException("Unexpected property " +
|
||||||
property + " in reconfigureSlowNodesParameters");
|
property + " in reconfigureSlowNodesParameters");
|
||||||
}
|
}
|
||||||
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
|
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
|
||||||
return Boolean.toString(enable);
|
return result;
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
throw new ReconfigurationException(property, newVal, getConf().get(
|
throw new ReconfigurationException(property, newVal, getConf().get(
|
||||||
property), e);
|
property), e);
|
||||||
|
|
|
@ -55,6 +55,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
|
||||||
|
|
||||||
public class TestNameNodeReconfigure {
|
public class TestNameNodeReconfigure {
|
||||||
|
@ -430,6 +431,24 @@ public class TestNameNodeReconfigure {
|
||||||
getExcludeSlowNodesEnabled(BlockType.STRIPED));
|
getExcludeSlowNodesEnabled(BlockType.STRIPED));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReconfigureMaxSlowpeerCollectNodes()
|
||||||
|
throws ReconfigurationException {
|
||||||
|
final NameNode nameNode = cluster.getNameNode();
|
||||||
|
final DatanodeManager datanodeManager = nameNode.namesystem
|
||||||
|
.getBlockManager().getDatanodeManager();
|
||||||
|
|
||||||
|
// By default, DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY is 5.
|
||||||
|
assertEquals(5, datanodeManager.getMaxSlowpeerCollectNodes());
|
||||||
|
|
||||||
|
// Reconfigure.
|
||||||
|
nameNode.reconfigureProperty(
|
||||||
|
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, Integer.toString(10));
|
||||||
|
|
||||||
|
// Assert DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY is 10.
|
||||||
|
assertEquals(10, datanodeManager.getMaxSlowpeerCollectNodes());
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void shutDown() throws IOException {
|
public void shutDown() throws IOException {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
|
|
|
@ -87,6 +87,7 @@ import java.util.List;
|
||||||
import java.util.Scanner;
|
import java.util.Scanner;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
|
||||||
import static org.hamcrest.CoreMatchers.allOf;
|
import static org.hamcrest.CoreMatchers.allOf;
|
||||||
import static org.hamcrest.CoreMatchers.anyOf;
|
import static org.hamcrest.CoreMatchers.anyOf;
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
@ -424,7 +425,7 @@ public class TestDFSAdmin {
|
||||||
final List<String> outs = Lists.newArrayList();
|
final List<String> outs = Lists.newArrayList();
|
||||||
final List<String> errs = Lists.newArrayList();
|
final List<String> errs = Lists.newArrayList();
|
||||||
getReconfigurableProperties("namenode", address, outs, errs);
|
getReconfigurableProperties("namenode", address, outs, errs);
|
||||||
assertEquals(15, outs.size());
|
assertEquals(16, outs.size());
|
||||||
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
|
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
|
||||||
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
|
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
|
||||||
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
|
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
|
||||||
|
@ -432,6 +433,7 @@ public class TestDFSAdmin {
|
||||||
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(5));
|
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(5));
|
||||||
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(6));
|
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(6));
|
||||||
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(7));
|
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(7));
|
||||||
|
assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(8));
|
||||||
assertEquals(errs.size(), 0);
|
assertEquals(errs.size(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue