HDFS-14235. Handle ArrayIndexOutOfBoundsException in DataNodeDiskMetrics#slowDiskDetectionDaemon. Contributed by Ranith Sardar.
(cherry picked from commit 41e18feda3
)
This commit is contained in:
parent
e6f2b8730f
commit
b93b127956
|
@ -57,6 +57,10 @@ public class DataNodeDiskMetrics {
|
||||||
private volatile Map<String, Map<DiskOp, Double>>
|
private volatile Map<String, Map<DiskOp, Double>>
|
||||||
diskOutliersStats = Maps.newHashMap();
|
diskOutliersStats = Maps.newHashMap();
|
||||||
|
|
||||||
|
// Adding for test purpose. When addSlowDiskForTesting() called from test
|
||||||
|
// code, status should not be overridden by daemon thread.
|
||||||
|
private boolean overrideStatus = true;
|
||||||
|
|
||||||
public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs) {
|
public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs) {
|
||||||
this.dn = dn;
|
this.dn = dn;
|
||||||
this.detectionInterval = diskOutlierDetectionIntervalMs;
|
this.detectionInterval = diskOutlierDetectionIntervalMs;
|
||||||
|
@ -71,6 +75,7 @@ public class DataNodeDiskMetrics {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
while (shouldRun) {
|
while (shouldRun) {
|
||||||
|
if (dn.getFSDataset() != null) {
|
||||||
Map<String, Double> metadataOpStats = Maps.newHashMap();
|
Map<String, Double> metadataOpStats = Maps.newHashMap();
|
||||||
Map<String, Double> readIoStats = Maps.newHashMap();
|
Map<String, Double> readIoStats = Maps.newHashMap();
|
||||||
Map<String, Double> writeIoStats = Maps.newHashMap();
|
Map<String, Double> writeIoStats = Maps.newHashMap();
|
||||||
|
@ -81,7 +86,7 @@ public class DataNodeDiskMetrics {
|
||||||
.iterator();
|
.iterator();
|
||||||
while (volumeIterator.hasNext()) {
|
while (volumeIterator.hasNext()) {
|
||||||
FsVolumeSpi volume = volumeIterator.next();
|
FsVolumeSpi volume = volumeIterator.next();
|
||||||
DataNodeVolumeMetrics metrics = volumeIterator.next().getMetrics();
|
DataNodeVolumeMetrics metrics = volume.getMetrics();
|
||||||
String volumeName = volume.getBaseURI().getPath();
|
String volumeName = volume.getBaseURI().getPath();
|
||||||
|
|
||||||
metadataOpStats.put(volumeName,
|
metadataOpStats.put(volumeName,
|
||||||
|
@ -98,14 +103,15 @@ public class DataNodeDiskMetrics {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (metadataOpStats.isEmpty() && readIoStats.isEmpty() &&
|
if (metadataOpStats.isEmpty() && readIoStats.isEmpty()
|
||||||
writeIoStats.isEmpty()) {
|
&& writeIoStats.isEmpty()) {
|
||||||
LOG.debug("No disk stats available for detecting outliers.");
|
LOG.debug("No disk stats available for detecting outliers.");
|
||||||
return;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
detectAndUpdateDiskOutliers(metadataOpStats, readIoStats,
|
detectAndUpdateDiskOutliers(metadataOpStats, readIoStats,
|
||||||
writeIoStats);
|
writeIoStats);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(detectionInterval);
|
Thread.sleep(detectionInterval);
|
||||||
|
@ -143,10 +149,11 @@ public class DataNodeDiskMetrics {
|
||||||
for (Map.Entry<String, Double> entry : writeIoOutliers.entrySet()) {
|
for (Map.Entry<String, Double> entry : writeIoOutliers.entrySet()) {
|
||||||
addDiskStat(diskStats, entry.getKey(), DiskOp.WRITE, entry.getValue());
|
addDiskStat(diskStats, entry.getKey(), DiskOp.WRITE, entry.getValue());
|
||||||
}
|
}
|
||||||
|
if (overrideStatus) {
|
||||||
diskOutliersStats = diskStats;
|
diskOutliersStats = diskStats;
|
||||||
LOG.debug("Updated disk outliers.");
|
LOG.debug("Updated disk outliers.");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void addDiskStat(Map<String, Map<DiskOp, Double>> diskStats,
|
private void addDiskStat(Map<String, Map<DiskOp, Double>> diskStats,
|
||||||
String disk, DiskOp diskOp, double latency) {
|
String disk, DiskOp diskOp, double latency) {
|
||||||
|
@ -176,6 +183,7 @@ public class DataNodeDiskMetrics {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void addSlowDiskForTesting(String slowDiskPath,
|
public void addSlowDiskForTesting(String slowDiskPath,
|
||||||
Map<DiskOp, Double> latencies) {
|
Map<DiskOp, Double> latencies) {
|
||||||
|
overrideStatus = false;
|
||||||
if (latencies == null) {
|
if (latencies == null) {
|
||||||
diskOutliersStats.put(slowDiskPath, ImmutableMap.of());
|
diskOutliersStats.put(slowDiskPath, ImmutableMap.of());
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue