HDFS-16396. Reconfig slow peer parameters for datanode (#3827)
Reviewed-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
parent
7fd90cdcbe
commit
0c194f2157
|
@ -552,7 +552,7 @@ class BPServiceActor implements Runnable {
|
|||
volumeFailureSummary.getFailedStorageLocations().length : 0;
|
||||
final boolean outliersReportDue = scheduler.isOutliersReportDue(now);
|
||||
final SlowPeerReports slowPeers =
|
||||
outliersReportDue && dn.getPeerMetrics() != null ?
|
||||
outliersReportDue && dnConf.peerStatsEnabled && dn.getPeerMetrics() != null ?
|
||||
SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
|
||||
SlowPeerReports.EMPTY_REPORT;
|
||||
final SlowDiskReports slowDisks =
|
||||
|
|
|
@ -887,7 +887,7 @@ class BlockReceiver implements Closeable {
|
|||
*/
|
||||
private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
|
||||
final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics();
|
||||
if (peerMetrics != null && isPenultimateNode) {
|
||||
if (datanode.getDnConf().peerStatsEnabled && peerMetrics != null && isPenultimateNode) {
|
||||
peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs);
|
||||
}
|
||||
}
|
||||
|
@ -1107,7 +1107,7 @@ class BlockReceiver implements Closeable {
|
|||
if (downstreams != null && downstreams.length > 0) {
|
||||
downstreamDNs = downstreams;
|
||||
isPenultimateNode = (downstreams.length == 1);
|
||||
if (isPenultimateNode && datanode.getPeerMetrics() != null) {
|
||||
if (isPenultimateNode && datanode.getDnConf().peerStatsEnabled) {
|
||||
mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ?
|
||||
downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr());
|
||||
LOG.debug("Will collect peer metrics for downstream node {}",
|
||||
|
|
|
@ -108,7 +108,7 @@ public class DNConf {
|
|||
private final long lifelineIntervalMs;
|
||||
volatile long blockReportInterval;
|
||||
volatile long blockReportSplitThreshold;
|
||||
final boolean peerStatsEnabled;
|
||||
volatile boolean peerStatsEnabled;
|
||||
final boolean diskStatsEnabled;
|
||||
final long outliersReportIntervalMs;
|
||||
final long ibrInterval;
|
||||
|
@ -507,4 +507,8 @@ public class DNConf {
|
|||
dn.getConf().set(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, delayMs);
|
||||
initBlockReportDelay();
|
||||
}
|
||||
|
||||
void setPeerStatsEnabled(boolean enablePeerStats) {
|
||||
peerStatsEnabled = enablePeerStats;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,11 +46,19 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
||||
|
@ -315,7 +323,11 @@ public class DataNode extends ReconfigurableBase
|
|||
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
|
||||
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
|
||||
DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
|
||||
DFS_CACHEREPORT_INTERVAL_MSEC_KEY));
|
||||
DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
|
||||
DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
||||
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
|
||||
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
|
||||
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY));
|
||||
|
||||
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
||||
|
||||
|
@ -357,7 +369,7 @@ public class DataNode extends ReconfigurableBase
|
|||
|
||||
DataNodeMetrics metrics;
|
||||
@Nullable
|
||||
private DataNodePeerMetrics peerMetrics;
|
||||
private volatile DataNodePeerMetrics peerMetrics;
|
||||
private DataNodeDiskMetrics diskMetrics;
|
||||
private InetSocketAddress streamingAddr;
|
||||
|
||||
|
@ -634,6 +646,11 @@ public class DataNode extends ReconfigurableBase
|
|||
return reconfDataXceiverParameters(property, newVal);
|
||||
case DFS_CACHEREPORT_INTERVAL_MSEC_KEY:
|
||||
return reconfCacheReportParameters(property, newVal);
|
||||
case DFS_DATANODE_PEER_STATS_ENABLED_KEY:
|
||||
case DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY:
|
||||
case DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY:
|
||||
case DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY:
|
||||
return reconfSlowPeerParameters(property, newVal);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -713,6 +730,53 @@ public class DataNode extends ReconfigurableBase
|
|||
}
|
||||
}
|
||||
|
||||
private String reconfSlowPeerParameters(String property, String newVal)
|
||||
throws ReconfigurationException {
|
||||
String result = null;
|
||||
try {
|
||||
LOG.info("Reconfiguring {} to {}", property, newVal);
|
||||
if (property.equals(DFS_DATANODE_PEER_STATS_ENABLED_KEY)) {
|
||||
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
|
||||
if (newVal != null && !newVal.equalsIgnoreCase("true")
|
||||
&& !newVal.equalsIgnoreCase("false")) {
|
||||
throw new IllegalArgumentException("Not a valid Boolean value for " + property +
|
||||
" in reconfSlowPeerParameters");
|
||||
}
|
||||
boolean enable = (newVal == null ? DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT :
|
||||
Boolean.parseBoolean(newVal));
|
||||
result = Boolean.toString(enable);
|
||||
dnConf.setPeerStatsEnabled(enable);
|
||||
if (enable) {
|
||||
// Create if it doesn't exist, overwrite if it does.
|
||||
peerMetrics = DataNodePeerMetrics.create(getDisplayName(), getConf());
|
||||
}
|
||||
} else if (property.equals(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY)) {
|
||||
Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
|
||||
long minNodes = (newVal == null ? DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT :
|
||||
Long.parseLong(newVal));
|
||||
result = Long.toString(minNodes);
|
||||
peerMetrics.setMinOutlierDetectionNodes(minNodes);
|
||||
} else if (property.equals(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY)) {
|
||||
Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
|
||||
long threshold = (newVal == null ? DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT :
|
||||
Long.parseLong(newVal));
|
||||
result = Long.toString(threshold);
|
||||
peerMetrics.setLowThresholdMs(threshold);
|
||||
} else if (property.equals(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY)) {
|
||||
Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
|
||||
long minSamples = (newVal == null ?
|
||||
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT :
|
||||
Long.parseLong(newVal));
|
||||
result = Long.toString(minSamples);
|
||||
peerMetrics.setMinOutlierDetectionSamples(minSamples);
|
||||
}
|
||||
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
|
||||
return result;
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a list of the keys of the re-configurable properties in configuration.
|
||||
*/
|
||||
|
@ -3872,7 +3936,7 @@ public class DataNode extends ReconfigurableBase
|
|||
|
||||
@Override // DataNodeMXBean
|
||||
public String getSendPacketDownstreamAvgInfo() {
|
||||
return peerMetrics != null ?
|
||||
return dnConf.peerStatsEnabled && peerMetrics != null ?
|
||||
peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson() : null;
|
||||
}
|
||||
|
||||
|
|
|
@ -341,7 +341,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
* the thread dies away.
|
||||
*/
|
||||
private void collectThreadLocalStates() {
|
||||
if (datanode.getPeerMetrics() != null) {
|
||||
if (datanode.getDnConf().peerStatsEnabled && datanode.getPeerMetrics() != null) {
|
||||
datanode.getPeerMetrics().collectThreadLocalStates();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,18 +21,23 @@ package org.apache.hadoop.hdfs.server.datanode.metrics;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.metrics2.MetricsJsonBuilder;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
|
||||
import org.apache.hadoop.util.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
|
||||
|
||||
/**
|
||||
* This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
|
||||
|
@ -57,15 +62,15 @@ public class DataNodePeerMetrics {
|
|||
* for outlier detection. If the number of samples is below this then
|
||||
* outlier detection is skipped.
|
||||
*/
|
||||
private final long minOutlierDetectionSamples;
|
||||
private volatile long minOutlierDetectionSamples;
|
||||
/**
|
||||
* Threshold in milliseconds below which a DataNode is definitely not slow.
|
||||
*/
|
||||
private final long lowThresholdMs;
|
||||
private volatile long lowThresholdMs;
|
||||
/**
|
||||
* Minimum number of nodes to run outlier detection.
|
||||
*/
|
||||
private final long minOutlierDetectionNodes;
|
||||
private volatile long minOutlierDetectionNodes;
|
||||
|
||||
public DataNodePeerMetrics(final String name, Configuration conf) {
|
||||
this.name = name;
|
||||
|
@ -73,11 +78,11 @@ public class DataNodePeerMetrics {
|
|||
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY,
|
||||
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT);
|
||||
lowThresholdMs =
|
||||
conf.getLong(DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
|
||||
conf.getLong(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
|
||||
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
|
||||
minOutlierDetectionNodes =
|
||||
conf.getLong(DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
|
||||
conf.getLong(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
|
||||
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
|
||||
this.slowNodeDetector =
|
||||
new OutlierDetector(minOutlierDetectionNodes, lowThresholdMs);
|
||||
sendPacketDownstreamRollingAverages = new MutableRollingAverages("Time");
|
||||
|
@ -87,7 +92,7 @@ public class DataNodePeerMetrics {
|
|||
return name;
|
||||
}
|
||||
|
||||
long getMinOutlierDetectionSamples() {
|
||||
public long getMinOutlierDetectionSamples() {
|
||||
return minOutlierDetectionSamples;
|
||||
}
|
||||
|
||||
|
@ -150,4 +155,38 @@ public class DataNodePeerMetrics {
|
|||
public MutableRollingAverages getSendPacketDownstreamRollingAverages() {
|
||||
return sendPacketDownstreamRollingAverages;
|
||||
}
|
||||
|
||||
public void setMinOutlierDetectionNodes(long minNodes) {
|
||||
Preconditions.checkArgument(minNodes > 0,
|
||||
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY + " should be larger than 0");
|
||||
minOutlierDetectionNodes = minNodes;
|
||||
this.slowNodeDetector.setMinNumResources(minNodes);
|
||||
}
|
||||
|
||||
public long getMinOutlierDetectionNodes() {
|
||||
return minOutlierDetectionNodes;
|
||||
}
|
||||
|
||||
public void setLowThresholdMs(long thresholdMs) {
|
||||
Preconditions.checkArgument(thresholdMs > 0,
|
||||
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY + " should be larger than 0");
|
||||
lowThresholdMs = thresholdMs;
|
||||
this.slowNodeDetector.setLowThresholdMs(thresholdMs);
|
||||
}
|
||||
|
||||
public long getLowThresholdMs() {
|
||||
return lowThresholdMs;
|
||||
}
|
||||
|
||||
public void setMinOutlierDetectionSamples(long minSamples) {
|
||||
Preconditions.checkArgument(minSamples > 0,
|
||||
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY +
|
||||
" should be larger than 0");
|
||||
minOutlierDetectionSamples = minSamples;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public OutlierDetector getSlowNodeDetector() {
|
||||
return this.slowNodeDetector;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ public class OutlierDetector {
|
|||
/**
|
||||
* Minimum number of resources to run outlier detection.
|
||||
*/
|
||||
private final long minNumResources;
|
||||
private volatile long minNumResources;
|
||||
|
||||
/**
|
||||
* The multiplier is from Leys, C. et al.
|
||||
|
@ -70,7 +70,7 @@ public class OutlierDetector {
|
|||
/**
|
||||
* Threshold in milliseconds below which a node/ disk is definitely not slow.
|
||||
*/
|
||||
private final long lowThresholdMs;
|
||||
private volatile long lowThresholdMs;
|
||||
|
||||
/**
|
||||
* Deviation multiplier. A sample is considered to be an outlier if it
|
||||
|
@ -180,4 +180,20 @@ public class OutlierDetector {
|
|||
}
|
||||
return median;
|
||||
}
|
||||
|
||||
public void setMinNumResources(long minNodes) {
|
||||
minNumResources = minNodes;
|
||||
}
|
||||
|
||||
public long getMinOutlierDetectionNodes() {
|
||||
return minNumResources;
|
||||
}
|
||||
|
||||
public void setLowThresholdMs(long thresholdMs) {
|
||||
lowThresholdMs = thresholdMs;
|
||||
}
|
||||
|
||||
public long getLowThresholdMs() {
|
||||
return lowThresholdMs;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
@ -45,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -84,6 +92,7 @@ public class TestDataNodeReconfiguration {
|
|||
private void startDFSCluster(int numNameNodes, int numDataNodes)
|
||||
throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
|
||||
|
||||
MiniDFSNNTopology nnTopology = MiniDFSNNTopology
|
||||
.simpleFederatedTopology(numNameNodes);
|
||||
|
@ -467,4 +476,82 @@ public class TestDataNodeReconfiguration {
|
|||
dn.getConf().get(DFS_CACHEREPORT_INTERVAL_MSEC_KEY));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSlowPeerParameters() throws Exception {
|
||||
String[] slowPeersParameters = {
|
||||
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
|
||||
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
|
||||
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY};
|
||||
|
||||
for (int i = 0; i < NUM_DATA_NODE; i++) {
|
||||
DataNode dn = cluster.getDataNodes().get(i);
|
||||
|
||||
// Try invalid values.
|
||||
LambdaTestUtils.intercept(ReconfigurationException.class,
|
||||
"Could not change property dfs.datanode.peer.stats.enabled from 'true' to 'text'",
|
||||
() -> dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "text"));
|
||||
|
||||
for (String parameter : slowPeersParameters) {
|
||||
try {
|
||||
dn.reconfigureProperty(parameter, "text");
|
||||
fail("ReconfigurationException expected");
|
||||
} catch (ReconfigurationException expected) {
|
||||
assertTrue("expecting NumberFormatException",
|
||||
expected.getCause() instanceof NumberFormatException);
|
||||
}
|
||||
|
||||
try {
|
||||
dn.reconfigureProperty(parameter, String.valueOf(-1));
|
||||
fail("ReconfigurationException expected");
|
||||
} catch (ReconfigurationException expected) {
|
||||
assertTrue("expecting IllegalArgumentException",
|
||||
expected.getCause() instanceof IllegalArgumentException);
|
||||
}
|
||||
}
|
||||
|
||||
// Change and verify properties.
|
||||
dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "false");
|
||||
assertFalse(dn.getDnConf().peerStatsEnabled);
|
||||
|
||||
// Reset DFS_DATANODE_PEER_STATS_ENABLED_KEY to true.
|
||||
dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
|
||||
for (String parameter : slowPeersParameters) {
|
||||
dn.reconfigureProperty(parameter, "123");
|
||||
}
|
||||
assertEquals(123, dn.getPeerMetrics().getMinOutlierDetectionNodes());
|
||||
assertEquals(123, dn.getPeerMetrics().getLowThresholdMs());
|
||||
assertEquals(123, dn.getPeerMetrics().getMinOutlierDetectionSamples());
|
||||
assertEquals(123,
|
||||
dn.getPeerMetrics().getSlowNodeDetector().getMinOutlierDetectionNodes());
|
||||
assertEquals(123,
|
||||
dn.getPeerMetrics().getSlowNodeDetector().getLowThresholdMs());
|
||||
|
||||
// Revert to default and verify.
|
||||
dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, null);
|
||||
assertEquals(String.format("expect %s is not configured",
|
||||
DFS_DATANODE_PEER_STATS_ENABLED_KEY), null,
|
||||
dn.getConf().get(DFS_DATANODE_PEER_STATS_ENABLED_KEY));
|
||||
|
||||
// Reset DFS_DATANODE_PEER_STATS_ENABLED_KEY to true.
|
||||
dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
|
||||
|
||||
for (String parameter : slowPeersParameters) {
|
||||
dn.reconfigureProperty(parameter, null);
|
||||
}
|
||||
assertEquals(String.format("expect %s is not configured",
|
||||
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY), null,
|
||||
dn.getConf().get(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY));
|
||||
assertEquals(String.format("expect %s is not configured",
|
||||
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY), null,
|
||||
dn.getConf().get(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY));
|
||||
assertEquals(String.format("expect %s is not configured",
|
||||
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY), null,
|
||||
dn.getConf().get(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY));
|
||||
assertEquals(dn.getPeerMetrics().getSlowNodeDetector().getMinOutlierDetectionNodes(),
|
||||
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
|
||||
assertEquals(dn.getPeerMetrics().getSlowNodeDetector().getLowThresholdMs(),
|
||||
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -338,7 +338,7 @@ public class TestDFSAdmin {
|
|||
final List<String> outs = Lists.newArrayList();
|
||||
final List<String> errs = Lists.newArrayList();
|
||||
getReconfigurableProperties("datanode", address, outs, errs);
|
||||
assertEquals(8, outs.size());
|
||||
assertEquals(12, outs.size());
|
||||
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue