HDFS-16582. Expose aggregate latency of slow node as perceived by the reporting node (#4323)

Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
Signed-off-by: Tao Li <tomscut@apache.org>
This commit is contained in:
Viraj Jasani 2022-05-20 18:28:59 -07:00 committed by GitHub
parent 8dd3ef1f08
commit 93a13202d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 322 additions and 123 deletions

View File

@ -1901,8 +1901,9 @@ public class DatanodeManager {
if (LOG.isDebugEnabled()) {
LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap);
}
for (String slowNodeId : slowPeersMap.keySet()) {
slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
for (Map.Entry<String, Double> slowNodeId : slowPeersMap.entrySet()) {
slowPeerTracker.addReport(slowNodeId.getKey(), nodeReg.getIpcAddr(false),
slowNodeId.getValue());
}
}
}

View File

@ -58,20 +58,20 @@ public class SlowPeerDisabledTracker extends SlowPeerTracker {
}
@Override
public void addReport(String slowNode, String reportingNode) {
public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) {
LOG.trace("Adding slow peer report is disabled. To enable it, please enable config {}.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
}
@Override
public Set<String> getReportsForNode(String slowNode) {
public Set<SlowPeerLatencyWithReportingNode> getReportsForNode(String slowNode) {
LOG.trace("Retrieval of slow peer report is disabled. To enable it, please enable config {}.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
return ImmutableSet.of();
}
@Override
public Map<String, SortedSet<String>> getReportsForAllDataNodes() {
public Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> getReportsForAllDataNodes() {
LOG.trace("Retrieval of slow peer report for all nodes is disabled. "
+ "To enable it, please enable config {}.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.SortedSet;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* This structure is a thin wrapper over slow peer reports to make Json
* [de]serialization easy.
*/
@InterfaceAudience.Private
final class SlowPeerJsonReport {
@JsonProperty("SlowNode")
private final String slowNode;
@JsonProperty("SlowPeerLatencyWithReportingNodes")
private final SortedSet<SlowPeerLatencyWithReportingNode> slowPeerLatencyWithReportingNodes;
SlowPeerJsonReport(
@JsonProperty("SlowNode")
String slowNode,
@JsonProperty("SlowPeerLatencyWithReportingNodes")
SortedSet<SlowPeerLatencyWithReportingNode> slowPeerLatencyWithReportingNodes) {
this.slowNode = slowNode;
this.slowPeerLatencyWithReportingNodes = slowPeerLatencyWithReportingNodes;
}
public String getSlowNode() {
return slowNode;
}
public SortedSet<SlowPeerLatencyWithReportingNode> getSlowPeerLatencyWithReportingNodes() {
return slowPeerLatencyWithReportingNodes;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SlowPeerJsonReport that = (SlowPeerJsonReport) o;
return new EqualsBuilder()
.append(slowNode, that.slowNode)
.append(slowPeerLatencyWithReportingNodes, that.slowPeerLatencyWithReportingNodes)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(slowNode)
.append(slowPeerLatencyWithReportingNodes)
.toHashCode();
}
}

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* This class represents the reporting node and the slow node's latency as observed by the
* reporting node. This class is used by SlowPeerJsonReport class.
*/
@InterfaceAudience.Private
final class SlowPeerLatencyWithReportingNode
implements Comparable<SlowPeerLatencyWithReportingNode> {
@JsonProperty("ReportingNode")
private final String reportingNode;
@JsonProperty("ReportedLatency")
private final Double reportedLatency;
SlowPeerLatencyWithReportingNode(
@JsonProperty("ReportingNode")
String reportingNode,
@JsonProperty("ReportedLatency")
Double reportedLatency) {
this.reportingNode = reportingNode;
this.reportedLatency = reportedLatency;
}
public String getReportingNode() {
return reportingNode;
}
public Double getReportedLatency() {
return reportedLatency;
}
@Override
public int compareTo(SlowPeerLatencyWithReportingNode o) {
return this.reportingNode.compareTo(o.getReportingNode());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SlowPeerLatencyWithReportingNode that = (SlowPeerLatencyWithReportingNode) o;
return new EqualsBuilder()
.append(reportingNode, that.reportingNode)
.append(reportedLatency, that.reportedLatency)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(reportingNode)
.append(reportedLatency)
.toHashCode();
}
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
@ -37,7 +36,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -95,7 +93,7 @@ public class SlowPeerTracker {
* Stale reports are not evicted proactively and can potentially
* hang around forever.
*/
private final ConcurrentMap<String, ConcurrentMap<String, Long>>
private final ConcurrentMap<String, ConcurrentMap<String, LatencyWithLastReportTime>>
allReports;
public SlowPeerTracker(Configuration conf, Timer timer) {
@ -123,12 +121,12 @@ public class SlowPeerTracker {
* Add a new report. DatanodeIds can be the DataNodeIds or addresses
* We don't care as long as the caller is consistent.
*
* @param reportingNode DataNodeId of the node reporting on its peer.
* @param slowNode DataNodeId of the peer suspected to be slow.
* @param reportingNode DataNodeId of the node reporting on its peer.
* @param slowNodeLatency Aggregate latency of slownode as reported by the reporting node.
*/
public void addReport(String slowNode,
String reportingNode) {
ConcurrentMap<String, Long> nodeEntries = allReports.get(slowNode);
public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) {
ConcurrentMap<String, LatencyWithLastReportTime> nodeEntries = allReports.get(slowNode);
if (nodeEntries == null) {
// putIfAbsent guards against multiple writers.
@ -137,7 +135,8 @@ public class SlowPeerTracker {
}
// Replace the existing entry from this node, if any.
nodeEntries.put(reportingNode, timer.monotonicNow());
nodeEntries.put(reportingNode,
new LatencyWithLastReportTime(timer.monotonicNow(), slowNodeLatency));
}
/**
@ -147,8 +146,8 @@ public class SlowPeerTracker {
* @param slowNode target node Id.
* @return set of reports which implicate the target node as being slow.
*/
public Set<String> getReportsForNode(String slowNode) {
final ConcurrentMap<String, Long> nodeEntries =
public Set<SlowPeerLatencyWithReportingNode> getReportsForNode(String slowNode) {
final ConcurrentMap<String, LatencyWithLastReportTime> nodeEntries =
allReports.get(slowNode);
if (nodeEntries == null || nodeEntries.isEmpty()) {
@ -163,17 +162,19 @@ public class SlowPeerTracker {
*
* @return map from SlowNodeId {@literal ->} (set of nodes reporting peers).
*/
public Map<String, SortedSet<String>> getReportsForAllDataNodes() {
public Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> getReportsForAllDataNodes() {
if (allReports.isEmpty()) {
return ImmutableMap.of();
}
final Map<String, SortedSet<String>> allNodesValidReports = new HashMap<>();
final Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> allNodesValidReports =
new HashMap<>();
final long now = timer.monotonicNow();
for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
allReports.entrySet()) {
SortedSet<String> validReports = filterNodeReports(entry.getValue(), now);
for (Map.Entry<String, ConcurrentMap<String, LatencyWithLastReportTime>> entry
: allReports.entrySet()) {
SortedSet<SlowPeerLatencyWithReportingNode> validReports =
filterNodeReports(entry.getValue(), now);
if (!validReports.isEmpty()) {
allNodesValidReports.put(entry.getKey(), validReports);
}
@ -184,17 +185,18 @@ public class SlowPeerTracker {
/**
* Filter the given reports to return just the valid ones.
*
* @param reports
* @param now
* @return
* @param reports Current set of reports.
* @param now Current time.
* @return Set of valid reports that were created within last reportValidityMs millis.
*/
private SortedSet<String> filterNodeReports(
ConcurrentMap<String, Long> reports, long now) {
final SortedSet<String> validReports = new TreeSet<>();
private SortedSet<SlowPeerLatencyWithReportingNode> filterNodeReports(
ConcurrentMap<String, LatencyWithLastReportTime> reports, long now) {
final SortedSet<SlowPeerLatencyWithReportingNode> validReports = new TreeSet<>();
for (Map.Entry<String, Long> entry : reports.entrySet()) {
if (now - entry.getValue() < reportValidityMs) {
validReports.add(entry.getKey());
for (Map.Entry<String, LatencyWithLastReportTime> entry : reports.entrySet()) {
if (now - entry.getValue().getTime() < reportValidityMs) {
validReports.add(
new SlowPeerLatencyWithReportingNode(entry.getKey(), entry.getValue().getLatency()));
}
}
return validReports;
@ -206,7 +208,7 @@ public class SlowPeerTracker {
* serialization failed.
*/
public String getJson() {
Collection<ReportForJson> validReports = getJsonReports(
Collection<SlowPeerJsonReport> validReports = getJsonReports(
maxNodesToReport);
try {
return WRITER.writeValueAsString(validReports);
@ -217,42 +219,15 @@ public class SlowPeerTracker {
}
}
/**
* This structure is a thin wrapper over reports to make Json
* [de]serialization easy.
*/
public static class ReportForJson {
@JsonProperty("SlowNode")
final private String slowNode;
@JsonProperty("ReportingNodes")
final private SortedSet<String> reportingNodes;
public ReportForJson(
@JsonProperty("SlowNode") String slowNode,
@JsonProperty("ReportingNodes") SortedSet<String> reportingNodes) {
this.slowNode = slowNode;
this.reportingNodes = reportingNodes;
}
public String getSlowNode() {
return slowNode;
}
public SortedSet<String> getReportingNodes() {
return reportingNodes;
}
}
/**
* Returns all tracking slow peers.
* @param numNodes
* @return
*/
public List<String> getSlowNodes(int numNodes) {
Collection<ReportForJson> jsonReports = getJsonReports(numNodes);
Collection<SlowPeerJsonReport> jsonReports = getJsonReports(numNodes);
ArrayList<String> slowNodes = new ArrayList<>();
for (ReportForJson jsonReport : jsonReports) {
for (SlowPeerJsonReport jsonReport : jsonReports) {
slowNodes.add(jsonReport.getSlowNode());
}
if (!slowNodes.isEmpty()) {
@ -267,35 +242,30 @@ public class SlowPeerTracker {
* @param numNodes number of nodes to return. This is to limit the
* size of the generated JSON.
*/
private Collection<ReportForJson> getJsonReports(int numNodes) {
private Collection<SlowPeerJsonReport> getJsonReports(int numNodes) {
if (allReports.isEmpty()) {
return Collections.emptyList();
}
final PriorityQueue<ReportForJson> topNReports =
new PriorityQueue<>(allReports.size(),
new Comparator<ReportForJson>() {
@Override
public int compare(ReportForJson o1, ReportForJson o2) {
return Ints.compare(o1.reportingNodes.size(),
o2.reportingNodes.size());
}
});
final PriorityQueue<SlowPeerJsonReport> topNReports = new PriorityQueue<>(allReports.size(),
(o1, o2) -> Ints.compare(o1.getSlowPeerLatencyWithReportingNodes().size(),
o2.getSlowPeerLatencyWithReportingNodes().size()));
final long now = timer.monotonicNow();
for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
allReports.entrySet()) {
SortedSet<String> validReports = filterNodeReports(
entry.getValue(), now);
for (Map.Entry<String, ConcurrentMap<String, LatencyWithLastReportTime>> entry
: allReports.entrySet()) {
SortedSet<SlowPeerLatencyWithReportingNode> validReports =
filterNodeReports(entry.getValue(), now);
if (!validReports.isEmpty()) {
if (topNReports.size() < numNodes) {
topNReports.add(new ReportForJson(entry.getKey(), validReports));
} else if (topNReports.peek().getReportingNodes().size() <
validReports.size()){
topNReports.add(new SlowPeerJsonReport(entry.getKey(), validReports));
} else if (topNReports.peek() != null
&& topNReports.peek().getSlowPeerLatencyWithReportingNodes().size()
< validReports.size()) {
// Remove the lowest element
topNReports.poll();
topNReports.add(new ReportForJson(entry.getKey(), validReports));
topNReports.add(new SlowPeerJsonReport(entry.getKey(), validReports));
}
}
}
@ -306,4 +276,23 @@ public class SlowPeerTracker {
long getReportValidityMs() {
return reportValidityMs;
}
private static class LatencyWithLastReportTime {
private final Long time;
private final Double latency;
LatencyWithLastReportTime(Long time, Double latency) {
this.time = time;
this.latency = latency;
}
public Long getTime() {
return time;
}
public Double getLatency() {
return latency;
}
}
}

View File

@ -86,13 +86,18 @@ public class TestSlowDatanodeReport {
return false;
}
}, 2000, 180000, "Slow nodes could not be detected");
LOG.info("Slow peer report: {}", cluster.getNameNode().getSlowPeersReport());
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().length() > 0);
Assert.assertTrue(
cluster.getNameNode().getSlowPeersReport().contains(slowNode.getDatanodeHostname()));
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("15.5"));
}
@Test
public void testMultiNodesReport() throws Exception {
List<DataNode> dataNodes = cluster.getDataNodes();
dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of(
dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 15.5));
dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 14.5));
dataNodes.get(1).getPeerMetrics().setTestOutliers(ImmutableMap.of(
dataNodes.get(2).getDatanodeHostname() + ":" + dataNodes.get(2).getIpcPort(), 18.7));
DistributedFileSystem distributedFileSystem = cluster.getFileSystem();
@ -107,6 +112,14 @@ public class TestSlowDatanodeReport {
return false;
}
}, 2000, 200000, "Slow nodes could not be detected");
LOG.info("Slow peer report: {}", cluster.getNameNode().getSlowPeersReport());
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().length() > 0);
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport()
.contains(dataNodes.get(1).getDatanodeHostname()));
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport()
.contains(dataNodes.get(2).getDatanodeHostname()));
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("14.5"));
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("18.7"));
}
}

View File

@ -88,12 +88,12 @@ public class TestReplicationPolicyExcludeSlowNodes
// mock slow nodes
SlowPeerTracker tracker = dnManager.getSlowPeerTracker();
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr());
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr());
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr());
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr());
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr());
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr());
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr(), 1.29463);
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr(), 2.9576);
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr(), 3.59674);
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr(), 4.238456);
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr(), 5.18375);
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr(), 6.39576);
// waiting for slow nodes collector run
Thread.sleep(3000);

View File

@ -23,8 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker
.ReportForJson;
import org.apache.hadoop.util.FakeTimer;
import org.junit.Before;
import org.junit.Rule;
@ -37,6 +35,7 @@ import java.io.IOException;
import java.util.Set;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@ -59,7 +58,7 @@ public class TestSlowPeerTracker {
private FakeTimer timer;
private long reportValidityMs;
private static final ObjectReader READER =
new ObjectMapper().readerFor(new TypeReference<Set<ReportForJson>>() {});
new ObjectMapper().readerFor(new TypeReference<Set<SlowPeerJsonReport>>() {});
@Before
public void setup() {
@ -80,9 +79,9 @@ public class TestSlowPeerTracker {
@Test
public void testReportsAreRetrieved() {
tracker.addReport("node2", "node1");
tracker.addReport("node3", "node1");
tracker.addReport("node3", "node2");
tracker.addReport("node2", "node1", 1.2);
tracker.addReport("node3", "node1", 2.1);
tracker.addReport("node3", "node2", 1.22);
assertThat(tracker.getReportsForAllDataNodes().size(), is(2));
assertThat(tracker.getReportsForNode("node2").size(), is(1));
@ -95,9 +94,9 @@ public class TestSlowPeerTracker {
*/
@Test
public void testAllReportsAreExpired() {
tracker.addReport("node2", "node1");
tracker.addReport("node3", "node2");
tracker.addReport("node1", "node3");
tracker.addReport("node2", "node1", 0.123);
tracker.addReport("node3", "node2", 0.2334);
tracker.addReport("node1", "node3", 1.234);
// No reports should expire after 1ms.
timer.advance(1);
@ -117,13 +116,14 @@ public class TestSlowPeerTracker {
*/
@Test
public void testSomeReportsAreExpired() {
tracker.addReport("node3", "node1");
tracker.addReport("node3", "node2");
tracker.addReport("node3", "node1", 1.234);
tracker.addReport("node3", "node2", 1.222);
timer.advance(reportValidityMs);
tracker.addReport("node3", "node4");
tracker.addReport("node3", "node4", 1.20);
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
assertThat(tracker.getReportsForNode("node3").size(), is(1));
assertTrue(tracker.getReportsForNode("node3").contains("node4"));
assertEquals(1, tracker.getReportsForNode("node3").stream()
.filter(e -> e.getReportingNode().equals("node4")).count());
}
/**
@ -131,24 +131,24 @@ public class TestSlowPeerTracker {
*/
@Test
public void testReplacement() {
tracker.addReport("node2", "node1");
tracker.addReport("node2", "node1", 2.1);
timer.advance(reportValidityMs); // Expire the report.
assertThat(tracker.getReportsForAllDataNodes().size(), is(0));
// This should replace the expired report with a newer valid one.
tracker.addReport("node2", "node1");
tracker.addReport("node2", "node1", 0.001);
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
assertThat(tracker.getReportsForNode("node2").size(), is(1));
}
@Test
public void testGetJson() throws IOException {
tracker.addReport("node1", "node2");
tracker.addReport("node2", "node3");
tracker.addReport("node2", "node1");
tracker.addReport("node4", "node1");
tracker.addReport("node1", "node2", 1.1);
tracker.addReport("node2", "node3", 1.23);
tracker.addReport("node2", "node1", 2.13);
tracker.addReport("node4", "node1", 1.244);
final Set<ReportForJson> reports = getAndDeserializeJson();
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
// And ensure its contents are what we expect.
assertThat(reports.size(), is(3));
@ -161,19 +161,19 @@ public class TestSlowPeerTracker {
@Test
public void testGetJsonSizeIsLimited() throws IOException {
tracker.addReport("node1", "node2");
tracker.addReport("node1", "node3");
tracker.addReport("node2", "node3");
tracker.addReport("node2", "node4");
tracker.addReport("node3", "node4");
tracker.addReport("node3", "node5");
tracker.addReport("node4", "node6");
tracker.addReport("node5", "node6");
tracker.addReport("node5", "node7");
tracker.addReport("node6", "node7");
tracker.addReport("node6", "node8");
tracker.addReport("node1", "node2", 1.634);
tracker.addReport("node1", "node3", 2.3566);
tracker.addReport("node2", "node3", 3.869);
tracker.addReport("node2", "node4", 4.1356);
tracker.addReport("node3", "node4", 1.73057);
tracker.addReport("node3", "node5", 2.4956730);
tracker.addReport("node4", "node6", 3.29847);
tracker.addReport("node5", "node6", 4.13444);
tracker.addReport("node5", "node7", 5.10845);
tracker.addReport("node6", "node8", 2.37464);
tracker.addReport("node6", "node7", 1.29475656);
final Set<ReportForJson> reports = getAndDeserializeJson();
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
// Ensure that node4 is not in the list since it was
// tagged by just one peer and we already have 5 other nodes.
@ -185,22 +185,46 @@ public class TestSlowPeerTracker {
assertTrue(isNodeInReports(reports, "node3"));
assertTrue(isNodeInReports(reports, "node5"));
assertTrue(isNodeInReports(reports, "node6"));
assertEquals(1, reports.stream().filter(
e -> e.getSlowNode().equals("node1") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
&& e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(1.634)
&& e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency().equals(2.3566))
.count());
assertEquals(1, reports.stream().filter(
e -> e.getSlowNode().equals("node2") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
&& e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(3.869)
&& e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency().equals(4.1356))
.count());
assertEquals(1, reports.stream().filter(
e -> e.getSlowNode().equals("node3") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
&& e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(1.73057)
&& e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency()
.equals(2.4956730)).count());
assertEquals(1, reports.stream().filter(
e -> e.getSlowNode().equals("node6") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
&& e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency()
.equals(1.29475656) && e.getSlowPeerLatencyWithReportingNodes().last()
.getReportedLatency().equals(2.37464)).count());
}
@Test
public void testLowRankedElementsIgnored() throws IOException {
// Insert 5 nodes with 2 peer reports each.
for (int i = 0; i < 5; ++i) {
tracker.addReport("node" + i, "reporter1");
tracker.addReport("node" + i, "reporter2");
tracker.addReport("node" + i, "reporter1", 1.295673);
tracker.addReport("node" + i, "reporter2", 2.38560);
}
// Insert 10 nodes with 1 peer report each.
for (int i = 10; i < 20; ++i) {
tracker.addReport("node" + i, "reporter1");
tracker.addReport("node" + i, "reporter1", 3.4957);
}
final Set<ReportForJson> reports = getAndDeserializeJson();
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
// Ensure that only the first 5 nodes with two reports each were
// included in the JSON.
@ -210,8 +234,8 @@ public class TestSlowPeerTracker {
}
private boolean isNodeInReports(
Set<ReportForJson> reports, String node) {
for (ReportForJson report : reports) {
Set<SlowPeerJsonReport> reports, String node) {
for (SlowPeerJsonReport report : reports) {
if (report.getSlowNode().equalsIgnoreCase(node)) {
return true;
}
@ -219,7 +243,7 @@ public class TestSlowPeerTracker {
return false;
}
private Set<ReportForJson> getAndDeserializeJson()
private Set<SlowPeerJsonReport> getAndDeserializeJson()
throws IOException {
final String json = tracker.getJson();
LOG.info("Got JSON: {}", json);