> entry
+ : averages.entrySet()) {
+ final String name = entry.getKey();
+ double totalSum = 0;
+ long totalCount = 0;
+
+ for (final SumAndCount sumAndCount : entry.getValue()) {
+ totalCount += sumAndCount.getCount();
+ totalSum += sumAndCount.getSum();
+ }
+
+ if (totalCount > minSamples) {
+ stats.put(name, totalSum / totalCount);
+ }
+ }
+ return stats;
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
index 899d98c4b64..44202e788e0 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
@@ -42,7 +42,8 @@ public class TestRollingAverages {
public void testRollingAveragesEmptyRollover() throws Exception {
final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
/* 5s interval and 2 windows */
- try (final RollingAverages rollingAverages = new RollingAverages(5, 2)) {
+ try (final RollingAverages rollingAverages =
+ new RollingAverages(5000, 2)) {
/* Check it initially */
rollingAverages.snapshot(rb, true);
verify(rb, never()).addGauge(
@@ -74,10 +75,10 @@ public class TestRollingAverages {
public void testRollingAveragesRollover() throws Exception {
final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
final String name = "foo2";
- final int windowSize = 5; // 5s roll over interval
+ final int windowSizeMs = 5000; // 5s roll over interval
final int numWindows = 2;
final int numOpsPerIteration = 1000;
- try (RollingAverages rollingAverages = new RollingAverages(windowSize,
+ try (RollingAverages rollingAverages = new RollingAverages(windowSizeMs,
numWindows)) {
/* Push values for three intervals */
@@ -92,7 +93,7 @@ public class TestRollingAverages {
* Sleep until 1s after the next windowSize seconds interval, to let the
* metrics roll over
*/
- final long sleep = (start + (windowSize * 1000 * i) + 1000)
+ final long sleep = (start + (windowSizeMs * i) + 1000)
- Time.monotonicNow();
Thread.sleep(sleep);
@@ -110,12 +111,12 @@ public class TestRollingAverages {
final long rollingTotal = i > 1 ? 2 * numOpsPerIteration
: numOpsPerIteration;
verify(rb).addGauge(
- info("Foo2RollingAvgTime", "Rolling average time for foo2"),
+ info("[Foo2]RollingAvgTime", "Rolling average time for foo2"),
rollingSum / rollingTotal);
/* Verify the metrics were added the right number of times */
verify(rb, times(i)).addGauge(
- eq(info("Foo2RollingAvgTime", "Rolling average time for foo2")),
+ eq(info("[Foo2]RollingAvgTime", "Rolling average time for foo2")),
anyDouble());
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java
new file mode 100644
index 00000000000..f4e3b1372af
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * A class that allows a DataNode to communicate information about all
+ * its peer DataNodes that appear to be slow.
+ *
+ * The wire representation of this structure is a list of
+ * SlowPeerReportProto messages.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class SlowPeerReports {
+ /**
+ * A map from the DataNode's DataNodeUUID to its aggregate latency
+ * as seen by the reporting node.
+ *
+ * The exact choice of the aggregate is opaque to the NameNode but it
+ * should be chosen consistently by all DataNodes in the cluster.
+ * Examples of aggregates are 90th percentile (good) and mean (not so
+ * good).
+ *
+ * The NameNode must not attempt to interpret the aggregate latencies
+ * beyond exposing them as a diagnostic. e.g. metrics. Also, comparing
+ * latencies across reports from different DataNodes may not be not
+ * meaningful and must be avoided.
+ */
+ @Nonnull
+ private final Map slowPeers;
+
+ /**
+ * An object representing a SlowPeerReports with no entries. Should
+ * be used instead of null or creating new objects when there are
+ * no slow peers to report.
+ */
+ public static final SlowPeerReports EMPTY_REPORT =
+ new SlowPeerReports(ImmutableMap.of());
+
+ private SlowPeerReports(Map slowPeers) {
+ this.slowPeers = slowPeers;
+ }
+
+ public static SlowPeerReports create(
+ @Nullable Map slowPeers) {
+ if (slowPeers == null || slowPeers.isEmpty()) {
+ return EMPTY_REPORT;
+ }
+ return new SlowPeerReports(slowPeers);
+ }
+
+ public Map getSlowPeers() {
+ return slowPeers;
+ }
+
+ public boolean haveSlowPeers() {
+ return slowPeers.size() > 0;
+ }
+
+ /**
+ * Return true if the two objects represent the same set slow peer
+ * entries. Primarily for unit testing convenience.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof SlowPeerReports)) {
+ return false;
+ }
+
+ SlowPeerReports that = (SlowPeerReports) o;
+
+ return slowPeers.equals(that.slowPeers);
+ }
+
+ @Override
+ public int hashCode() {
+ return slowPeers.hashCode();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 86848ce0284..699738e19a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -432,14 +432,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_METRICS_SESSION_ID_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
- public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY =
- "dfs.metrics.rolling.average.window.size";
- public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT =
- 3600;
- public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY =
- "dfs.metrics.rolling.average.window.numbers";
- public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT =
- 48;
+
+ // The following setting is not meant to be changed by administrators.
+ public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY =
+ "dfs.metrics.rolling.averages.window.length";
+ public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT =
+ 300 * 1000;
+
+ // The following setting is not meant to be changed by administrators.
+ public static final String DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY =
+ "dfs.metrics.rolling.average.num.windows";
+ public static final int DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT =
+ 36;
+
public static final String DFS_DATANODE_PEER_STATS_ENABLED_KEY =
"dfs.datanode.peer.stats.enabled";
public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false;
@@ -628,6 +633,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT = "dfs.block.misreplication.processing.limit";
public static final int DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT = 10000;
+ public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY =
+ "dfs.datanode.slow.peers.report.interval";
+ public static final int DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT =
+ 1800 * 1000;
+
// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
public static final boolean DFS_IMAGE_COMPRESS_DEFAULT = false;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 16caea563b6..1794cbdef9f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -71,6 +72,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
+import javax.annotation.Nonnull;
+
/**
* This class is the client side translator to translate the requests made on
* {@link DatanodeProtocol} interfaces to the RPC server implementing
@@ -132,7 +135,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
- boolean requestFullBlockReportLease) throws IOException {
+ boolean requestFullBlockReportLease,
+ @Nonnull SlowPeerReports slowPeers) throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@@ -149,6 +153,9 @@ public class DatanodeProtocolClientSideTranslatorPB implements
builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary(
volumeFailureSummary));
}
+ if (slowPeers.haveSlowPeers()) {
+ builder.addAllSlowPeers(PBHelper.convertSlowPeerInfo(slowPeers));
+ }
HeartbeatResponseProto resp;
try {
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 871b4fde2f0..cdfcf4cb6b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -120,7 +120,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements
report, request.getCacheCapacity(), request.getCacheUsed(),
request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes(),
- volumeFailureSummary, request.getRequestFullBlockReportLease());
+ volumeFailureSummary, request.getRequestFullBlockReportLease(),
+ PBHelper.convertSlowPeerInfo(request.getSlowPeersList()));
} catch (IOException e) {
throw new ServiceException(e);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index d0db9f4ad74..5ee7847113b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -19,7 +19,11 @@ package org.apache.hadoop.hdfs.protocolPB;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import com.google.protobuf.ByteString;
@@ -42,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeComm
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@@ -99,12 +104,13 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
/**
* Utilities for converting protobuf classes to and from implementation classes
* and other helper utilities to help in dealing with protobuf.
- *
+ *
* Note that when converting from an internal type to protobuf type, the
* converter never return null for protobuf type. The check for internal type
* being null must be done before calling the convert() method.
@@ -113,7 +119,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
* and to protobuf, see {@link PBHelperClient}.
*/
public class PBHelper {
- private static final RegisterCommandProto REG_CMD_PROTO =
+ private static final RegisterCommandProto REG_CMD_PROTO =
RegisterCommandProto.newBuilder().build();
private static final RegisterCommand REG_CMD = new RegisterCommand();
@@ -387,7 +393,7 @@ public class PBHelper {
return ReplicaStateProto.FINALIZED;
}
}
-
+
public static DatanodeRegistrationProto convert(
DatanodeRegistration registration) {
DatanodeRegistrationProto.Builder builder = DatanodeRegistrationProto
@@ -424,7 +430,7 @@ public class PBHelper {
return null;
}
}
-
+
public static BalancerBandwidthCommandProto convert(
BalancerBandwidthCommand bbCmd) {
return BalancerBandwidthCommandProto.newBuilder()
@@ -569,7 +575,7 @@ public class PBHelper {
List list = recoveryCmd.getBlocksList();
List recoveringBlocks = new ArrayList(
list.size());
-
+
for (RecoveringBlockProto rbp : list) {
recoveringBlocks.add(PBHelper.convert(rbp));
}
@@ -654,9 +660,9 @@ public class PBHelper {
public static ReceivedDeletedBlockInfoProto convert(
ReceivedDeletedBlockInfo receivedDeletedBlockInfo) {
- ReceivedDeletedBlockInfoProto.Builder builder =
+ ReceivedDeletedBlockInfoProto.Builder builder =
ReceivedDeletedBlockInfoProto.newBuilder();
-
+
ReceivedDeletedBlockInfoProto.BlockStatus status;
switch (receivedDeletedBlockInfo.getStatus()) {
case RECEIVING_BLOCK:
@@ -673,7 +679,7 @@ public class PBHelper {
receivedDeletedBlockInfo.getStatus());
}
builder.setStatus(status);
-
+
if (receivedDeletedBlockInfo.getDelHints() != null) {
builder.setDeleteHint(receivedDeletedBlockInfo.getDelHints());
}
@@ -700,7 +706,7 @@ public class PBHelper {
status,
proto.hasDeleteHint() ? proto.getDeleteHint() : null);
}
-
+
public static NamespaceInfoProto convert(NamespaceInfo info) {
NamespaceInfoProto.Builder builder = NamespaceInfoProto.newBuilder();
builder.setBlockPoolID(info.getBlockPoolID())
@@ -787,6 +793,45 @@ public class PBHelper {
return builder.build();
}
+ public static List convertSlowPeerInfo(
+ SlowPeerReports slowPeers) {
+ if (slowPeers.getSlowPeers().size() == 0) {
+ return Collections.emptyList();
+ }
+
+ List slowPeerInfoProtos =
+ new ArrayList<>(slowPeers.getSlowPeers().size());
+ for (Map.Entry entry :
+ slowPeers.getSlowPeers().entrySet()) {
+ slowPeerInfoProtos.add(SlowPeerReportProto.newBuilder()
+ .setDataNodeId(entry.getKey())
+ .setAggregateLatency(entry.getValue())
+ .build());
+ }
+ return slowPeerInfoProtos;
+ }
+
+ public static SlowPeerReports convertSlowPeerInfo(
+ List slowPeerProtos) {
+
+ // No slow peers, or possibly an older DataNode.
+ if (slowPeerProtos == null || slowPeerProtos.size() == 0) {
+ return SlowPeerReports.EMPTY_REPORT;
+ }
+
+ Map slowPeersMap = new HashMap<>(slowPeerProtos.size());
+ for (SlowPeerReportProto proto : slowPeerProtos) {
+ if (!proto.hasDataNodeId()) {
+ // The DataNodeId should be reported.
+ continue;
+ }
+ slowPeersMap.put(
+ proto.getDataNodeId(),
+ proto.hasAggregateLatency() ? proto.getAggregateLatency() : 0.0);
+ }
+ return SlowPeerReports.create(slowPeersMap);
+ }
+
public static JournalInfo convert(JournalInfoProto info) {
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 7782946d4b1..7f8afa9ec76 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -45,7 +45,10 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Timer;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
@@ -166,6 +169,14 @@ public class DatanodeManager {
*/
private final HashMap datanodesSoftwareVersions =
new HashMap<>(4, 0.75f);
+
+ /**
+ * True if we should process latency metrics from downstream peers.
+ */
+ private final boolean dataNodePeerStatsEnabled;
+
+ @Nullable
+ private final SlowPeerTracker slowPeerTracker;
/**
* The minimum time between resending caching directives to Datanodes,
@@ -186,6 +197,12 @@ public class DatanodeManager {
this.decomManager = new DecommissionManager(namesystem, blockManager,
heartbeatManager);
this.fsClusterStats = newFSClusterStats();
+ this.dataNodePeerStatsEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+ DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+
+ this.slowPeerTracker = dataNodePeerStatsEnabled ?
+ new SlowPeerTracker(conf, new Timer()) : null;
networktopology = NetworkTopology.getInstance(conf);
@@ -1490,7 +1507,8 @@ public class DatanodeManager {
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
- VolumeFailureSummary volumeFailureSummary) throws IOException {
+ VolumeFailureSummary volumeFailureSummary,
+ @Nonnull SlowPeerReports slowPeers) throws IOException {
final DatanodeDescriptor nodeinfo;
try {
nodeinfo = getDatanode(nodeReg);
@@ -1549,6 +1567,19 @@ public class DatanodeManager {
nodeinfo.setBalancerBandwidth(0);
}
+ if (slowPeerTracker != null) {
+ final Map slowPeersMap = slowPeers.getSlowPeers();
+ if (!slowPeersMap.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DataNode " + nodeReg + " reported slow peers: " +
+ slowPeersMap);
+ }
+ for (String slowNodeId : slowPeersMap.keySet()) {
+ slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
+ }
+ }
+ }
+
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
@@ -1751,5 +1782,14 @@ public class DatanodeManager {
this.blockInvalidateLimit = Math.max(20 * (int) (intervalSeconds),
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
}
+
+ /**
+ * Retrieve information about slow peers as a JSON.
+ * Returns null if we are not tracking slow peers.
+ * @return
+ */
+ public String getSlowPeersReport() {
+ return slowPeerTracker != null ? slowPeerTracker.getJson() : null;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
new file mode 100644
index 00000000000..a1ffd20bc4e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * This class aggregates information from {@link SlowPeerReports} received via
+ * heartbeats.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SlowPeerTracker {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SlowPeerTracker.class);
+
+ /**
+ * Time duration after which a report is considered stale. This is
+ * set to DFS_DATANODE_SLOW_PEER_REPORT_INTERVAL_KEY * 3 i.e.
+ * maintained for at least two successive reports.
+ */
+ private final long reportValidityMs;
+
+ /**
+ * Timer object for querying the current time. Separated out for
+ * unit testing.
+ */
+ private final Timer timer;
+
+ /**
+ * Number of nodes to include in JSON report. We will return nodes with
+ * the highest number of votes from peers.
+ */
+ private static final int MAX_NODES_TO_REPORT = 5;
+
+ /**
+ * Information about peers that have reported a node as being slow.
+ * Each outer map entry is a map of (DatanodeId) -> (timestamp),
+ * mapping reporting nodes to the timestamp of the last report from
+ * that node.
+ *
+ * DatanodeId could be the DataNodeId or its address. We
+ * don't care as long as the caller uses it consistently.
+ *
+ * Stale reports are not evicted proactively and can potentially
+ * hang around forever.
+ */
+ private final ConcurrentMap>
+ allReports;
+
+ public SlowPeerTracker(Configuration conf, Timer timer) {
+ this.timer = timer;
+ this.allReports = new ConcurrentHashMap<>();
+ this.reportValidityMs = conf.getTimeDuration(
+ DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY,
+ DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS) * 3;
+ }
+
+ /**
+ * Add a new report. DatanodeIds can be the DataNodeIds or addresses
+ * We don't care as long as the caller is consistent.
+ *
+ * @param reportingNode DataNodeId of the node reporting on its peer.
+ * @param slowNode DataNodeId of the peer suspected to be slow.
+ */
+ public void addReport(String slowNode,
+ String reportingNode) {
+ ConcurrentMap nodeEntries = allReports.get(slowNode);
+
+ if (nodeEntries == null) {
+ // putIfAbsent guards against multiple writers.
+ allReports.putIfAbsent(
+ slowNode, new ConcurrentHashMap());
+ nodeEntries = allReports.get(slowNode);
+ }
+
+ // Replace the existing entry from this node, if any.
+ nodeEntries.put(reportingNode, timer.monotonicNow());
+ }
+
+ /**
+ * Retrieve the non-expired reports that mark a given DataNode
+ * as slow. Stale reports are excluded.
+ *
+ * @param slowNode target node Id.
+ * @return set of reports which implicate the target node as being slow.
+ */
+ public Set getReportsForNode(String slowNode) {
+ final ConcurrentMap nodeEntries =
+ allReports.get(slowNode);
+
+ if (nodeEntries == null || nodeEntries.isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ return filterNodeReports(nodeEntries, timer.monotonicNow());
+ }
+
+ /**
+ * Retrieve all reports for all nodes. Stale reports are excluded.
+ *
+ * @return map from SlowNodeId -> (set of nodes reporting peers).
+ */
+ public Map> getReportsForAllDataNodes() {
+ if (allReports.isEmpty()) {
+ return ImmutableMap.of();
+ }
+
+ final Map> allNodesValidReports = new HashMap<>();
+ final long now = timer.monotonicNow();
+
+ for (Map.Entry> entry :
+ allReports.entrySet()) {
+ SortedSet validReports = filterNodeReports(entry.getValue(), now);
+ if (!validReports.isEmpty()) {
+ allNodesValidReports.put(entry.getKey(), validReports);
+ }
+ }
+ return allNodesValidReports;
+ }
+
+ /**
+ * Filter the given reports to return just the valid ones.
+ *
+ * @param reports
+ * @param now
+ * @return
+ */
+ private SortedSet filterNodeReports(
+ ConcurrentMap reports, long now) {
+ final SortedSet validReports = new TreeSet<>();
+
+ for (Map.Entry entry : reports.entrySet()) {
+ if (now - entry.getValue() < reportValidityMs) {
+ validReports.add(entry.getKey());
+ }
+ }
+ return validReports;
+ }
+
+ /**
+ * Retrieve all valid reports as a JSON string.
+ * @return serialized representation of valid reports. null if
+ * serialization failed.
+ */
+ public String getJson() {
+ Collection validReports = getJsonReports(
+ MAX_NODES_TO_REPORT);
+ ObjectMapper objectMapper = new ObjectMapper();
+ try {
+ return objectMapper.writeValueAsString(validReports);
+ } catch (JsonProcessingException e) {
+ // Failed to serialize. Don't log the exception call stack.
+ LOG.debug("Failed to serialize statistics" + e);
+ return null;
+ }
+ }
+
+ /**
+ * This structure is a thin wrapper over reports to make Json
+ * [de]serialization easy.
+ */
+ public static class ReportForJson {
+ @JsonProperty("SlowNode")
+ final private String slowNode;
+
+ @JsonProperty("ReportingNodes")
+ final private SortedSet reportingNodes;
+
+ public ReportForJson(
+ @JsonProperty("SlowNode") String slowNode,
+ @JsonProperty("ReportingNodes") SortedSet reportingNodes) {
+ this.slowNode = slowNode;
+ this.reportingNodes = reportingNodes;
+ }
+
+ public String getSlowNode() {
+ return slowNode;
+ }
+
+ public SortedSet getReportingNodes() {
+ return reportingNodes;
+ }
+ }
+
+ /**
+ * Retrieve reports in a structure for generating JSON, limiting the
+ * output to the top numNodes nodes i.e nodes with the most reports.
+ * @param numNodes number of nodes to return. This is to limit the
+ * size of the generated JSON.
+ */
+ private Collection getJsonReports(int numNodes) {
+ if (allReports.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ final PriorityQueue topNReports =
+ new PriorityQueue<>(allReports.size(),
+ new Comparator() {
+ @Override
+ public int compare(ReportForJson o1, ReportForJson o2) {
+ return Ints.compare(o1.reportingNodes.size(),
+ o2.reportingNodes.size());
+ }
+ });
+
+ final long now = timer.monotonicNow();
+
+ for (Map.Entry> entry :
+ allReports.entrySet()) {
+ SortedSet validReports = filterNodeReports(
+ entry.getValue(), now);
+ if (!validReports.isEmpty()) {
+ if (topNReports.size() < numNodes) {
+ topNReports.add(new ReportForJson(entry.getKey(), validReports));
+ } else if (topNReports.peek().getReportingNodes().size() <
+ validReports.size()){
+ // Remove the lowest element
+ topNReports.poll();
+ topNReports.add(new ReportForJson(entry.getKey(), validReports));
+ }
+ }
+ }
+ return topNReports;
+ }
+
+ @VisibleForTesting
+ long getReportValidityMs() {
+ return reportValidityMs;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index c7e54d7f809..5bfdaecf8c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@@ -82,13 +83,13 @@ import com.google.common.base.Joiner;
*/
@InterfaceAudience.Private
class BPServiceActor implements Runnable {
-
+
static final Logger LOG = DataNode.LOG;
final InetSocketAddress nnAddr;
HAServiceState state;
final BPOfferService bpos;
-
+
volatile long lastCacheReport = 0;
private final Scheduler scheduler;
@@ -111,7 +112,7 @@ class BPServiceActor implements Runnable {
private final IncrementalBlockReportManager ibrManager;
private DatanodeRegistration bpRegistration;
- final LinkedList bpThreadQueue
+ final LinkedList bpThreadQueue
= new LinkedList();
BPServiceActor(InetSocketAddress nnAddr, InetSocketAddress lifelineNnAddr,
@@ -127,7 +128,8 @@ class BPServiceActor implements Runnable {
this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval);
prevBlockReportId = ThreadLocalRandom.current().nextLong();
scheduler = new Scheduler(dnConf.heartBeatInterval,
- dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval);
+ dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
+ dnConf.slowPeersReportIntervalMs);
// get the value of maxDataLength.
this.maxDataLength = dnConf.getMaxDataLength();
}
@@ -156,7 +158,7 @@ class BPServiceActor implements Runnable {
public String toString() {
return bpos.toString() + " service to " + nnAddr;
}
-
+
InetSocketAddress getNNSocketAddress() {
return nnAddr;
}
@@ -214,7 +216,7 @@ class BPServiceActor implements Runnable {
* This calls versionRequest
to determine the NN's
* namespace and version info. It automatically retries until
* the NN responds or the DN is shutting down.
- *
+ *
* @return the NamespaceInfo
*/
@VisibleForTesting
@@ -230,11 +232,11 @@ class BPServiceActor implements Runnable {
} catch(IOException e ) { // namenode is not available
LOG.warn("Problem connecting to server: " + nnAddr);
}
-
+
// try again in a second
sleepAndLogInterrupts(5000, "requesting version info from NN");
}
-
+
if (nsInfo != null) {
checkNNVersion(nsInfo);
} else {
@@ -274,7 +276,7 @@ class BPServiceActor implements Runnable {
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
bpos.verifyAndSetNamespaceInfo(this, nsInfo);
-
+
// Second phase of the handshake with the NN.
register(nsInfo);
}
@@ -299,7 +301,7 @@ class BPServiceActor implements Runnable {
}
}
}
-
+
@VisibleForTesting
void triggerHeartbeatForTests() {
synchronized (ibrManager) {
@@ -488,13 +490,19 @@ class BPServiceActor implements Runnable {
LOG.debug("Sending heartbeat with " + reports.length +
" storage reports from service actor: " + this);
}
-
- scheduler.updateLastHeartbeatTime(monotonicNow());
+
+ final long now = monotonicNow();
+ scheduler.updateLastHeartbeatTime(now);
VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
- return bpNamenode.sendHeartbeat(bpRegistration,
+ final boolean slowPeersReportDue = scheduler.isSlowPeersReportDue(now);
+ final SlowPeerReports slowPeers =
+ slowPeersReportDue && dn.getPeerMetrics() != null ?
+ SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
+ SlowPeerReports.EMPTY_REPORT;
+ HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
@@ -502,7 +510,14 @@ class BPServiceActor implements Runnable {
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary,
- requestBlockReportLease);
+ requestBlockReportLease,
+ slowPeers);
+
+ if (slowPeersReportDue) {
+ // If the report was due and successfully sent, schedule the next one.
+ scheduler.scheduleNextSlowPeerReport();
+ }
+ return response;
}
@VisibleForTesting
@@ -524,14 +539,14 @@ class BPServiceActor implements Runnable {
lifelineSender.start();
}
}
-
+
private String formatThreadName(String action, InetSocketAddress addr) {
Collection dataDirs =
DataNode.getStorageLocations(dn.getConf());
return "DataNode: [" + dataDirs.toString() + "] " +
action + " to " + addr;
}
-
+
//This must be called only by blockPoolManager.
void stop() {
shouldServiceRun = false;
@@ -542,7 +557,7 @@ class BPServiceActor implements Runnable {
bpThread.interrupt();
}
}
-
+
//This must be called only by blockPoolManager
void join() {
try {
@@ -554,10 +569,10 @@ class BPServiceActor implements Runnable {
}
} catch (InterruptedException ie) { }
}
-
+
//Cleanup method to be called by current thread before exiting.
private synchronized void cleanUp() {
-
+
shouldServiceRun = false;
IOUtils.cleanup(null, bpNamenode);
IOUtils.cleanup(null, lifelineSender);
@@ -682,7 +697,7 @@ class BPServiceActor implements Runnable {
scheduler.monotonicNow() - startTime);
}
- // There is no work to do; sleep until hearbeat timer elapses,
+ // There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
} catch(RemoteException re) {
@@ -717,11 +732,11 @@ class BPServiceActor implements Runnable {
* Register one bp with the corresponding NameNode
*
* The bpDatanode needs to register with the namenode on startup in order
- * 1) to report which storage it is serving now and
+ * 1) to report which storage it is serving now and
* 2) to receive a registrationID
- *
+ *
* issued by the namenode to recognize registered datanodes.
- *
+ *
* @param nsInfo current NamespaceInfo
* @see FSNamesystem#registerDatanode(DatanodeRegistration)
* @throws IOException
@@ -749,7 +764,7 @@ class BPServiceActor implements Runnable {
sleepAndLogInterrupts(1000, "connecting to server");
}
}
-
+
LOG.info("Block pool " + this + " successfully registered with NN");
bpos.registrationSucceeded(this, bpRegistration);
@@ -835,9 +850,9 @@ class BPServiceActor implements Runnable {
/**
* Process an array of datanode commands
- *
+ *
* @param cmds an array of datanode commands
- * @return true if further processing may be required or false otherwise.
+ * @return true if further processing may be required or false otherwise.
*/
boolean processCommand(DatanodeCommand[] cmds) {
if (cmds != null) {
@@ -860,7 +875,7 @@ class BPServiceActor implements Runnable {
*/
void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block)
throws IOException {
- LocatedBlock lb = new LocatedBlock(block,
+ LocatedBlock lb = new LocatedBlock(block,
new DatanodeInfo[] {dnInfo});
bpNamenode.reportBadBlocks(new LocatedBlock[] {lb});
}
@@ -893,7 +908,7 @@ class BPServiceActor implements Runnable {
}
}
}
-
+
public void bpThreadEnqueue(BPServiceActorAction action) {
synchronized (bpThreadQueue) {
if (!bpThreadQueue.contains(action)) {
@@ -1079,18 +1094,23 @@ class BPServiceActor implements Runnable {
@VisibleForTesting
boolean resetBlockReportTime = true;
+ @VisibleForTesting
+ volatile long nextSlowPeersReportTime = monotonicNow();
+
private final AtomicBoolean forceFullBlockReport =
new AtomicBoolean(false);
private final long heartbeatIntervalMs;
private final long lifelineIntervalMs;
private final long blockReportIntervalMs;
+ private final long slowPeersReportIntervalMs;
Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
- long blockReportIntervalMs) {
+ long blockReportIntervalMs, long slowPeersReportIntervalMs) {
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.lifelineIntervalMs = lifelineIntervalMs;
this.blockReportIntervalMs = blockReportIntervalMs;
+ this.slowPeersReportIntervalMs = slowPeersReportIntervalMs;
}
// This is useful to make sure NN gets Heartbeat before Blockreport
@@ -1122,6 +1142,10 @@ class BPServiceActor implements Runnable {
lastBlockReportTime = blockReportTime;
}
+ void scheduleNextSlowPeerReport() {
+ nextSlowPeersReportTime = monotonicNow() + slowPeersReportIntervalMs;
+ }
+
long getLastHearbeatTime() {
return (monotonicNow() - lastHeartbeatTime)/1000;
}
@@ -1148,6 +1172,10 @@ class BPServiceActor implements Runnable {
return nextBlockReportTime - curTime <= 0;
}
+ boolean isSlowPeersReportDue(long curTime) {
+ return nextSlowPeersReportTime - curTime <= 0;
+ }
+
void forceFullBlockReportNow() {
forceFullBlockReport.set(true);
resetBlockReportTime = true;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 1b4bc2e927e..4b257321072 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
@@ -93,7 +94,7 @@ class BlockReceiver implements Closeable {
protected final String inAddr;
protected final String myAddr;
private String mirrorAddr;
- private String bracketedMirrorAddr;
+ private String mirrorNameForMetrics;
private DataOutputStream mirrorOut;
private Daemon responder = null;
private DataTransferThrottler throttler;
@@ -843,10 +844,9 @@ class BlockReceiver implements Closeable {
*
*/
private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
- if (isPenultimateNode && mirrorAddr != null) {
- datanode.getPeerMetrics().addSendPacketDownstream(
- bracketedMirrorAddr,
- elapsedMs);
+ final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics();
+ if (peerMetrics != null && isPenultimateNode) {
+ peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs);
}
}
@@ -927,8 +927,13 @@ class BlockReceiver implements Closeable {
boolean responderClosed = false;
mirrorOut = mirrOut;
mirrorAddr = mirrAddr;
- bracketedMirrorAddr = "[" + mirrAddr + "]";
isPenultimateNode = ((downstreams != null) && (downstreams.length == 1));
+ if (isPenultimateNode) {
+ mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ?
+ downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr());
+ LOG.debug("Will collect peer metrics for downstream node {}",
+ mirrorNameForMetrics);
+ }
throttler = throttlerArg;
this.replyOut = replyOut;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 9ba1be09230..229cd7e0202 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
@@ -65,6 +67,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.security.SaslPropertiesResolver;
+import java.util.concurrent.TimeUnit;
+
/**
* Simple class encapsulating all of the configuration that the DataNode
* loads at startup time.
@@ -91,6 +95,8 @@ public class DNConf {
private final long lifelineIntervalMs;
final long blockReportInterval;
final long blockReportSplitThreshold;
+ final boolean peerStatsEnabled;
+ final long slowPeersReportIntervalMs;
final long ibrInterval;
final long initialBlockReportDelayMs;
final long cacheReportInterval;
@@ -168,6 +174,13 @@ public class DNConf {
this.blockReportInterval = getConf().getLong(
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+ this.peerStatsEnabled = getConf().getBoolean(
+ DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+ DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+ this.slowPeersReportIntervalMs = getConf().getTimeDuration(
+ DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY,
+ DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
this.ibrInterval = getConf().getLong(
DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,
DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index fa3e0993187..6683a0e15e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -334,6 +334,7 @@ public class DataNode extends ReconfigurableBase
private int infoSecurePort;
DataNodeMetrics metrics;
+ @Nullable
private DataNodePeerMetrics peerMetrics;
private InetSocketAddress streamingAddr;
@@ -419,6 +420,7 @@ public class DataNode extends ReconfigurableBase
this.connectToDnViaHostname = false;
this.getHdfsBlockLocationsEnabled = false;
this.pipelineSupportECN = false;
+ this.dnConf = new DNConf(this);
initOOBTimeout();
storageLocationChecker = null;
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
@@ -1330,7 +1332,8 @@ public class DataNode extends ReconfigurableBase
initIpcServer();
metrics = DataNodeMetrics.create(getConf(), getDisplayName());
- peerMetrics = DataNodePeerMetrics.create(getConf(), getDisplayName());
+ peerMetrics = dnConf.peerStatsEnabled ?
+ DataNodePeerMetrics.create(getConf(), getDisplayName()) : null;
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
blockRecoveryWorker = new BlockRecoveryWorker(this);
@@ -3305,6 +3308,7 @@ public class DataNode extends ReconfigurableBase
@Override // DataNodeMXBean
public String getSendPacketDownstreamAvgInfo() {
- return peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+ return peerMetrics != null ?
+ peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson() : null;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 345ff6135b0..7e0c2bceff5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -339,7 +339,9 @@ class DataXceiver extends Receiver implements Runnable {
* the thread dies away.
*/
private void collectThreadLocalStates() {
- datanode.getPeerMetrics().collectThreadLocalStates();
+ if (datanode.getPeerMetrics() != null) {
+ datanode.getPeerMetrics().collectThreadLocalStates();
+ }
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
index 9344d1b5a5a..5241c78c019 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
@@ -18,40 +18,59 @@
package org.apache.hadoop.hdfs.server.datanode.metrics;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.metrics2.MetricsJsonBuilder;
import org.apache.hadoop.metrics2.lib.RollingAverages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
/**
* This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
* various peer operations.
*/
@InterfaceAudience.Private
+@InterfaceStability.Unstable
public class DataNodePeerMetrics {
- static final Log LOG = LogFactory.getLog(DataNodePeerMetrics.class);
+ public static final Logger LOG = LoggerFactory.getLogger(
+ DataNodePeerMetrics.class);
private final RollingAverages sendPacketDownstreamRollingAvgerages;
private final String name;
- private final boolean peerStatsEnabled;
+
+ /**
+ * Threshold in milliseconds below which a DataNode is definitely not slow.
+ */
+ private static final long LOW_THRESHOLD_MS = 5;
+
+ private final SlowNodeDetector slowNodeDetector;
+
+ /**
+ * Minimum number of packet send samples which are required to qualify
+ * for outlier detection. If the number of samples is below this then
+ * outlier detection is skipped.
+ */
+ @VisibleForTesting
+ static final long MIN_OUTLIER_DETECTION_SAMPLES = 1000;
public DataNodePeerMetrics(
final String name,
- final int windowSize,
- final int numWindows,
- final boolean peerStatsEnabled) {
+ final long windowSizeMs,
+ final int numWindows) {
this.name = name;
- this.peerStatsEnabled = peerStatsEnabled;
+ this.slowNodeDetector = new SlowNodeDetector(LOW_THRESHOLD_MS);
sendPacketDownstreamRollingAvgerages = new RollingAverages(
- windowSize,
- numWindows);
+ windowSizeMs, numWindows);
}
public String name() {
@@ -66,21 +85,18 @@ public class DataNodePeerMetrics {
? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt()
: dnName.replace(':', '-'));
- final int windowSize = conf.getInt(
- DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
- DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT);
+ final long windowSizeMs = conf.getTimeDuration(
+ DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
+ DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT,
+ TimeUnit.MILLISECONDS);
final int numWindows = conf.getInt(
- DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
- DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT);
- final boolean peerStatsEnabled = conf.getBoolean(
- DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
- DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+ DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
+ DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT);
return new DataNodePeerMetrics(
name,
- windowSize,
- numWindows,
- peerStatsEnabled);
+ windowSizeMs,
+ numWindows);
}
/**
@@ -94,9 +110,7 @@ public class DataNodePeerMetrics {
public void addSendPacketDownstream(
final String peerAddr,
final long elapsedMs) {
- if (peerStatsEnabled) {
- sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
- }
+ sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
}
/**
@@ -114,4 +128,19 @@ public class DataNodePeerMetrics {
public void collectThreadLocalStates() {
sendPacketDownstreamRollingAvgerages.collectThreadLocalStates();
}
+
+ /**
+ * Retrieve the set of dataNodes that look significantly slower
+ * than their peers.
+ */
+ public Map getOutliers() {
+ // This maps the metric name to the aggregate latency.
+ // The metric name is the datanode ID.
+ final Map stats =
+ sendPacketDownstreamRollingAvgerages.getStats(
+ MIN_OUTLIER_DETECTION_SAMPLES);
+ LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
+
+ return slowNodeDetector.getOutliers(stats);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java
new file mode 100644
index 00000000000..b6278cee655
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * A utility class to help detect nodes whose aggregate latency
+ * is an outlier within a given set.
+ *
+ * We use the median absolute deviation for outlier detection as
+ * described in the following publication:
+ *
+ * Leys, C., et al., Detecting outliers: Do not use standard deviation
+ * around the mean, use absolute deviation around the median.
+ * http://dx.doi.org/10.1016/j.jesp.2013.03.013
+ *
+ * We augment the above scheme with the following heuristics to be even
+ * more conservative:
+ *
+ * 1. Skip outlier detection if the sample size is too small.
+ * 2. Never flag nodes whose aggregate latency is below a low threshold.
+ * 3. Never flag nodes whose aggregate latency is less than a small
+ * multiple of the median.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SlowNodeDetector {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SlowNodeDetector.class);
+
+ /**
+ * Minimum number of peers to run outlier detection.
+ */
+ private static long minOutlierDetectionPeers = 10;
+
+ /**
+ * The multiplier is from Leys, C. et al.
+ */
+ private static final double MAD_MULTIPLIER = (double) 1.4826;
+
+ /**
+ * Threshold in milliseconds below which a DataNode is definitely not slow.
+ */
+ private final long lowThresholdMs;
+
+ /**
+ * Deviation multiplier. A sample is considered to be an outlier if it
+ * exceeds the median by (multiplier * median abs. deviation). 3 is a
+ * conservative choice.
+ */
+ private static final int DEVIATION_MULTIPLIER = 3;
+
+ /**
+ * If most of the samples are clustered together, the MAD can be
+ * low. The median multiplier introduces another safeguard to avoid
+ * overaggressive outlier detection.
+ */
+ @VisibleForTesting
+ static final int MEDIAN_MULTIPLIER = 3;
+
+ public SlowNodeDetector(long lowThresholdMs) {
+ this.lowThresholdMs = lowThresholdMs;
+ }
+
+ /**
+ * Return a set of DataNodes whose latency is much higher than
+ * their peers. The input is a map of (node -> aggregate latency)
+ * entries.
+ *
+ * The aggregate may be an arithmetic mean or a percentile e.g.
+ * 90th percentile. Percentiles are a better choice than median
+ * since latency is usually not a normal distribution.
+ *
+ * This method allocates temporary memory O(n) and
+ * has run time O(n.log(n)), where n = stats.size().
+ *
+ * @return
+ */
+ public Map getOutliers(Map stats) {
+ if (stats.size() < minOutlierDetectionPeers) {
+ LOG.debug("Skipping statistical outlier detection as we don't have " +
+ "latency data for enough peers. Have {}, need at least {}",
+ stats.size(), minOutlierDetectionPeers);
+ return ImmutableMap.of();
+ }
+ // Compute the median absolute deviation of the aggregates.
+ final List sorted = new ArrayList<>(stats.values());
+ Collections.sort(sorted);
+ final Double median = computeMedian(sorted);
+ final Double mad = computeMad(sorted);
+ Double upperLimitLatency = Math.max(
+ lowThresholdMs, median * MEDIAN_MULTIPLIER);
+ upperLimitLatency = Math.max(
+ upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad));
+
+ final Map slowNodes = new HashMap<>();
+
+ LOG.trace("getOutliers: List={}, MedianLatency={}, " +
+ "MedianAbsoluteDeviation={}, upperLimitLatency={}",
+ sorted, median, mad, upperLimitLatency);
+
+ // Find nodes whose latency exceeds the threshold.
+ for (Map.Entry entry : stats.entrySet()) {
+ if (entry.getValue() > upperLimitLatency) {
+ slowNodes.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ return slowNodes;
+ }
+
+ /**
+ * Compute the Median Absolute Deviation of a sorted list.
+ */
+ public static Double computeMad(List sortedValues) {
+ if (sortedValues.size() == 0) {
+ throw new IllegalArgumentException(
+ "Cannot compute the Median Absolute Deviation " +
+ "of an empty list.");
+ }
+
+ // First get the median of the values.
+ Double median = computeMedian(sortedValues);
+ List deviations = new ArrayList<>(sortedValues);
+
+ // Then update the list to store deviation from the median.
+ for (int i = 0; i < sortedValues.size(); ++i) {
+ deviations.set(i, Math.abs(sortedValues.get(i) - median));
+ }
+
+ // Finally get the median absolute deviation.
+ Collections.sort(deviations);
+ return computeMedian(deviations) * MAD_MULTIPLIER;
+ }
+
+ /**
+ * Compute the median of a sorted list.
+ */
+ public static Double computeMedian(List sortedValues) {
+ if (sortedValues.size() == 0) {
+ throw new IllegalArgumentException(
+ "Cannot compute the median of an empty list.");
+ }
+
+ Double median = sortedValues.get(sortedValues.size() / 2);
+ if (sortedValues.size() % 2 == 0) {
+ median += sortedValues.get((sortedValues.size() / 2) - 1);
+ median /= 2;
+ }
+ return median;
+ }
+
+ /**
+ * This method *must not* be used outside of unit tests.
+ */
+ @VisibleForTesting
+ static void setMinOutlierDetectionPeers(long minOutlierDetectionPeers) {
+ SlowNodeDetector.minOutlierDetectionPeers = minOutlierDetectionPeers;
+ }
+
+ @VisibleForTesting
+ static long getMinOutlierDetectionPeers() {
+ return minOutlierDetectionPeers;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 4b722f6c7b9..af1f3e87f51 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -132,6 +132,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.annotation.Nonnull;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
@@ -258,6 +259,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@@ -3617,7 +3619,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
- boolean requestFullBlockReportLease) throws IOException {
+ boolean requestFullBlockReportLease,
+ @Nonnull SlowPeerReports slowPeers) throws IOException {
readLock();
try {
//get datanode commands
@@ -3625,7 +3628,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
- xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
- xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
+ xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
+ slowPeers);
long blockReportLeaseId = 0;
if (requestFullBlockReportLease) {
blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index f2f3d8818b0..f04f65bc990 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -1834,6 +1834,12 @@ public class NameNode extends ReconfigurableBase implements
return getNamesystem().getBytesInFuture();
}
+ @Override
+ public String getSlowPeersReport() {
+ return namesystem.getBlockManager().getDatanodeManager()
+ .getSlowPeersReport();
+ }
+
/**
* Shutdown the NN immediately in an ungraceful way. Used when it would be
* unsafe for the NN to continue operating, e.g. during a failed HA state
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 8a113b27d42..59a741e1293 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -152,6 +152,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -203,6 +204,8 @@ import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
+import javax.annotation.Nonnull;
+
/**
* This class is responsible for handling all of the RPC calls to the NameNode.
* It is created, started, and stopped by {@link NameNode}.
@@ -1412,12 +1415,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount,
int failedVolumes, VolumeFailureSummary volumeFailureSummary,
- boolean requestFullBlockReportLease) throws IOException {
+ boolean requestFullBlockReportLease,
+ @Nonnull SlowPeerReports slowPeers) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
- failedVolumes, volumeFailureSummary, requestFullBlockReportLease);
+ failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
+ slowPeers);
}
@Override // DatanodeProtocol
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java
index 7b373723ed2..f46b9ae927e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java
@@ -69,4 +69,10 @@ public interface NameNodeStatusMXBean {
* @return number of bytes that can be deleted if exited from safe mode.
*/
long getBytesWithFutureGenerationStamps();
+
+ /**
+ * Retrieves information about slow DataNodes, if the feature is
+ * enabled. The report is in a JSON format.
+ */
+ String getSlowPeersReport();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index dfe081382ca..a15aea81e55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.security.KerberosInfo;
+import javax.annotation.Nonnull;
+
/**********************************************************************
* Protocol that a DFS datanode uses to communicate with the NameNode.
* It's used to upload current load information and block reports.
@@ -104,6 +106,9 @@ public interface DatanodeProtocol {
* @param volumeFailureSummary info about volume failures
* @param requestFullBlockReportLease whether to request a full block
* report lease.
+ * @param slowPeers Details of peer DataNodes that were detected as being
+ * slow to respond to packet writes. Empty report if no
+ * slow peers were detected by the DataNode.
* @throws IOException on error
*/
@Idempotent
@@ -115,7 +120,8 @@ public interface DatanodeProtocol {
int xceiverCount,
int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
- boolean requestFullBlockReportLease)
+ boolean requestFullBlockReportLease,
+ @Nonnull SlowPeerReports slowPeers)
throws IOException;
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 6bcb59ab3cd..393cc43116a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -185,6 +185,7 @@ message VolumeFailureSummaryProto {
* cacheCapacity - total cache capacity available at the datanode
* cacheUsed - amount of cache used
* volumeFailureSummary - info about volume failures
+ * slowPeers - info about peer DataNodes that are suspected to be slow.
*/
message HeartbeatRequestProto {
required DatanodeRegistrationProto registration = 1; // Datanode info
@@ -196,6 +197,7 @@ message HeartbeatRequestProto {
optional uint64 cacheUsed = 7 [default = 0 ];
optional VolumeFailureSummaryProto volumeFailureSummary = 8;
optional bool requestFullBlockReportLease = 9 [ default = false ];
+ repeated SlowPeerReportProto slowPeers = 10;
}
/**
@@ -372,6 +374,24 @@ message CommitBlockSynchronizationRequestProto {
message CommitBlockSynchronizationResponseProto {
}
+/**
+ * Information about a single slow peer that may be reported by
+ * the DataNode to the NameNode as part of the heartbeat request.
+ * The message includes the peer's DataNodeId and its
+ * aggregate packet latency as observed by the reporting DataNode.
+ * (DataNodeId must be transmitted as a string for protocol compability
+ * with earlier versions of Hadoop).
+ *
+ * The exact choice of the aggregate is opaque to the NameNode but it
+ * _should_ be chosen consistenly by all DataNodes in the cluster.
+ * Examples of aggregates are 90th percentile (good) and mean (not so
+ * good).
+ */
+message SlowPeerReportProto {
+ optional string dataNodeId = 1;
+ optional double aggregateLatency = 2;
+}
+
/**
* Protocol used from datanode to the namenode
* See the request and response for details of rpc call.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 73e6f5ca46b..281753b0c52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1930,19 +1930,15 @@
- dfs.metrics.rolling.average.window.size
- 3600
+ dfs.datanode.slow.peers.report.interval
+ 30m
- The number of seconds of each window for which sub set of samples are gathered
- to compute the rolling average, A.K.A. roll over interval.
-
-
+ This setting controls how frequently DataNodes will report their peer
+ latencies to the NameNode via heartbeats. This setting supports
+ multiple time unit suffixes as described in dfs.heartbeat.interval.
+ If no suffix is specified then milliseconds is assumed.
-
- dfs.metrics.rolling.average.window.numbers
- 48
-
- The number of windows maintained to compute the rolling average.
+ It is ignored if dfs.datanode.peer.stats.enabled is false.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index 63f11b72866..3280563c254 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
@@ -81,6 +82,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
@@ -165,7 +167,7 @@ public class TestPBHelper {
DatanodeID dn2 = PBHelperClient.convert(dnProto);
compare(dn, dn2);
}
-
+
void compare(DatanodeID dn, DatanodeID dn2) {
assertEquals(dn.getIpAddr(), dn2.getIpAddr());
assertEquals(dn.getHostName(), dn2.getHostName());
@@ -253,7 +255,7 @@ public class TestPBHelper {
ExportedBlockKeys expKeys1 = PBHelper.convert(expKeysProto);
compare(expKeys, expKeys1);
}
-
+
void compare(ExportedBlockKeys expKeys, ExportedBlockKeys expKeys1) {
BlockKey[] allKeys = expKeys.getAllKeys();
BlockKey[] allKeys1 = expKeys1.getAllKeys();
@@ -282,12 +284,12 @@ public class TestPBHelper {
s1.getMostRecentCheckpointTxId());
assertEquals(s.getNamespaceID(), s1.getNamespaceID());
}
-
+
private static void compare(RemoteEditLog l1, RemoteEditLog l2) {
assertEquals(l1.getEndTxId(), l2.getEndTxId());
assertEquals(l1.getStartTxId(), l2.getStartTxId());
}
-
+
@Test
public void testConvertRemoteEditLog() {
RemoteEditLog l = new RemoteEditLog(1, 100);
@@ -295,7 +297,7 @@ public class TestPBHelper {
RemoteEditLog l1 = PBHelper.convert(lProto);
compare(l, l1);
}
-
+
@Test
public void testConvertRemoteEditLogManifest() {
List logs = new ArrayList();
@@ -304,7 +306,7 @@ public class TestPBHelper {
RemoteEditLogManifest m = new RemoteEditLogManifest(logs);
RemoteEditLogManifestProto mProto = PBHelper.convert(m);
RemoteEditLogManifest m1 = PBHelper.convert(mProto);
-
+
List logs1 = m1.getLogs();
assertEquals(logs.size(), logs1.size());
for (int i = 0; i < logs.size(); i++) {
@@ -314,15 +316,15 @@ public class TestPBHelper {
public ExtendedBlock getExtendedBlock() {
return getExtendedBlock(1);
}
-
+
public ExtendedBlock getExtendedBlock(long blkid) {
return new ExtendedBlock("bpid", blkid, 100, 2);
}
-
+
private void compare(DatanodeInfo dn1, DatanodeInfo dn2) {
assertEquals(dn1.getAdminState(), dn2.getAdminState());
assertEquals(dn1.getBlockPoolUsed(), dn2.getBlockPoolUsed());
- assertEquals(dn1.getBlockPoolUsedPercent(),
+ assertEquals(dn1.getBlockPoolUsedPercent(),
dn2.getBlockPoolUsedPercent(), DELTA);
assertEquals(dn1.getCapacity(), dn2.getCapacity());
assertEquals(dn1.getDatanodeReport(), dn2.getDatanodeReport());
@@ -336,20 +338,20 @@ public class TestPBHelper {
assertEquals(dn1.getLevel(), dn2.getLevel());
assertEquals(dn1.getNetworkLocation(), dn2.getNetworkLocation());
}
-
+
@Test
public void testConvertExtendedBlock() {
ExtendedBlock b = getExtendedBlock();
ExtendedBlockProto bProto = PBHelperClient.convert(b);
ExtendedBlock b1 = PBHelperClient.convert(bProto);
assertEquals(b, b1);
-
+
b.setBlockId(-1);
bProto = PBHelperClient.convert(b);
b1 = PBHelperClient.convert(bProto);
assertEquals(b, b1);
}
-
+
@Test
public void testConvertRecoveringBlock() {
DatanodeInfo di1 = DFSTestUtil.getLocalDatanodeInfo();
@@ -365,7 +367,7 @@ public class TestPBHelper {
compare(dnInfo[0], dnInfo1[0]);
}
}
-
+
@Test
public void testConvertBlockRecoveryCommand() {
DatanodeInfo di1 = DFSTestUtil.getLocalDatanodeInfo();
@@ -376,14 +378,14 @@ public class TestPBHelper {
new RecoveringBlock(getExtendedBlock(1), dnInfo, 3),
new RecoveringBlock(getExtendedBlock(2), dnInfo, 3)
);
-
+
BlockRecoveryCommand cmd = new BlockRecoveryCommand(blks);
BlockRecoveryCommandProto proto = PBHelper.convert(cmd);
assertEquals(1, proto.getBlocks(0).getBlock().getB().getBlockId());
assertEquals(2, proto.getBlocks(1).getBlock().getB().getBlockId());
-
+
BlockRecoveryCommand cmd2 = PBHelper.convert(proto);
-
+
List cmd2Blks = Lists.newArrayList(
cmd2.getRecoveringBlocks());
assertEquals(blks.get(0).getBlock(), cmd2Blks.get(0).getBlock());
@@ -391,8 +393,8 @@ public class TestPBHelper {
assertEquals(Joiner.on(",").join(blks), Joiner.on(",").join(cmd2Blks));
assertEquals(cmd.toString(), cmd2.toString());
}
-
-
+
+
@Test
public void testConvertText() {
Text t = new Text("abc".getBytes());
@@ -400,7 +402,7 @@ public class TestPBHelper {
Text t1 = new Text(s);
assertEquals(t, t1);
}
-
+
@Test
public void testConvertBlockToken() {
Token token = new Token(
@@ -410,7 +412,7 @@ public class TestPBHelper {
Token token2 = PBHelperClient.convert(tokenProto);
compare(token, token2);
}
-
+
@Test
public void testConvertNamespaceInfo() {
NamespaceInfo info = new NamespaceInfo(37, "clusterID", "bpID", 2300);
@@ -455,7 +457,7 @@ public class TestPBHelper {
AdminStates.DECOMMISSION_INPROGRESS),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2",
AdminStates.DECOMMISSIONED),
- DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3",
+ DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3",
AdminStates.NORMAL),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h4",
AdminStates.NORMAL),
@@ -523,7 +525,7 @@ public class TestPBHelper {
compare(lbl.get(i), lbl2.get(2));
}
}
-
+
@Test
public void testConvertLocatedBlockArray() {
LocatedBlock [] lbl = new LocatedBlock[3];
@@ -563,7 +565,7 @@ public class TestPBHelper {
DatanodeStorage dns2 = PBHelperClient.convert(proto);
compare(dns1, dns2);
}
-
+
@Test
public void testConvertBlockCommand() {
Block[] blocks = new Block[] { new Block(21), new Block(22) };
@@ -596,7 +598,7 @@ public class TestPBHelper {
}
}
}
-
+
@Test
public void testChecksumTypeProto() {
assertEquals(DataChecksum.Type.NULL,
@@ -678,4 +680,24 @@ public class TestPBHelper {
DatanodeInfo dnInfos3 = PBHelperClient.convert(b.build());
assertEquals(dnInfos0.getNonDfsUsed(), dnInfos3.getNonDfsUsed());
}
+
+ @Test
+ public void testSlowPeerInfoPBHelper() {
+ // Test with a map that has a few slow peer entries.
+ final SlowPeerReports slowPeers = SlowPeerReports.create(
+ ImmutableMap.of("peer1", 0.0, "peer2", 1.0, "peer3", 2.0));
+ SlowPeerReports slowPeersConverted1 = PBHelper.convertSlowPeerInfo(
+ PBHelper.convertSlowPeerInfo(slowPeers));
+ assertTrue(
+ "Expected map:" + slowPeers + ", got map:" +
+ slowPeersConverted1.getSlowPeers(),
+ slowPeersConverted1.equals(slowPeers));
+
+ // Test with an empty map.
+ SlowPeerReports slowPeersConverted2 = PBHelper.convertSlowPeerInfo(
+ PBHelper.convertSlowPeerInfo(SlowPeerReports.EMPTY_REPORT));
+ assertTrue(
+ "Expected empty map:" + ", got map:" + slowPeersConverted2,
+ slowPeersConverted2.equals(SlowPeerReports.EMPTY_REPORT));
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
index ab607eaa8d4..f12f6f59f82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
@@ -42,13 +42,23 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import org.mockito.Mockito;
/**
* Test if FSNamesystem handles heartbeat right
*/
public class TestHeartbeatHandling {
+
+
+ /**
+ * Set a timeout for every test case.
+ */
+ @Rule
+ public Timeout testTimeout = new Timeout(300_000);
+
/**
* Test if
* {@link FSNamesystem#handleHeartbeat}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index 5214af3115a..fbfccbe74a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Supplier;
import java.util.ArrayList;
import java.util.Collection;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeRef
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
@@ -111,7 +113,7 @@ public class TestNameNodePrunesMissingStorages {
// Stop the DataNode and send fake heartbeat with missing storage.
cluster.stopDataNode(0);
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
- 0, null, true);
+ 0, null, true, SlowPeerReports.EMPTY_REPORT);
// Check that the missing storage was pruned.
assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
new file mode 100644
index 00000000000..15eb3a511e7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker.ReportForJson;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Tests for {@link SlowPeerTracker}.
+ */
+public class TestSlowPeerTracker {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ TestSlowPeerTracker.class);
+
+ /**
+ * Set a timeout for every test case.
+ */
+ @Rule
+ public Timeout testTimeout = new Timeout(300_000);
+
+ private Configuration conf;
+ private SlowPeerTracker tracker;
+ private FakeTimer timer;
+ private long reportValidityMs;
+
+ @Before
+ public void setup() {
+ conf = new HdfsConfiguration();
+ timer = new FakeTimer();
+ tracker = new SlowPeerTracker(conf, timer);
+ reportValidityMs = tracker.getReportValidityMs();
+ }
+
+ /**
+ * Edge case, there are no reports to retrieve.
+ */
+ @Test
+ public void testEmptyReports() {
+ assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
+ assertTrue(tracker.getReportsForNode("noSuchNode").isEmpty());
+ }
+
+ @Test
+ public void testReportsAreRetrieved() {
+ tracker.addReport("node2", "node1");
+ tracker.addReport("node3", "node1");
+ tracker.addReport("node3", "node2");
+
+ assertThat(tracker.getReportsForAllDataNodes().size(), is(2));
+ assertThat(tracker.getReportsForNode("node2").size(), is(1));
+ assertThat(tracker.getReportsForNode("node3").size(), is(2));
+ assertThat(tracker.getReportsForNode("node1").size(), is(0));
+ }
+
+ /**
+ * Test that when all reports are expired, we get back nothing.
+ */
+ @Test
+ public void testAllReportsAreExpired() {
+ tracker.addReport("node2", "node1");
+ tracker.addReport("node3", "node2");
+ tracker.addReport("node1", "node3");
+
+ // No reports should expire after 1ms.
+ timer.advance(1);
+ assertThat(tracker.getReportsForAllDataNodes().size(), is(3));
+
+ // All reports should expire after REPORT_VALIDITY_MS.
+ timer.advance(reportValidityMs);
+ assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
+ assertTrue(tracker.getReportsForNode("node1").isEmpty());
+ assertTrue(tracker.getReportsForNode("node2").isEmpty());
+ assertTrue(tracker.getReportsForNode("node3").isEmpty());
+ }
+
+ /**
+ * Test the case when a subset of reports has expired.
+ * Ensure that we only get back non-expired reports.
+ */
+ @Test
+ public void testSomeReportsAreExpired() {
+ tracker.addReport("node3", "node1");
+ tracker.addReport("node3", "node2");
+ timer.advance(reportValidityMs);
+ tracker.addReport("node3", "node4");
+ assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
+ assertThat(tracker.getReportsForNode("node3").size(), is(1));
+ assertTrue(tracker.getReportsForNode("node3").contains("node4"));
+ }
+
+ /**
+ * Test the case when an expired report is replaced by a valid one.
+ */
+ @Test
+ public void testReplacement() {
+ tracker.addReport("node2", "node1");
+ timer.advance(reportValidityMs); // Expire the report.
+ assertThat(tracker.getReportsForAllDataNodes().size(), is(0));
+
+ // This should replace the expired report with a newer valid one.
+ tracker.addReport("node2", "node1");
+ assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
+ assertThat(tracker.getReportsForNode("node2").size(), is(1));
+ }
+
+ @Test
+ public void testGetJson() throws IOException {
+ tracker.addReport("node1", "node2");
+ tracker.addReport("node2", "node3");
+ tracker.addReport("node2", "node1");
+ tracker.addReport("node4", "node1");
+
+ final Set reports = getAndDeserializeJson();
+
+ // And ensure its contents are what we expect.
+ assertThat(reports.size(), is(3));
+ assertTrue(isNodeInReports(reports, "node1"));
+ assertTrue(isNodeInReports(reports, "node2"));
+ assertTrue(isNodeInReports(reports, "node4"));
+
+ assertFalse(isNodeInReports(reports, "node3"));
+ }
+
+ @Test
+ public void testGetJsonSizeIsLimited() throws IOException {
+ tracker.addReport("node1", "node2");
+ tracker.addReport("node1", "node3");
+ tracker.addReport("node2", "node3");
+ tracker.addReport("node2", "node4");
+ tracker.addReport("node3", "node4");
+ tracker.addReport("node3", "node5");
+ tracker.addReport("node4", "node6");
+ tracker.addReport("node5", "node6");
+ tracker.addReport("node5", "node7");
+ tracker.addReport("node6", "node7");
+ tracker.addReport("node6", "node8");
+
+ final Set reports = getAndDeserializeJson();
+
+ // Ensure that node4 is not in the list since it was
+ // tagged by just one peer and we already have 5 other nodes.
+ assertFalse(isNodeInReports(reports, "node4"));
+
+ // Remaining nodes should be in the list.
+ assertTrue(isNodeInReports(reports, "node1"));
+ assertTrue(isNodeInReports(reports, "node2"));
+ assertTrue(isNodeInReports(reports, "node3"));
+ assertTrue(isNodeInReports(reports, "node5"));
+ assertTrue(isNodeInReports(reports, "node6"));
+ }
+
+ @Test
+ public void testLowRankedElementsIgnored() throws IOException {
+ // Insert 5 nodes with 2 peer reports each.
+ for (int i = 0; i < 5; ++i) {
+ tracker.addReport("node" + i, "reporter1");
+ tracker.addReport("node" + i, "reporter2");
+ }
+
+ // Insert 10 nodes with 1 peer report each.
+ for (int i = 10; i < 20; ++i) {
+ tracker.addReport("node" + i, "reporter1");
+ }
+
+ final Set reports = getAndDeserializeJson();
+
+ // Ensure that only the first 5 nodes with two reports each were
+ // included in the JSON.
+ for (int i = 0; i < 5; ++i) {
+ assertTrue(isNodeInReports(reports, "node" + i));
+ }
+ }
+
+ private boolean isNodeInReports(
+ Set reports, String node) {
+ for (ReportForJson report : reports) {
+ if (report.getSlowNode().equalsIgnoreCase(node)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private Set getAndDeserializeJson()
+ throws IOException {
+ final String json = tracker.getJson();
+ LOG.info("Got JSON: {}", json);
+ return (new ObjectMapper()).readValue(
+ json, new TypeReference>() {});
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
index de856e609f5..cf43fd0fddc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.junit.Assert;
@@ -136,7 +137,8 @@ public class InternalDataNodeTestUtils {
Mockito.any(StorageReport[].class), Mockito.anyLong(),
Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(),
Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
- Mockito.anyBoolean())).thenReturn(
+ Mockito.anyBoolean(),
+ Mockito.any(SlowPeerReports.class))).thenReturn(
new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
.nextLong() | 1L));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index b7b89667ef8..c6b38eea7db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -119,7 +120,7 @@ public class TestBPOfferService {
Mockito.doReturn(conf).when(mockDn).getConf();
Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
- .when(mockDn).getMetrics();
+ .when(mockDn).getMetrics();
// Set up a simulated dataset with our fake BP
mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@@ -152,7 +153,8 @@ public class TestBPOfferService {
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
- Mockito.anyBoolean());
+ Mockito.anyBoolean(),
+ Mockito.any(SlowPeerReports.class));
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
datanodeCommands[nnIdx] = new DatanodeCommand[0];
return mock;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 346250bcbd0..fecddc57886 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
@@ -83,6 +84,7 @@ import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
@@ -186,7 +188,8 @@ public class TestBlockRecovery {
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
- Mockito.anyBoolean()))
+ Mockito.anyBoolean(),
+ Mockito.any(SlowPeerReports.class)))
.thenReturn(new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
@@ -252,15 +255,15 @@ public class TestBlockRecovery {
}
/** Sync two replicas */
- private void testSyncReplicas(ReplicaRecoveryInfo replica1,
+ private void testSyncReplicas(ReplicaRecoveryInfo replica1,
ReplicaRecoveryInfo replica2,
InterDatanodeProtocol dn1,
InterDatanodeProtocol dn2,
long expectLen) throws IOException {
-
+
DatanodeInfo[] locs = new DatanodeInfo[]{
mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
- RecoveringBlock rBlock = new RecoveringBlock(block,
+ RecoveringBlock rBlock = new RecoveringBlock(block,
locs, RECOVERY_ID);
ArrayList syncList = new ArrayList(2);
BlockRecord record1 = new BlockRecord(
@@ -269,7 +272,7 @@ public class TestBlockRecovery {
DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn2, replica2);
syncList.add(record1);
syncList.add(record2);
-
+
when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
anyLong(), anyLong())).thenReturn("storage1");
when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
@@ -279,7 +282,7 @@ public class TestBlockRecovery {
recoveryWorker.new RecoveryTaskContiguous(rBlock);
RecoveryTaskContiguous.syncBlock(syncList);
}
-
+
/**
* BlockRecovery_02.8.
* Two replicas are in Finalized state
@@ -290,9 +293,9 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
- ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
- ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-2, ReplicaState.FINALIZED);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@@ -305,9 +308,9 @@ public class TestBlockRecovery {
REPLICA_LEN1);
// two finalized replicas have different length
- replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
- replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.FINALIZED);
try {
@@ -318,10 +321,10 @@ public class TestBlockRecovery {
"Inconsistent size of finalized replicas. "));
}
}
-
+
/**
* BlockRecovery_02.9.
- * One replica is Finalized and another is RBW.
+ * One replica is Finalized and another is RBW.
* @throws IOException in case of an error
*/
@Test(timeout=60000)
@@ -329,11 +332,11 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
-
+
// rbw and finalized replicas have the same length
- ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
- ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RBW);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@@ -344,11 +347,11 @@ public class TestBlockRecovery {
REPLICA_LEN1);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
-
+
// rbw replica has a different length from the finalized one
- replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
- replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
dn1 = mock(InterDatanodeProtocol.class);
@@ -359,10 +362,10 @@ public class TestBlockRecovery {
verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
}
-
+
/**
* BlockRecovery_02.10.
- * One replica is Finalized and another is RWR.
+ * One replica is Finalized and another is RWR.
* @throws IOException in case of an error
*/
@Test(timeout=60000)
@@ -370,11 +373,11 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
-
+
// rbw and finalized replicas have the same length
- ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
- ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@@ -385,11 +388,11 @@ public class TestBlockRecovery {
REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
-
+
// rbw replica has a different length from the finalized one
- replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
- replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
dn1 = mock(InterDatanodeProtocol.class);
@@ -401,7 +404,7 @@ public class TestBlockRecovery {
verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
}
-
+
/**
* BlockRecovery_02.11.
* Two replicas are RBW.
@@ -412,9 +415,9 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
- ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
- ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@@ -425,10 +428,10 @@ public class TestBlockRecovery {
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
}
-
+
/**
* BlockRecovery_02.12.
- * One replica is RBW and another is RWR.
+ * One replica is RBW and another is RWR.
* @throws IOException in case of an error
*/
@Test(timeout=60000)
@@ -436,9 +439,9 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
- ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
- ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@@ -450,9 +453,9 @@ public class TestBlockRecovery {
verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
}
-
+
/**
- * BlockRecovery_02.13.
+ * BlockRecovery_02.13.
* Two replicas are RWR.
* @throws IOException in case of an error
*/
@@ -461,9 +464,9 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
- ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RWR);
- ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RWR);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@@ -471,10 +474,10 @@ public class TestBlockRecovery {
long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
testSyncReplicas(replica1, replica2, dn1, dn2, minLen);
-
+
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
- }
+ }
private Collection initRecoveringBlocks() throws IOException {
Collection blocks = new ArrayList(1);
@@ -661,10 +664,10 @@ public class TestBlockRecovery {
streams.close();
}
}
-
+
/**
* Test to verify the race between finalizeBlock and Lease recovery
- *
+ *
* @throws Exception
*/
@Test(timeout = 20000)
@@ -682,11 +685,11 @@ public class TestBlockRecovery {
FSDataOutputStream out = fs.create(path);
out.writeBytes("data");
out.hsync();
-
+
List blocks = DFSTestUtil.getAllBlocks(fs.open(path));
final LocatedBlock block = blocks.get(0);
final DataNode dataNode = cluster.getDataNodes().get(0);
-
+
final AtomicBoolean recoveryInitResult = new AtomicBoolean(true);
Thread recoveryThread = new Thread() {
@Override
@@ -716,7 +719,7 @@ public class TestBlockRecovery {
}
Assert.assertTrue("Recovery should be initiated successfully",
recoveryInitResult.get());
-
+
dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
.getGenerationStamp() + 1, block.getBlock().getBlockId(),
block.getBlockSize());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
index 76885e417ba..6435d4d1a5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
@@ -20,10 +20,15 @@ package org.apache.hadoop.hdfs.server.datanode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
import static java.lang.Math.abs;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
@@ -31,11 +36,6 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
-import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
/**
@@ -51,6 +51,7 @@ public class TestBpServiceActorScheduler {
private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds
private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS;
private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds
+ private static final long SLOW_PEER_REPORT_INTERVAL_MS = 10000; // 10 seconds
private final Random random = new Random(System.nanoTime());
@Test
@@ -180,13 +181,28 @@ public class TestBpServiceActorScheduler {
}
}
+ @Test
+ public void testSlowPeerReportScheduling() {
+ for (final long now : getTimestamps()) {
+ Scheduler scheduler = makeMockScheduler(now);
+ assertTrue(scheduler.isSlowPeersReportDue(now));
+ scheduler.scheduleNextSlowPeerReport();
+ assertFalse(scheduler.isSlowPeersReportDue(now));
+ assertFalse(scheduler.isSlowPeersReportDue(now + 1));
+ assertTrue(scheduler.isSlowPeersReportDue(
+ now + SLOW_PEER_REPORT_INTERVAL_MS));
+ }
+ }
+
private Scheduler makeMockScheduler(long now) {
LOG.info("Using now = " + now);
- Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS,
- LIFELINE_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
+ Scheduler mockScheduler = spy(new Scheduler(
+ HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS,
+ BLOCK_REPORT_INTERVAL_MS, SLOW_PEER_REPORT_INTERVAL_MS));
doReturn(now).when(mockScheduler).monotonicNow();
mockScheduler.nextBlockReportTime = now;
mockScheduler.nextHeartbeatTime = now;
+ mockScheduler.nextSlowPeersReportTime = now;
return mockScheduler;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
index df2fe5aa75c..8a9f0b8da1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
@@ -167,7 +168,8 @@ public class TestDataNodeLifeline {
anyInt(),
anyInt(),
any(VolumeFailureSummary.class),
- anyBoolean());
+ anyBoolean(),
+ any(SlowPeerReports.class));
// Intercept lifeline to trigger latch count-down on each call.
doAnswer(new LatchCountingAnswer(lifelinesSent))
@@ -230,7 +232,8 @@ public class TestDataNodeLifeline {
anyInt(),
anyInt(),
any(VolumeFailureSummary.class),
- anyBoolean());
+ anyBoolean(),
+ any(SlowPeerReports.class));
// While waiting on the latch for the expected number of heartbeat messages,
// poll DataNode tracking information. We expect that the DataNode always
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
index 5af54a4cbbb..b18ff2a2a94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -41,9 +42,10 @@ public class TestDataNodePeerMetrics {
final int numOpsPerIteration = 1000;
final Configuration conf = new HdfsConfiguration();
- conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
- windowSize);
- conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
+ conf.setTimeDuration(
+ DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
+ windowSize, TimeUnit.SECONDS);
+ conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
numWindows);
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index d447a76f0d2..c94f74ecc52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@@ -218,7 +219,8 @@ public class TestDatanodeProtocolRetryPolicy {
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
- Mockito.anyBoolean());
+ Mockito.anyBoolean(),
+ Mockito.any(SlowPeerReports.class));
dn = new DataNode(conf, locations, null, null) {
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index 6557055f783..eb015c03573 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
@@ -172,7 +173,7 @@ public class TestFsDatasetCache {
(DatanodeRegistration) any(),
(StorageReport[]) any(), anyLong(), anyLong(),
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
- anyBoolean());
+ anyBoolean(), any(SlowPeerReports.class));
}
private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index d8418d46b6a..2b793e9caaf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.junit.After;
@@ -106,7 +107,8 @@ public class TestStorageReport {
any(DatanodeRegistration.class),
captor.capture(),
anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
- Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean());
+ Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
+ Mockito.any(SlowPeerReports.class));
StorageReport[] reports = captor.getValue();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
new file mode 100644
index 00000000000..34e15e546b0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Random;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test that the {@link DataNodePeerMetrics} class is able to detect
+ * outliers i.e. slow nodes via the metrics it maintains.
+ */
+public class TestDataNodeOutlierDetectionViaMetrics {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestDataNodeOutlierDetectionViaMetrics.class);
+
+ /**
+ * Set a timeout for every test case.
+ */
+ @Rule
+ public Timeout testTimeout = new Timeout(300_000);
+
+ // A few constants to keep the test run time short.
+ private static final int WINDOW_INTERVAL_SECONDS = 3;
+ private static final int ROLLING_AVERAGE_WINDOWS = 10;
+ private static final int SLOW_NODE_LATENCY_MS = 20_000;
+ private static final int FAST_NODE_MAX_LATENCY_MS = 5;
+
+ private Random random = new Random(System.currentTimeMillis());
+
+ @Before
+ public void setup() {
+ GenericTestUtils.setLogLevel(DataNodePeerMetrics.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL);
+ }
+
+ /**
+ * Test that a very slow peer is detected as an outlier.
+ */
+ @Test
+ public void testOutlierIsDetected() throws Exception {
+ final String slowNodeName = "SlowNode";
+
+ DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
+ "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
+ ROLLING_AVERAGE_WINDOWS);
+
+ injectFastNodesSamples(peerMetrics);
+ injectSlowNodeSamples(peerMetrics, slowNodeName);
+
+ // Trigger a snapshot.
+ peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+
+ final Map outliers = peerMetrics.getOutliers();
+ LOG.info("Got back outlier nodes: {}", outliers);
+ assertThat(outliers.size(), is(1));
+ assertTrue(outliers.containsKey(slowNodeName));
+ }
+
+ /**
+ * Test that when there are no outliers, we get back nothing.
+ */
+ @Test
+ public void testWithNoOutliers() throws Exception {
+ DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
+ "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
+ ROLLING_AVERAGE_WINDOWS);
+
+ injectFastNodesSamples(peerMetrics);
+
+ // Trigger a snapshot.
+ peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+
+ // Ensure that we get back the outlier.
+ assertTrue(peerMetrics.getOutliers().isEmpty());
+ }
+
+ /**
+ * Inject fake stats for MIN_OUTLIER_DETECTION_PEERS fast nodes.
+ *
+ * @param peerMetrics
+ */
+ public void injectFastNodesSamples(DataNodePeerMetrics peerMetrics) {
+ for (int nodeIndex = 0;
+ nodeIndex < SlowNodeDetector.getMinOutlierDetectionPeers();
+ ++nodeIndex) {
+ final String nodeName = "FastNode-" + nodeIndex;
+ LOG.info("Generating stats for node {}", nodeName);
+ for (int i = 0;
+ i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES;
+ ++i) {
+ peerMetrics.addSendPacketDownstream(
+ nodeName, random.nextInt(FAST_NODE_MAX_LATENCY_MS));
+ }
+ }
+ }
+
+ /**
+ * Inject fake stats for one extremely slow node.
+ */
+ public void injectSlowNodeSamples(
+ DataNodePeerMetrics peerMetrics, String slowNodeName)
+ throws InterruptedException {
+
+ // And the one slow node.
+ for (int i = 0;
+ i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES;
+ ++i) {
+ peerMetrics.addSendPacketDownstream(
+ slowNodeName, SLOW_NODE_LATENCY_MS);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
new file mode 100644
index 00000000000..6107d639cfe
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link SlowNodeDetector}.
+ */
+public class TestSlowNodeDetector {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestSlowNodeDetector.class);
+
+ /**
+ * Set a timeout for every test case.
+ */
+ @Rule
+ public Timeout testTimeout = new Timeout(300_000);
+
+ private final static double LOW_THRESHOLD = 1000;
+ private final static long MIN_OUTLIER_DETECTION_PEERS = 3;
+
+ // Randomly generated test cases for median and MAD. The first entry
+ // in each pair is the expected median and the second entry is the
+ // expected Median Absolute Deviation. The small sets of size 1 and 2
+ // exist to test the edge cases however in practice the MAD of a very
+ // small set is not useful.
+ private Map, Pair> medianTestMatrix =
+ new ImmutableMap.Builder, Pair>()
+ // Single element.
+ .put(new ImmutableList.Builder()
+ .add(9.6502431302).build(),
+ Pair.of(9.6502431302, 0.0))
+
+ // Two elements.
+ .put(new ImmutableList.Builder()
+ .add(1.72168104625)
+ .add(11.7872544459).build(),
+ Pair.of(6.75446774606, 7.4616095611))
+
+ // The Remaining lists were randomly generated with sizes 3-10.
+ .put(new ImmutableList.Builder()
+ .add(76.2635686249)
+ .add(27.0652018553)
+ .add(1.3868476443)
+ .add(49.7194624164)
+ .add(47.385680883)
+ .add(57.8721199173).build(),
+ Pair.of(48.5525716497, 22.837202532))
+
+ .put(new ImmutableList.Builder()
+ .add(86.0573389581)
+ .add(93.2399572424)
+ .add(64.9545429122)
+ .add(35.8509730085)
+ .add(1.6534313654).build(),
+ Pair.of(64.9545429122, 41.9360180373))
+
+ .put(new ImmutableList.Builder()
+ .add(5.00127007366)
+ .add(37.9790589127)
+ .add(67.5784746266).build(),
+ Pair.of(37.9790589127, 43.8841594039))
+
+ .put(new ImmutableList.Builder()
+ .add(1.43442932944)
+ .add(70.6769829947)
+ .add(37.47579656)
+ .add(51.1126141394)
+ .add(72.2465914419)
+ .add(32.2930549225)
+ .add(39.677459781).build(),
+ Pair.of(39.677459781, 16.9537852208))
+
+ .put(new ImmutableList.Builder()
+ .add(26.7913745214)
+ .add(68.9833706658)
+ .add(29.3882180746)
+ .add(68.3455244453)
+ .add(74.9277265022)
+ .add(12.1469972942)
+ .add(72.5395402683)
+ .add(7.87917492506)
+ .add(33.3253447774)
+ .add(72.2753759125).build(),
+ Pair.of(50.8354346113, 31.9881230079))
+
+ .put(new ImmutableList.Builder()
+ .add(38.6482290705)
+ .add(88.0690746319)
+ .add(50.6673611649)
+ .add(64.5329814115)
+ .add(25.2580979294)
+ .add(59.6709630711)
+ .add(71.5406993741)
+ .add(81.3073035091)
+ .add(20.5549547284).build(),
+ Pair.of(59.6709630711, 31.1683520683))
+
+ .put(new ImmutableList.Builder()
+ .add(87.352734249)
+ .add(65.4760359094)
+ .add(28.9206803169)
+ .add(36.5908574008)
+ .add(87.7407653175)
+ .add(99.3704511335)
+ .add(41.3227434076)
+ .add(46.2713494909)
+ .add(3.49940920921).build(),
+ Pair.of(46.2713494909, 28.4729106898))
+
+ .put(new ImmutableList.Builder()
+ .add(95.3251533286)
+ .add(27.2777870437)
+ .add(43.73477168).build(),
+ Pair.of(43.73477168, 24.3991619317))
+
+ .build();
+
+ // A test matrix that maps inputs to the expected output list of
+ // slow nodes i.e. outliers.
+ private Map