HDFS-16521. DFS API to retrieve slow datanodes (#4107)

Signed-off-by: stack <stack@apache.org>
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Viraj Jasani 2022-05-02 14:05:40 -07:00 committed by GitHub
parent d4a91bd0c0
commit 2dfa928a20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 361 additions and 45 deletions

View File

@ -3491,4 +3491,12 @@ public void removeLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
private boolean isLocatedBlocksRefresherEnabled() {
return clientContext.isLocatedBlocksRefresherEnabled();
}
public DatanodeInfo[] slowDatanodeReport() throws IOException {
checkOpen();
try (TraceScope ignored = tracer.newScope("slowDatanodeReport")) {
return namenode.getSlowDatanodeReport();
}
}
}

View File

@ -3887,4 +3887,15 @@ public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
throws IOException {
return new FileSystemMultipartUploaderBuilder(this, basePath);
}
/**
* Retrieve stats for slow running datanodes.
*
* @return An array of slow datanode info.
* @throws IOException If an I/O error occurs.
*/
public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
return dfs.slowDatanodeReport();
}
}

View File

@ -2318,4 +2318,14 @@ public long getUsed() throws IOException {
}
return this.vfs.getUsed();
}
@Override
public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
if (this.vfs == null) {
return super.getSlowDatanodeStats();
}
checkDefaultDFS(defaultDFS, "getSlowDatanodeStats");
return defaultDFS.getSlowDatanodeStats();
}
}

View File

@ -1868,4 +1868,16 @@ BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
*/
@AtMostOnce
void satisfyStoragePolicy(String path) throws IOException;
/**
* Get report on all of the slow Datanodes. Slow running datanodes are identified based on
* the Outlier detection algorithm, if slow peer tracking is enabled for the DFS cluster.
*
* @return Datanode report for slow running datanodes.
* @throws IOException If an I/O error occurs.
*/
@Idempotent
@ReadOnly
DatanodeInfo[] getSlowDatanodeReport() throws IOException;
}

View File

@ -143,6 +143,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
@ -2065,6 +2066,18 @@ public void satisfyStoragePolicy(String src) throws IOException {
}
}
@Override
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
GetSlowDatanodeReportRequestProto req =
GetSlowDatanodeReportRequestProto.newBuilder().build();
try {
return PBHelperClient.convert(
rpcProxy.getSlowDatanodeReport(null, req).getDatanodeInfoProtoList());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public HAServiceProtocol.HAServiceState getHAServiceState()
throws IOException {

View File

@ -424,6 +424,13 @@ message GetPreferredBlockSizeResponseProto {
required uint64 bsize = 1;
}
message GetSlowDatanodeReportRequestProto {
}
message GetSlowDatanodeReportResponseProto {
repeated DatanodeInfoProto datanodeInfoProto = 1;
}
enum SafeModeActionProto {
SAFEMODE_LEAVE = 1;
SAFEMODE_ENTER = 2;
@ -1070,4 +1077,6 @@ service ClientNamenodeProtocol {
returns(SatisfyStoragePolicyResponseProto);
rpc getHAServiceState(HAServiceStateRequestProto)
returns(HAServiceStateResponseProto);
rpc getSlowDatanodeReport(GetSlowDatanodeReportRequestProto)
returns(GetSlowDatanodeReportResponseProto);
}

View File

@ -76,7 +76,8 @@ public class TestReadOnly {
"getQuotaUsage",
"msync",
"getHAServiceState",
"getECTopologyResultForPolicies"
"getECTopologyResultForPolicies",
"getSlowDatanodeReport"
)
);

View File

@ -1815,6 +1815,12 @@ public void satisfyStoragePolicy(String path) throws IOException {
storagePolicy.satisfyStoragePolicy(path);
}
@Override
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
return rpcServer.getSlowDatanodeReport(true, 0);
}
@Override
public HAServiceProtocol.HAServiceState getHAServiceState() {
if (rpcServer.isSafeMode()) {

View File

@ -1095,24 +1095,7 @@ public DatanodeInfo[] getDatanodeReport(
Map<FederationNamespaceInfo, DatanodeInfo[]> results =
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
results.entrySet()) {
FederationNamespaceInfo ns = entry.getKey();
DatanodeInfo[] result = entry.getValue();
for (DatanodeInfo node : result) {
String nodeId = node.getXferAddr();
DatanodeInfo dn = datanodesMap.get(nodeId);
if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
// Add the subcluster as a suffix to the network location
node.setNetworkLocation(
NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
node.getNetworkLocation());
datanodesMap.put(nodeId, node);
} else {
LOG.debug("{} is in multiple subclusters", nodeId);
}
}
}
updateDnMap(results, datanodesMap);
// Map -> Array
Collection<DatanodeInfo> datanodes = datanodesMap.values();
return toArray(datanodes, DatanodeInfo.class);
@ -1578,6 +1561,11 @@ public void satisfyStoragePolicy(String path) throws IOException {
clientProto.satisfyStoragePolicy(path);
}
@Override // ClientProtocol
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
return clientProto.getSlowDatanodeReport();
}
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
long minBlockSize, long hotBlockTimeInterval) throws IOException {
@ -1994,6 +1982,53 @@ public String refreshFairnessPolicyController() {
return rpcClient.refreshFairnessPolicyController(new Configuration());
}
/**
* Get the slow running datanodes report with a timeout.
*
* @param requireResponse If we require all the namespaces to report.
* @param timeOutMs Time out for the reply in milliseconds.
* @return List of datanodes.
* @throws IOException If it cannot get the report.
*/
public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOutMs)
throws IOException {
checkOperation(OperationCategory.UNCHECKED);
Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getSlowDatanodeReport");
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, DatanodeInfo[]> results =
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);
updateDnMap(results, datanodesMap);
// Map -> Array
Collection<DatanodeInfo> datanodes = datanodesMap.values();
return toArray(datanodes, DatanodeInfo.class);
}
private void updateDnMap(Map<FederationNamespaceInfo, DatanodeInfo[]> results,
Map<String, DatanodeInfo> datanodesMap) {
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
results.entrySet()) {
FederationNamespaceInfo ns = entry.getKey();
DatanodeInfo[] result = entry.getValue();
for (DatanodeInfo node : result) {
String nodeId = node.getXferAddr();
DatanodeInfo dn = datanodesMap.get(nodeId);
if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
// Add the subcluster as a suffix to the network location
node.setNetworkLocation(
NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
node.getNetworkLocation());
datanodesMap.put(nodeId, node);
} else {
LOG.debug("{} is in multiple subclusters", nodeId);
}
}
}
}
/**
* Deals with loading datanode report into the cache and refresh.
*/

