HDFS-11461. DataNode Disk Outlier Detection. Contributed by Hanisha Koneru.
This commit is contained in:
parent
747bafaf96
commit
b3ec531f40
|
@ -677,9 +677,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT = "dfs.block.misreplication.processing.limit";
|
public static final String DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT = "dfs.block.misreplication.processing.limit";
|
||||||
public static final int DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT = 10000;
|
public static final int DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT = 10000;
|
||||||
|
|
||||||
public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY =
|
public static final String DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY =
|
||||||
"dfs.datanode.slow.peers.report.interval";
|
"dfs.datanode.outliers.report.interval";
|
||||||
public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT =
|
public static final String DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT =
|
||||||
"30m";
|
"30m";
|
||||||
|
|
||||||
// property for fsimage compression
|
// property for fsimage compression
|
||||||
|
|
|
@ -94,8 +94,8 @@ public class SlowPeerTracker {
|
||||||
this.timer = timer;
|
this.timer = timer;
|
||||||
this.allReports = new ConcurrentHashMap<>();
|
this.allReports = new ConcurrentHashMap<>();
|
||||||
this.reportValidityMs = conf.getTimeDuration(
|
this.reportValidityMs = conf.getTimeDuration(
|
||||||
DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY,
|
DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT,
|
DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT,
|
||||||
TimeUnit.MILLISECONDS) * 3;
|
TimeUnit.MILLISECONDS) * 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -129,7 +129,7 @@ class BPServiceActor implements Runnable {
|
||||||
prevBlockReportId = ThreadLocalRandom.current().nextLong();
|
prevBlockReportId = ThreadLocalRandom.current().nextLong();
|
||||||
scheduler = new Scheduler(dnConf.heartBeatInterval,
|
scheduler = new Scheduler(dnConf.heartBeatInterval,
|
||||||
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
|
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
|
||||||
dnConf.slowPeersReportIntervalMs);
|
dnConf.outliersReportIntervalMs);
|
||||||
// get the value of maxDataLength.
|
// get the value of maxDataLength.
|
||||||
this.maxDataLength = dnConf.getMaxDataLength();
|
this.maxDataLength = dnConf.getMaxDataLength();
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,8 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
||||||
|
@ -95,7 +95,8 @@ public class DNConf {
|
||||||
final long blockReportInterval;
|
final long blockReportInterval;
|
||||||
final long blockReportSplitThreshold;
|
final long blockReportSplitThreshold;
|
||||||
final boolean peerStatsEnabled;
|
final boolean peerStatsEnabled;
|
||||||
final long slowPeersReportIntervalMs;
|
final boolean diskStatsEnabled;
|
||||||
|
final long outliersReportIntervalMs;
|
||||||
final long ibrInterval;
|
final long ibrInterval;
|
||||||
final long initialBlockReportDelayMs;
|
final long initialBlockReportDelayMs;
|
||||||
final long cacheReportInterval;
|
final long cacheReportInterval;
|
||||||
|
@ -173,9 +174,12 @@ public class DNConf {
|
||||||
this.peerStatsEnabled = getConf().getBoolean(
|
this.peerStatsEnabled = getConf().getBoolean(
|
||||||
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
|
||||||
this.slowPeersReportIntervalMs = getConf().getTimeDuration(
|
this.diskStatsEnabled = getConf().getBoolean(
|
||||||
DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY,
|
DFSConfigKeys.DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY,
|
||||||
DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT,
|
DFSConfigKeys.DFS_DATANODE_ENABLE_FILEIO_PROFILING_DEFAULT);
|
||||||
|
this.outliersReportIntervalMs = getConf().getTimeDuration(
|
||||||
|
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
|
||||||
|
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT,
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
this.ibrInterval = getConf().getLong(
|
this.ibrInterval = getConf().getLong(
|
||||||
DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,
|
DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,
|
||||||
|
|
|
@ -164,6 +164,7 @@ import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResour
|
||||||
import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker;
|
import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
|
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
|
||||||
|
@ -336,6 +337,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
DataNodeMetrics metrics;
|
DataNodeMetrics metrics;
|
||||||
@Nullable
|
@Nullable
|
||||||
private DataNodePeerMetrics peerMetrics;
|
private DataNodePeerMetrics peerMetrics;
|
||||||
|
private DataNodeDiskMetrics diskMetrics;
|
||||||
private InetSocketAddress streamingAddr;
|
private InetSocketAddress streamingAddr;
|
||||||
|
|
||||||
// See the note below in incrDatanodeNetworkErrors re: concurrency.
|
// See the note below in incrDatanodeNetworkErrors re: concurrency.
|
||||||
|
@ -1390,6 +1392,11 @@ public class DataNode extends ReconfigurableBase
|
||||||
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
|
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
|
||||||
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
|
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
|
||||||
startMetricsLogger();
|
startMetricsLogger();
|
||||||
|
|
||||||
|
if (dnConf.diskStatsEnabled) {
|
||||||
|
diskMetrics = new DataNodeDiskMetrics(this,
|
||||||
|
dnConf.outliersReportIntervalMs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2046,6 +2053,9 @@ public class DataNode extends ReconfigurableBase
|
||||||
if (metrics != null) {
|
if (metrics != null) {
|
||||||
metrics.shutdown();
|
metrics.shutdown();
|
||||||
}
|
}
|
||||||
|
if (diskMetrics != null) {
|
||||||
|
diskMetrics.shutdownAndWait();
|
||||||
|
}
|
||||||
if (dataNodeInfoBeanName != null) {
|
if (dataNodeInfoBeanName != null) {
|
||||||
MBeans.unregister(dataNodeInfoBeanName);
|
MBeans.unregister(dataNodeInfoBeanName);
|
||||||
dataNodeInfoBeanName = null;
|
dataNodeInfoBeanName = null;
|
||||||
|
|
|
@ -0,0 +1,181 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.metrics;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class detects and maintains DataNode disk outliers and their
|
||||||
|
* latencies for different ops (metadata, read, write).
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class DataNodeDiskMetrics {
|
||||||
|
|
||||||
|
public static final Logger LOG = LoggerFactory.getLogger(
|
||||||
|
DataNodeDiskMetrics.class);
|
||||||
|
|
||||||
|
private DataNode dn;
|
||||||
|
private final long MIN_OUTLIER_DETECTION_DISKS = 5;
|
||||||
|
private final long SLOW_DISK_LOW_THRESHOLD_MS = 20;
|
||||||
|
private final long detectionInterval;
|
||||||
|
private volatile boolean shouldRun;
|
||||||
|
private OutlierDetector slowDiskDetector;
|
||||||
|
private Daemon slowDiskDetectionDaemon;
|
||||||
|
private volatile Map<String, Map<DiskOutlierDetectionOp, Double>> diskOutliersStats;
|
||||||
|
|
||||||
|
public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs) {
|
||||||
|
this.dn = dn;
|
||||||
|
this.detectionInterval = diskOutlierDetectionIntervalMs;
|
||||||
|
slowDiskDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_DISKS,
|
||||||
|
SLOW_DISK_LOW_THRESHOLD_MS);
|
||||||
|
shouldRun = true;
|
||||||
|
startDiskOutlierDetectionThread();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startDiskOutlierDetectionThread() {
|
||||||
|
slowDiskDetectionDaemon = new Daemon(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (shouldRun) {
|
||||||
|
Map<String, Double> metadataOpStats = Maps.newHashMap();
|
||||||
|
Map<String, Double> readIoStats = Maps.newHashMap();
|
||||||
|
Map<String, Double> writeIoStats = Maps.newHashMap();
|
||||||
|
FsDatasetSpi.FsVolumeReferences fsVolumeReferences = null;
|
||||||
|
try {
|
||||||
|
fsVolumeReferences = dn.getFSDataset().getFsVolumeReferences();
|
||||||
|
Iterator<FsVolumeSpi> volumeIterator = fsVolumeReferences
|
||||||
|
.iterator();
|
||||||
|
while (volumeIterator.hasNext()) {
|
||||||
|
FsVolumeSpi volume = volumeIterator.next();
|
||||||
|
DataNodeVolumeMetrics metrics = volumeIterator.next().getMetrics();
|
||||||
|
String volumeName = volume.getBaseURI().getPath();
|
||||||
|
|
||||||
|
metadataOpStats.put(volumeName,
|
||||||
|
metrics.getMetadataOperationMean());
|
||||||
|
readIoStats.put(volumeName, metrics.getReadIoMean());
|
||||||
|
writeIoStats.put(volumeName, metrics.getWriteIoMean());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (fsVolumeReferences != null) {
|
||||||
|
try {
|
||||||
|
fsVolumeReferences.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Error in releasing FS Volume references", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (metadataOpStats.isEmpty() && readIoStats.isEmpty() &&
|
||||||
|
writeIoStats.isEmpty()) {
|
||||||
|
LOG.debug("No disk stats available for detecting outliers.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
detectAndUpdateDiskOutliers(metadataOpStats, readIoStats,
|
||||||
|
writeIoStats);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(detectionInterval);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.error("Disk Outlier Detection thread interrupted", e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
slowDiskDetectionDaemon.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void detectAndUpdateDiskOutliers(Map<String, Double> metadataOpStats,
|
||||||
|
Map<String, Double> readIoStats, Map<String, Double> writeIoStats) {
|
||||||
|
Set<String> diskOutliersSet = Sets.newHashSet();
|
||||||
|
|
||||||
|
// Get MetadataOp Outliers
|
||||||
|
Map<String, Double> metadataOpOutliers = slowDiskDetector
|
||||||
|
.getOutliers(metadataOpStats);
|
||||||
|
if (!metadataOpOutliers.isEmpty()) {
|
||||||
|
diskOutliersSet.addAll(metadataOpOutliers.keySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get ReadIo Outliers
|
||||||
|
Map<String, Double> readIoOutliers = slowDiskDetector
|
||||||
|
.getOutliers(readIoStats);
|
||||||
|
if (!readIoOutliers.isEmpty()) {
|
||||||
|
diskOutliersSet.addAll(readIoOutliers.keySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get WriteIo Outliers
|
||||||
|
Map<String, Double> writeIoOutliers = slowDiskDetector
|
||||||
|
.getOutliers(writeIoStats);
|
||||||
|
if (!readIoOutliers.isEmpty()) {
|
||||||
|
diskOutliersSet.addAll(writeIoOutliers.keySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Map<DiskOutlierDetectionOp, Double>> diskStats =
|
||||||
|
Maps.newHashMap();
|
||||||
|
for (String disk : diskOutliersSet) {
|
||||||
|
Map<DiskOutlierDetectionOp, Double> diskStat = Maps.newHashMap();
|
||||||
|
diskStat.put(DiskOutlierDetectionOp.METADATA, metadataOpStats.get(disk));
|
||||||
|
diskStat.put(DiskOutlierDetectionOp.READ, readIoStats.get(disk));
|
||||||
|
diskStat.put(DiskOutlierDetectionOp.WRITE, writeIoStats.get(disk));
|
||||||
|
diskStats.put(disk, diskStat);
|
||||||
|
}
|
||||||
|
|
||||||
|
diskOutliersStats = diskStats;
|
||||||
|
LOG.debug("Updated disk outliers.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lists the types of operations on which disk latencies are measured.
|
||||||
|
*/
|
||||||
|
public enum DiskOutlierDetectionOp {
|
||||||
|
METADATA,
|
||||||
|
READ,
|
||||||
|
WRITE
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String,
|
||||||
|
Map<DiskOutlierDetectionOp, Double>> getDiskOutliersStats() {
|
||||||
|
return diskOutliersStats;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdownAndWait() {
|
||||||
|
shouldRun = false;
|
||||||
|
slowDiskDetectionDaemon.interrupt();
|
||||||
|
try {
|
||||||
|
slowDiskDetectionDaemon.join();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.error("Disk Outlier Detection daemon did not shutdown", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -52,8 +52,9 @@ public class DataNodePeerMetrics {
|
||||||
* Threshold in milliseconds below which a DataNode is definitely not slow.
|
* Threshold in milliseconds below which a DataNode is definitely not slow.
|
||||||
*/
|
*/
|
||||||
private static final long LOW_THRESHOLD_MS = 5;
|
private static final long LOW_THRESHOLD_MS = 5;
|
||||||
|
private static final long MIN_OUTLIER_DETECTION_NODES = 10;
|
||||||
|
|
||||||
private final SlowNodeDetector slowNodeDetector;
|
private final OutlierDetector slowNodeDetector;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Minimum number of packet send samples which are required to qualify
|
* Minimum number of packet send samples which are required to qualify
|
||||||
|
@ -68,7 +69,8 @@ public class DataNodePeerMetrics {
|
||||||
final long windowSizeMs,
|
final long windowSizeMs,
|
||||||
final int numWindows) {
|
final int numWindows) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.slowNodeDetector = new SlowNodeDetector(LOW_THRESHOLD_MS);
|
this.slowNodeDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_NODES,
|
||||||
|
LOW_THRESHOLD_MS);
|
||||||
sendPacketDownstreamRollingAvgerages = new RollingAverages(
|
sendPacketDownstreamRollingAvgerages = new RollingAverages(
|
||||||
windowSizeMs, numWindows);
|
windowSizeMs, numWindows);
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,8 +33,8 @@ import java.util.Map;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A utility class to help detect nodes whose aggregate latency
|
* A utility class to help detect resources (nodes/ disks) whose aggregate
|
||||||
* is an outlier within a given set.
|
* latency is an outlier within a given set.
|
||||||
*
|
*
|
||||||
* We use the median absolute deviation for outlier detection as
|
* We use the median absolute deviation for outlier detection as
|
||||||
* described in the following publication:
|
* described in the following publication:
|
||||||
|
@ -47,20 +47,20 @@ import java.util.Map;
|
||||||
* more conservative:
|
* more conservative:
|
||||||
*
|
*
|
||||||
* 1. Skip outlier detection if the sample size is too small.
|
* 1. Skip outlier detection if the sample size is too small.
|
||||||
* 2. Never flag nodes whose aggregate latency is below a low threshold.
|
* 2. Never flag resources whose aggregate latency is below a low threshold.
|
||||||
* 3. Never flag nodes whose aggregate latency is less than a small
|
* 3. Never flag resources whose aggregate latency is less than a small
|
||||||
* multiple of the median.
|
* multiple of the median.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class SlowNodeDetector {
|
public class OutlierDetector {
|
||||||
public static final Logger LOG =
|
public static final Logger LOG =
|
||||||
LoggerFactory.getLogger(SlowNodeDetector.class);
|
LoggerFactory.getLogger(OutlierDetector.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Minimum number of peers to run outlier detection.
|
* Minimum number of resources to run outlier detection.
|
||||||
*/
|
*/
|
||||||
private static long minOutlierDetectionPeers = 10;
|
private final long minNumResources;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The multiplier is from Leys, C. et al.
|
* The multiplier is from Leys, C. et al.
|
||||||
|
@ -68,7 +68,7 @@ public class SlowNodeDetector {
|
||||||
private static final double MAD_MULTIPLIER = (double) 1.4826;
|
private static final double MAD_MULTIPLIER = (double) 1.4826;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Threshold in milliseconds below which a DataNode is definitely not slow.
|
* Threshold in milliseconds below which a node/ disk is definitely not slow.
|
||||||
*/
|
*/
|
||||||
private final long lowThresholdMs;
|
private final long lowThresholdMs;
|
||||||
|
|
||||||
|
@ -87,13 +87,14 @@ public class SlowNodeDetector {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static final int MEDIAN_MULTIPLIER = 3;
|
static final int MEDIAN_MULTIPLIER = 3;
|
||||||
|
|
||||||
public SlowNodeDetector(long lowThresholdMs) {
|
public OutlierDetector(long minNumResources, long lowThresholdMs) {
|
||||||
|
this.minNumResources = minNumResources;
|
||||||
this.lowThresholdMs = lowThresholdMs;
|
this.lowThresholdMs = lowThresholdMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a set of DataNodes whose latency is much higher than
|
* Return a set of nodes/ disks whose latency is much higher than
|
||||||
* their peers. The input is a map of (node -> aggregate latency)
|
* their counterparts. The input is a map of (resource -> aggregate latency)
|
||||||
* entries.
|
* entries.
|
||||||
*
|
*
|
||||||
* The aggregate may be an arithmetic mean or a percentile e.g.
|
* The aggregate may be an arithmetic mean or a percentile e.g.
|
||||||
|
@ -106,10 +107,10 @@ public class SlowNodeDetector {
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public Map<String, Double> getOutliers(Map<String, Double> stats) {
|
public Map<String, Double> getOutliers(Map<String, Double> stats) {
|
||||||
if (stats.size() < minOutlierDetectionPeers) {
|
if (stats.size() < minNumResources) {
|
||||||
LOG.debug("Skipping statistical outlier detection as we don't have " +
|
LOG.debug("Skipping statistical outlier detection as we don't have " +
|
||||||
"latency data for enough peers. Have {}, need at least {}",
|
"latency data for enough resources. Have {}, need at least {}",
|
||||||
stats.size(), minOutlierDetectionPeers);
|
stats.size(), minNumResources);
|
||||||
return ImmutableMap.of();
|
return ImmutableMap.of();
|
||||||
}
|
}
|
||||||
// Compute the median absolute deviation of the aggregates.
|
// Compute the median absolute deviation of the aggregates.
|
||||||
|
@ -122,20 +123,20 @@ public class SlowNodeDetector {
|
||||||
upperLimitLatency = Math.max(
|
upperLimitLatency = Math.max(
|
||||||
upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad));
|
upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad));
|
||||||
|
|
||||||
final Map<String, Double> slowNodes = new HashMap<>();
|
final Map<String, Double> slowResources = new HashMap<>();
|
||||||
|
|
||||||
LOG.trace("getOutliers: List={}, MedianLatency={}, " +
|
LOG.trace("getOutliers: List={}, MedianLatency={}, " +
|
||||||
"MedianAbsoluteDeviation={}, upperLimitLatency={}",
|
"MedianAbsoluteDeviation={}, upperLimitLatency={}",
|
||||||
sorted, median, mad, upperLimitLatency);
|
sorted, median, mad, upperLimitLatency);
|
||||||
|
|
||||||
// Find nodes whose latency exceeds the threshold.
|
// Find resources whose latency exceeds the threshold.
|
||||||
for (Map.Entry<String, Double> entry : stats.entrySet()) {
|
for (Map.Entry<String, Double> entry : stats.entrySet()) {
|
||||||
if (entry.getValue() > upperLimitLatency) {
|
if (entry.getValue() > upperLimitLatency) {
|
||||||
slowNodes.put(entry.getKey(), entry.getValue());
|
slowResources.put(entry.getKey(), entry.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return slowNodes;
|
return slowResources;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -178,17 +179,4 @@ public class SlowNodeDetector {
|
||||||
}
|
}
|
||||||
return median;
|
return median;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This method *must not* be used outside of unit tests.
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
static void setMinOutlierDetectionPeers(long minOutlierDetectionPeers) {
|
|
||||||
SlowNodeDetector.minOutlierDetectionPeers = minOutlierDetectionPeers;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
static long getMinOutlierDetectionPeers() {
|
|
||||||
return minOutlierDetectionPeers;
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -2009,7 +2009,7 @@
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.datanode.slow.peers.report.interval</name>
|
<name>dfs.datanode.outliers.report.interval</name>
|
||||||
<value>30m</value>
|
<value>30m</value>
|
||||||
<description>
|
<description>
|
||||||
This setting controls how frequently DataNodes will report their peer
|
This setting controls how frequently DataNodes will report their peer
|
||||||
|
|
|
@ -54,13 +54,14 @@ public class TestDataNodeOutlierDetectionViaMetrics {
|
||||||
private static final int ROLLING_AVERAGE_WINDOWS = 10;
|
private static final int ROLLING_AVERAGE_WINDOWS = 10;
|
||||||
private static final int SLOW_NODE_LATENCY_MS = 20_000;
|
private static final int SLOW_NODE_LATENCY_MS = 20_000;
|
||||||
private static final int FAST_NODE_MAX_LATENCY_MS = 5;
|
private static final int FAST_NODE_MAX_LATENCY_MS = 5;
|
||||||
|
private static final long MIN_OUTLIER_DETECTION_PEERS = 10;
|
||||||
|
|
||||||
private Random random = new Random(System.currentTimeMillis());
|
private Random random = new Random(System.currentTimeMillis());
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
GenericTestUtils.setLogLevel(DataNodePeerMetrics.LOG, Level.ALL);
|
GenericTestUtils.setLogLevel(DataNodePeerMetrics.LOG, Level.ALL);
|
||||||
GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL);
|
GenericTestUtils.setLogLevel(OutlierDetector.LOG, Level.ALL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -111,8 +112,7 @@ public class TestDataNodeOutlierDetectionViaMetrics {
|
||||||
*/
|
*/
|
||||||
public void injectFastNodesSamples(DataNodePeerMetrics peerMetrics) {
|
public void injectFastNodesSamples(DataNodePeerMetrics peerMetrics) {
|
||||||
for (int nodeIndex = 0;
|
for (int nodeIndex = 0;
|
||||||
nodeIndex < SlowNodeDetector.getMinOutlierDetectionPeers();
|
nodeIndex < MIN_OUTLIER_DETECTION_PEERS; ++nodeIndex) {
|
||||||
++nodeIndex) {
|
|
||||||
final String nodeName = "FastNode-" + nodeIndex;
|
final String nodeName = "FastNode-" + nodeIndex;
|
||||||
LOG.info("Generating stats for node {}", nodeName);
|
LOG.info("Generating stats for node {}", nodeName);
|
||||||
for (int i = 0;
|
for (int i = 0;
|
||||||
|
|
|
@ -40,7 +40,7 @@ import java.util.Set;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for {@link SlowNodeDetector}.
|
* Unit tests for {@link OutlierDetector}.
|
||||||
*/
|
*/
|
||||||
public class TestSlowNodeDetector {
|
public class TestSlowNodeDetector {
|
||||||
public static final Logger LOG =
|
public static final Logger LOG =
|
||||||
|
@ -183,7 +183,7 @@ public class TestSlowNodeDetector {
|
||||||
.put(ImmutableMap.of(
|
.put(ImmutableMap.of(
|
||||||
"n1", LOW_THRESHOLD + 0.1,
|
"n1", LOW_THRESHOLD + 0.1,
|
||||||
"n2", LOW_THRESHOLD + 0.1,
|
"n2", LOW_THRESHOLD + 0.1,
|
||||||
"n3", LOW_THRESHOLD * SlowNodeDetector.MEDIAN_MULTIPLIER - 0.1),
|
"n3", LOW_THRESHOLD * OutlierDetector.MEDIAN_MULTIPLIER - 0.1),
|
||||||
ImmutableSet.of())
|
ImmutableSet.of())
|
||||||
|
|
||||||
// A statistical outlier must be returned if it is outside a
|
// A statistical outlier must be returned if it is outside a
|
||||||
|
@ -192,7 +192,7 @@ public class TestSlowNodeDetector {
|
||||||
"n1", LOW_THRESHOLD + 0.1,
|
"n1", LOW_THRESHOLD + 0.1,
|
||||||
"n2", LOW_THRESHOLD + 0.1,
|
"n2", LOW_THRESHOLD + 0.1,
|
||||||
"n3", (LOW_THRESHOLD + 0.1) *
|
"n3", (LOW_THRESHOLD + 0.1) *
|
||||||
SlowNodeDetector.MEDIAN_MULTIPLIER + 0.1),
|
OutlierDetector.MEDIAN_MULTIPLIER + 0.1),
|
||||||
ImmutableSet.of("n3"))
|
ImmutableSet.of("n3"))
|
||||||
|
|
||||||
// Only the statistical outliers n3 and n11 should be returned.
|
// Only the statistical outliers n3 and n11 should be returned.
|
||||||
|
@ -233,13 +233,13 @@ public class TestSlowNodeDetector {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
||||||
private SlowNodeDetector slowNodeDetector;
|
private OutlierDetector slowNodeDetector;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
slowNodeDetector = new SlowNodeDetector((long) LOW_THRESHOLD);
|
slowNodeDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_PEERS,
|
||||||
SlowNodeDetector.setMinOutlierDetectionPeers(MIN_OUTLIER_DETECTION_PEERS);
|
(long) LOW_THRESHOLD);
|
||||||
GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL);
|
GenericTestUtils.setLogLevel(OutlierDetector.LOG, Level.ALL);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -259,7 +259,7 @@ public class TestSlowNodeDetector {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit test for {@link SlowNodeDetector#computeMedian(List)}.
|
* Unit test for {@link OutlierDetector#computeMedian(List)}.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testMediansFromTestMatrix() {
|
public void testMediansFromTestMatrix() {
|
||||||
|
@ -267,7 +267,7 @@ public class TestSlowNodeDetector {
|
||||||
medianTestMatrix.entrySet()) {
|
medianTestMatrix.entrySet()) {
|
||||||
final List<Double> inputList = new ArrayList<>(entry.getKey());
|
final List<Double> inputList = new ArrayList<>(entry.getKey());
|
||||||
Collections.sort(inputList);
|
Collections.sort(inputList);
|
||||||
final Double median = SlowNodeDetector.computeMedian(inputList);
|
final Double median = OutlierDetector.computeMedian(inputList);
|
||||||
final Double expectedMedian = entry.getValue().getLeft();
|
final Double expectedMedian = entry.getValue().getLeft();
|
||||||
|
|
||||||
// Ensure that the median is within 0.001% of expected.
|
// Ensure that the median is within 0.001% of expected.
|
||||||
|
@ -283,7 +283,7 @@ public class TestSlowNodeDetector {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit test for {@link SlowNodeDetector#computeMad(List)}.
|
* Unit test for {@link OutlierDetector#computeMad(List)}.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testMadsFromTestMatrix() {
|
public void testMadsFromTestMatrix() {
|
||||||
|
@ -291,7 +291,7 @@ public class TestSlowNodeDetector {
|
||||||
medianTestMatrix.entrySet()) {
|
medianTestMatrix.entrySet()) {
|
||||||
final List<Double> inputList = new ArrayList<>(entry.getKey());
|
final List<Double> inputList = new ArrayList<>(entry.getKey());
|
||||||
Collections.sort(inputList);
|
Collections.sort(inputList);
|
||||||
final Double mad = SlowNodeDetector.computeMad(inputList);
|
final Double mad = OutlierDetector.computeMad(inputList);
|
||||||
final Double expectedMad = entry.getValue().getRight();
|
final Double expectedMad = entry.getValue().getRight();
|
||||||
|
|
||||||
// Ensure that the MAD is within 0.001% of expected.
|
// Ensure that the MAD is within 0.001% of expected.
|
||||||
|
@ -316,20 +316,20 @@ public class TestSlowNodeDetector {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify that {@link SlowNodeDetector#computeMedian(List)} throws when
|
* Verify that {@link OutlierDetector#computeMedian(List)} throws when
|
||||||
* passed an empty list.
|
* passed an empty list.
|
||||||
*/
|
*/
|
||||||
@Test(expected=IllegalArgumentException.class)
|
@Test(expected=IllegalArgumentException.class)
|
||||||
public void testMedianOfEmptyList() {
|
public void testMedianOfEmptyList() {
|
||||||
SlowNodeDetector.computeMedian(Collections.emptyList());
|
OutlierDetector.computeMedian(Collections.emptyList());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify that {@link SlowNodeDetector#computeMad(List)} throws when
|
* Verify that {@link OutlierDetector#computeMad(List)} throws when
|
||||||
* passed an empty list.
|
* passed an empty list.
|
||||||
*/
|
*/
|
||||||
@Test(expected=IllegalArgumentException.class)
|
@Test(expected=IllegalArgumentException.class)
|
||||||
public void testMadOfEmptyList() {
|
public void testMadOfEmptyList() {
|
||||||
SlowNodeDetector.computeMedian(Collections.emptyList());
|
OutlierDetector.computeMedian(Collections.emptyList());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue