HDFS-11551. Handle SlowDiskReport from DataNode at the NameNode. Contributed by Hanisha Koneru.

This commit is contained in:
Hanisha Koneru 2017-03-30 22:41:26 -07:00 committed by Arpit Agarwal
parent bf3fb585aa
commit 28cdc5a8dc
5 changed files with 812 additions and 24 deletions

View File

@ -48,7 +48,7 @@ public final class SlowDiskReports {
private final Map<String, Map<DiskOp, Double>> slowDisks;
/**
* An object representing a SlowPeerReports with no entries. Should
* An object representing a SlowDiskReports with no entries. Should
* be used instead of null or creating new objects when there are
* no slow peers to report.
*/
@ -119,8 +119,28 @@ public final class SlowDiskReports {
* Lists the types of operations on which disk latencies are measured.
*/
public enum DiskOp {
METADATA,
READ,
WRITE
METADATA("MetadataOp"),
READ("ReadIO"),
WRITE("WriteIO");
private final String value;
DiskOp(final String v) {
this.value = v;
}
@Override
public String toString() {
return value;
}
public static DiskOp fromValue(final String value) {
for (DiskOp as : DiskOp.values()) {
if (as.value.equals(value)) {
return as;
}
}
return null;
}
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@ -180,9 +181,15 @@ public class DatanodeManager {
* True if we should process latency metrics from downstream peers.
*/
private final boolean dataNodePeerStatsEnabled;
/**
* True if we should process latency metrics from individual DN disks.
*/
private final boolean dataNodeDiskStatsEnabled;
@Nullable
private final SlowPeerTracker slowPeerTracker;
@Nullable
private final SlowDiskTracker slowDiskTracker;
/**
* The minimum time between resending caching directives to Datanodes,
@ -208,9 +215,16 @@ public class DatanodeManager {
this.dataNodePeerStatsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
this.dataNodeDiskStatsEnabled = Util.isDiskStatsEnabled(conf.getDouble(
DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_FRACTION_KEY,
DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_FRACTION_DEFAULT));
final Timer timer = new Timer();
this.slowPeerTracker = dataNodePeerStatsEnabled ?
new SlowPeerTracker(conf, new Timer()) : null;
new SlowPeerTracker(conf, timer) : null;
this.slowDiskTracker = dataNodeDiskStatsEnabled ?
new SlowDiskTracker(conf, timer) : null;
this.defaultXferPort = NetUtils.createSocketAddr(
conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
@ -1664,6 +1678,16 @@ public class DatanodeManager {
}
}
if (slowDiskTracker != null) {
if (!slowDisks.getSlowDisks().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("DataNode " + nodeReg + " reported slow disks: " +
slowDisks.getSlowDisks());
}
slowDiskTracker.addSlowDiskReport(nodeReg.getIpcAddr(false), slowDisks);
}
}
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
@ -1875,5 +1899,13 @@ public class DatanodeManager {
public String getSlowPeersReport() {
return slowPeerTracker != null ? slowPeerTracker.getJson() : null;
}
/**
* Use only for testing.
*/
@VisibleForTesting
public SlowDiskTracker getSlowDiskTracker() {
return slowDiskTracker;
}
}

View File

@ -0,0 +1,291 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.primitives.Doubles;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class aggregates information from {@link SlowDiskReports} received via
* heartbeats.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SlowDiskTracker {
public static final Logger LOG =
LoggerFactory.getLogger(SlowPeerTracker.class);
/**
* Time duration after which a report is considered stale. This is
* set to DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY * 3 i.e.
* maintained for at least two successive reports.
*/
private long reportValidityMs;
/**
* Timer object for querying the current time. Separated out for
* unit testing.
*/
private final Timer timer;
/**
* Number of disks to include in JSON report per operation. We will return
* disks with the highest latency.
*/
private static final int MAX_DISKS_TO_REPORT = 5;
private static final String DATANODE_DISK_SEPARATOR = ":";
private final long reportGenerationIntervalMs;
private volatile long lastUpdateTime;
private AtomicBoolean isUpdateInProgress = new AtomicBoolean(false);
/**
* Information about disks that have been reported as being slow.
* It is map of (Slow Disk ID) -> (DiskLatency). The DiskLatency contains
* the disk ID, the latencies reported and the timestamp when the report
* was received.
*/
private final Map<String, DiskLatency> diskIDLatencyMap;
/**
* Map of slow disk -> diskOperations it has been reported slow in.
*/
private volatile ArrayList<DiskLatency> slowDisksReport =
Lists.newArrayList();
private volatile ArrayList<DiskLatency> oldSlowDisksCheck;
public SlowDiskTracker(Configuration conf, Timer timer) {
this.timer = timer;
this.lastUpdateTime = timer.monotonicNow();
this.diskIDLatencyMap = new ConcurrentHashMap<>();
this.reportGenerationIntervalMs = conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
this.reportValidityMs = reportGenerationIntervalMs * 3;
}
@VisibleForTesting
public static String getSlowDiskIDForReport(String datanodeID,
String slowDisk) {
return datanodeID + DATANODE_DISK_SEPARATOR + slowDisk;
}
public void addSlowDiskReport(String dataNodeID,
SlowDiskReports dnSlowDiskReport) {
Map<String, Map<DiskOp, Double>> slowDisks =
dnSlowDiskReport.getSlowDisks();
long now = timer.monotonicNow();
for (Map.Entry<String, Map<DiskOp, Double>> slowDiskEntry :
slowDisks.entrySet()) {
String diskID = getSlowDiskIDForReport(dataNodeID,
slowDiskEntry.getKey());
Map<DiskOp, Double> latencies = slowDiskEntry.getValue();
DiskLatency diskLatency = new DiskLatency(diskID, latencies, now);
diskIDLatencyMap.put(diskID, diskLatency);
}
checkAndUpdateReportIfNecessary();
}
private void checkAndUpdateReportIfNecessary() {
// Check if it is time for update
long now = timer.monotonicNow();
if (now - lastUpdateTime > reportGenerationIntervalMs) {
updateSlowDiskReportAsync(now);
}
}
@VisibleForTesting
public void updateSlowDiskReportAsync(long now) {
if (isUpdateInProgress.compareAndSet(false, true)) {
lastUpdateTime = now;
new Thread(new Runnable() {
@Override
public void run() {
slowDisksReport = getSlowDisks(diskIDLatencyMap,
MAX_DISKS_TO_REPORT, now);
cleanUpOldReports(now);
isUpdateInProgress.set(false);
}
}).start();
}
}
/**
* This structure is a thin wrapper over disk latencies.
*/
public static class DiskLatency {
@JsonProperty("SlowDiskID")
final private String slowDiskID;
@JsonProperty("Latencies")
final private Map<DiskOp, Double> latencyMap;
@JsonIgnore
private long timestamp;
/**
* Constructor needed by Jackson for Object mapping.
*/
public DiskLatency(
@JsonProperty("SlowDiskID") String slowDiskID,
@JsonProperty("Latencies") Map<DiskOp, Double> latencyMap) {
this.slowDiskID = slowDiskID;
this.latencyMap = latencyMap;
}
public DiskLatency(String slowDiskID, Map<DiskOp, Double> latencyMap,
long timestamp) {
this.slowDiskID = slowDiskID;
this.latencyMap = latencyMap;
this.timestamp = timestamp;
}
String getSlowDiskID() {
return this.slowDiskID;
}
double getMaxLatency() {
double maxLatency = 0;
for (double latency : latencyMap.values()) {
if (latency > maxLatency) {
maxLatency = latency;
}
}
return maxLatency;
}
Double getLatency(DiskOp op) {
return this.latencyMap.get(op);
}
}
/**
* Retrieve a list of stop low disks i.e disks with the highest max latencies.
* @param numDisks number of disks to return. This is to limit the size of
* the generated JSON.
*/
private ArrayList<DiskLatency> getSlowDisks(
Map<String, DiskLatency> reports, int numDisks, long now) {
if (reports.isEmpty()) {
return new ArrayList(ImmutableList.of());
}
final PriorityQueue<DiskLatency> topNReports = new PriorityQueue<>(
reports.size(),
new Comparator<DiskLatency>() {
@Override
public int compare(DiskLatency o1, DiskLatency o2) {
return Doubles.compare(
o1.getMaxLatency(), o2.getMaxLatency());
}
});
ArrayList<DiskLatency> oldSlowDiskIDs = Lists.newArrayList();
for (Map.Entry<String, DiskLatency> entry : reports.entrySet()) {
DiskLatency diskLatency = entry.getValue();
if (now - diskLatency.timestamp < reportValidityMs) {
if (topNReports.size() < numDisks) {
topNReports.add(diskLatency);
} else if (topNReports.peek().getMaxLatency() <
diskLatency.getMaxLatency()) {
topNReports.poll();
topNReports.add(diskLatency);
}
} else {
oldSlowDiskIDs.add(diskLatency);
}
}
oldSlowDisksCheck = oldSlowDiskIDs;
return Lists.newArrayList(topNReports);
}
/**
* Retrieve all valid reports as a JSON string.
* @return serialized representation of valid reports. null if
* serialization failed.
*/
public String getSlowDiskReportAsJsonString() {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(slowDisksReport);
} catch (JsonProcessingException e) {
// Failed to serialize. Don't log the exception call stack.
LOG.debug("Failed to serialize statistics" + e);
return null;
}
}
private void cleanUpOldReports(long now) {
if (oldSlowDisksCheck != null) {
for (DiskLatency oldDiskLatency : oldSlowDisksCheck) {
diskIDLatencyMap.remove(oldDiskLatency.getSlowDiskID(), oldDiskLatency);
}
}
// Replace oldSlowDiskIDsCheck with an empty ArrayList
oldSlowDisksCheck = null;
}
@VisibleForTesting
ArrayList<DiskLatency> getSlowDisksReport() {
return this.slowDisksReport;
}
@VisibleForTesting
long getReportValidityMs() {
return reportValidityMs;
}
@VisibleForTesting
void setReportValidityMs(long reportValidityMs) {
this.reportValidityMs = reportValidityMs;
}
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.metrics;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -33,9 +32,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
/**
* This class detects and maintains DataNode disk outliers and their
@ -122,43 +121,41 @@ public class DataNodeDiskMetrics {
private void detectAndUpdateDiskOutliers(Map<String, Double> metadataOpStats,
Map<String, Double> readIoStats, Map<String, Double> writeIoStats) {
Set<String> diskOutliersSet = Sets.newHashSet();
Map<String, Map<DiskOp, Double>> diskStats = Maps.newHashMap();
// Get MetadataOp Outliers
Map<String, Double> metadataOpOutliers = slowDiskDetector
.getOutliers(metadataOpStats);
if (!metadataOpOutliers.isEmpty()) {
diskOutliersSet.addAll(metadataOpOutliers.keySet());
for (Map.Entry<String, Double> entry : metadataOpOutliers.entrySet()) {
addDiskStat(diskStats, entry.getKey(), DiskOp.METADATA, entry.getValue());
}
// Get ReadIo Outliers
Map<String, Double> readIoOutliers = slowDiskDetector
.getOutliers(readIoStats);
if (!readIoOutliers.isEmpty()) {
diskOutliersSet.addAll(readIoOutliers.keySet());
for (Map.Entry<String, Double> entry : readIoOutliers.entrySet()) {
addDiskStat(diskStats, entry.getKey(), DiskOp.READ, entry.getValue());
}
// Get WriteIo Outliers
Map<String, Double> writeIoOutliers = slowDiskDetector
.getOutliers(writeIoStats);
if (!readIoOutliers.isEmpty()) {
diskOutliersSet.addAll(writeIoOutliers.keySet());
}
Map<String, Map<DiskOp, Double>> diskStats =
Maps.newHashMap();
for (String disk : diskOutliersSet) {
Map<DiskOp, Double> diskStat = Maps.newHashMap();
diskStat.put(DiskOp.METADATA, metadataOpStats.get(disk));
diskStat.put(DiskOp.READ, readIoStats.get(disk));
diskStat.put(DiskOp.WRITE, writeIoStats.get(disk));
diskStats.put(disk, diskStat);
for (Map.Entry<String, Double> entry : writeIoOutliers.entrySet()) {
addDiskStat(diskStats, entry.getKey(), DiskOp.WRITE, entry.getValue());
}
diskOutliersStats = diskStats;
LOG.debug("Updated disk outliers.");
}
private void addDiskStat(Map<String, Map<DiskOp, Double>> diskStats,
String disk, DiskOp diskOp, double latency) {
if (!diskStats.containsKey(disk)) {
diskStats.put(disk, new HashMap<>());
}
diskStats.get(disk).put(diskOp, latency);
}
public Map<String, Map<DiskOp, Double>> getDiskOutliersStats() {
return diskOutliersStats;
}

View File

@ -0,0 +1,448 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.hdfs.DFSConfigKeys
.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_FRACTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys
.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp;
import org.apache.hadoop.hdfs.server.blockmanagement.SlowDiskTracker
.DiskLatency;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.FakeTimer;
import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Tests for {@link SlowDiskTracker}.
*/
public class TestSlowDiskTracker {
public static final Logger LOG = LoggerFactory.getLogger(
TestSlowDiskTracker.class);
/**
* Set a timeout for every test case.
*/
@Rule
public Timeout testTimeout = new Timeout(300_000);
private static Configuration conf;
private SlowDiskTracker tracker;
private FakeTimer timer;
private long reportValidityMs;
private static final long OUTLIERS_REPORT_INTERVAL = 1000;
static {
conf = new HdfsConfiguration();
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setDouble(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_FRACTION_KEY, 1.0);
conf.setTimeDuration(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
OUTLIERS_REPORT_INTERVAL, TimeUnit.MILLISECONDS);
}
@Before
public void setup() {
timer = new FakeTimer();
tracker = new SlowDiskTracker(conf, timer);
reportValidityMs = tracker.getReportValidityMs();
}
@Test
public void testDataNodeHeartbeatSlowDiskReport() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.build();
try {
DataNode dn1 = cluster.getDataNodes().get(0);
DataNode dn2 = cluster.getDataNodes().get(1);
NameNode nn = cluster.getNameNode(0);
DatanodeManager datanodeManager = nn.getNamesystem().getBlockManager()
.getDatanodeManager();
SlowDiskTracker slowDiskTracker = datanodeManager.getSlowDiskTracker();
slowDiskTracker.setReportValidityMs(OUTLIERS_REPORT_INTERVAL * 100);
dn1.getDiskMetrics().addSlowDiskForTesting("disk1", ImmutableMap.of(
DiskOp.WRITE, 1.3));
dn1.getDiskMetrics().addSlowDiskForTesting("disk2", ImmutableMap.of(
DiskOp.READ, 1.6, DiskOp.WRITE, 1.1));
dn2.getDiskMetrics().addSlowDiskForTesting("disk1", ImmutableMap.of(
DiskOp.METADATA, 0.8));
dn2.getDiskMetrics().addSlowDiskForTesting("disk2", ImmutableMap.of(
DiskOp.WRITE, 1.3));
String dn1ID = dn1.getDatanodeId().getIpcAddr(false);
String dn2ID = dn2.getDatanodeId().getIpcAddr(false);
// Advance the timer and wait for NN to receive reports from DataNodes.
Thread.sleep(OUTLIERS_REPORT_INTERVAL);
// Wait for NN to receive reports from all DNs
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return (slowDiskTracker.getSlowDisksReport().size() == 4);
}
}, 1000, 100000);
Map<String, DiskLatency> slowDisksReport = getSlowDisksReportForTesting(
slowDiskTracker);
assertThat(slowDisksReport.size(), is(4));
assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk1")
.getLatency(DiskOp.WRITE) - 1.3) < 0.0000001);
assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk2")
.getLatency(DiskOp.READ) - 1.6) < 0.0000001);
assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk2")
.getLatency(DiskOp.WRITE) - 1.1) < 0.0000001);
assertTrue(Math.abs(slowDisksReport.get(dn2ID + ":disk1")
.getLatency(DiskOp.METADATA) - 0.8) < 0.0000001);
assertTrue(Math.abs(slowDisksReport.get(dn2ID + ":disk2")
.getLatency(DiskOp.WRITE) - 1.3) < 0.0000001);
// Test the slow disk report JSON string
ArrayList<DiskLatency> jsonReport = getAndDeserializeJson(
slowDiskTracker.getSlowDiskReportAsJsonString());
assertThat(jsonReport.size(), is(4));
assertTrue(isDiskInReports(jsonReport, dn1ID, "disk1", DiskOp.WRITE, 1.3));
assertTrue(isDiskInReports(jsonReport, dn1ID, "disk2", DiskOp.READ, 1.6));
assertTrue(isDiskInReports(jsonReport, dn1ID, "disk2", DiskOp.WRITE, 1.1));
assertTrue(isDiskInReports(jsonReport, dn2ID, "disk1", DiskOp.METADATA,
0.8));
assertTrue(isDiskInReports(jsonReport, dn2ID, "disk2", DiskOp.WRITE, 1.3));
} finally {
cluster.shutdown();
}
}
/**
* Edge case, there are no reports to retrieve.
*/
@Test
public void testEmptyReports() {
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
assertTrue(getSlowDisksReportForTesting(tracker).isEmpty());
}
@Test
public void testReportsAreRetrieved() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
addSlowDiskForTesting("dn1", "disk2",
ImmutableMap.of(DiskOp.READ, 1.3));
addSlowDiskForTesting("dn2", "disk2",
ImmutableMap.of(DiskOp.READ, 1.1));
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !tracker.getSlowDisksReport().isEmpty();
}
}, 500, 5000);
Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker);
assertThat(reports.size(), is(3));
assertTrue(Math.abs(reports.get("dn1:disk1")
.getLatency(DiskOp.METADATA) - 1.1) < 0.0000001);
assertTrue(Math.abs(reports.get("dn1:disk1")
.getLatency(DiskOp.READ) - 1.8) < 0.0000001);
assertTrue(Math.abs(reports.get("dn1:disk2")
.getLatency(DiskOp.READ) - 1.3) < 0.0000001);
assertTrue(Math.abs(reports.get("dn2:disk2")
.getLatency(DiskOp.READ) - 1.1) < 0.0000001);
}
/**
* Test that when all reports are expired, we get back nothing.
*/
@Test
public void testAllReportsAreExpired() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
addSlowDiskForTesting("dn1", "disk2",
ImmutableMap.of(DiskOp.READ, 1.3));
addSlowDiskForTesting("dn2", "disk2",
ImmutableMap.of(DiskOp.WRITE, 1.1));
// No reports should expire after 1ms.
timer.advance(1);
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !tracker.getSlowDisksReport().isEmpty();
}
}, 500, 5000);
Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker);
assertThat(reports.size(), is(3));
assertTrue(Math.abs(reports.get("dn1:disk1")
.getLatency(DiskOp.METADATA) - 1.1) < 0.0000001);
assertTrue(Math.abs(reports.get("dn1:disk1")
.getLatency(DiskOp.READ) - 1.8) < 0.0000001);
assertTrue(Math.abs(reports.get("dn1:disk2")
.getLatency(DiskOp.READ) - 1.3) < 0.0000001);
assertTrue(Math.abs(reports.get("dn2:disk2")
.getLatency(DiskOp.WRITE) - 1.1) < 0.0000001);
// All reports should expire after REPORT_VALIDITY_MS.
timer.advance(reportValidityMs);
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return tracker.getSlowDisksReport().isEmpty();
}
}, 500, 3000);
reports = getSlowDisksReportForTesting(tracker);
assertThat(reports.size(), is(0));
}
/**
* Test the case when a subset of reports has expired.
* Ensure that we only get back non-expired reports.
*/
@Test
public void testSomeReportsAreExpired() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
addSlowDiskForTesting("dn1", "disk2",
ImmutableMap.of(DiskOp.READ, 1.3));
timer.advance(reportValidityMs);
addSlowDiskForTesting("dn2", "disk2",
ImmutableMap.of(DiskOp.WRITE, 1.1));
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !tracker.getSlowDisksReport().isEmpty();
}
}, 500, 5000);
Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker);
assertThat(reports.size(), is(1));
assertTrue(Math.abs(reports.get("dn2:disk2")
.getLatency(DiskOp.WRITE) - 1.1) < 0.0000001);
}
/**
* Test the case when an expired report is replaced by a valid one.
*/
@Test
public void testReplacement() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
timer.advance(reportValidityMs);
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.READ, 1.4));
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !tracker.getSlowDisksReport().isEmpty();
}
}, 500, 5000);
Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker);
assertThat(reports.size(), is(1));
assertTrue(reports.get("dn1:disk1").getLatency(DiskOp.METADATA) == null);
assertTrue(Math.abs(reports.get("dn1:disk1")
.getLatency(DiskOp.READ) - 1.4) < 0.0000001);
}
@Test
public void testGetJson() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
addSlowDiskForTesting("dn1", "disk2",
ImmutableMap.of(DiskOp.READ, 1.3));
addSlowDiskForTesting("dn2", "disk2",
ImmutableMap.of(DiskOp.WRITE, 1.1));
addSlowDiskForTesting("dn3", "disk1",
ImmutableMap.of(DiskOp.WRITE, 1.1));
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return tracker.getSlowDiskReportAsJsonString() != null;
}
}, 500, 5000);
ArrayList<DiskLatency> jsonReport = getAndDeserializeJson(
tracker.getSlowDiskReportAsJsonString());
// And ensure its contents are what we expect.
assertThat(jsonReport.size(), is(4));
assertTrue(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.METADATA,
1.1));
assertTrue(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.READ, 1.8));
assertTrue(isDiskInReports(jsonReport, "dn1", "disk2", DiskOp.READ, 1.3));
assertTrue(isDiskInReports(jsonReport, "dn2", "disk2", DiskOp.WRITE, 1.1));
assertTrue(isDiskInReports(jsonReport, "dn3", "disk1", DiskOp.WRITE, 1.1));
}
@Test
public void testGetJsonSizeIsLimited() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.READ, 1.1));
addSlowDiskForTesting("dn1", "disk2",
ImmutableMap.of(DiskOp.READ, 1.2));
addSlowDiskForTesting("dn1", "disk3",
ImmutableMap.of(DiskOp.READ, 1.3));
addSlowDiskForTesting("dn2", "disk1",
ImmutableMap.of(DiskOp.READ, 1.4));
addSlowDiskForTesting("dn2", "disk2",
ImmutableMap.of(DiskOp.READ, 1.5));
addSlowDiskForTesting("dn3", "disk1",
ImmutableMap.of(DiskOp.WRITE, 1.6));
addSlowDiskForTesting("dn3", "disk2",
ImmutableMap.of(DiskOp.READ, 1.7));
addSlowDiskForTesting("dn3", "disk3",
ImmutableMap.of(DiskOp.READ, 1.2));
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return tracker.getSlowDiskReportAsJsonString() != null;
}
}, 500, 5000);
ArrayList<DiskLatency> jsonReport = getAndDeserializeJson(
tracker.getSlowDiskReportAsJsonString());
// Ensure that only the top 5 highest latencies are in the report.
assertThat(jsonReport.size(), is(5));
assertTrue(isDiskInReports(jsonReport, "dn3", "disk2", DiskOp.READ, 1.7));
assertTrue(isDiskInReports(jsonReport, "dn3", "disk1", DiskOp.WRITE, 1.6));
assertTrue(isDiskInReports(jsonReport, "dn2", "disk2", DiskOp.READ, 1.5));
assertTrue(isDiskInReports(jsonReport, "dn2", "disk1", DiskOp.READ, 1.4));
assertTrue(isDiskInReports(jsonReport, "dn1", "disk3", DiskOp.READ, 1.3));
// Remaining nodes should be in the list.
assertFalse(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.READ, 1.1));
assertFalse(isDiskInReports(jsonReport, "dn1", "disk2", DiskOp.READ, 1.2));
assertFalse(isDiskInReports(jsonReport, "dn3", "disk3", DiskOp.READ, 1.2));
}
@Test
public void testEmptyReport() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.READ, 1.1));
timer.advance(reportValidityMs);
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return tracker.getSlowDiskReportAsJsonString() != null;
}
}, 500, 5000);
ArrayList<DiskLatency> jsonReport = getAndDeserializeJson(
tracker.getSlowDiskReportAsJsonString());
assertTrue(jsonReport.isEmpty());
}
private boolean isDiskInReports(ArrayList<DiskLatency> reports,
String dataNodeID, String disk, DiskOp diskOp, double latency) {
String diskID = SlowDiskTracker.getSlowDiskIDForReport(dataNodeID, disk);
for (DiskLatency diskLatency : reports) {
if (diskLatency.getSlowDiskID().equals(diskID)) {
if (diskLatency.getLatency(diskOp) == null) {
return false;
}
if (Math.abs(diskLatency.getLatency(diskOp) - latency) < 0.0000001) {
return true;
}
}
}
return false;
}
private ArrayList<DiskLatency> getAndDeserializeJson(
final String json) throws IOException {
return (new ObjectMapper()).readValue(json,
new TypeReference<ArrayList<DiskLatency>>() {});
}
private void addSlowDiskForTesting(String dnID, String disk,
Map<DiskOp, Double> latencies) {
Map<String, Map<DiskOp, Double>> slowDisk = Maps.newHashMap();
slowDisk.put(disk, latencies);
SlowDiskReports slowDiskReport = SlowDiskReports.create(slowDisk);
tracker.addSlowDiskReport(dnID, slowDiskReport);
}
Map<String, DiskLatency> getSlowDisksReportForTesting(
SlowDiskTracker slowDiskTracker) {
Map<String, DiskLatency> slowDisksMap = Maps.newHashMap();
for (DiskLatency diskLatency : slowDiskTracker.getSlowDisksReport()) {
slowDisksMap.put(diskLatency.getSlowDiskID(), diskLatency);
}
return slowDisksMap;
}
}