HDFS-16634. Dynamically adjust slow peer report size on JMX metrics (#4448)
Signed-off-by: Tao Li <tomscut@apache.org>
This commit is contained in:
parent
e38e13be03
commit
cb0421095b
|
@ -2256,4 +2256,9 @@ public class DatanodeManager {
|
|||
public Map<String, DatanodeDescriptor> getDatanodeMap() {
|
||||
return datanodeMap;
|
||||
}
|
||||
|
||||
public void setMaxSlowPeersToReport(int maxSlowPeersToReport) {
|
||||
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
|
||||
slowPeerTracker.setMaxSlowPeersToReport(maxSlowPeersToReport);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ public class SlowPeerTracker {
|
|||
* Number of nodes to include in JSON report. We will return nodes with
|
||||
* the highest number of votes from peers.
|
||||
*/
|
||||
private final int maxNodesToReport;
|
||||
private volatile int maxNodesToReport;
|
||||
|
||||
/**
|
||||
* Information about peers that have reported a node as being slow.
|
||||
|
@ -104,9 +104,8 @@ public class SlowPeerTracker {
|
|||
DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS) * 3;
|
||||
this.maxNodesToReport = conf.getInt(
|
||||
DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT);
|
||||
this.setMaxSlowPeersToReport(conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -282,6 +281,10 @@ public class SlowPeerTracker {
|
|||
return reportValidityMs;
|
||||
}
|
||||
|
||||
public synchronized void setMaxSlowPeersToReport(int maxSlowPeersToReport) {
|
||||
this.maxNodesToReport = maxSlowPeersToReport;
|
||||
}
|
||||
|
||||
private static class LatencyWithLastReportTime {
|
||||
private final Long time;
|
||||
private final OutlierMetrics latency;
|
||||
|
|
|
@ -124,6 +124,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME
|
|||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE;
|
||||
|
@ -344,7 +346,8 @@ public class NameNode extends ReconfigurableBase implements
|
|||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
|
||||
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
|
||||
DFS_BLOCK_INVALIDATE_LIMIT_KEY,
|
||||
DFS_DATANODE_PEER_STATS_ENABLED_KEY));
|
||||
DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
||||
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY));
|
||||
|
||||
private static final String USAGE = "Usage: hdfs namenode ["
|
||||
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
||||
|
@ -2216,7 +2219,8 @@ public class NameNode extends ReconfigurableBase implements
|
|||
} else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY) || (property.equals(
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)) || (property.equals(
|
||||
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY)) || (property.equals(
|
||||
DFS_DATANODE_PEER_STATS_ENABLED_KEY))) {
|
||||
DFS_DATANODE_PEER_STATS_ENABLED_KEY)) || property.equals(
|
||||
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY)) {
|
||||
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
|
||||
} else if (property.equals(DFS_BLOCK_INVALIDATE_LIMIT_KEY)) {
|
||||
return reconfigureBlockInvalidateLimit(datanodeManager, property, newVal);
|
||||
|
@ -2450,6 +2454,13 @@ public class NameNode extends ReconfigurableBase implements
|
|||
datanodeManager.initSlowPeerTracker(getConf(), timer, peerStatsEnabled);
|
||||
break;
|
||||
}
|
||||
case DFS_DATANODE_MAX_NODES_TO_REPORT_KEY: {
|
||||
int maxSlowPeersToReport = (newVal == null
|
||||
? DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT : Integer.parseInt(newVal));
|
||||
result = Integer.toString(maxSlowPeersToReport);
|
||||
datanodeManager.setMaxSlowPeersToReport(maxSlowPeersToReport);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
throw new IllegalArgumentException(
|
||||
"Unexpected property " + property + " in reconfigureSlowNodesParameters");
|
||||
|
|
|
@ -19,11 +19,14 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.Before;
|
||||
import org.junit.After;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
|
@ -40,7 +43,9 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker;
|
||||
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
|
||||
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
||||
|
@ -513,6 +518,55 @@ public class TestNameNodeReconfigure {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSlowPeerMaxNodesToReportReconf() throws Exception {
|
||||
final NameNode nameNode = cluster.getNameNode();
|
||||
final DatanodeManager datanodeManager = nameNode.namesystem.getBlockManager()
|
||||
.getDatanodeManager();
|
||||
nameNode.reconfigurePropertyImpl(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
|
||||
assertTrue("SlowNode tracker is still disabled. Reconfiguration could not be successful",
|
||||
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
|
||||
|
||||
SlowPeerTracker tracker = datanodeManager.getSlowPeerTracker();
|
||||
|
||||
OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 1.1);
|
||||
tracker.addReport("node1", "node70", outlierMetrics1);
|
||||
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 1.23);
|
||||
tracker.addReport("node2", "node71", outlierMetrics2);
|
||||
OutlierMetrics outlierMetrics3 = new OutlierMetrics(0.0, 0.0, 0.0, 2.13);
|
||||
tracker.addReport("node3", "node72", outlierMetrics3);
|
||||
OutlierMetrics outlierMetrics4 = new OutlierMetrics(0.0, 0.0, 0.0, 1.244);
|
||||
tracker.addReport("node4", "node73", outlierMetrics4);
|
||||
OutlierMetrics outlierMetrics5 = new OutlierMetrics(0.0, 0.0, 0.0, 0.2);
|
||||
tracker.addReport("node5", "node74", outlierMetrics4);
|
||||
OutlierMetrics outlierMetrics6 = new OutlierMetrics(0.0, 0.0, 0.0, 1.244);
|
||||
tracker.addReport("node6", "node75", outlierMetrics4);
|
||||
|
||||
String jsonReport = tracker.getJson();
|
||||
LOG.info("Retrieved slow peer json report: {}", jsonReport);
|
||||
|
||||
List<Boolean> containReport = validatePeerReport(jsonReport);
|
||||
assertEquals(1, containReport.stream().filter(reportVal -> !reportVal).count());
|
||||
|
||||
nameNode.reconfigurePropertyImpl(DFS_DATANODE_MAX_NODES_TO_REPORT_KEY, "2");
|
||||
jsonReport = tracker.getJson();
|
||||
LOG.info("Retrieved slow peer json report: {}", jsonReport);
|
||||
|
||||
containReport = validatePeerReport(jsonReport);
|
||||
assertEquals(4, containReport.stream().filter(reportVal -> !reportVal).count());
|
||||
}
|
||||
|
||||
private List<Boolean> validatePeerReport(String jsonReport) {
|
||||
List<Boolean> containReport = new ArrayList<>();
|
||||
containReport.add(jsonReport.contains("node1"));
|
||||
containReport.add(jsonReport.contains("node2"));
|
||||
containReport.add(jsonReport.contains("node3"));
|
||||
containReport.add(jsonReport.contains("node4"));
|
||||
containReport.add(jsonReport.contains("node5"));
|
||||
containReport.add(jsonReport.contains("node6"));
|
||||
return containReport;
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutDown() throws IOException {
|
||||
if (cluster != null) {
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.function.Supplier;
|
|||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
||||
|
@ -437,18 +438,19 @@ public class TestDFSAdmin {
|
|||
final List<String> outs = Lists.newArrayList();
|
||||
final List<String> errs = Lists.newArrayList();
|
||||
getReconfigurableProperties("namenode", address, outs, errs);
|
||||
assertEquals(18, outs.size());
|
||||
assertEquals(19, outs.size());
|
||||
assertTrue(outs.get(0).contains("Reconfigurable properties:"));
|
||||
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
|
||||
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));
|
||||
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(3));
|
||||
assertEquals(DFS_DATANODE_PEER_STATS_ENABLED_KEY, outs.get(4));
|
||||
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(5));
|
||||
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(6));
|
||||
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(7));
|
||||
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(8));
|
||||
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(9));
|
||||
assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(10));
|
||||
assertEquals(DFS_DATANODE_MAX_NODES_TO_REPORT_KEY, outs.get(4));
|
||||
assertEquals(DFS_DATANODE_PEER_STATS_ENABLED_KEY, outs.get(5));
|
||||
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(6));
|
||||
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(7));
|
||||
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(8));
|
||||
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(9));
|
||||
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(10));
|
||||
assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(11));
|
||||
assertEquals(errs.size(), 0);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue