HDFS-11194. Maintain aggregated peer performance metrics on NameNode.

This commit is contained in:
Arpit Agarwal 2017-01-24 16:58:20 -08:00
parent 8027c3e8b9
commit b4078e1d0e
42 changed files with 1847 additions and 207 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.metrics2.lib;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@ -39,6 +40,9 @@ import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import javax.annotation.Nullable;
import static org.apache.hadoop.metrics2.lib.Interns.*;
/**
@ -63,7 +67,10 @@ public class RollingAverages extends MutableMetric implements Closeable {
.setNameFormat("RollingAverages-%d").build());
private ScheduledFuture<?> scheduledTask = null;
@Nullable
private Map<String, MutableRate> currentSnapshot;
private final int numWindows;
private final String avgInfoNameTemplate;
private final String avgInfoDescTemplate;
@ -100,31 +107,31 @@ public class RollingAverages extends MutableMetric implements Closeable {
/**
* Constructor of {@link RollingAverages}.
* @param windowSize
* The number of seconds of each window for which sub set of samples
* are gathered to compute the rolling average, A.K.A. roll over
* interval.
* @param windowSizeMs
* The number of milliseconds of each window for which subset
* of samples are gathered to compute the rolling average, A.K.A.
* roll over interval.
* @param numWindows
* The number of windows maintained to compute the rolling average.
* @param valueName
* of the metric (e.g. "Time", "Latency")
*/
public RollingAverages(
final int windowSize,
final long windowSizeMs,
final int numWindows,
final String valueName) {
String uvName = StringUtils.capitalize(valueName);
String lvName = StringUtils.uncapitalize(valueName);
avgInfoNameTemplate = "%s" + "RollingAvg"+ uvName;
avgInfoNameTemplate = "[%s]" + "RollingAvg"+ uvName;
avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s";
this.numWindows = numWindows;
scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
windowSize, windowSize, TimeUnit.SECONDS);
windowSizeMs, windowSizeMs, TimeUnit.MILLISECONDS);
}
/**
* Constructor of {@link RollingAverages}.
* @param windowSize
* @param windowSizeMs
* The number of seconds of each window for which sub set of samples
* are gathered to compute rolling average, also A.K.A roll over
* interval.
@ -133,9 +140,9 @@ public class RollingAverages extends MutableMetric implements Closeable {
* average of the rolling averages.
*/
public RollingAverages(
final int windowSize,
final long windowSizeMs,
final int numWindows) {
this(windowSize, numWindows, "Time");
this(windowSizeMs, numWindows, "Time");
}
@Override
@ -216,7 +223,7 @@ public class RollingAverages extends MutableMetric implements Closeable {
* This function is not thread safe, callers should ensure thread safety.
* </p>
*/
private void rollOverAvgs() {
private synchronized void rollOverAvgs() {
if (currentSnapshot == null) {
return;
}
@ -250,4 +257,32 @@ public class RollingAverages extends MutableMetric implements Closeable {
}
scheduledTask = null;
}
/**
* Retrieve a map of metric name -> (aggregate).
* Filter out entries that don't have at least minSamples.
*
* @return a map of peer DataNode Id to the average latency to that
* node seen over the measurement period.
*/
public synchronized Map<String, Double> getStats(long minSamples) {
final Map<String, Double> stats = new HashMap<>();
for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
: averages.entrySet()) {
final String name = entry.getKey();
double totalSum = 0;
long totalCount = 0;
for (final SumAndCount sumAndCount : entry.getValue()) {
totalCount += sumAndCount.getCount();
totalSum += sumAndCount.getSum();
}
if (totalCount > minSamples) {
stats.put(name, totalSum / totalCount);
}
}
return stats;
}
}

View File

@ -42,7 +42,8 @@ public class TestRollingAverages {
public void testRollingAveragesEmptyRollover() throws Exception {
final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
/* 5s interval and 2 windows */
try (final RollingAverages rollingAverages = new RollingAverages(5, 2)) {
try (final RollingAverages rollingAverages =
new RollingAverages(5000, 2)) {
/* Check it initially */
rollingAverages.snapshot(rb, true);
verify(rb, never()).addGauge(
@ -74,10 +75,10 @@ public class TestRollingAverages {
public void testRollingAveragesRollover() throws Exception {
final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
final String name = "foo2";
final int windowSize = 5; // 5s roll over interval
final int windowSizeMs = 5000; // 5s roll over interval
final int numWindows = 2;
final int numOpsPerIteration = 1000;
try (RollingAverages rollingAverages = new RollingAverages(windowSize,
try (RollingAverages rollingAverages = new RollingAverages(windowSizeMs,
numWindows)) {
/* Push values for three intervals */
@ -92,7 +93,7 @@ public class TestRollingAverages {
* Sleep until 1s after the next windowSize seconds interval, to let the
* metrics roll over
*/
final long sleep = (start + (windowSize * 1000 * i) + 1000)
final long sleep = (start + (windowSizeMs * i) + 1000)
- Time.monotonicNow();
Thread.sleep(sleep);
@ -110,12 +111,12 @@ public class TestRollingAverages {
final long rollingTotal = i > 1 ? 2 * numOpsPerIteration
: numOpsPerIteration;
verify(rb).addGauge(
info("Foo2RollingAvgTime", "Rolling average time for foo2"),
info("[Foo2]RollingAvgTime", "Rolling average time for foo2"),
rollingSum / rollingTotal);
/* Verify the metrics were added the right number of times */
verify(rb, times(i)).addGauge(
eq(info("Foo2RollingAvgTime", "Rolling average time for foo2")),
eq(info("[Foo2]RollingAvgTime", "Rolling average time for foo2")),
anyDouble());
}
}

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
/**
* A class that allows a DataNode to communicate information about all
* its peer DataNodes that appear to be slow.
*
* The wire representation of this structure is a list of
* SlowPeerReportProto messages.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class SlowPeerReports {
/**
* A map from the DataNode's DataNodeUUID to its aggregate latency
* as seen by the reporting node.
*
* The exact choice of the aggregate is opaque to the NameNode but it
* should be chosen consistently by all DataNodes in the cluster.
* Examples of aggregates are 90th percentile (good) and mean (not so
* good).
*
* The NameNode must not attempt to interpret the aggregate latencies
* beyond exposing them as a diagnostic. e.g. metrics. Also, comparing
* latencies across reports from different DataNodes may not be not
* meaningful and must be avoided.
*/
@Nonnull
private final Map<String, Double> slowPeers;
/**
* An object representing a SlowPeerReports with no entries. Should
* be used instead of null or creating new objects when there are
* no slow peers to report.
*/
public static final SlowPeerReports EMPTY_REPORT =
new SlowPeerReports(ImmutableMap.<String, Double>of());
private SlowPeerReports(Map<String, Double> slowPeers) {
this.slowPeers = slowPeers;
}
public static SlowPeerReports create(
@Nullable Map<String, Double> slowPeers) {
if (slowPeers == null || slowPeers.isEmpty()) {
return EMPTY_REPORT;
}
return new SlowPeerReports(slowPeers);
}
public Map<String, Double> getSlowPeers() {
return slowPeers;
}
public boolean haveSlowPeers() {
return slowPeers.size() > 0;
}
/**
* Return true if the two objects represent the same set slow peer
* entries. Primarily for unit testing convenience.
*/
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof SlowPeerReports)) {
return false;
}
SlowPeerReports that = (SlowPeerReports) o;
return slowPeers.equals(that.slowPeers);
}
@Override
public int hashCode() {
return slowPeers.hashCode();
}
}

View File

@ -432,14 +432,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_METRICS_SESSION_ID_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY =
"dfs.metrics.rolling.average.window.size";
public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT =
3600;
public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY =
"dfs.metrics.rolling.average.window.numbers";
public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT =
48;
// The following setting is not meant to be changed by administrators.
public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY =
"dfs.metrics.rolling.averages.window.length";
public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT =
300 * 1000;
// The following setting is not meant to be changed by administrators.
public static final String DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY =
"dfs.metrics.rolling.average.num.windows";
public static final int DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT =
36;
public static final String DFS_DATANODE_PEER_STATS_ENABLED_KEY =
"dfs.datanode.peer.stats.enabled";
public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false;
@ -628,6 +633,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT = "dfs.block.misreplication.processing.limit";
public static final int DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT = 10000;
public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY =
"dfs.datanode.slow.peers.report.interval";
public static final int DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT =
1800 * 1000;
// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
public static final boolean DFS_IMAGE_COMPRESS_DEFAULT = false;

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -71,6 +72,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import javax.annotation.Nonnull;
/**
* This class is the client side translator to translate the requests made on
* {@link DatanodeProtocol} interfaces to the RPC server implementing
@ -132,7 +135,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease) throws IOException {
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers) throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@ -149,6 +153,9 @@ public class DatanodeProtocolClientSideTranslatorPB implements
builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary(
volumeFailureSummary));
}
if (slowPeers.haveSlowPeers()) {
builder.addAllSlowPeers(PBHelper.convertSlowPeerInfo(slowPeers));
}
HeartbeatResponseProto resp;
try {
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());

View File

@ -120,7 +120,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements
report, request.getCacheCapacity(), request.getCacheUsed(),
request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes(),
volumeFailureSummary, request.getRequestFullBlockReportLease());
volumeFailureSummary, request.getRequestFullBlockReportLease(),
PBHelper.convertSlowPeerInfo(request.getSlowPeersList()));
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -19,7 +19,11 @@ package org.apache.hadoop.hdfs.protocolPB;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.protobuf.ByteString;
@ -42,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeComm
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@ -99,12 +104,13 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
/**
* Utilities for converting protobuf classes to and from implementation classes
* and other helper utilities to help in dealing with protobuf.
*
*
* Note that when converting from an internal type to protobuf type, the
* converter never return null for protobuf type. The check for internal type
* being null must be done before calling the convert() method.
@ -113,7 +119,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
* and to protobuf, see {@link PBHelperClient}.
*/
public class PBHelper {
private static final RegisterCommandProto REG_CMD_PROTO =
private static final RegisterCommandProto REG_CMD_PROTO =
RegisterCommandProto.newBuilder().build();
private static final RegisterCommand REG_CMD = new RegisterCommand();
@ -387,7 +393,7 @@ public class PBHelper {
return ReplicaStateProto.FINALIZED;
}
}
public static DatanodeRegistrationProto convert(
DatanodeRegistration registration) {
DatanodeRegistrationProto.Builder builder = DatanodeRegistrationProto
@ -424,7 +430,7 @@ public class PBHelper {
return null;
}
}
public static BalancerBandwidthCommandProto convert(
BalancerBandwidthCommand bbCmd) {
return BalancerBandwidthCommandProto.newBuilder()
@ -569,7 +575,7 @@ public class PBHelper {
List<RecoveringBlockProto> list = recoveryCmd.getBlocksList();
List<RecoveringBlock> recoveringBlocks = new ArrayList<RecoveringBlock>(
list.size());
for (RecoveringBlockProto rbp : list) {
recoveringBlocks.add(PBHelper.convert(rbp));
}
@ -654,9 +660,9 @@ public class PBHelper {
public static ReceivedDeletedBlockInfoProto convert(
ReceivedDeletedBlockInfo receivedDeletedBlockInfo) {
ReceivedDeletedBlockInfoProto.Builder builder =
ReceivedDeletedBlockInfoProto.Builder builder =
ReceivedDeletedBlockInfoProto.newBuilder();
ReceivedDeletedBlockInfoProto.BlockStatus status;
switch (receivedDeletedBlockInfo.getStatus()) {
case RECEIVING_BLOCK:
@ -673,7 +679,7 @@ public class PBHelper {
receivedDeletedBlockInfo.getStatus());
}
builder.setStatus(status);
if (receivedDeletedBlockInfo.getDelHints() != null) {
builder.setDeleteHint(receivedDeletedBlockInfo.getDelHints());
}
@ -700,7 +706,7 @@ public class PBHelper {
status,
proto.hasDeleteHint() ? proto.getDeleteHint() : null);
}
public static NamespaceInfoProto convert(NamespaceInfo info) {
NamespaceInfoProto.Builder builder = NamespaceInfoProto.newBuilder();
builder.setBlockPoolID(info.getBlockPoolID())
@ -787,6 +793,45 @@ public class PBHelper {
return builder.build();
}
public static List<SlowPeerReportProto> convertSlowPeerInfo(
SlowPeerReports slowPeers) {
if (slowPeers.getSlowPeers().size() == 0) {
return Collections.emptyList();
}
List<SlowPeerReportProto> slowPeerInfoProtos =
new ArrayList<>(slowPeers.getSlowPeers().size());
for (Map.Entry<String, Double> entry :
slowPeers.getSlowPeers().entrySet()) {
slowPeerInfoProtos.add(SlowPeerReportProto.newBuilder()
.setDataNodeId(entry.getKey())
.setAggregateLatency(entry.getValue())
.build());
}
return slowPeerInfoProtos;
}
public static SlowPeerReports convertSlowPeerInfo(
List<SlowPeerReportProto> slowPeerProtos) {
// No slow peers, or possibly an older DataNode.
if (slowPeerProtos == null || slowPeerProtos.size() == 0) {
return SlowPeerReports.EMPTY_REPORT;
}
Map<String, Double> slowPeersMap = new HashMap<>(slowPeerProtos.size());
for (SlowPeerReportProto proto : slowPeerProtos) {
if (!proto.hasDataNodeId()) {
// The DataNodeId should be reported.
continue;
}
slowPeersMap.put(
proto.getDataNodeId(),
proto.hasAggregateLatency() ? proto.getAggregateLatency() : 0.0);
}
return SlowPeerReports.create(slowPeersMap);
}
public static JournalInfo convert(JournalInfoProto info) {
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;

View File

@ -45,7 +45,10 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Timer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
@ -166,6 +169,14 @@ public class DatanodeManager {
*/
private final HashMap<String, Integer> datanodesSoftwareVersions =
new HashMap<>(4, 0.75f);
/**
* True if we should process latency metrics from downstream peers.
*/
private final boolean dataNodePeerStatsEnabled;
@Nullable
private final SlowPeerTracker slowPeerTracker;
/**
* The minimum time between resending caching directives to Datanodes,
@ -186,6 +197,12 @@ public class DatanodeManager {
this.decomManager = new DecommissionManager(namesystem, blockManager,
heartbeatManager);
this.fsClusterStats = newFSClusterStats();
this.dataNodePeerStatsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
this.slowPeerTracker = dataNodePeerStatsEnabled ?
new SlowPeerTracker(conf, new Timer()) : null;
networktopology = NetworkTopology.getInstance(conf);
@ -1490,7 +1507,8 @@ public class DatanodeManager {
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
VolumeFailureSummary volumeFailureSummary,
@Nonnull SlowPeerReports slowPeers) throws IOException {
final DatanodeDescriptor nodeinfo;
try {
nodeinfo = getDatanode(nodeReg);
@ -1549,6 +1567,19 @@ public class DatanodeManager {
nodeinfo.setBalancerBandwidth(0);
}
if (slowPeerTracker != null) {
final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
if (!slowPeersMap.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("DataNode " + nodeReg + " reported slow peers: " +
slowPeersMap);
}
for (String slowNodeId : slowPeersMap.keySet()) {
slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
}
}
}
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
@ -1751,5 +1782,14 @@ public class DatanodeManager {
this.blockInvalidateLimit = Math.max(20 * (int) (intervalSeconds),
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
}
/**
* Retrieve information about slow peers as a JSON.
* Returns null if we are not tracking slow peers.
* @return
*/
public String getSlowPeersReport() {
return slowPeerTracker != null ? slowPeerTracker.getJson() : null;
}
}

View File

@ -0,0 +1,274 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
* This class aggregates information from {@link SlowPeerReports} received via
* heartbeats.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SlowPeerTracker {
public static final Logger LOG =
LoggerFactory.getLogger(SlowPeerTracker.class);
/**
* Time duration after which a report is considered stale. This is
* set to DFS_DATANODE_SLOW_PEER_REPORT_INTERVAL_KEY * 3 i.e.
* maintained for at least two successive reports.
*/
private final long reportValidityMs;
/**
* Timer object for querying the current time. Separated out for
* unit testing.
*/
private final Timer timer;
/**
* Number of nodes to include in JSON report. We will return nodes with
* the highest number of votes from peers.
*/
private static final int MAX_NODES_TO_REPORT = 5;
/**
* Information about peers that have reported a node as being slow.
* Each outer map entry is a map of (DatanodeId) -> (timestamp),
* mapping reporting nodes to the timestamp of the last report from
* that node.
*
* DatanodeId could be the DataNodeId or its address. We
* don't care as long as the caller uses it consistently.
*
* Stale reports are not evicted proactively and can potentially
* hang around forever.
*/
private final ConcurrentMap<String, ConcurrentMap<String, Long>>
allReports;
public SlowPeerTracker(Configuration conf, Timer timer) {
this.timer = timer;
this.allReports = new ConcurrentHashMap<>();
this.reportValidityMs = conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS) * 3;
}
/**
* Add a new report. DatanodeIds can be the DataNodeIds or addresses
* We don't care as long as the caller is consistent.
*
* @param reportingNode DataNodeId of the node reporting on its peer.
* @param slowNode DataNodeId of the peer suspected to be slow.
*/
public void addReport(String slowNode,
String reportingNode) {
ConcurrentMap<String, Long> nodeEntries = allReports.get(slowNode);
if (nodeEntries == null) {
// putIfAbsent guards against multiple writers.
allReports.putIfAbsent(
slowNode, new ConcurrentHashMap<String, Long>());
nodeEntries = allReports.get(slowNode);
}
// Replace the existing entry from this node, if any.
nodeEntries.put(reportingNode, timer.monotonicNow());
}
/**
* Retrieve the non-expired reports that mark a given DataNode
* as slow. Stale reports are excluded.
*
* @param slowNode target node Id.
* @return set of reports which implicate the target node as being slow.
*/
public Set<String> getReportsForNode(String slowNode) {
final ConcurrentMap<String, Long> nodeEntries =
allReports.get(slowNode);
if (nodeEntries == null || nodeEntries.isEmpty()) {
return Collections.emptySet();
}
return filterNodeReports(nodeEntries, timer.monotonicNow());
}
/**
* Retrieve all reports for all nodes. Stale reports are excluded.
*
* @return map from SlowNodeId -> (set of nodes reporting peers).
*/
public Map<String, SortedSet<String>> getReportsForAllDataNodes() {
if (allReports.isEmpty()) {
return ImmutableMap.of();
}
final Map<String, SortedSet<String>> allNodesValidReports = new HashMap<>();
final long now = timer.monotonicNow();
for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
allReports.entrySet()) {
SortedSet<String> validReports = filterNodeReports(entry.getValue(), now);
if (!validReports.isEmpty()) {
allNodesValidReports.put(entry.getKey(), validReports);
}
}
return allNodesValidReports;
}
/**
* Filter the given reports to return just the valid ones.
*
* @param reports
* @param now
* @return
*/
private SortedSet<String> filterNodeReports(
ConcurrentMap<String, Long> reports, long now) {
final SortedSet<String> validReports = new TreeSet<>();
for (Map.Entry<String, Long> entry : reports.entrySet()) {
if (now - entry.getValue() < reportValidityMs) {
validReports.add(entry.getKey());
}
}
return validReports;
}
/**
* Retrieve all valid reports as a JSON string.
* @return serialized representation of valid reports. null if
* serialization failed.
*/
public String getJson() {
Collection<ReportForJson> validReports = getJsonReports(
MAX_NODES_TO_REPORT);
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(validReports);
} catch (JsonProcessingException e) {
// Failed to serialize. Don't log the exception call stack.
LOG.debug("Failed to serialize statistics" + e);
return null;
}
}
/**
* This structure is a thin wrapper over reports to make Json
* [de]serialization easy.
*/
public static class ReportForJson {
@JsonProperty("SlowNode")
final private String slowNode;
@JsonProperty("ReportingNodes")
final private SortedSet<String> reportingNodes;
public ReportForJson(
@JsonProperty("SlowNode") String slowNode,
@JsonProperty("ReportingNodes") SortedSet<String> reportingNodes) {
this.slowNode = slowNode;
this.reportingNodes = reportingNodes;
}
public String getSlowNode() {
return slowNode;
}
public SortedSet<String> getReportingNodes() {
return reportingNodes;
}
}
/**
* Retrieve reports in a structure for generating JSON, limiting the
* output to the top numNodes nodes i.e nodes with the most reports.
* @param numNodes number of nodes to return. This is to limit the
* size of the generated JSON.
*/
private Collection<ReportForJson> getJsonReports(int numNodes) {
if (allReports.isEmpty()) {
return Collections.emptyList();
}
final PriorityQueue<ReportForJson> topNReports =
new PriorityQueue<>(allReports.size(),
new Comparator<ReportForJson>() {
@Override
public int compare(ReportForJson o1, ReportForJson o2) {
return Ints.compare(o1.reportingNodes.size(),
o2.reportingNodes.size());
}
});
final long now = timer.monotonicNow();
for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
allReports.entrySet()) {
SortedSet<String> validReports = filterNodeReports(
entry.getValue(), now);
if (!validReports.isEmpty()) {
if (topNReports.size() < numNodes) {
topNReports.add(new ReportForJson(entry.getKey(), validReports));
} else if (topNReports.peek().getReportingNodes().size() <
validReports.size()){
// Remove the lowest element
topNReports.poll();
topNReports.add(new ReportForJson(entry.getKey(), validReports));
}
}
}
return topNReports;
}
@VisibleForTesting
long getReportValidityMs() {
return reportValidityMs;
}
}

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@ -82,13 +83,13 @@ import com.google.common.base.Joiner;
*/
@InterfaceAudience.Private
class BPServiceActor implements Runnable {
static final Logger LOG = DataNode.LOG;
final InetSocketAddress nnAddr;
HAServiceState state;
final BPOfferService bpos;
volatile long lastCacheReport = 0;
private final Scheduler scheduler;
@ -111,7 +112,7 @@ class BPServiceActor implements Runnable {
private final IncrementalBlockReportManager ibrManager;
private DatanodeRegistration bpRegistration;
final LinkedList<BPServiceActorAction> bpThreadQueue
final LinkedList<BPServiceActorAction> bpThreadQueue
= new LinkedList<BPServiceActorAction>();
BPServiceActor(InetSocketAddress nnAddr, InetSocketAddress lifelineNnAddr,
@ -127,7 +128,8 @@ class BPServiceActor implements Runnable {
this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval);
prevBlockReportId = ThreadLocalRandom.current().nextLong();
scheduler = new Scheduler(dnConf.heartBeatInterval,
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval);
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
dnConf.slowPeersReportIntervalMs);
// get the value of maxDataLength.
this.maxDataLength = dnConf.getMaxDataLength();
}
@ -156,7 +158,7 @@ class BPServiceActor implements Runnable {
public String toString() {
return bpos.toString() + " service to " + nnAddr;
}
InetSocketAddress getNNSocketAddress() {
return nnAddr;
}
@ -214,7 +216,7 @@ class BPServiceActor implements Runnable {
* This calls <code>versionRequest</code> to determine the NN's
* namespace and version info. It automatically retries until
* the NN responds or the DN is shutting down.
*
*
* @return the NamespaceInfo
*/
@VisibleForTesting
@ -230,11 +232,11 @@ class BPServiceActor implements Runnable {
} catch(IOException e ) { // namenode is not available
LOG.warn("Problem connecting to server: " + nnAddr);
}
// try again in a second
sleepAndLogInterrupts(5000, "requesting version info from NN");
}
if (nsInfo != null) {
checkNNVersion(nsInfo);
} else {
@ -274,7 +276,7 @@ class BPServiceActor implements Runnable {
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
bpos.verifyAndSetNamespaceInfo(this, nsInfo);
// Second phase of the handshake with the NN.
register(nsInfo);
}
@ -299,7 +301,7 @@ class BPServiceActor implements Runnable {
}
}
}
@VisibleForTesting
void triggerHeartbeatForTests() {
synchronized (ibrManager) {
@ -488,13 +490,19 @@ class BPServiceActor implements Runnable {
LOG.debug("Sending heartbeat with " + reports.length +
" storage reports from service actor: " + this);
}
scheduler.updateLastHeartbeatTime(monotonicNow());
final long now = monotonicNow();
scheduler.updateLastHeartbeatTime(now);
VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
return bpNamenode.sendHeartbeat(bpRegistration,
final boolean slowPeersReportDue = scheduler.isSlowPeersReportDue(now);
final SlowPeerReports slowPeers =
slowPeersReportDue && dn.getPeerMetrics() != null ?
SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
SlowPeerReports.EMPTY_REPORT;
HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
@ -502,7 +510,14 @@ class BPServiceActor implements Runnable {
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary,
requestBlockReportLease);
requestBlockReportLease,
slowPeers);
if (slowPeersReportDue) {
// If the report was due and successfully sent, schedule the next one.
scheduler.scheduleNextSlowPeerReport();
}
return response;
}
@VisibleForTesting
@ -524,14 +539,14 @@ class BPServiceActor implements Runnable {
lifelineSender.start();
}
}
private String formatThreadName(String action, InetSocketAddress addr) {
Collection<StorageLocation> dataDirs =
DataNode.getStorageLocations(dn.getConf());
return "DataNode: [" + dataDirs.toString() + "] " +
action + " to " + addr;
}
//This must be called only by blockPoolManager.
void stop() {
shouldServiceRun = false;
@ -542,7 +557,7 @@ class BPServiceActor implements Runnable {
bpThread.interrupt();
}
}
//This must be called only by blockPoolManager
void join() {
try {
@ -554,10 +569,10 @@ class BPServiceActor implements Runnable {
}
} catch (InterruptedException ie) { }
}
//Cleanup method to be called by current thread before exiting.
private synchronized void cleanUp() {
shouldServiceRun = false;
IOUtils.cleanup(null, bpNamenode);
IOUtils.cleanup(null, lifelineSender);
@ -682,7 +697,7 @@ class BPServiceActor implements Runnable {
scheduler.monotonicNow() - startTime);
}
// There is no work to do; sleep until hearbeat timer elapses,
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
} catch(RemoteException re) {
@ -717,11 +732,11 @@ class BPServiceActor implements Runnable {
* Register one bp with the corresponding NameNode
* <p>
* The bpDatanode needs to register with the namenode on startup in order
* 1) to report which storage it is serving now and
* 1) to report which storage it is serving now and
* 2) to receive a registrationID
*
*
* issued by the namenode to recognize registered datanodes.
*
*
* @param nsInfo current NamespaceInfo
* @see FSNamesystem#registerDatanode(DatanodeRegistration)
* @throws IOException
@ -749,7 +764,7 @@ class BPServiceActor implements Runnable {
sleepAndLogInterrupts(1000, "connecting to server");
}
}
LOG.info("Block pool " + this + " successfully registered with NN");
bpos.registrationSucceeded(this, bpRegistration);
@ -835,9 +850,9 @@ class BPServiceActor implements Runnable {
/**
* Process an array of datanode commands
*
*
* @param cmds an array of datanode commands
* @return true if further processing may be required or false otherwise.
* @return true if further processing may be required or false otherwise.
*/
boolean processCommand(DatanodeCommand[] cmds) {
if (cmds != null) {
@ -860,7 +875,7 @@ class BPServiceActor implements Runnable {
*/
void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block)
throws IOException {
LocatedBlock lb = new LocatedBlock(block,
LocatedBlock lb = new LocatedBlock(block,
new DatanodeInfo[] {dnInfo});
bpNamenode.reportBadBlocks(new LocatedBlock[] {lb});
}
@ -893,7 +908,7 @@ class BPServiceActor implements Runnable {
}
}
}
public void bpThreadEnqueue(BPServiceActorAction action) {
synchronized (bpThreadQueue) {
if (!bpThreadQueue.contains(action)) {
@ -1079,18 +1094,23 @@ class BPServiceActor implements Runnable {
@VisibleForTesting
boolean resetBlockReportTime = true;
@VisibleForTesting
volatile long nextSlowPeersReportTime = monotonicNow();
private final AtomicBoolean forceFullBlockReport =
new AtomicBoolean(false);
private final long heartbeatIntervalMs;
private final long lifelineIntervalMs;
private final long blockReportIntervalMs;
private final long slowPeersReportIntervalMs;
Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
long blockReportIntervalMs) {
long blockReportIntervalMs, long slowPeersReportIntervalMs) {
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.lifelineIntervalMs = lifelineIntervalMs;
this.blockReportIntervalMs = blockReportIntervalMs;
this.slowPeersReportIntervalMs = slowPeersReportIntervalMs;
}
// This is useful to make sure NN gets Heartbeat before Blockreport
@ -1122,6 +1142,10 @@ class BPServiceActor implements Runnable {
lastBlockReportTime = blockReportTime;
}
void scheduleNextSlowPeerReport() {
nextSlowPeersReportTime = monotonicNow() + slowPeersReportIntervalMs;
}
long getLastHearbeatTime() {
return (monotonicNow() - lastHeartbeatTime)/1000;
}
@ -1148,6 +1172,10 @@ class BPServiceActor implements Runnable {
return nextBlockReportTime - curTime <= 0;
}
boolean isSlowPeersReportDue(long curTime) {
return nextSlowPeersReportTime - curTime <= 0;
}
void forceFullBlockReportNow() {
forceFullBlockReport.set(true);
resetBlockReportTime = true;

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
@ -93,7 +94,7 @@ class BlockReceiver implements Closeable {
protected final String inAddr;
protected final String myAddr;
private String mirrorAddr;
private String bracketedMirrorAddr;
private String mirrorNameForMetrics;
private DataOutputStream mirrorOut;
private Daemon responder = null;
private DataTransferThrottler throttler;
@ -843,10 +844,9 @@ class BlockReceiver implements Closeable {
* </p>
*/
private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
if (isPenultimateNode && mirrorAddr != null) {
datanode.getPeerMetrics().addSendPacketDownstream(
bracketedMirrorAddr,
elapsedMs);
final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics();
if (peerMetrics != null && isPenultimateNode) {
peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs);
}
}
@ -927,8 +927,13 @@ class BlockReceiver implements Closeable {
boolean responderClosed = false;
mirrorOut = mirrOut;
mirrorAddr = mirrAddr;
bracketedMirrorAddr = "[" + mirrAddr + "]";
isPenultimateNode = ((downstreams != null) && (downstreams.length == 1));
if (isPenultimateNode) {
mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ?
downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr());
LOG.debug("Will collect peer metrics for downstream node {}",
mirrorNameForMetrics);
}
throttler = throttlerArg;
this.replyOut = replyOut;

View File

@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
@ -65,6 +67,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.security.SaslPropertiesResolver;
import java.util.concurrent.TimeUnit;
/**
* Simple class encapsulating all of the configuration that the DataNode
* loads at startup time.
@ -91,6 +95,8 @@ public class DNConf {
private final long lifelineIntervalMs;
final long blockReportInterval;
final long blockReportSplitThreshold;
final boolean peerStatsEnabled;
final long slowPeersReportIntervalMs;
final long ibrInterval;
final long initialBlockReportDelayMs;
final long cacheReportInterval;
@ -168,6 +174,13 @@ public class DNConf {
this.blockReportInterval = getConf().getLong(
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
this.peerStatsEnabled = getConf().getBoolean(
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
this.slowPeersReportIntervalMs = getConf().getTimeDuration(
DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY,
DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
this.ibrInterval = getConf().getLong(
DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,
DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT);

View File

@ -334,6 +334,7 @@ public class DataNode extends ReconfigurableBase
private int infoSecurePort;
DataNodeMetrics metrics;
@Nullable
private DataNodePeerMetrics peerMetrics;
private InetSocketAddress streamingAddr;
@ -419,6 +420,7 @@ public class DataNode extends ReconfigurableBase
this.connectToDnViaHostname = false;
this.getHdfsBlockLocationsEnabled = false;
this.pipelineSupportECN = false;
this.dnConf = new DNConf(this);
initOOBTimeout();
storageLocationChecker = null;
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
@ -1330,7 +1332,8 @@ public class DataNode extends ReconfigurableBase
initIpcServer();
metrics = DataNodeMetrics.create(getConf(), getDisplayName());
peerMetrics = DataNodePeerMetrics.create(getConf(), getDisplayName());
peerMetrics = dnConf.peerStatsEnabled ?
DataNodePeerMetrics.create(getConf(), getDisplayName()) : null;
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
blockRecoveryWorker = new BlockRecoveryWorker(this);
@ -3305,6 +3308,7 @@ public class DataNode extends ReconfigurableBase
@Override // DataNodeMXBean
public String getSendPacketDownstreamAvgInfo() {
return peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
return peerMetrics != null ?
peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson() : null;
}
}

View File

@ -339,7 +339,9 @@ class DataXceiver extends Receiver implements Runnable {
* the thread dies away.
*/
private void collectThreadLocalStates() {
datanode.getPeerMetrics().collectThreadLocalStates();
if (datanode.getPeerMetrics() != null) {
datanode.getPeerMetrics().collectThreadLocalStates();
}
}
@Override

View File

@ -18,40 +18,59 @@
package org.apache.hadoop.hdfs.server.datanode.metrics;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.metrics2.MetricsJsonBuilder;
import org.apache.hadoop.metrics2.lib.RollingAverages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
* various peer operations.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class DataNodePeerMetrics {
static final Log LOG = LogFactory.getLog(DataNodePeerMetrics.class);
public static final Logger LOG = LoggerFactory.getLogger(
DataNodePeerMetrics.class);
private final RollingAverages sendPacketDownstreamRollingAvgerages;
private final String name;
private final boolean peerStatsEnabled;
/**
* Threshold in milliseconds below which a DataNode is definitely not slow.
*/
private static final long LOW_THRESHOLD_MS = 5;
private final SlowNodeDetector slowNodeDetector;
/**
* Minimum number of packet send samples which are required to qualify
* for outlier detection. If the number of samples is below this then
* outlier detection is skipped.
*/
@VisibleForTesting
static final long MIN_OUTLIER_DETECTION_SAMPLES = 1000;
public DataNodePeerMetrics(
final String name,
final int windowSize,
final int numWindows,
final boolean peerStatsEnabled) {
final long windowSizeMs,
final int numWindows) {
this.name = name;
this.peerStatsEnabled = peerStatsEnabled;
this.slowNodeDetector = new SlowNodeDetector(LOW_THRESHOLD_MS);
sendPacketDownstreamRollingAvgerages = new RollingAverages(
windowSize,
numWindows);
windowSizeMs, numWindows);
}
public String name() {
@ -66,21 +85,18 @@ public class DataNodePeerMetrics {
? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt()
: dnName.replace(':', '-'));
final int windowSize = conf.getInt(
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT);
final long windowSizeMs = conf.getTimeDuration(
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT,
TimeUnit.MILLISECONDS);
final int numWindows = conf.getInt(
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT);
final boolean peerStatsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT);
return new DataNodePeerMetrics(
name,
windowSize,
numWindows,
peerStatsEnabled);
windowSizeMs,
numWindows);
}
/**
@ -94,9 +110,7 @@ public class DataNodePeerMetrics {
public void addSendPacketDownstream(
final String peerAddr,
final long elapsedMs) {
if (peerStatsEnabled) {
sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
}
sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
}
/**
@ -114,4 +128,19 @@ public class DataNodePeerMetrics {
public void collectThreadLocalStates() {
sendPacketDownstreamRollingAvgerages.collectThreadLocalStates();
}
/**
* Retrieve the set of dataNodes that look significantly slower
* than their peers.
*/
public Map<String, Double> getOutliers() {
// This maps the metric name to the aggregate latency.
// The metric name is the datanode ID.
final Map<String, Double> stats =
sendPacketDownstreamRollingAvgerages.getStats(
MIN_OUTLIER_DETECTION_SAMPLES);
LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
return slowNodeDetector.getOutliers(stats);
}
}

View File

@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.metrics;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A utility class to help detect nodes whose aggregate latency
* is an outlier within a given set.
*
* We use the median absolute deviation for outlier detection as
* described in the following publication:
*
* Leys, C., et al., Detecting outliers: Do not use standard deviation
* around the mean, use absolute deviation around the median.
* http://dx.doi.org/10.1016/j.jesp.2013.03.013
*
* We augment the above scheme with the following heuristics to be even
* more conservative:
*
* 1. Skip outlier detection if the sample size is too small.
* 2. Never flag nodes whose aggregate latency is below a low threshold.
* 3. Never flag nodes whose aggregate latency is less than a small
* multiple of the median.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SlowNodeDetector {
public static final Logger LOG =
LoggerFactory.getLogger(SlowNodeDetector.class);
/**
* Minimum number of peers to run outlier detection.
*/
private static long minOutlierDetectionPeers = 10;
/**
* The multiplier is from Leys, C. et al.
*/
private static final double MAD_MULTIPLIER = (double) 1.4826;
/**
* Threshold in milliseconds below which a DataNode is definitely not slow.
*/
private final long lowThresholdMs;
/**
* Deviation multiplier. A sample is considered to be an outlier if it
* exceeds the median by (multiplier * median abs. deviation). 3 is a
* conservative choice.
*/
private static final int DEVIATION_MULTIPLIER = 3;
/**
* If most of the samples are clustered together, the MAD can be
* low. The median multiplier introduces another safeguard to avoid
* overaggressive outlier detection.
*/
@VisibleForTesting
static final int MEDIAN_MULTIPLIER = 3;
public SlowNodeDetector(long lowThresholdMs) {
this.lowThresholdMs = lowThresholdMs;
}
/**
* Return a set of DataNodes whose latency is much higher than
* their peers. The input is a map of (node -> aggregate latency)
* entries.
*
* The aggregate may be an arithmetic mean or a percentile e.g.
* 90th percentile. Percentiles are a better choice than median
* since latency is usually not a normal distribution.
*
* This method allocates temporary memory O(n) and
* has run time O(n.log(n)), where n = stats.size().
*
* @return
*/
public Map<String, Double> getOutliers(Map<String, Double> stats) {
if (stats.size() < minOutlierDetectionPeers) {
LOG.debug("Skipping statistical outlier detection as we don't have " +
"latency data for enough peers. Have {}, need at least {}",
stats.size(), minOutlierDetectionPeers);
return ImmutableMap.of();
}
// Compute the median absolute deviation of the aggregates.
final List<Double> sorted = new ArrayList<>(stats.values());
Collections.sort(sorted);
final Double median = computeMedian(sorted);
final Double mad = computeMad(sorted);
Double upperLimitLatency = Math.max(
lowThresholdMs, median * MEDIAN_MULTIPLIER);
upperLimitLatency = Math.max(
upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad));
final Map<String, Double> slowNodes = new HashMap<>();
LOG.trace("getOutliers: List={}, MedianLatency={}, " +
"MedianAbsoluteDeviation={}, upperLimitLatency={}",
sorted, median, mad, upperLimitLatency);
// Find nodes whose latency exceeds the threshold.
for (Map.Entry<String, Double> entry : stats.entrySet()) {
if (entry.getValue() > upperLimitLatency) {
slowNodes.put(entry.getKey(), entry.getValue());
}
}
return slowNodes;
}
/**
* Compute the Median Absolute Deviation of a sorted list.
*/
public static Double computeMad(List<Double> sortedValues) {
if (sortedValues.size() == 0) {
throw new IllegalArgumentException(
"Cannot compute the Median Absolute Deviation " +
"of an empty list.");
}
// First get the median of the values.
Double median = computeMedian(sortedValues);
List<Double> deviations = new ArrayList<>(sortedValues);
// Then update the list to store deviation from the median.
for (int i = 0; i < sortedValues.size(); ++i) {
deviations.set(i, Math.abs(sortedValues.get(i) - median));
}
// Finally get the median absolute deviation.
Collections.sort(deviations);
return computeMedian(deviations) * MAD_MULTIPLIER;
}
/**
* Compute the median of a sorted list.
*/
public static Double computeMedian(List<Double> sortedValues) {
if (sortedValues.size() == 0) {
throw new IllegalArgumentException(
"Cannot compute the median of an empty list.");
}
Double median = sortedValues.get(sortedValues.size() / 2);
if (sortedValues.size() % 2 == 0) {
median += sortedValues.get((sortedValues.size() / 2) - 1);
median /= 2;
}
return median;
}
/**
* This method *must not* be used outside of unit tests.
*/
@VisibleForTesting
static void setMinOutlierDetectionPeers(long minOutlierDetectionPeers) {
SlowNodeDetector.minOutlierDetectionPeers = minOutlierDetectionPeers;
}
@VisibleForTesting
static long getMinOutlierDetectionPeers() {
return minOutlierDetectionPeers;
}
}

View File

@ -132,6 +132,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nonnull;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
@ -258,6 +259,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@ -3617,7 +3619,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease) throws IOException {
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers) throws IOException {
readLock();
try {
//get datanode commands
@ -3625,7 +3628,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
- xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
slowPeers);
long blockReportLeaseId = 0;
if (requestFullBlockReportLease) {
blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg);

View File

@ -1834,6 +1834,12 @@ public class NameNode extends ReconfigurableBase implements
return getNamesystem().getBytesInFuture();
}
@Override
public String getSlowPeersReport() {
return namesystem.getBlockManager().getDatanodeManager()
.getSlowPeersReport();
}
/**
* Shutdown the NN immediately in an ungraceful way. Used when it would be
* unsafe for the NN to continue operating, e.g. during a failed HA state

View File

@ -152,6 +152,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -203,6 +204,8 @@ import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import javax.annotation.Nonnull;
/**
* This class is responsible for handling all of the RPC calls to the NameNode.
* It is created, started, and stopped by {@link NameNode}.
@ -1412,12 +1415,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount,
int failedVolumes, VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease) throws IOException {
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary, requestFullBlockReportLease);
failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
slowPeers);
}
@Override // DatanodeProtocol

View File

@ -69,4 +69,10 @@ public interface NameNodeStatusMXBean {
* @return number of bytes that can be deleted if exited from safe mode.
*/
long getBytesWithFutureGenerationStamps();
/**
* Retrieves information about slow DataNodes, if the feature is
* enabled. The report is in a JSON format.
*/
String getSlowPeersReport();
}

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.security.KerberosInfo;
import javax.annotation.Nonnull;
/**********************************************************************
* Protocol that a DFS datanode uses to communicate with the NameNode.
* It's used to upload current load information and block reports.
@ -104,6 +106,9 @@ public interface DatanodeProtocol {
* @param volumeFailureSummary info about volume failures
* @param requestFullBlockReportLease whether to request a full block
* report lease.
* @param slowPeers Details of peer DataNodes that were detected as being
* slow to respond to packet writes. Empty report if no
* slow peers were detected by the DataNode.
* @throws IOException on error
*/
@Idempotent
@ -115,7 +120,8 @@ public interface DatanodeProtocol {
int xceiverCount,
int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease)
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers)
throws IOException;
/**

View File

@ -185,6 +185,7 @@ message VolumeFailureSummaryProto {
* cacheCapacity - total cache capacity available at the datanode
* cacheUsed - amount of cache used
* volumeFailureSummary - info about volume failures
* slowPeers - info about peer DataNodes that are suspected to be slow.
*/
message HeartbeatRequestProto {
required DatanodeRegistrationProto registration = 1; // Datanode info
@ -196,6 +197,7 @@ message HeartbeatRequestProto {
optional uint64 cacheUsed = 7 [default = 0 ];
optional VolumeFailureSummaryProto volumeFailureSummary = 8;
optional bool requestFullBlockReportLease = 9 [ default = false ];
repeated SlowPeerReportProto slowPeers = 10;
}
/**
@ -372,6 +374,24 @@ message CommitBlockSynchronizationRequestProto {
message CommitBlockSynchronizationResponseProto {
}
/**
* Information about a single slow peer that may be reported by
* the DataNode to the NameNode as part of the heartbeat request.
* The message includes the peer's DataNodeId and its
* aggregate packet latency as observed by the reporting DataNode.
* (DataNodeId must be transmitted as a string for protocol compability
* with earlier versions of Hadoop).
*
* The exact choice of the aggregate is opaque to the NameNode but it
* _should_ be chosen consistenly by all DataNodes in the cluster.
* Examples of aggregates are 90th percentile (good) and mean (not so
* good).
*/
message SlowPeerReportProto {
optional string dataNodeId = 1;
optional double aggregateLatency = 2;
}
/**
* Protocol used from datanode to the namenode
* See the request and response for details of rpc call.

View File

@ -1930,19 +1930,15 @@
</property>
<property>
<name>dfs.metrics.rolling.average.window.size</name>
<value>3600</value>
<name>dfs.datanode.slow.peers.report.interval</name>
<value>30m</value>
<description>
The number of seconds of each window for which sub set of samples are gathered
to compute the rolling average, A.K.A. roll over interval.
</description>
</property>
This setting controls how frequently DataNodes will report their peer
latencies to the NameNode via heartbeats. This setting supports
multiple time unit suffixes as described in dfs.heartbeat.interval.
If no suffix is specified then milliseconds is assumed.
<property>
<name>dfs.metrics.rolling.average.window.numbers</name>
<value>48</value>
<description>
The number of windows maintained to compute the rolling average.
It is ignored if dfs.datanode.peer.stats.enabled is false.
</description>
</property>

View File

@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
@ -81,6 +82,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
@ -165,7 +167,7 @@ public class TestPBHelper {
DatanodeID dn2 = PBHelperClient.convert(dnProto);
compare(dn, dn2);
}
void compare(DatanodeID dn, DatanodeID dn2) {
assertEquals(dn.getIpAddr(), dn2.getIpAddr());
assertEquals(dn.getHostName(), dn2.getHostName());
@ -253,7 +255,7 @@ public class TestPBHelper {
ExportedBlockKeys expKeys1 = PBHelper.convert(expKeysProto);
compare(expKeys, expKeys1);
}
void compare(ExportedBlockKeys expKeys, ExportedBlockKeys expKeys1) {
BlockKey[] allKeys = expKeys.getAllKeys();
BlockKey[] allKeys1 = expKeys1.getAllKeys();
@ -282,12 +284,12 @@ public class TestPBHelper {
s1.getMostRecentCheckpointTxId());
assertEquals(s.getNamespaceID(), s1.getNamespaceID());
}
private static void compare(RemoteEditLog l1, RemoteEditLog l2) {
assertEquals(l1.getEndTxId(), l2.getEndTxId());
assertEquals(l1.getStartTxId(), l2.getStartTxId());
}
@Test
public void testConvertRemoteEditLog() {
RemoteEditLog l = new RemoteEditLog(1, 100);
@ -295,7 +297,7 @@ public class TestPBHelper {
RemoteEditLog l1 = PBHelper.convert(lProto);
compare(l, l1);
}
@Test
public void testConvertRemoteEditLogManifest() {
List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>();
@ -304,7 +306,7 @@ public class TestPBHelper {
RemoteEditLogManifest m = new RemoteEditLogManifest(logs);
RemoteEditLogManifestProto mProto = PBHelper.convert(m);
RemoteEditLogManifest m1 = PBHelper.convert(mProto);
List<RemoteEditLog> logs1 = m1.getLogs();
assertEquals(logs.size(), logs1.size());
for (int i = 0; i < logs.size(); i++) {
@ -314,15 +316,15 @@ public class TestPBHelper {
public ExtendedBlock getExtendedBlock() {
return getExtendedBlock(1);
}
public ExtendedBlock getExtendedBlock(long blkid) {
return new ExtendedBlock("bpid", blkid, 100, 2);
}
private void compare(DatanodeInfo dn1, DatanodeInfo dn2) {
assertEquals(dn1.getAdminState(), dn2.getAdminState());
assertEquals(dn1.getBlockPoolUsed(), dn2.getBlockPoolUsed());
assertEquals(dn1.getBlockPoolUsedPercent(),
assertEquals(dn1.getBlockPoolUsedPercent(),
dn2.getBlockPoolUsedPercent(), DELTA);
assertEquals(dn1.getCapacity(), dn2.getCapacity());
assertEquals(dn1.getDatanodeReport(), dn2.getDatanodeReport());
@ -336,20 +338,20 @@ public class TestPBHelper {
assertEquals(dn1.getLevel(), dn2.getLevel());
assertEquals(dn1.getNetworkLocation(), dn2.getNetworkLocation());
}
@Test
public void testConvertExtendedBlock() {
ExtendedBlock b = getExtendedBlock();
ExtendedBlockProto bProto = PBHelperClient.convert(b);
ExtendedBlock b1 = PBHelperClient.convert(bProto);
assertEquals(b, b1);
b.setBlockId(-1);
bProto = PBHelperClient.convert(b);
b1 = PBHelperClient.convert(bProto);
assertEquals(b, b1);
}
@Test
public void testConvertRecoveringBlock() {
DatanodeInfo di1 = DFSTestUtil.getLocalDatanodeInfo();
@ -365,7 +367,7 @@ public class TestPBHelper {
compare(dnInfo[0], dnInfo1[0]);
}
}
@Test
public void testConvertBlockRecoveryCommand() {
DatanodeInfo di1 = DFSTestUtil.getLocalDatanodeInfo();
@ -376,14 +378,14 @@ public class TestPBHelper {
new RecoveringBlock(getExtendedBlock(1), dnInfo, 3),
new RecoveringBlock(getExtendedBlock(2), dnInfo, 3)
);
BlockRecoveryCommand cmd = new BlockRecoveryCommand(blks);
BlockRecoveryCommandProto proto = PBHelper.convert(cmd);
assertEquals(1, proto.getBlocks(0).getBlock().getB().getBlockId());
assertEquals(2, proto.getBlocks(1).getBlock().getB().getBlockId());
BlockRecoveryCommand cmd2 = PBHelper.convert(proto);
List<RecoveringBlock> cmd2Blks = Lists.newArrayList(
cmd2.getRecoveringBlocks());
assertEquals(blks.get(0).getBlock(), cmd2Blks.get(0).getBlock());
@ -391,8 +393,8 @@ public class TestPBHelper {
assertEquals(Joiner.on(",").join(blks), Joiner.on(",").join(cmd2Blks));
assertEquals(cmd.toString(), cmd2.toString());
}
@Test
public void testConvertText() {
Text t = new Text("abc".getBytes());
@ -400,7 +402,7 @@ public class TestPBHelper {
Text t1 = new Text(s);
assertEquals(t, t1);
}
@Test
public void testConvertBlockToken() {
Token<BlockTokenIdentifier> token = new Token<BlockTokenIdentifier>(
@ -410,7 +412,7 @@ public class TestPBHelper {
Token<BlockTokenIdentifier> token2 = PBHelperClient.convert(tokenProto);
compare(token, token2);
}
@Test
public void testConvertNamespaceInfo() {
NamespaceInfo info = new NamespaceInfo(37, "clusterID", "bpID", 2300);
@ -455,7 +457,7 @@ public class TestPBHelper {
AdminStates.DECOMMISSION_INPROGRESS),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2",
AdminStates.DECOMMISSIONED),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3",
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3",
AdminStates.NORMAL),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h4",
AdminStates.NORMAL),
@ -523,7 +525,7 @@ public class TestPBHelper {
compare(lbl.get(i), lbl2.get(2));
}
}
@Test
public void testConvertLocatedBlockArray() {
LocatedBlock [] lbl = new LocatedBlock[3];
@ -563,7 +565,7 @@ public class TestPBHelper {
DatanodeStorage dns2 = PBHelperClient.convert(proto);
compare(dns1, dns2);
}
@Test
public void testConvertBlockCommand() {
Block[] blocks = new Block[] { new Block(21), new Block(22) };
@ -596,7 +598,7 @@ public class TestPBHelper {
}
}
}
@Test
public void testChecksumTypeProto() {
assertEquals(DataChecksum.Type.NULL,
@ -678,4 +680,24 @@ public class TestPBHelper {
DatanodeInfo dnInfos3 = PBHelperClient.convert(b.build());
assertEquals(dnInfos0.getNonDfsUsed(), dnInfos3.getNonDfsUsed());
}
@Test
public void testSlowPeerInfoPBHelper() {
// Test with a map that has a few slow peer entries.
final SlowPeerReports slowPeers = SlowPeerReports.create(
ImmutableMap.of("peer1", 0.0, "peer2", 1.0, "peer3", 2.0));
SlowPeerReports slowPeersConverted1 = PBHelper.convertSlowPeerInfo(
PBHelper.convertSlowPeerInfo(slowPeers));
assertTrue(
"Expected map:" + slowPeers + ", got map:" +
slowPeersConverted1.getSlowPeers(),
slowPeersConverted1.equals(slowPeers));
// Test with an empty map.
SlowPeerReports slowPeersConverted2 = PBHelper.convertSlowPeerInfo(
PBHelper.convertSlowPeerInfo(SlowPeerReports.EMPTY_REPORT));
assertTrue(
"Expected empty map:" + ", got map:" + slowPeersConverted2,
slowPeersConverted2.equals(SlowPeerReports.EMPTY_REPORT));
}
}

View File

@ -42,13 +42,23 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
/**
* Test if FSNamesystem handles heartbeat right
*/
public class TestHeartbeatHandling {
/**
* Set a timeout for every test case.
*/
@Rule
public Timeout testTimeout = new Timeout(300_000);
/**
* Test if
* {@link FSNamesystem#handleHeartbeat}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Supplier;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeRef
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
@ -111,7 +113,7 @@ public class TestNameNodePrunesMissingStorages {
// Stop the DataNode and send fake heartbeat with missing storage.
cluster.stopDataNode(0);
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
0, null, true);
0, null, true, SlowPeerReports.EMPTY_REPORT);
// Check that the missing storage was pruned.
assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));

View File

@ -0,0 +1,226 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker.ReportForJson;
import org.apache.hadoop.util.FakeTimer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Set;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
* Tests for {@link SlowPeerTracker}.
*/
public class TestSlowPeerTracker {
public static final Logger LOG = LoggerFactory.getLogger(
TestSlowPeerTracker.class);
/**
* Set a timeout for every test case.
*/
@Rule
public Timeout testTimeout = new Timeout(300_000);
private Configuration conf;
private SlowPeerTracker tracker;
private FakeTimer timer;
private long reportValidityMs;
@Before
public void setup() {
conf = new HdfsConfiguration();
timer = new FakeTimer();
tracker = new SlowPeerTracker(conf, timer);
reportValidityMs = tracker.getReportValidityMs();
}
/**
* Edge case, there are no reports to retrieve.
*/
@Test
public void testEmptyReports() {
assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
assertTrue(tracker.getReportsForNode("noSuchNode").isEmpty());
}
@Test
public void testReportsAreRetrieved() {
tracker.addReport("node2", "node1");
tracker.addReport("node3", "node1");
tracker.addReport("node3", "node2");
assertThat(tracker.getReportsForAllDataNodes().size(), is(2));
assertThat(tracker.getReportsForNode("node2").size(), is(1));
assertThat(tracker.getReportsForNode("node3").size(), is(2));
assertThat(tracker.getReportsForNode("node1").size(), is(0));
}
/**
* Test that when all reports are expired, we get back nothing.
*/
@Test
public void testAllReportsAreExpired() {
tracker.addReport("node2", "node1");
tracker.addReport("node3", "node2");
tracker.addReport("node1", "node3");
// No reports should expire after 1ms.
timer.advance(1);
assertThat(tracker.getReportsForAllDataNodes().size(), is(3));
// All reports should expire after REPORT_VALIDITY_MS.
timer.advance(reportValidityMs);
assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
assertTrue(tracker.getReportsForNode("node1").isEmpty());
assertTrue(tracker.getReportsForNode("node2").isEmpty());
assertTrue(tracker.getReportsForNode("node3").isEmpty());
}
/**
* Test the case when a subset of reports has expired.
* Ensure that we only get back non-expired reports.
*/
@Test
public void testSomeReportsAreExpired() {
tracker.addReport("node3", "node1");
tracker.addReport("node3", "node2");
timer.advance(reportValidityMs);
tracker.addReport("node3", "node4");
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
assertThat(tracker.getReportsForNode("node3").size(), is(1));
assertTrue(tracker.getReportsForNode("node3").contains("node4"));
}
/**
* Test the case when an expired report is replaced by a valid one.
*/
@Test
public void testReplacement() {
tracker.addReport("node2", "node1");
timer.advance(reportValidityMs); // Expire the report.
assertThat(tracker.getReportsForAllDataNodes().size(), is(0));
// This should replace the expired report with a newer valid one.
tracker.addReport("node2", "node1");
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
assertThat(tracker.getReportsForNode("node2").size(), is(1));
}
@Test
public void testGetJson() throws IOException {
tracker.addReport("node1", "node2");
tracker.addReport("node2", "node3");
tracker.addReport("node2", "node1");
tracker.addReport("node4", "node1");
final Set<ReportForJson> reports = getAndDeserializeJson();
// And ensure its contents are what we expect.
assertThat(reports.size(), is(3));
assertTrue(isNodeInReports(reports, "node1"));
assertTrue(isNodeInReports(reports, "node2"));
assertTrue(isNodeInReports(reports, "node4"));
assertFalse(isNodeInReports(reports, "node3"));
}
@Test
public void testGetJsonSizeIsLimited() throws IOException {
tracker.addReport("node1", "node2");
tracker.addReport("node1", "node3");
tracker.addReport("node2", "node3");
tracker.addReport("node2", "node4");
tracker.addReport("node3", "node4");
tracker.addReport("node3", "node5");
tracker.addReport("node4", "node6");
tracker.addReport("node5", "node6");
tracker.addReport("node5", "node7");
tracker.addReport("node6", "node7");
tracker.addReport("node6", "node8");
final Set<ReportForJson> reports = getAndDeserializeJson();
// Ensure that node4 is not in the list since it was
// tagged by just one peer and we already have 5 other nodes.
assertFalse(isNodeInReports(reports, "node4"));
// Remaining nodes should be in the list.
assertTrue(isNodeInReports(reports, "node1"));
assertTrue(isNodeInReports(reports, "node2"));
assertTrue(isNodeInReports(reports, "node3"));
assertTrue(isNodeInReports(reports, "node5"));
assertTrue(isNodeInReports(reports, "node6"));
}
@Test
public void testLowRankedElementsIgnored() throws IOException {
// Insert 5 nodes with 2 peer reports each.
for (int i = 0; i < 5; ++i) {
tracker.addReport("node" + i, "reporter1");
tracker.addReport("node" + i, "reporter2");
}
// Insert 10 nodes with 1 peer report each.
for (int i = 10; i < 20; ++i) {
tracker.addReport("node" + i, "reporter1");
}
final Set<ReportForJson> reports = getAndDeserializeJson();
// Ensure that only the first 5 nodes with two reports each were
// included in the JSON.
for (int i = 0; i < 5; ++i) {
assertTrue(isNodeInReports(reports, "node" + i));
}
}
private boolean isNodeInReports(
Set<ReportForJson> reports, String node) {
for (ReportForJson report : reports) {
if (report.getSlowNode().equalsIgnoreCase(node)) {
return true;
}
}
return false;
}
private Set<ReportForJson> getAndDeserializeJson()
throws IOException {
final String json = tracker.getJson();
LOG.info("Got JSON: {}", json);
return (new ObjectMapper()).readValue(
json, new TypeReference<Set<ReportForJson>>() {});
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.junit.Assert;
@ -136,7 +137,8 @@ public class InternalDataNodeTestUtils {
Mockito.any(StorageReport[].class), Mockito.anyLong(),
Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(),
Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean())).thenReturn(
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class))).thenReturn(
new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
.nextLong() | 1L));

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -119,7 +120,7 @@ public class TestBPOfferService {
Mockito.doReturn(conf).when(mockDn).getConf();
Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
.when(mockDn).getMetrics();
.when(mockDn).getMetrics();
// Set up a simulated dataset with our fake BP
mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@ -152,7 +153,8 @@ public class TestBPOfferService {
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean());
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class));
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
datanodeCommands[nnIdx] = new DatanodeCommand[0];
return mock;

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
@ -83,6 +84,7 @@ import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
@ -186,7 +188,8 @@ public class TestBlockRecovery {
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean()))
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class)))
.thenReturn(new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
@ -252,15 +255,15 @@ public class TestBlockRecovery {
}
/** Sync two replicas */
private void testSyncReplicas(ReplicaRecoveryInfo replica1,
private void testSyncReplicas(ReplicaRecoveryInfo replica1,
ReplicaRecoveryInfo replica2,
InterDatanodeProtocol dn1,
InterDatanodeProtocol dn2,
long expectLen) throws IOException {
DatanodeInfo[] locs = new DatanodeInfo[]{
mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
RecoveringBlock rBlock = new RecoveringBlock(block,
RecoveringBlock rBlock = new RecoveringBlock(block,
locs, RECOVERY_ID);
ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2);
BlockRecord record1 = new BlockRecord(
@ -269,7 +272,7 @@ public class TestBlockRecovery {
DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn2, replica2);
syncList.add(record1);
syncList.add(record2);
when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
anyLong(), anyLong())).thenReturn("storage1");
when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
@ -279,7 +282,7 @@ public class TestBlockRecovery {
recoveryWorker.new RecoveryTaskContiguous(rBlock);
RecoveryTaskContiguous.syncBlock(syncList);
}
/**
* BlockRecovery_02.8.
* Two replicas are in Finalized state
@ -290,9 +293,9 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-2, ReplicaState.FINALIZED);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@ -305,9 +308,9 @@ public class TestBlockRecovery {
REPLICA_LEN1);
// two finalized replicas have different length
replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.FINALIZED);
try {
@ -318,10 +321,10 @@ public class TestBlockRecovery {
"Inconsistent size of finalized replicas. "));
}
}
/**
* BlockRecovery_02.9.
* One replica is Finalized and another is RBW.
* One replica is Finalized and another is RBW.
* @throws IOException in case of an error
*/
@Test(timeout=60000)
@ -329,11 +332,11 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
// rbw and finalized replicas have the same length
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RBW);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@ -344,11 +347,11 @@ public class TestBlockRecovery {
REPLICA_LEN1);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
// rbw replica has a different length from the finalized one
replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
dn1 = mock(InterDatanodeProtocol.class);
@ -359,10 +362,10 @@ public class TestBlockRecovery {
verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
}
/**
* BlockRecovery_02.10.
* One replica is Finalized and another is RWR.
* One replica is Finalized and another is RWR.
* @throws IOException in case of an error
*/
@Test(timeout=60000)
@ -370,11 +373,11 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
// rbw and finalized replicas have the same length
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@ -385,11 +388,11 @@ public class TestBlockRecovery {
REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
// rbw replica has a different length from the finalized one
replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
dn1 = mock(InterDatanodeProtocol.class);
@ -401,7 +404,7 @@ public class TestBlockRecovery {
verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
}
/**
* BlockRecovery_02.11.
* Two replicas are RBW.
@ -412,9 +415,9 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@ -425,10 +428,10 @@ public class TestBlockRecovery {
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
}
/**
* BlockRecovery_02.12.
* One replica is RBW and another is RWR.
* One replica is RBW and another is RWR.
* @throws IOException in case of an error
*/
@Test(timeout=60000)
@ -436,9 +439,9 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@ -450,9 +453,9 @@ public class TestBlockRecovery {
verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
}
/**
* BlockRecovery_02.13.
* BlockRecovery_02.13.
* Two replicas are RWR.
* @throws IOException in case of an error
*/
@ -461,9 +464,9 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RWR);
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RWR);
InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
@ -471,10 +474,10 @@ public class TestBlockRecovery {
long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
testSyncReplicas(replica1, replica2, dn1, dn2, minLen);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
}
}
private Collection<RecoveringBlock> initRecoveringBlocks() throws IOException {
Collection<RecoveringBlock> blocks = new ArrayList<RecoveringBlock>(1);
@ -661,10 +664,10 @@ public class TestBlockRecovery {
streams.close();
}
}
/**
* Test to verify the race between finalizeBlock and Lease recovery
*
*
* @throws Exception
*/
@Test(timeout = 20000)
@ -682,11 +685,11 @@ public class TestBlockRecovery {
FSDataOutputStream out = fs.create(path);
out.writeBytes("data");
out.hsync();
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path));
final LocatedBlock block = blocks.get(0);
final DataNode dataNode = cluster.getDataNodes().get(0);
final AtomicBoolean recoveryInitResult = new AtomicBoolean(true);
Thread recoveryThread = new Thread() {
@Override
@ -716,7 +719,7 @@ public class TestBlockRecovery {
}
Assert.assertTrue("Recovery should be initiated successfully",
recoveryInitResult.get());
dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
.getGenerationStamp() + 1, block.getBlock().getBlockId(),
block.getBlockSize());

View File

@ -20,10 +20,15 @@ package org.apache.hadoop.hdfs.server.datanode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import static java.lang.Math.abs;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
@ -31,11 +36,6 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
/**
@ -51,6 +51,7 @@ public class TestBpServiceActorScheduler {
private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds
private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS;
private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds
private static final long SLOW_PEER_REPORT_INTERVAL_MS = 10000; // 10 seconds
private final Random random = new Random(System.nanoTime());
@Test
@ -180,13 +181,28 @@ public class TestBpServiceActorScheduler {
}
}
@Test
public void testSlowPeerReportScheduling() {
for (final long now : getTimestamps()) {
Scheduler scheduler = makeMockScheduler(now);
assertTrue(scheduler.isSlowPeersReportDue(now));
scheduler.scheduleNextSlowPeerReport();
assertFalse(scheduler.isSlowPeersReportDue(now));
assertFalse(scheduler.isSlowPeersReportDue(now + 1));
assertTrue(scheduler.isSlowPeersReportDue(
now + SLOW_PEER_REPORT_INTERVAL_MS));
}
}
private Scheduler makeMockScheduler(long now) {
LOG.info("Using now = " + now);
Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS,
LIFELINE_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
Scheduler mockScheduler = spy(new Scheduler(
HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS,
BLOCK_REPORT_INTERVAL_MS, SLOW_PEER_REPORT_INTERVAL_MS));
doReturn(now).when(mockScheduler).monotonicNow();
mockScheduler.nextBlockReportTime = now;
mockScheduler.nextHeartbeatTime = now;
mockScheduler.nextSlowPeersReportTime = now;
return mockScheduler;
}

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
@ -167,7 +168,8 @@ public class TestDataNodeLifeline {
anyInt(),
anyInt(),
any(VolumeFailureSummary.class),
anyBoolean());
anyBoolean(),
any(SlowPeerReports.class));
// Intercept lifeline to trigger latch count-down on each call.
doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@ -230,7 +232,8 @@ public class TestDataNodeLifeline {
anyInt(),
anyInt(),
any(VolumeFailureSummary.class),
anyBoolean());
anyBoolean(),
any(SlowPeerReports.class));
// While waiting on the latch for the expected number of heartbeat messages,
// poll DataNode tracking information. We expect that the DataNode always

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -41,9 +42,10 @@ public class TestDataNodePeerMetrics {
final int numOpsPerIteration = 1000;
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
windowSize);
conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
conf.setTimeDuration(
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
windowSize, TimeUnit.SECONDS);
conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
numWindows);
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@ -218,7 +219,8 @@ public class TestDatanodeProtocolRetryPolicy {
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean());
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class));
dn = new DataNode(conf, locations, null, null) {
@Override

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
@ -172,7 +173,7 @@ public class TestFsDatasetCache {
(DatanodeRegistration) any(),
(StorageReport[]) any(), anyLong(), anyLong(),
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
anyBoolean());
anyBoolean(), any(SlowPeerReports.class));
}
private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.junit.After;
@ -106,7 +107,8 @@ public class TestStorageReport {
any(DatanodeRegistration.class),
captor.capture(),
anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean());
Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class));
StorageReport[] reports = captor.getValue();

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.metrics;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Random;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
* Test that the {@link DataNodePeerMetrics} class is able to detect
* outliers i.e. slow nodes via the metrics it maintains.
*/
public class TestDataNodeOutlierDetectionViaMetrics {
public static final Logger LOG =
LoggerFactory.getLogger(TestDataNodeOutlierDetectionViaMetrics.class);
/**
* Set a timeout for every test case.
*/
@Rule
public Timeout testTimeout = new Timeout(300_000);
// A few constants to keep the test run time short.
private static final int WINDOW_INTERVAL_SECONDS = 3;
private static final int ROLLING_AVERAGE_WINDOWS = 10;
private static final int SLOW_NODE_LATENCY_MS = 20_000;
private static final int FAST_NODE_MAX_LATENCY_MS = 5;
private Random random = new Random(System.currentTimeMillis());
@Before
public void setup() {
GenericTestUtils.setLogLevel(DataNodePeerMetrics.LOG, Level.ALL);
GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL);
}
/**
* Test that a very slow peer is detected as an outlier.
*/
@Test
public void testOutlierIsDetected() throws Exception {
final String slowNodeName = "SlowNode";
DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
"PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
ROLLING_AVERAGE_WINDOWS);
injectFastNodesSamples(peerMetrics);
injectSlowNodeSamples(peerMetrics, slowNodeName);
// Trigger a snapshot.
peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
final Map<String, Double> outliers = peerMetrics.getOutliers();
LOG.info("Got back outlier nodes: {}", outliers);
assertThat(outliers.size(), is(1));
assertTrue(outliers.containsKey(slowNodeName));
}
/**
* Test that when there are no outliers, we get back nothing.
*/
@Test
public void testWithNoOutliers() throws Exception {
DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
"PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
ROLLING_AVERAGE_WINDOWS);
injectFastNodesSamples(peerMetrics);
// Trigger a snapshot.
peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
// Ensure that we get back the outlier.
assertTrue(peerMetrics.getOutliers().isEmpty());
}
/**
* Inject fake stats for MIN_OUTLIER_DETECTION_PEERS fast nodes.
*
* @param peerMetrics
*/
public void injectFastNodesSamples(DataNodePeerMetrics peerMetrics) {
for (int nodeIndex = 0;
nodeIndex < SlowNodeDetector.getMinOutlierDetectionPeers();
++nodeIndex) {
final String nodeName = "FastNode-" + nodeIndex;
LOG.info("Generating stats for node {}", nodeName);
for (int i = 0;
i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES;
++i) {
peerMetrics.addSendPacketDownstream(
nodeName, random.nextInt(FAST_NODE_MAX_LATENCY_MS));
}
}
}
/**
* Inject fake stats for one extremely slow node.
*/
public void injectSlowNodeSamples(
DataNodePeerMetrics peerMetrics, String slowNodeName)
throws InterruptedException {
// And the one slow node.
for (int i = 0;
i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES;
++i) {
peerMetrics.addSendPacketDownstream(
slowNodeName, SLOW_NODE_LATENCY_MS);
}
}
}

View File

@ -0,0 +1,356 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.metrics;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link SlowNodeDetector}.
*/
public class TestSlowNodeDetector {
public static final Logger LOG =
LoggerFactory.getLogger(TestSlowNodeDetector.class);
/**
* Set a timeout for every test case.
*/
@Rule
public Timeout testTimeout = new Timeout(300_000);
private final static double LOW_THRESHOLD = 1000;
private final static long MIN_OUTLIER_DETECTION_PEERS = 3;
// Randomly generated test cases for median and MAD. The first entry
// in each pair is the expected median and the second entry is the
// expected Median Absolute Deviation. The small sets of size 1 and 2
// exist to test the edge cases however in practice the MAD of a very
// small set is not useful.
private Map<List<Double>, Pair<Double, Double>> medianTestMatrix =
new ImmutableMap.Builder<List<Double>, Pair<Double, Double>>()
// Single element.
.put(new ImmutableList.Builder<Double>()
.add(9.6502431302).build(),
Pair.of(9.6502431302, 0.0))
// Two elements.
.put(new ImmutableList.Builder<Double>()
.add(1.72168104625)
.add(11.7872544459).build(),
Pair.of(6.75446774606, 7.4616095611))
// The Remaining lists were randomly generated with sizes 3-10.
.put(new ImmutableList.Builder<Double>()
.add(76.2635686249)
.add(27.0652018553)
.add(1.3868476443)
.add(49.7194624164)
.add(47.385680883)
.add(57.8721199173).build(),
Pair.of(48.5525716497, 22.837202532))
.put(new ImmutableList.Builder<Double>()
.add(86.0573389581)
.add(93.2399572424)
.add(64.9545429122)
.add(35.8509730085)
.add(1.6534313654).build(),
Pair.of(64.9545429122, 41.9360180373))
.put(new ImmutableList.Builder<Double>()
.add(5.00127007366)
.add(37.9790589127)
.add(67.5784746266).build(),
Pair.of(37.9790589127, 43.8841594039))
.put(new ImmutableList.Builder<Double>()
.add(1.43442932944)
.add(70.6769829947)
.add(37.47579656)
.add(51.1126141394)
.add(72.2465914419)
.add(32.2930549225)
.add(39.677459781).build(),
Pair.of(39.677459781, 16.9537852208))
.put(new ImmutableList.Builder<Double>()
.add(26.7913745214)
.add(68.9833706658)
.add(29.3882180746)
.add(68.3455244453)
.add(74.9277265022)
.add(12.1469972942)
.add(72.5395402683)
.add(7.87917492506)
.add(33.3253447774)
.add(72.2753759125).build(),
Pair.of(50.8354346113, 31.9881230079))
.put(new ImmutableList.Builder<Double>()
.add(38.6482290705)
.add(88.0690746319)
.add(50.6673611649)
.add(64.5329814115)
.add(25.2580979294)
.add(59.6709630711)
.add(71.5406993741)
.add(81.3073035091)
.add(20.5549547284).build(),
Pair.of(59.6709630711, 31.1683520683))
.put(new ImmutableList.Builder<Double>()
.add(87.352734249)
.add(65.4760359094)
.add(28.9206803169)
.add(36.5908574008)
.add(87.7407653175)
.add(99.3704511335)
.add(41.3227434076)
.add(46.2713494909)
.add(3.49940920921).build(),
Pair.of(46.2713494909, 28.4729106898))
.put(new ImmutableList.Builder<Double>()
.add(95.3251533286)
.add(27.2777870437)
.add(43.73477168).build(),
Pair.of(43.73477168, 24.3991619317))
.build();
// A test matrix that maps inputs to the expected output list of
// slow nodes i.e. outliers.
private Map<Map<String, Double>, Set<String>> outlierTestMatrix =
new ImmutableMap.Builder<Map<String, Double>, Set<String>>()
// The number of samples is too low and all samples are below
// the low threshold. Nothing should be returned.
.put(ImmutableMap.of(
"n1", 0.0,
"n2", LOW_THRESHOLD + 1),
ImmutableSet.<String>of())
// A statistical outlier below the low threshold must not be
// returned.
.put(ImmutableMap.of(
"n1", 1.0,
"n2", 1.0,
"n3", LOW_THRESHOLD - 1),
ImmutableSet.<String>of())
// A statistical outlier above the low threshold must be returned.
.put(ImmutableMap.of(
"n1", 1.0,
"n2", 1.0,
"n3", LOW_THRESHOLD + 1),
ImmutableSet.of("n3"))
// A statistical outlier must not be returned if it is within a
// MEDIAN_MULTIPLIER multiple of the median.
.put(ImmutableMap.of(
"n1", LOW_THRESHOLD + 0.1,
"n2", LOW_THRESHOLD + 0.1,
"n3", LOW_THRESHOLD * SlowNodeDetector.MEDIAN_MULTIPLIER - 0.1),
ImmutableSet.<String>of())
// A statistical outlier must be returned if it is outside a
// MEDIAN_MULTIPLIER multiple of the median.
.put(ImmutableMap.of(
"n1", LOW_THRESHOLD + 0.1,
"n2", LOW_THRESHOLD + 0.1,
"n3", (LOW_THRESHOLD + 0.1) *
SlowNodeDetector.MEDIAN_MULTIPLIER + 0.1),
ImmutableSet.of("n3"))
// Only the statistical outliers n3 and n11 should be returned.
.put(new ImmutableMap.Builder<String, Double>()
.put("n1", 1029.4322)
.put("n2", 2647.876)
.put("n3", 9194.312)
.put("n4", 2.2)
.put("n5", 2012.92)
.put("n6", 1843.81)
.put("n7", 1201.43)
.put("n8", 6712.01)
.put("n9", 3278.554)
.put("n10", 2091.765)
.put("n11", 9194.77).build(),
ImmutableSet.of("n3", "n11"))
// The following input set has multiple outliers.
// - The low outliers (n4, n6) should not be returned.
// - High outlier n2 is within 3 multiples of the median
// and so it should not be returned.
// - Only the high outlier n8 should be returned.
.put(new ImmutableMap.Builder<String, Double>()
.put("n1", 5002.0)
.put("n2", 9001.0)
.put("n3", 5004.0)
.put("n4", 1001.0)
.put("n5", 5003.0)
.put("n6", 2001.0)
.put("n7", 5000.0)
.put("n8", 101002.0)
.put("n9", 5001.0)
.put("n10", 5002.0)
.put("n11", 5105.0)
.put("n12", 5006.0).build(),
ImmutableSet.of("n8"))
.build();
private SlowNodeDetector slowNodeDetector;
@Before
public void setup() {
slowNodeDetector = new SlowNodeDetector((long) LOW_THRESHOLD);
SlowNodeDetector.setMinOutlierDetectionPeers(MIN_OUTLIER_DETECTION_PEERS);
GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL);
}
@Test
public void testOutliersFromTestMatrix() {
for (Map.Entry<Map<String, Double>, Set<String>> entry :
outlierTestMatrix.entrySet()) {
LOG.info("Verifying set {}", entry.getKey());
final Set<String> outliers =
slowNodeDetector.getOutliers(entry.getKey()).keySet();
assertTrue(
"Running outlier detection on " + entry.getKey() +
" was expected to yield set " + entry.getValue() + ", but " +
" we got set " + outliers,
outliers.equals(entry.getValue()));
}
}
/**
* Unit test for {@link SlowNodeDetector#computeMedian(List)}.
*/
@Test
public void testMediansFromTestMatrix() {
for (Map.Entry<List<Double>, Pair<Double, Double>> entry :
medianTestMatrix.entrySet()) {
final List<Double> inputList = new ArrayList<>(entry.getKey());
Collections.sort(inputList);
final Double median = SlowNodeDetector.computeMedian(inputList);
final Double expectedMedian = entry.getValue().getLeft();
// Ensure that the median is within 0.001% of expected.
// We need some fudge factor for floating point comparison.
final Double errorPercent =
Math.abs(median - expectedMedian) * 100.0 / expectedMedian;
assertTrue(
"Set " + inputList + "; Expected median: " +
expectedMedian + ", got: " + median,
errorPercent < 0.001);
}
}
/**
* Unit test for {@link SlowNodeDetector#computeMad(List)}.
*/
@Test
public void testMadsFromTestMatrix() {
for (Map.Entry<List<Double>, Pair<Double, Double>> entry :
medianTestMatrix.entrySet()) {
final List<Double> inputList = new ArrayList<>(entry.getKey());
Collections.sort(inputList);
final Double mad = SlowNodeDetector.computeMad(inputList);
final Double expectedMad = entry.getValue().getRight();
// Ensure that the MAD is within 0.001% of expected.
// We need some fudge factor for floating point comparison.
if (entry.getKey().size() > 1) {
final Double errorPercent =
Math.abs(mad - expectedMad) * 100.0 / expectedMad;
assertTrue(
"Set " + entry.getKey() + "; Expected M.A.D.: " +
expectedMad + ", got: " + mad,
errorPercent < 0.001);
} else {
// For an input list of size 1, the MAD should be 0.0.
final Double epsilon = 0.000001; // Allow for some FP math error.
assertTrue(
"Set " + entry.getKey() + "; Expected M.A.D.: " +
expectedMad + ", got: " + mad,
mad < epsilon);
}
}
}
/**
* Verify that {@link SlowNodeDetector#computeMedian(List)} throws when
* passed an empty list.
*/
@Test(expected=IllegalArgumentException.class)
public void testMedianOfEmptyList() {
SlowNodeDetector.computeMedian(Collections.<Double>emptyList());
}
/**
* Verify that {@link SlowNodeDetector#computeMad(List)} throws when
* passed an empty list.
*/
@Test(expected=IllegalArgumentException.class)
public void testMadOfEmptyList() {
SlowNodeDetector.computeMedian(Collections.<Double>emptyList());
}
private static class Pair<L, R> {
private final L l;
private final R r;
Pair(L l, R r) {
this.l = l;
this.r = r;
}
L getLeft() {
return l;
}
R getRight() {
return r;
}
static <L, R> Pair of(L l, R r) {
return new Pair<>(l, r);
}
}
}

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -951,7 +952,8 @@ public class NNThroughputBenchmark implements Tool {
StorageReport[] rep = { new StorageReport(storage, false,
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
0L, 0L, 0, 0, 0, null, true).getCommands();
0L, 0L, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT).getCommands();
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) {
@ -1000,7 +1002,8 @@ public class NNThroughputBenchmark implements Tool {
StorageReport[] rep = { new StorageReport(storage,
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
rep, 0L, 0L, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT).getCommands();
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
@ -117,7 +118,8 @@ public class NameNodeAdapter {
DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
return namesystem.handleHeartbeat(nodeReg,
BlockManagerTestUtil.getStorageReportsForDatanode(dd),
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true);
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT);
}
public static boolean setReplication(final FSNamesystem ns,

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -131,7 +132,8 @@ public class TestDeadDatanode {
new DatanodeStorage(reg.getDatanodeUuid()),
false, 0, 0, 0, 0, 0) };
DatanodeCommand[] cmd =
dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT).getCommands();
assertEquals(1, cmd.length);
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
.getAction());

View File

@ -68,6 +68,10 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
// Purposely hidden, based on comments in DFSConfigKeys
configurationPropsToSkipCompare
.add(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY);
configurationPropsToSkipCompare
.add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY);
configurationPropsToSkipCompare
.add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY);
// Fully deprecated properties?
configurationPropsToSkipCompare