View File

@ -704,6 +704,7 @@ public void testProxyGetDatanodeReport() throws Exception {
DatanodeInfo[] combinedData =
routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
assertEquals(0, routerProtocol.getSlowDatanodeReport().length);
final Map<Integer, String> routerDNMap = new TreeMap<>();
for (DatanodeInfo dn : combinedData) {
String subcluster = dn.getNetworkLocation().split("/")[1];

View File

@ -156,6 +156,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
@ -2058,4 +2060,18 @@ public HAServiceStateResponseProto getHAServiceState(
throw new ServiceException(e);
}
}
@Override
public GetSlowDatanodeReportResponseProto getSlowDatanodeReport(RpcController controller,
GetSlowDatanodeReportRequestProto request) throws ServiceException {
try {
List<? extends DatanodeInfoProto> result =
PBHelperClient.convert(server.getSlowDatanodeReport());
return GetSlowDatanodeReportResponseProto.newBuilder()
.addAllDatanodeInfoProto(result)
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -23,6 +23,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;
import org.apache.hadoop.fs.StorageType;
@ -1665,7 +1667,17 @@ public List<DatanodeDescriptor> getDatanodeListForReport(
}
return nodes;
}
public List<DatanodeDescriptor> getAllSlowDataNodes() {
if (slowPeerTracker == null) {
LOG.debug("{} is disabled. Try enabling it first to capture slow peer outliers.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
return ImmutableList.of();
}
List<String> slowNodes = slowPeerTracker.getSlowNodes(getNumOfDataNodes());
return getDnDescriptorsFromIpAddr(slowNodes);
}
/**
* Checks if name resolution was successful for the given address. If IP
* address and host name are the same, then it means name resolution has
@ -2148,19 +2160,26 @@ public Set<String> getSlowPeersUuidSet() {
List<String> slowNodes;
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
slowNodes = slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
for (String slowNode : slowNodes) {
if (StringUtils.isBlank(slowNode)
|| !slowNode.contains(IP_PORT_SEPARATOR)) {
List<DatanodeDescriptor> datanodeDescriptors = getDnDescriptorsFromIpAddr(slowNodes);
datanodeDescriptors.forEach(
datanodeDescriptor -> slowPeersUuidSet.add(datanodeDescriptor.getDatanodeUuid()));
return slowPeersUuidSet;
}
private List<DatanodeDescriptor> getDnDescriptorsFromIpAddr(List<String> nodes) {
List<DatanodeDescriptor> datanodeDescriptors = new ArrayList<>();
for (String node : nodes) {
if (StringUtils.isBlank(node) || !node.contains(IP_PORT_SEPARATOR)) {
continue;
}
String ipAddr = slowNode.split(IP_PORT_SEPARATOR)[0];
String ipAddr = node.split(IP_PORT_SEPARATOR)[0];
DatanodeDescriptor datanodeByHost =
host2DatanodeMap.getDatanodeByHost(ipAddr);
if (datanodeByHost != null) {
slowPeersUuidSet.add(datanodeByHost.getDatanodeUuid());
datanodeDescriptors.add(datanodeByHost);
}
}
return slowPeersUuidSet;
return datanodeDescriptors;
}
/**

View File

@ -54,6 +54,8 @@ public class DataNodePeerMetrics {
private final String name;
// Strictly to be used by test code only. Source code is not supposed to use this.
private Map<String, Double> testOutlier = null;
private final OutlierDetector slowNodeDetector;
@ -142,14 +144,28 @@ public void collectThreadLocalStates() {
* than their peers.
*/
public Map<String, Double> getOutliers() {
// This maps the metric name to the aggregate latency.
// The metric name is the datanode ID.
final Map<String, Double> stats =
sendPacketDownstreamRollingAverages.getStats(
minOutlierDetectionSamples);
LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
// outlier must be null for source code.
if (testOutlier == null) {
// This maps the metric name to the aggregate latency.
// The metric name is the datanode ID.
final Map<String, Double> stats =
sendPacketDownstreamRollingAverages.getStats(minOutlierDetectionSamples);
LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
return slowNodeDetector.getOutliers(stats);
} else {
// this happens only for test code.
return testOutlier;
}
}
return slowNodeDetector.getOutliers(stats);
/**
* Strictly to be used by test code only. Source code is not supposed to use this. This method
* directly sets outlier mapping so that aggregate latency metrics are not calculated for tests.
*
* @param outlier outlier directly set by tests.
*/
public void setTestOutliers(Map<String, Double> outlier) {
this.testOutlier = outlier;
}
public MutableRollingAverages getSendPacketDownstreamRollingAverages() {

View File

@ -4913,6 +4913,32 @@ int getNumberOfDatanodes(DatanodeReportType type) {
}
}
DatanodeInfo[] slowDataNodesReport() throws IOException {
String operationName = "slowDataNodesReport";
DatanodeInfo[] datanodeInfos;
checkOperation(OperationCategory.UNCHECKED);
readLock();
try {
checkOperation(OperationCategory.UNCHECKED);
final DatanodeManager dm = getBlockManager().getDatanodeManager();
final List<DatanodeDescriptor> results = dm.getAllSlowDataNodes();
datanodeInfos = getDatanodeInfoFromDescriptors(results);
} finally {
readUnlock(operationName, getLockReportInfoSupplier(null));
}
logAuditEvent(true, operationName, null);
return datanodeInfos;
}
private DatanodeInfo[] getDatanodeInfoFromDescriptors(List<DatanodeDescriptor> results) {
DatanodeInfo[] datanodeInfos = new DatanodeInfo[results.size()];
for (int i = 0; i < datanodeInfos.length; i++) {
datanodeInfos[i] = new DatanodeInfoBuilder().setFrom(results.get(i)).build();
datanodeInfos[i].setNumBlocks(results.get(i).numBlocks());
}
return datanodeInfos;
}
DatanodeInfo[] datanodeReport(final DatanodeReportType type)
throws IOException {
String operationName = "datanodeReport";
@ -4924,12 +4950,7 @@ DatanodeInfo[] datanodeReport(final DatanodeReportType type)
checkOperation(OperationCategory.UNCHECKED);
final DatanodeManager dm = getBlockManager().getDatanodeManager();
final List<DatanodeDescriptor> results = dm.getDatanodeListForReport(type);
arr = new DatanodeInfo[results.size()];
for (int i=0; i<arr.length; i++) {
arr[i] = new DatanodeInfoBuilder().setFrom(results.get(i))
.build();
arr[i].setNumBlocks(results.get(i).numBlocks());
}
arr = getDatanodeInfoFromDescriptors(results);
} finally {
readUnlock(operationName, getLockReportInfoSupplier(null));
}

View File

@ -1509,6 +1509,12 @@ public void satisfyStoragePolicy(String src) throws IOException {
}
}
@Override
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
checkNNStartup();
return namesystem.slowDataNodesReport();
}
@Override // ClientProtocol
public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
StorageType type)

View File

@ -433,7 +433,7 @@ static int run(DistributedFileSystem dfs, String[] argv, int idx) throws IOExcep
*/
private static final String commonUsageSummary =
"\t[-report [-live] [-dead] [-decommissioning] " +
"[-enteringmaintenance] [-inmaintenance]]\n" +
"[-enteringmaintenance] [-inmaintenance] [-slownodes]]\n" +
"\t[-safemode <enter | leave | get | wait | forceExit>]\n" +
"\t[-saveNamespace [-beforeShutdown]]\n" +
"\t[-rollEdits]\n" +
@ -587,11 +587,13 @@ public void report(String[] argv, int i) throws IOException {
StringUtils.popOption("-enteringmaintenance", args);
final boolean listInMaintenance =
StringUtils.popOption("-inmaintenance", args);
final boolean listSlowNodes =
StringUtils.popOption("-slownodes", args);
// If no filter flags are found, then list all DN types
boolean listAll = (!listLive && !listDead && !listDecommissioning
&& !listEnteringMaintenance && !listInMaintenance);
&& !listEnteringMaintenance && !listInMaintenance && !listSlowNodes);
if (listAll || listLive) {
printDataNodeReports(dfs, DatanodeReportType.LIVE, listLive, "Live");
@ -615,6 +617,10 @@ public void report(String[] argv, int i) throws IOException {
printDataNodeReports(dfs, DatanodeReportType.IN_MAINTENANCE,
listInMaintenance, "In maintenance");
}
if (listAll || listSlowNodes) {
printSlowDataNodeReports(dfs, listSlowNodes, "Slow");
}
}
private static void printDataNodeReports(DistributedFileSystem dfs,
@ -632,6 +638,20 @@ private static void printDataNodeReports(DistributedFileSystem dfs,
}
}
private static void printSlowDataNodeReports(DistributedFileSystem dfs, boolean listNodes,
String nodeState) throws IOException {
DatanodeInfo[] nodes = dfs.getSlowDatanodeStats();
if (nodes.length > 0 || listNodes) {
System.out.println(nodeState + " datanodes (" + nodes.length + "):\n");
}
if (nodes.length > 0) {
for (DatanodeInfo dn : nodes) {
System.out.println(dn.getDatanodeReport());
System.out.println();
}
}
}
/**
* Safe mode maintenance command.
* Usage: hdfs dfsadmin -safemode [enter | leave | get | wait | forceExit]
@ -1148,7 +1168,7 @@ private void printHelp(String cmd) {
commonUsageSummary;
String report ="-report [-live] [-dead] [-decommissioning] "
+ "[-enteringmaintenance] [-inmaintenance]:\n" +
+ "[-enteringmaintenance] [-inmaintenance] [-slownodes]:\n" +
"\tReports basic filesystem information and statistics. \n" +
"\tThe dfs usage can be different from \"du\" usage, because it\n" +
"\tmeasures raw space used by replication, checksums, snapshots\n" +
@ -2126,7 +2146,7 @@ private static void printUsage(String cmd) {
if ("-report".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"
+ " [-report] [-live] [-dead] [-decommissioning]"
+ " [-enteringmaintenance] [-inmaintenance]");
+ " [-enteringmaintenance] [-inmaintenance] [-slownodes]");
} else if ("-safemode".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"
+ " [-safemode enter | leave | get | wait | forceExit]");

View File

@ -356,7 +356,7 @@ Runs a HDFS datanode.
Usage:
hdfs dfsadmin [-report [-live] [-dead] [-decommissioning] [-enteringmaintenance] [-inmaintenance]]
hdfs dfsadmin [-report [-live] [-dead] [-decommissioning] [-enteringmaintenance] [-inmaintenance] [-slownodes]]
hdfs dfsadmin [-safemode enter | leave | get | wait | forceExit]
hdfs dfsadmin [-saveNamespace [-beforeShutdown]]
hdfs dfsadmin [-rollEdits]
@ -394,7 +394,7 @@ Usage:
| COMMAND\_OPTION | Description |
|:---- |:---- |
| `-report` `[-live]` `[-dead]` `[-decommissioning]` `[-enteringmaintenance]` `[-inmaintenance]` | Reports basic filesystem information and statistics, The dfs usage can be different from "du" usage, because it measures raw space used by replication, checksums, snapshots and etc. on all the DNs. Optional flags may be used to filter the list of displayed DataNodes. |
| `-report` `[-live]` `[-dead]` `[-decommissioning]` `[-enteringmaintenance]` `[-inmaintenance]` `[-slownodes]` | Reports basic filesystem information and statistics, The dfs usage can be different from "du" usage, because it measures raw space used by replication, checksums, snapshots and etc. on all the DNs. Optional flags may be used to filter the list of displayed DataNodes. Filters are either based on the DN state (e.g. live, dead, decommissioning) or the nature of the DN (e.g. slow nodes - nodes with higher latency than their peers). |
| `-safemode` enter\|leave\|get\|wait\|forceExit | Safe mode maintenance command. Safe mode is a Namenode state in which it <br/>1. does not accept changes to the name space (read-only) <br/>2. does not replicate or delete blocks. <br/>Safe mode is entered automatically at Namenode startup, and leaves safe mode automatically when the configured minimum percentage of blocks satisfies the minimum replication condition. If Namenode detects any anomaly then it will linger in safe mode till that issue is resolved. If that anomaly is the consequence of a deliberate action, then administrator can use -safemode forceExit to exit safe mode. The cases where forceExit may be required are<br/> 1. Namenode metadata is not consistent. If Namenode detects that metadata has been modified out of band and can cause data loss, then Namenode will enter forceExit state. At that point user can either restart Namenode with correct metadata files or forceExit (if data loss is acceptable).<br/>2. Rollback causes metadata to be replaced and rarely it can trigger safe mode forceExit state in Namenode. In that case you may proceed by issuing -safemode forceExit.<br/> Safe mode can also be entered manually, but then it can only be turned off manually as well. |
| `-saveNamespace` `[-beforeShutdown]` | Save current namespace into storage directories and reset edits log. Requires safe mode. If the "beforeShutdown" option is given, the NameNode does a checkpoint if and only if no checkpoint has been done during a time window (a configurable number of checkpoint periods). This is usually used before shutting down the NameNode to prevent potential fsimage/editlog corruption. |
| `-rollEdits` | Rolls the edit log on the active NameNode. |

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys
.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
/**
* Tests to report slow running datanodes.
*/
public class TestSlowDatanodeReport {
private static final Logger LOG = LoggerFactory.getLogger(TestSlowDatanodeReport.class);
private MiniDFSCluster cluster;
@Before
public void testSetup() throws Exception {
Configuration conf = new Configuration();
conf.set(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "1000");
conf.set(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
conf.set(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY, "1");
conf.set(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY, "1");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
}
@After
public void tearDown() throws Exception {
cluster.shutdown();
}
@Test
public void testSingleNodeReport() throws Exception {
List<DataNode> dataNodes = cluster.getDataNodes();
DataNode slowNode = dataNodes.get(1);
dataNodes.get(0).getPeerMetrics().setTestOutliers(
ImmutableMap.of(slowNode.getDatanodeHostname() + ":" + slowNode.getIpcPort(), 15.5));
DistributedFileSystem distributedFileSystem = cluster.getFileSystem();
Assert.assertEquals(3, distributedFileSystem.getDataNodeStats().length);
GenericTestUtils.waitFor(() -> {
try {
DatanodeInfo[] slowNodeInfo = distributedFileSystem.getSlowDatanodeStats();
LOG.info("Slow Datanode report: {}", Arrays.asList(slowNodeInfo));
return slowNodeInfo.length == 1;
} catch (IOException e) {
LOG.error("Failed to retrieve slownode report", e);
return false;
}
}, 2000, 180000, "Slow nodes could not be detected");
}
@Test
public void testMultiNodesReport() throws Exception {
List<DataNode> dataNodes = cluster.getDataNodes();
dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of(
dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 15.5));
dataNodes.get(1).getPeerMetrics().setTestOutliers(ImmutableMap.of(
dataNodes.get(2).getDatanodeHostname() + ":" + dataNodes.get(2).getIpcPort(), 18.7));
DistributedFileSystem distributedFileSystem = cluster.getFileSystem();
Assert.assertEquals(3, distributedFileSystem.getDataNodeStats().length);
GenericTestUtils.waitFor(() -> {
try {
DatanodeInfo[] slowNodeInfo = distributedFileSystem.getSlowDatanodeStats();
LOG.info("Slow Datanode report: {}", Arrays.asList(slowNodeInfo));
return slowNodeInfo.length == 2;
} catch (IOException e) {
LOG.error("Failed to retrieve slownode report", e);
return false;
}
}, 2000, 200000, "Slow nodes could not be detected");
}
}