HDFS-16582. Expose aggregate latency of slow node as perceived by the reporting node (#4323)
Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org> Signed-off-by: Tao Li <tomscut@apache.org>
This commit is contained in:
parent
57fe613299
commit
ab3a9cedc9
|
@ -1886,8 +1886,9 @@ public class DatanodeManager {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap);
|
LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap);
|
||||||
}
|
}
|
||||||
for (String slowNodeId : slowPeersMap.keySet()) {
|
for (Map.Entry<String, Double> slowNodeId : slowPeersMap.entrySet()) {
|
||||||
slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
|
slowPeerTracker.addReport(slowNodeId.getKey(), nodeReg.getIpcAddr(false),
|
||||||
|
slowNodeId.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,20 +58,20 @@ public class SlowPeerDisabledTracker extends SlowPeerTracker {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addReport(String slowNode, String reportingNode) {
|
public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) {
|
||||||
LOG.trace("Adding slow peer report is disabled. To enable it, please enable config {}.",
|
LOG.trace("Adding slow peer report is disabled. To enable it, please enable config {}.",
|
||||||
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
|
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getReportsForNode(String slowNode) {
|
public Set<SlowPeerLatencyWithReportingNode> getReportsForNode(String slowNode) {
|
||||||
LOG.trace("Retrieval of slow peer report is disabled. To enable it, please enable config {}.",
|
LOG.trace("Retrieval of slow peer report is disabled. To enable it, please enable config {}.",
|
||||||
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
|
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
|
||||||
return ImmutableSet.of();
|
return ImmutableSet.of();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, SortedSet<String>> getReportsForAllDataNodes() {
|
public Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> getReportsForAllDataNodes() {
|
||||||
LOG.trace("Retrieval of slow peer report for all nodes is disabled. "
|
LOG.trace("Retrieval of slow peer report for all nodes is disabled. "
|
||||||
+ "To enable it, please enable config {}.",
|
+ "To enable it, please enable config {}.",
|
||||||
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
|
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
|
||||||
|
|
|
@ -0,0 +1,84 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import java.util.SortedSet;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||||
|
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This structure is a thin wrapper over slow peer reports to make Json
|
||||||
|
* [de]serialization easy.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
final class SlowPeerJsonReport {
|
||||||
|
|
||||||
|
@JsonProperty("SlowNode")
|
||||||
|
private final String slowNode;
|
||||||
|
|
||||||
|
@JsonProperty("SlowPeerLatencyWithReportingNodes")
|
||||||
|
private final SortedSet<SlowPeerLatencyWithReportingNode> slowPeerLatencyWithReportingNodes;
|
||||||
|
|
||||||
|
SlowPeerJsonReport(
|
||||||
|
@JsonProperty("SlowNode")
|
||||||
|
String slowNode,
|
||||||
|
@JsonProperty("SlowPeerLatencyWithReportingNodes")
|
||||||
|
SortedSet<SlowPeerLatencyWithReportingNode> slowPeerLatencyWithReportingNodes) {
|
||||||
|
this.slowNode = slowNode;
|
||||||
|
this.slowPeerLatencyWithReportingNodes = slowPeerLatencyWithReportingNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSlowNode() {
|
||||||
|
return slowNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SortedSet<SlowPeerLatencyWithReportingNode> getSlowPeerLatencyWithReportingNodes() {
|
||||||
|
return slowPeerLatencyWithReportingNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SlowPeerJsonReport that = (SlowPeerJsonReport) o;
|
||||||
|
|
||||||
|
return new EqualsBuilder()
|
||||||
|
.append(slowNode, that.slowNode)
|
||||||
|
.append(slowPeerLatencyWithReportingNodes, that.slowPeerLatencyWithReportingNodes)
|
||||||
|
.isEquals();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return new HashCodeBuilder(17, 37)
|
||||||
|
.append(slowNode)
|
||||||
|
.append(slowPeerLatencyWithReportingNodes)
|
||||||
|
.toHashCode();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,88 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||||
|
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class represents the reporting node and the slow node's latency as observed by the
|
||||||
|
* reporting node. This class is used by SlowPeerJsonReport class.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
final class SlowPeerLatencyWithReportingNode
|
||||||
|
implements Comparable<SlowPeerLatencyWithReportingNode> {
|
||||||
|
|
||||||
|
@JsonProperty("ReportingNode")
|
||||||
|
private final String reportingNode;
|
||||||
|
|
||||||
|
@JsonProperty("ReportedLatency")
|
||||||
|
private final Double reportedLatency;
|
||||||
|
|
||||||
|
SlowPeerLatencyWithReportingNode(
|
||||||
|
@JsonProperty("ReportingNode")
|
||||||
|
String reportingNode,
|
||||||
|
@JsonProperty("ReportedLatency")
|
||||||
|
Double reportedLatency) {
|
||||||
|
this.reportingNode = reportingNode;
|
||||||
|
this.reportedLatency = reportedLatency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getReportingNode() {
|
||||||
|
return reportingNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Double getReportedLatency() {
|
||||||
|
return reportedLatency;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(SlowPeerLatencyWithReportingNode o) {
|
||||||
|
return this.reportingNode.compareTo(o.getReportingNode());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SlowPeerLatencyWithReportingNode that = (SlowPeerLatencyWithReportingNode) o;
|
||||||
|
|
||||||
|
return new EqualsBuilder()
|
||||||
|
.append(reportingNode, that.reportingNode)
|
||||||
|
.append(reportedLatency, that.reportedLatency)
|
||||||
|
.isEquals();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return new HashCodeBuilder(17, 37)
|
||||||
|
.append(reportingNode)
|
||||||
|
.append(reportedLatency)
|
||||||
|
.toHashCode();
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.databind.ObjectWriter;
|
import com.fasterxml.jackson.databind.ObjectWriter;
|
||||||
|
@ -37,7 +36,6 @@ import org.slf4j.LoggerFactory;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -95,7 +93,7 @@ public class SlowPeerTracker {
|
||||||
* Stale reports are not evicted proactively and can potentially
|
* Stale reports are not evicted proactively and can potentially
|
||||||
* hang around forever.
|
* hang around forever.
|
||||||
*/
|
*/
|
||||||
private final ConcurrentMap<String, ConcurrentMap<String, Long>>
|
private final ConcurrentMap<String, ConcurrentMap<String, LatencyWithLastReportTime>>
|
||||||
allReports;
|
allReports;
|
||||||
|
|
||||||
public SlowPeerTracker(Configuration conf, Timer timer) {
|
public SlowPeerTracker(Configuration conf, Timer timer) {
|
||||||
|
@ -123,12 +121,12 @@ public class SlowPeerTracker {
|
||||||
* Add a new report. DatanodeIds can be the DataNodeIds or addresses
|
* Add a new report. DatanodeIds can be the DataNodeIds or addresses
|
||||||
* We don't care as long as the caller is consistent.
|
* We don't care as long as the caller is consistent.
|
||||||
*
|
*
|
||||||
* @param reportingNode DataNodeId of the node reporting on its peer.
|
|
||||||
* @param slowNode DataNodeId of the peer suspected to be slow.
|
* @param slowNode DataNodeId of the peer suspected to be slow.
|
||||||
|
* @param reportingNode DataNodeId of the node reporting on its peer.
|
||||||
|
* @param slowNodeLatency Aggregate latency of slownode as reported by the reporting node.
|
||||||
*/
|
*/
|
||||||
public void addReport(String slowNode,
|
public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) {
|
||||||
String reportingNode) {
|
ConcurrentMap<String, LatencyWithLastReportTime> nodeEntries = allReports.get(slowNode);
|
||||||
ConcurrentMap<String, Long> nodeEntries = allReports.get(slowNode);
|
|
||||||
|
|
||||||
if (nodeEntries == null) {
|
if (nodeEntries == null) {
|
||||||
// putIfAbsent guards against multiple writers.
|
// putIfAbsent guards against multiple writers.
|
||||||
|
@ -137,7 +135,8 @@ public class SlowPeerTracker {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Replace the existing entry from this node, if any.
|
// Replace the existing entry from this node, if any.
|
||||||
nodeEntries.put(reportingNode, timer.monotonicNow());
|
nodeEntries.put(reportingNode,
|
||||||
|
new LatencyWithLastReportTime(timer.monotonicNow(), slowNodeLatency));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -147,8 +146,8 @@ public class SlowPeerTracker {
|
||||||
* @param slowNode target node Id.
|
* @param slowNode target node Id.
|
||||||
* @return set of reports which implicate the target node as being slow.
|
* @return set of reports which implicate the target node as being slow.
|
||||||
*/
|
*/
|
||||||
public Set<String> getReportsForNode(String slowNode) {
|
public Set<SlowPeerLatencyWithReportingNode> getReportsForNode(String slowNode) {
|
||||||
final ConcurrentMap<String, Long> nodeEntries =
|
final ConcurrentMap<String, LatencyWithLastReportTime> nodeEntries =
|
||||||
allReports.get(slowNode);
|
allReports.get(slowNode);
|
||||||
|
|
||||||
if (nodeEntries == null || nodeEntries.isEmpty()) {
|
if (nodeEntries == null || nodeEntries.isEmpty()) {
|
||||||
|
@ -163,17 +162,19 @@ public class SlowPeerTracker {
|
||||||
*
|
*
|
||||||
* @return map from SlowNodeId {@literal ->} (set of nodes reporting peers).
|
* @return map from SlowNodeId {@literal ->} (set of nodes reporting peers).
|
||||||
*/
|
*/
|
||||||
public Map<String, SortedSet<String>> getReportsForAllDataNodes() {
|
public Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> getReportsForAllDataNodes() {
|
||||||
if (allReports.isEmpty()) {
|
if (allReports.isEmpty()) {
|
||||||
return ImmutableMap.of();
|
return ImmutableMap.of();
|
||||||
}
|
}
|
||||||
|
|
||||||
final Map<String, SortedSet<String>> allNodesValidReports = new HashMap<>();
|
final Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> allNodesValidReports =
|
||||||
|
new HashMap<>();
|
||||||
final long now = timer.monotonicNow();
|
final long now = timer.monotonicNow();
|
||||||
|
|
||||||
for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
|
for (Map.Entry<String, ConcurrentMap<String, LatencyWithLastReportTime>> entry
|
||||||
allReports.entrySet()) {
|
: allReports.entrySet()) {
|
||||||
SortedSet<String> validReports = filterNodeReports(entry.getValue(), now);
|
SortedSet<SlowPeerLatencyWithReportingNode> validReports =
|
||||||
|
filterNodeReports(entry.getValue(), now);
|
||||||
if (!validReports.isEmpty()) {
|
if (!validReports.isEmpty()) {
|
||||||
allNodesValidReports.put(entry.getKey(), validReports);
|
allNodesValidReports.put(entry.getKey(), validReports);
|
||||||
}
|
}
|
||||||
|
@ -184,17 +185,18 @@ public class SlowPeerTracker {
|
||||||
/**
|
/**
|
||||||
* Filter the given reports to return just the valid ones.
|
* Filter the given reports to return just the valid ones.
|
||||||
*
|
*
|
||||||
* @param reports
|
* @param reports Current set of reports.
|
||||||
* @param now
|
* @param now Current time.
|
||||||
* @return
|
* @return Set of valid reports that were created within last reportValidityMs millis.
|
||||||
*/
|
*/
|
||||||
private SortedSet<String> filterNodeReports(
|
private SortedSet<SlowPeerLatencyWithReportingNode> filterNodeReports(
|
||||||
ConcurrentMap<String, Long> reports, long now) {
|
ConcurrentMap<String, LatencyWithLastReportTime> reports, long now) {
|
||||||
final SortedSet<String> validReports = new TreeSet<>();
|
final SortedSet<SlowPeerLatencyWithReportingNode> validReports = new TreeSet<>();
|
||||||
|
|
||||||
for (Map.Entry<String, Long> entry : reports.entrySet()) {
|
for (Map.Entry<String, LatencyWithLastReportTime> entry : reports.entrySet()) {
|
||||||
if (now - entry.getValue() < reportValidityMs) {
|
if (now - entry.getValue().getTime() < reportValidityMs) {
|
||||||
validReports.add(entry.getKey());
|
validReports.add(
|
||||||
|
new SlowPeerLatencyWithReportingNode(entry.getKey(), entry.getValue().getLatency()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return validReports;
|
return validReports;
|
||||||
|
@ -206,7 +208,7 @@ public class SlowPeerTracker {
|
||||||
* serialization failed.
|
* serialization failed.
|
||||||
*/
|
*/
|
||||||
public String getJson() {
|
public String getJson() {
|
||||||
Collection<ReportForJson> validReports = getJsonReports(
|
Collection<SlowPeerJsonReport> validReports = getJsonReports(
|
||||||
maxNodesToReport);
|
maxNodesToReport);
|
||||||
try {
|
try {
|
||||||
return WRITER.writeValueAsString(validReports);
|
return WRITER.writeValueAsString(validReports);
|
||||||
|
@ -217,42 +219,15 @@ public class SlowPeerTracker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This structure is a thin wrapper over reports to make Json
|
|
||||||
* [de]serialization easy.
|
|
||||||
*/
|
|
||||||
public static class ReportForJson {
|
|
||||||
@JsonProperty("SlowNode")
|
|
||||||
final private String slowNode;
|
|
||||||
|
|
||||||
@JsonProperty("ReportingNodes")
|
|
||||||
final private SortedSet<String> reportingNodes;
|
|
||||||
|
|
||||||
public ReportForJson(
|
|
||||||
@JsonProperty("SlowNode") String slowNode,
|
|
||||||
@JsonProperty("ReportingNodes") SortedSet<String> reportingNodes) {
|
|
||||||
this.slowNode = slowNode;
|
|
||||||
this.reportingNodes = reportingNodes;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getSlowNode() {
|
|
||||||
return slowNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
public SortedSet<String> getReportingNodes() {
|
|
||||||
return reportingNodes;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns all tracking slow peers.
|
* Returns all tracking slow peers.
|
||||||
* @param numNodes
|
* @param numNodes
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public List<String> getSlowNodes(int numNodes) {
|
public List<String> getSlowNodes(int numNodes) {
|
||||||
Collection<ReportForJson> jsonReports = getJsonReports(numNodes);
|
Collection<SlowPeerJsonReport> jsonReports = getJsonReports(numNodes);
|
||||||
ArrayList<String> slowNodes = new ArrayList<>();
|
ArrayList<String> slowNodes = new ArrayList<>();
|
||||||
for (ReportForJson jsonReport : jsonReports) {
|
for (SlowPeerJsonReport jsonReport : jsonReports) {
|
||||||
slowNodes.add(jsonReport.getSlowNode());
|
slowNodes.add(jsonReport.getSlowNode());
|
||||||
}
|
}
|
||||||
if (!slowNodes.isEmpty()) {
|
if (!slowNodes.isEmpty()) {
|
||||||
|
@ -267,35 +242,30 @@ public class SlowPeerTracker {
|
||||||
* @param numNodes number of nodes to return. This is to limit the
|
* @param numNodes number of nodes to return. This is to limit the
|
||||||
* size of the generated JSON.
|
* size of the generated JSON.
|
||||||
*/
|
*/
|
||||||
private Collection<ReportForJson> getJsonReports(int numNodes) {
|
private Collection<SlowPeerJsonReport> getJsonReports(int numNodes) {
|
||||||
if (allReports.isEmpty()) {
|
if (allReports.isEmpty()) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
final PriorityQueue<ReportForJson> topNReports =
|
final PriorityQueue<SlowPeerJsonReport> topNReports = new PriorityQueue<>(allReports.size(),
|
||||||
new PriorityQueue<>(allReports.size(),
|
(o1, o2) -> Ints.compare(o1.getSlowPeerLatencyWithReportingNodes().size(),
|
||||||
new Comparator<ReportForJson>() {
|
o2.getSlowPeerLatencyWithReportingNodes().size()));
|
||||||
@Override
|
|
||||||
public int compare(ReportForJson o1, ReportForJson o2) {
|
|
||||||
return Ints.compare(o1.reportingNodes.size(),
|
|
||||||
o2.reportingNodes.size());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
final long now = timer.monotonicNow();
|
final long now = timer.monotonicNow();
|
||||||
|
|
||||||
for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
|
for (Map.Entry<String, ConcurrentMap<String, LatencyWithLastReportTime>> entry
|
||||||
allReports.entrySet()) {
|
: allReports.entrySet()) {
|
||||||
SortedSet<String> validReports = filterNodeReports(
|
SortedSet<SlowPeerLatencyWithReportingNode> validReports =
|
||||||
entry.getValue(), now);
|
filterNodeReports(entry.getValue(), now);
|
||||||
if (!validReports.isEmpty()) {
|
if (!validReports.isEmpty()) {
|
||||||
if (topNReports.size() < numNodes) {
|
if (topNReports.size() < numNodes) {
|
||||||
topNReports.add(new ReportForJson(entry.getKey(), validReports));
|
topNReports.add(new SlowPeerJsonReport(entry.getKey(), validReports));
|
||||||
} else if (topNReports.peek().getReportingNodes().size() <
|
} else if (topNReports.peek() != null
|
||||||
validReports.size()){
|
&& topNReports.peek().getSlowPeerLatencyWithReportingNodes().size()
|
||||||
|
< validReports.size()) {
|
||||||
// Remove the lowest element
|
// Remove the lowest element
|
||||||
topNReports.poll();
|
topNReports.poll();
|
||||||
topNReports.add(new ReportForJson(entry.getKey(), validReports));
|
topNReports.add(new SlowPeerJsonReport(entry.getKey(), validReports));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -306,4 +276,23 @@ public class SlowPeerTracker {
|
||||||
long getReportValidityMs() {
|
long getReportValidityMs() {
|
||||||
return reportValidityMs;
|
return reportValidityMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class LatencyWithLastReportTime {
|
||||||
|
private final Long time;
|
||||||
|
private final Double latency;
|
||||||
|
|
||||||
|
LatencyWithLastReportTime(Long time, Double latency) {
|
||||||
|
this.time = time;
|
||||||
|
this.latency = latency;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getTime() {
|
||||||
|
return time;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Double getLatency() {
|
||||||
|
return latency;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,13 +86,18 @@ public class TestSlowDatanodeReport {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}, 2000, 180000, "Slow nodes could not be detected");
|
}, 2000, 180000, "Slow nodes could not be detected");
|
||||||
|
LOG.info("Slow peer report: {}", cluster.getNameNode().getSlowPeersReport());
|
||||||
|
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().length() > 0);
|
||||||
|
Assert.assertTrue(
|
||||||
|
cluster.getNameNode().getSlowPeersReport().contains(slowNode.getDatanodeHostname()));
|
||||||
|
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("15.5"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultiNodesReport() throws Exception {
|
public void testMultiNodesReport() throws Exception {
|
||||||
List<DataNode> dataNodes = cluster.getDataNodes();
|
List<DataNode> dataNodes = cluster.getDataNodes();
|
||||||
dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of(
|
dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of(
|
||||||
dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 15.5));
|
dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 14.5));
|
||||||
dataNodes.get(1).getPeerMetrics().setTestOutliers(ImmutableMap.of(
|
dataNodes.get(1).getPeerMetrics().setTestOutliers(ImmutableMap.of(
|
||||||
dataNodes.get(2).getDatanodeHostname() + ":" + dataNodes.get(2).getIpcPort(), 18.7));
|
dataNodes.get(2).getDatanodeHostname() + ":" + dataNodes.get(2).getIpcPort(), 18.7));
|
||||||
DistributedFileSystem distributedFileSystem = cluster.getFileSystem();
|
DistributedFileSystem distributedFileSystem = cluster.getFileSystem();
|
||||||
|
@ -107,6 +112,14 @@ public class TestSlowDatanodeReport {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}, 2000, 200000, "Slow nodes could not be detected");
|
}, 2000, 200000, "Slow nodes could not be detected");
|
||||||
|
LOG.info("Slow peer report: {}", cluster.getNameNode().getSlowPeersReport());
|
||||||
|
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().length() > 0);
|
||||||
|
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport()
|
||||||
|
.contains(dataNodes.get(1).getDatanodeHostname()));
|
||||||
|
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport()
|
||||||
|
.contains(dataNodes.get(2).getDatanodeHostname()));
|
||||||
|
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("14.5"));
|
||||||
|
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("18.7"));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,12 +87,12 @@ public class TestReplicationPolicyExcludeSlowNodes
|
||||||
|
|
||||||
// mock slow nodes
|
// mock slow nodes
|
||||||
SlowPeerTracker tracker = dnManager.getSlowPeerTracker();
|
SlowPeerTracker tracker = dnManager.getSlowPeerTracker();
|
||||||
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr());
|
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr(), 1.29463);
|
||||||
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr());
|
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr(), 2.9576);
|
||||||
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr());
|
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr(), 3.59674);
|
||||||
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr());
|
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr(), 4.238456);
|
||||||
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr());
|
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr(), 5.18375);
|
||||||
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr());
|
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr(), 6.39576);
|
||||||
|
|
||||||
// waiting for slow nodes collector run
|
// waiting for slow nodes collector run
|
||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
|
|
|
@ -23,8 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.databind.ObjectReader;
|
import com.fasterxml.jackson.databind.ObjectReader;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker
|
|
||||||
.ReportForJson;
|
|
||||||
import org.apache.hadoop.util.FakeTimer;
|
import org.apache.hadoop.util.FakeTimer;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
|
@ -37,6 +35,7 @@ import java.io.IOException;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.hamcrest.core.Is.is;
|
import static org.hamcrest.core.Is.is;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -59,7 +58,7 @@ public class TestSlowPeerTracker {
|
||||||
private FakeTimer timer;
|
private FakeTimer timer;
|
||||||
private long reportValidityMs;
|
private long reportValidityMs;
|
||||||
private static final ObjectReader READER =
|
private static final ObjectReader READER =
|
||||||
new ObjectMapper().readerFor(new TypeReference<Set<ReportForJson>>() {});
|
new ObjectMapper().readerFor(new TypeReference<Set<SlowPeerJsonReport>>() {});
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
|
@ -80,9 +79,9 @@ public class TestSlowPeerTracker {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReportsAreRetrieved() {
|
public void testReportsAreRetrieved() {
|
||||||
tracker.addReport("node2", "node1");
|
tracker.addReport("node2", "node1", 1.2);
|
||||||
tracker.addReport("node3", "node1");
|
tracker.addReport("node3", "node1", 2.1);
|
||||||
tracker.addReport("node3", "node2");
|
tracker.addReport("node3", "node2", 1.22);
|
||||||
|
|
||||||
assertThat(tracker.getReportsForAllDataNodes().size(), is(2));
|
assertThat(tracker.getReportsForAllDataNodes().size(), is(2));
|
||||||
assertThat(tracker.getReportsForNode("node2").size(), is(1));
|
assertThat(tracker.getReportsForNode("node2").size(), is(1));
|
||||||
|
@ -95,9 +94,9 @@ public class TestSlowPeerTracker {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testAllReportsAreExpired() {
|
public void testAllReportsAreExpired() {
|
||||||
tracker.addReport("node2", "node1");
|
tracker.addReport("node2", "node1", 0.123);
|
||||||
tracker.addReport("node3", "node2");
|
tracker.addReport("node3", "node2", 0.2334);
|
||||||
tracker.addReport("node1", "node3");
|
tracker.addReport("node1", "node3", 1.234);
|
||||||
|
|
||||||
// No reports should expire after 1ms.
|
// No reports should expire after 1ms.
|
||||||
timer.advance(1);
|
timer.advance(1);
|
||||||
|
@ -117,13 +116,14 @@ public class TestSlowPeerTracker {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSomeReportsAreExpired() {
|
public void testSomeReportsAreExpired() {
|
||||||
tracker.addReport("node3", "node1");
|
tracker.addReport("node3", "node1", 1.234);
|
||||||
tracker.addReport("node3", "node2");
|
tracker.addReport("node3", "node2", 1.222);
|
||||||
timer.advance(reportValidityMs);
|
timer.advance(reportValidityMs);
|
||||||
tracker.addReport("node3", "node4");
|
tracker.addReport("node3", "node4", 1.20);
|
||||||
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
|
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
|
||||||
assertThat(tracker.getReportsForNode("node3").size(), is(1));
|
assertThat(tracker.getReportsForNode("node3").size(), is(1));
|
||||||
assertTrue(tracker.getReportsForNode("node3").contains("node4"));
|
assertEquals(1, tracker.getReportsForNode("node3").stream()
|
||||||
|
.filter(e -> e.getReportingNode().equals("node4")).count());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -131,24 +131,24 @@ public class TestSlowPeerTracker {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testReplacement() {
|
public void testReplacement() {
|
||||||
tracker.addReport("node2", "node1");
|
tracker.addReport("node2", "node1", 2.1);
|
||||||
timer.advance(reportValidityMs); // Expire the report.
|
timer.advance(reportValidityMs); // Expire the report.
|
||||||
assertThat(tracker.getReportsForAllDataNodes().size(), is(0));
|
assertThat(tracker.getReportsForAllDataNodes().size(), is(0));
|
||||||
|
|
||||||
// This should replace the expired report with a newer valid one.
|
// This should replace the expired report with a newer valid one.
|
||||||
tracker.addReport("node2", "node1");
|
tracker.addReport("node2", "node1", 0.001);
|
||||||
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
|
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
|
||||||
assertThat(tracker.getReportsForNode("node2").size(), is(1));
|
assertThat(tracker.getReportsForNode("node2").size(), is(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetJson() throws IOException {
|
public void testGetJson() throws IOException {
|
||||||
tracker.addReport("node1", "node2");
|
tracker.addReport("node1", "node2", 1.1);
|
||||||
tracker.addReport("node2", "node3");
|
tracker.addReport("node2", "node3", 1.23);
|
||||||
tracker.addReport("node2", "node1");
|
tracker.addReport("node2", "node1", 2.13);
|
||||||
tracker.addReport("node4", "node1");
|
tracker.addReport("node4", "node1", 1.244);
|
||||||
|
|
||||||
final Set<ReportForJson> reports = getAndDeserializeJson();
|
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
|
||||||
|
|
||||||
// And ensure its contents are what we expect.
|
// And ensure its contents are what we expect.
|
||||||
assertThat(reports.size(), is(3));
|
assertThat(reports.size(), is(3));
|
||||||
|
@ -161,19 +161,19 @@ public class TestSlowPeerTracker {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetJsonSizeIsLimited() throws IOException {
|
public void testGetJsonSizeIsLimited() throws IOException {
|
||||||
tracker.addReport("node1", "node2");
|
tracker.addReport("node1", "node2", 1.634);
|
||||||
tracker.addReport("node1", "node3");
|
tracker.addReport("node1", "node3", 2.3566);
|
||||||
tracker.addReport("node2", "node3");
|
tracker.addReport("node2", "node3", 3.869);
|
||||||
tracker.addReport("node2", "node4");
|
tracker.addReport("node2", "node4", 4.1356);
|
||||||
tracker.addReport("node3", "node4");
|
tracker.addReport("node3", "node4", 1.73057);
|
||||||
tracker.addReport("node3", "node5");
|
tracker.addReport("node3", "node5", 2.4956730);
|
||||||
tracker.addReport("node4", "node6");
|
tracker.addReport("node4", "node6", 3.29847);
|
||||||
tracker.addReport("node5", "node6");
|
tracker.addReport("node5", "node6", 4.13444);
|
||||||
tracker.addReport("node5", "node7");
|
tracker.addReport("node5", "node7", 5.10845);
|
||||||
tracker.addReport("node6", "node7");
|
tracker.addReport("node6", "node8", 2.37464);
|
||||||
tracker.addReport("node6", "node8");
|
tracker.addReport("node6", "node7", 1.29475656);
|
||||||
|
|
||||||
final Set<ReportForJson> reports = getAndDeserializeJson();
|
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
|
||||||
|
|
||||||
// Ensure that node4 is not in the list since it was
|
// Ensure that node4 is not in the list since it was
|
||||||
// tagged by just one peer and we already have 5 other nodes.
|
// tagged by just one peer and we already have 5 other nodes.
|
||||||
|
@ -185,22 +185,46 @@ public class TestSlowPeerTracker {
|
||||||
assertTrue(isNodeInReports(reports, "node3"));
|
assertTrue(isNodeInReports(reports, "node3"));
|
||||||
assertTrue(isNodeInReports(reports, "node5"));
|
assertTrue(isNodeInReports(reports, "node5"));
|
||||||
assertTrue(isNodeInReports(reports, "node6"));
|
assertTrue(isNodeInReports(reports, "node6"));
|
||||||
|
|
||||||
|
assertEquals(1, reports.stream().filter(
|
||||||
|
e -> e.getSlowNode().equals("node1") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
|
||||||
|
&& e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(1.634)
|
||||||
|
&& e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency().equals(2.3566))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
assertEquals(1, reports.stream().filter(
|
||||||
|
e -> e.getSlowNode().equals("node2") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
|
||||||
|
&& e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(3.869)
|
||||||
|
&& e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency().equals(4.1356))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
assertEquals(1, reports.stream().filter(
|
||||||
|
e -> e.getSlowNode().equals("node3") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
|
||||||
|
&& e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(1.73057)
|
||||||
|
&& e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency()
|
||||||
|
.equals(2.4956730)).count());
|
||||||
|
|
||||||
|
assertEquals(1, reports.stream().filter(
|
||||||
|
e -> e.getSlowNode().equals("node6") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
|
||||||
|
&& e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency()
|
||||||
|
.equals(1.29475656) && e.getSlowPeerLatencyWithReportingNodes().last()
|
||||||
|
.getReportedLatency().equals(2.37464)).count());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLowRankedElementsIgnored() throws IOException {
|
public void testLowRankedElementsIgnored() throws IOException {
|
||||||
// Insert 5 nodes with 2 peer reports each.
|
// Insert 5 nodes with 2 peer reports each.
|
||||||
for (int i = 0; i < 5; ++i) {
|
for (int i = 0; i < 5; ++i) {
|
||||||
tracker.addReport("node" + i, "reporter1");
|
tracker.addReport("node" + i, "reporter1", 1.295673);
|
||||||
tracker.addReport("node" + i, "reporter2");
|
tracker.addReport("node" + i, "reporter2", 2.38560);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert 10 nodes with 1 peer report each.
|
// Insert 10 nodes with 1 peer report each.
|
||||||
for (int i = 10; i < 20; ++i) {
|
for (int i = 10; i < 20; ++i) {
|
||||||
tracker.addReport("node" + i, "reporter1");
|
tracker.addReport("node" + i, "reporter1", 3.4957);
|
||||||
}
|
}
|
||||||
|
|
||||||
final Set<ReportForJson> reports = getAndDeserializeJson();
|
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
|
||||||
|
|
||||||
// Ensure that only the first 5 nodes with two reports each were
|
// Ensure that only the first 5 nodes with two reports each were
|
||||||
// included in the JSON.
|
// included in the JSON.
|
||||||
|
@ -210,8 +234,8 @@ public class TestSlowPeerTracker {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isNodeInReports(
|
private boolean isNodeInReports(
|
||||||
Set<ReportForJson> reports, String node) {
|
Set<SlowPeerJsonReport> reports, String node) {
|
||||||
for (ReportForJson report : reports) {
|
for (SlowPeerJsonReport report : reports) {
|
||||||
if (report.getSlowNode().equalsIgnoreCase(node)) {
|
if (report.getSlowNode().equalsIgnoreCase(node)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -219,7 +243,7 @@ public class TestSlowPeerTracker {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<ReportForJson> getAndDeserializeJson()
|
private Set<SlowPeerJsonReport> getAndDeserializeJson()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final String json = tracker.getJson();
|
final String json = tracker.getJson();
|
||||||
LOG.info("Got JSON: {}", json);
|
LOG.info("Got JSON: {}", json);
|
||||||
|
|
Loading…
Reference in New Issue