From 2dfa928a201560bc739ff5e4fe29f8bb19188927 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Mon, 2 May 2022 14:05:40 -0700 Subject: [PATCH] HDFS-16521. DFS API to retrieve slow datanodes (#4107) Signed-off-by: stack Signed-off-by: Wei-Chiu Chuang --- .../org/apache/hadoop/hdfs/DFSClient.java | 8 ++ .../hadoop/hdfs/DistributedFileSystem.java | 11 ++ .../hdfs/ViewDistributedFileSystem.java | 10 ++ .../hadoop/hdfs/protocol/ClientProtocol.java | 12 ++ .../ClientNamenodeProtocolTranslatorPB.java | 13 ++ .../main/proto/ClientNamenodeProtocol.proto | 9 ++ .../hadoop/hdfs/protocol/TestReadOnly.java | 3 +- .../router/RouterClientProtocol.java | 6 + .../federation/router/RouterRpcServer.java | 71 ++++++++--- .../federation/router/TestRouterRpc.java | 1 + ...amenodeProtocolServerSideTranslatorPB.java | 16 +++ .../blockmanagement/DatanodeManager.java | 33 ++++-- .../datanode/metrics/DataNodePeerMetrics.java | 30 +++-- .../hdfs/server/namenode/FSNamesystem.java | 33 +++++- .../server/namenode/NameNodeRpcServer.java | 6 + .../apache/hadoop/hdfs/tools/DFSAdmin.java | 28 ++++- .../src/site/markdown/HDFSCommands.md | 4 +- .../hadoop/hdfs/TestSlowDatanodeReport.java | 112 ++++++++++++++++++ 18 files changed, 361 insertions(+), 45 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 97045aff909..2682549cba1 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -3491,4 +3491,12 @@ public void removeLocatedBlocksRefresh(DFSInputStream dfsInputStream) { private boolean isLocatedBlocksRefresherEnabled() { return clientContext.isLocatedBlocksRefresherEnabled(); } + + public DatanodeInfo[] slowDatanodeReport() throws IOException { + checkOpen(); + try (TraceScope ignored = tracer.newScope("slowDatanodeReport")) { + return namenode.getSlowDatanodeReport(); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 350abab352b..93db332d738 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -3887,4 +3887,15 @@ public MultipartUploaderBuilder createMultipartUploader(final Path basePath) throws IOException { return new FileSystemMultipartUploaderBuilder(this, basePath); } + + /** + * Retrieve stats for slow running datanodes. + * + * @return An array of slow datanode info. + * @throws IOException If an I/O error occurs. + */ + public DatanodeInfo[] getSlowDatanodeStats() throws IOException { + return dfs.slowDatanodeReport(); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java index 2a4469a5ae5..d34df37ae61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java @@ -2318,4 +2318,14 @@ public long getUsed() throws IOException { } return this.vfs.getUsed(); } + + @Override + public DatanodeInfo[] getSlowDatanodeStats() throws IOException { + if (this.vfs == null) { + return super.getSlowDatanodeStats(); + } + checkDefaultDFS(defaultDFS, "getSlowDatanodeStats"); + return defaultDFS.getSlowDatanodeStats(); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index ea90645ca08..e9ae803a541 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -1868,4 +1868,16 @@ BatchedEntries listOpenFiles(long prevId, */ @AtMostOnce void satisfyStoragePolicy(String path) throws IOException; + + /** + * Get report on all of the slow Datanodes. Slow running datanodes are identified based on + * the Outlier detection algorithm, if slow peer tracking is enabled for the DFS cluster. + * + * @return Datanode report for slow running datanodes. + * @throws IOException If an I/O error occurs. + */ + @Idempotent + @ReadOnly + DatanodeInfo[] getSlowDatanodeReport() throws IOException; + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 2668ec1cefe..7b8ca42c50f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -143,6 +143,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto; @@ -2065,6 +2066,18 @@ public void satisfyStoragePolicy(String src) throws IOException { } } + @Override + public DatanodeInfo[] getSlowDatanodeReport() throws IOException { + GetSlowDatanodeReportRequestProto req = + GetSlowDatanodeReportRequestProto.newBuilder().build(); + try { + return PBHelperClient.convert( + rpcProxy.getSlowDatanodeReport(null, req).getDatanodeInfoProtoList()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + @Override public HAServiceProtocol.HAServiceState getHAServiceState() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 20967cc13ab..ebc56e9e675 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -424,6 +424,13 @@ message GetPreferredBlockSizeResponseProto { required uint64 bsize = 1; } +message GetSlowDatanodeReportRequestProto { +} + +message GetSlowDatanodeReportResponseProto { + repeated DatanodeInfoProto datanodeInfoProto = 1; +} + enum SafeModeActionProto { SAFEMODE_LEAVE = 1; SAFEMODE_ENTER = 2; @@ -1070,4 +1077,6 @@ service ClientNamenodeProtocol { returns(SatisfyStoragePolicyResponseProto); rpc getHAServiceState(HAServiceStateRequestProto) returns(HAServiceStateResponseProto); + rpc getSlowDatanodeReport(GetSlowDatanodeReportRequestProto) + returns(GetSlowDatanodeReportResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java index 4e6f4e3f4ba..f7ea7bcd76d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java @@ -76,7 +76,8 @@ public class TestReadOnly { "getQuotaUsage", "msync", "getHAServiceState", - "getECTopologyResultForPolicies" + "getECTopologyResultForPolicies", + "getSlowDatanodeReport" ) ); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index d5bbdf046c2..1bd7d65836d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -1815,6 +1815,12 @@ public void satisfyStoragePolicy(String path) throws IOException { storagePolicy.satisfyStoragePolicy(path); } + @Override + public DatanodeInfo[] getSlowDatanodeReport() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + return rpcServer.getSlowDatanodeReport(true, 0); + } + @Override public HAServiceProtocol.HAServiceState getHAServiceState() { if (rpcServer.isSafeMode()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 69f300bfb7d..2bec2c726a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1095,24 +1095,7 @@ public DatanodeInfo[] getDatanodeReport( Map results = rpcClient.invokeConcurrent(nss, method, requireResponse, false, timeOutMs, DatanodeInfo[].class); - for (Entry entry : - results.entrySet()) { - FederationNamespaceInfo ns = entry.getKey(); - DatanodeInfo[] result = entry.getValue(); - for (DatanodeInfo node : result) { - String nodeId = node.getXferAddr(); - DatanodeInfo dn = datanodesMap.get(nodeId); - if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) { - // Add the subcluster as a suffix to the network location - node.setNetworkLocation( - NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() + - node.getNetworkLocation()); - datanodesMap.put(nodeId, node); - } else { - LOG.debug("{} is in multiple subclusters", nodeId); - } - } - } + updateDnMap(results, datanodesMap); // Map -> Array Collection datanodes = datanodesMap.values(); return toArray(datanodes, DatanodeInfo.class); @@ -1578,6 +1561,11 @@ public void satisfyStoragePolicy(String path) throws IOException { clientProto.satisfyStoragePolicy(path); } + @Override // ClientProtocol + public DatanodeInfo[] getSlowDatanodeReport() throws IOException { + return clientProto.getSlowDatanodeReport(); + } + @Override // NamenodeProtocol public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long minBlockSize, long hotBlockTimeInterval) throws IOException { @@ -1994,6 +1982,53 @@ public String refreshFairnessPolicyController() { return rpcClient.refreshFairnessPolicyController(new Configuration()); } + /** + * Get the slow running datanodes report with a timeout. + * + * @param requireResponse If we require all the namespaces to report. + * @param timeOutMs Time out for the reply in milliseconds. + * @return List of datanodes. + * @throws IOException If it cannot get the report. + */ + public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOutMs) + throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + Map datanodesMap = new LinkedHashMap<>(); + RemoteMethod method = new RemoteMethod("getSlowDatanodeReport"); + + Set nss = namenodeResolver.getNamespaces(); + Map results = + rpcClient.invokeConcurrent(nss, method, requireResponse, false, + timeOutMs, DatanodeInfo[].class); + updateDnMap(results, datanodesMap); + // Map -> Array + Collection datanodes = datanodesMap.values(); + return toArray(datanodes, DatanodeInfo.class); + } + + private void updateDnMap(Map results, + Map datanodesMap) { + for (Entry entry : + results.entrySet()) { + FederationNamespaceInfo ns = entry.getKey(); + DatanodeInfo[] result = entry.getValue(); + for (DatanodeInfo node : result) { + String nodeId = node.getXferAddr(); + DatanodeInfo dn = datanodesMap.get(nodeId); + if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) { + // Add the subcluster as a suffix to the network location + node.setNetworkLocation( + NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() + + node.getNetworkLocation()); + datanodesMap.put(nodeId, node); + } else { + LOG.debug("{} is in multiple subclusters", nodeId); + } + } + } + } + /** * Deals with loading datanode report into the cache and refresh. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index ae58bf8b612..96e77d58007 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -704,6 +704,7 @@ public void testProxyGetDatanodeReport() throws Exception { DatanodeInfo[] combinedData = routerProtocol.getDatanodeReport(DatanodeReportType.ALL); + assertEquals(0, routerProtocol.getSlowDatanodeReport().length); final Map routerDNMap = new TreeMap<>(); for (DatanodeInfo dn : combinedData) { String subcluster = dn.getNetworkLocation().split("/")[1]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 5132afaa4b1..0164f25460d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -156,6 +156,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto; @@ -2058,4 +2060,18 @@ public HAServiceStateResponseProto getHAServiceState( throw new ServiceException(e); } } + + @Override + public GetSlowDatanodeReportResponseProto getSlowDatanodeReport(RpcController controller, + GetSlowDatanodeReportRequestProto request) throws ServiceException { + try { + List result = + PBHelperClient.convert(server.getSlowDatanodeReport()); + return GetSlowDatanodeReportResponseProto.newBuilder() + .addAllDatanodeInfoProto(result) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 0dc33c818c3..b1dbbdde3b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -23,6 +23,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.util.Preconditions; + +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses; import org.apache.hadoop.fs.StorageType; @@ -1665,7 +1667,17 @@ public List getDatanodeListForReport( } return nodes; } - + + public List getAllSlowDataNodes() { + if (slowPeerTracker == null) { + LOG.debug("{} is disabled. Try enabling it first to capture slow peer outliers.", + DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY); + return ImmutableList.of(); + } + List slowNodes = slowPeerTracker.getSlowNodes(getNumOfDataNodes()); + return getDnDescriptorsFromIpAddr(slowNodes); + } + /** * Checks if name resolution was successful for the given address. If IP * address and host name are the same, then it means name resolution has @@ -2148,19 +2160,26 @@ public Set getSlowPeersUuidSet() { List slowNodes; Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned"); slowNodes = slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes); - for (String slowNode : slowNodes) { - if (StringUtils.isBlank(slowNode) - || !slowNode.contains(IP_PORT_SEPARATOR)) { + List datanodeDescriptors = getDnDescriptorsFromIpAddr(slowNodes); + datanodeDescriptors.forEach( + datanodeDescriptor -> slowPeersUuidSet.add(datanodeDescriptor.getDatanodeUuid())); + return slowPeersUuidSet; + } + + private List getDnDescriptorsFromIpAddr(List nodes) { + List datanodeDescriptors = new ArrayList<>(); + for (String node : nodes) { + if (StringUtils.isBlank(node) || !node.contains(IP_PORT_SEPARATOR)) { continue; } - String ipAddr = slowNode.split(IP_PORT_SEPARATOR)[0]; + String ipAddr = node.split(IP_PORT_SEPARATOR)[0]; DatanodeDescriptor datanodeByHost = host2DatanodeMap.getDatanodeByHost(ipAddr); if (datanodeByHost != null) { - slowPeersUuidSet.add(datanodeByHost.getDatanodeUuid()); + datanodeDescriptors.add(datanodeByHost); } } - return slowPeersUuidSet; + return datanodeDescriptors; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java index f62a7b504a1..2e456b67ca1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java @@ -54,6 +54,8 @@ public class DataNodePeerMetrics { private final String name; + // Strictly to be used by test code only. Source code is not supposed to use this. + private Map testOutlier = null; private final OutlierDetector slowNodeDetector; @@ -142,14 +144,28 @@ public void collectThreadLocalStates() { * than their peers. */ public Map getOutliers() { - // This maps the metric name to the aggregate latency. - // The metric name is the datanode ID. - final Map stats = - sendPacketDownstreamRollingAverages.getStats( - minOutlierDetectionSamples); - LOG.trace("DataNodePeerMetrics: Got stats: {}", stats); + // outlier must be null for source code. + if (testOutlier == null) { + // This maps the metric name to the aggregate latency. + // The metric name is the datanode ID. + final Map stats = + sendPacketDownstreamRollingAverages.getStats(minOutlierDetectionSamples); + LOG.trace("DataNodePeerMetrics: Got stats: {}", stats); + return slowNodeDetector.getOutliers(stats); + } else { + // this happens only for test code. + return testOutlier; + } + } - return slowNodeDetector.getOutliers(stats); + /** + * Strictly to be used by test code only. Source code is not supposed to use this. This method + * directly sets outlier mapping so that aggregate latency metrics are not calculated for tests. + * + * @param outlier outlier directly set by tests. + */ + public void setTestOutliers(Map outlier) { + this.testOutlier = outlier; } public MutableRollingAverages getSendPacketDownstreamRollingAverages() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 259ca9d5b8f..ab3c49fc264 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -4913,6 +4913,32 @@ int getNumberOfDatanodes(DatanodeReportType type) { } } + DatanodeInfo[] slowDataNodesReport() throws IOException { + String operationName = "slowDataNodesReport"; + DatanodeInfo[] datanodeInfos; + checkOperation(OperationCategory.UNCHECKED); + readLock(); + try { + checkOperation(OperationCategory.UNCHECKED); + final DatanodeManager dm = getBlockManager().getDatanodeManager(); + final List results = dm.getAllSlowDataNodes(); + datanodeInfos = getDatanodeInfoFromDescriptors(results); + } finally { + readUnlock(operationName, getLockReportInfoSupplier(null)); + } + logAuditEvent(true, operationName, null); + return datanodeInfos; + } + + private DatanodeInfo[] getDatanodeInfoFromDescriptors(List results) { + DatanodeInfo[] datanodeInfos = new DatanodeInfo[results.size()]; + for (int i = 0; i < datanodeInfos.length; i++) { + datanodeInfos[i] = new DatanodeInfoBuilder().setFrom(results.get(i)).build(); + datanodeInfos[i].setNumBlocks(results.get(i).numBlocks()); + } + return datanodeInfos; + } + DatanodeInfo[] datanodeReport(final DatanodeReportType type) throws IOException { String operationName = "datanodeReport"; @@ -4924,12 +4950,7 @@ DatanodeInfo[] datanodeReport(final DatanodeReportType type) checkOperation(OperationCategory.UNCHECKED); final DatanodeManager dm = getBlockManager().getDatanodeManager(); final List results = dm.getDatanodeListForReport(type); - arr = new DatanodeInfo[results.size()]; - for (int i=0; i]\n" + "\t[-saveNamespace [-beforeShutdown]]\n" + "\t[-rollEdits]\n" + @@ -587,11 +587,13 @@ public void report(String[] argv, int i) throws IOException { StringUtils.popOption("-enteringmaintenance", args); final boolean listInMaintenance = StringUtils.popOption("-inmaintenance", args); + final boolean listSlowNodes = + StringUtils.popOption("-slownodes", args); // If no filter flags are found, then list all DN types boolean listAll = (!listLive && !listDead && !listDecommissioning - && !listEnteringMaintenance && !listInMaintenance); + && !listEnteringMaintenance && !listInMaintenance && !listSlowNodes); if (listAll || listLive) { printDataNodeReports(dfs, DatanodeReportType.LIVE, listLive, "Live"); @@ -615,6 +617,10 @@ public void report(String[] argv, int i) throws IOException { printDataNodeReports(dfs, DatanodeReportType.IN_MAINTENANCE, listInMaintenance, "In maintenance"); } + + if (listAll || listSlowNodes) { + printSlowDataNodeReports(dfs, listSlowNodes, "Slow"); + } } private static void printDataNodeReports(DistributedFileSystem dfs, @@ -632,6 +638,20 @@ private static void printDataNodeReports(DistributedFileSystem dfs, } } + private static void printSlowDataNodeReports(DistributedFileSystem dfs, boolean listNodes, + String nodeState) throws IOException { + DatanodeInfo[] nodes = dfs.getSlowDatanodeStats(); + if (nodes.length > 0 || listNodes) { + System.out.println(nodeState + " datanodes (" + nodes.length + "):\n"); + } + if (nodes.length > 0) { + for (DatanodeInfo dn : nodes) { + System.out.println(dn.getDatanodeReport()); + System.out.println(); + } + } + } + /** * Safe mode maintenance command. * Usage: hdfs dfsadmin -safemode [enter | leave | get | wait | forceExit] @@ -1148,7 +1168,7 @@ private void printHelp(String cmd) { commonUsageSummary; String report ="-report [-live] [-dead] [-decommissioning] " - + "[-enteringmaintenance] [-inmaintenance]:\n" + + + "[-enteringmaintenance] [-inmaintenance] [-slownodes]:\n" + "\tReports basic filesystem information and statistics. \n" + "\tThe dfs usage can be different from \"du\" usage, because it\n" + "\tmeasures raw space used by replication, checksums, snapshots\n" + @@ -2126,7 +2146,7 @@ private static void printUsage(String cmd) { if ("-report".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" + " [-report] [-live] [-dead] [-decommissioning]" - + " [-enteringmaintenance] [-inmaintenance]"); + + " [-enteringmaintenance] [-inmaintenance] [-slownodes]"); } else if ("-safemode".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" + " [-safemode enter | leave | get | wait | forceExit]"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index a11f2098705..4fe788dc62c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -356,7 +356,7 @@ Runs a HDFS datanode. Usage: - hdfs dfsadmin [-report [-live] [-dead] [-decommissioning] [-enteringmaintenance] [-inmaintenance]] + hdfs dfsadmin [-report [-live] [-dead] [-decommissioning] [-enteringmaintenance] [-inmaintenance] [-slownodes]] hdfs dfsadmin [-safemode enter | leave | get | wait | forceExit] hdfs dfsadmin [-saveNamespace [-beforeShutdown]] hdfs dfsadmin [-rollEdits] @@ -394,7 +394,7 @@ Usage: | COMMAND\_OPTION | Description | |:---- |:---- | -| `-report` `[-live]` `[-dead]` `[-decommissioning]` `[-enteringmaintenance]` `[-inmaintenance]` | Reports basic filesystem information and statistics, The dfs usage can be different from "du" usage, because it measures raw space used by replication, checksums, snapshots and etc. on all the DNs. Optional flags may be used to filter the list of displayed DataNodes. | +| `-report` `[-live]` `[-dead]` `[-decommissioning]` `[-enteringmaintenance]` `[-inmaintenance]` `[-slownodes]` | Reports basic filesystem information and statistics, The dfs usage can be different from "du" usage, because it measures raw space used by replication, checksums, snapshots and etc. on all the DNs. Optional flags may be used to filter the list of displayed DataNodes. Filters are either based on the DN state (e.g. live, dead, decommissioning) or the nature of the DN (e.g. slow nodes - nodes with higher latency than their peers). | | `-safemode` enter\|leave\|get\|wait\|forceExit | Safe mode maintenance command. Safe mode is a Namenode state in which it
1. does not accept changes to the name space (read-only)
2. does not replicate or delete blocks.
Safe mode is entered automatically at Namenode startup, and leaves safe mode automatically when the configured minimum percentage of blocks satisfies the minimum replication condition. If Namenode detects any anomaly then it will linger in safe mode till that issue is resolved. If that anomaly is the consequence of a deliberate action, then administrator can use -safemode forceExit to exit safe mode. The cases where forceExit may be required are
1. Namenode metadata is not consistent. If Namenode detects that metadata has been modified out of band and can cause data loss, then Namenode will enter forceExit state. At that point user can either restart Namenode with correct metadata files or forceExit (if data loss is acceptable).
2. Rollback causes metadata to be replaced and rarely it can trigger safe mode forceExit state in Namenode. In that case you may proceed by issuing -safemode forceExit.
Safe mode can also be entered manually, but then it can only be turned off manually as well. | | `-saveNamespace` `[-beforeShutdown]` | Save current namespace into storage directories and reset edits log. Requires safe mode. If the "beforeShutdown" option is given, the NameNode does a checkpoint if and only if no checkpoint has been done during a time window (a configurable number of checkpoint periods). This is usually used before shutting down the NameNode to prevent potential fsimage/editlog corruption. | | `-rollEdits` | Rolls the edit log on the active NameNode. | diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java new file mode 100644 index 00000000000..583c3159d5c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.test.GenericTestUtils; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys + .DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY; + +/** + * Tests to report slow running datanodes. + */ +public class TestSlowDatanodeReport { + + private static final Logger LOG = LoggerFactory.getLogger(TestSlowDatanodeReport.class); + + private MiniDFSCluster cluster; + + @Before + public void testSetup() throws Exception { + Configuration conf = new Configuration(); + + conf.set(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "1000"); + conf.set(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true"); + conf.set(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY, "1"); + conf.set(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY, "1"); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + } + + @After + public void tearDown() throws Exception { + cluster.shutdown(); + } + + @Test + public void testSingleNodeReport() throws Exception { + List dataNodes = cluster.getDataNodes(); + DataNode slowNode = dataNodes.get(1); + dataNodes.get(0).getPeerMetrics().setTestOutliers( + ImmutableMap.of(slowNode.getDatanodeHostname() + ":" + slowNode.getIpcPort(), 15.5)); + DistributedFileSystem distributedFileSystem = cluster.getFileSystem(); + Assert.assertEquals(3, distributedFileSystem.getDataNodeStats().length); + GenericTestUtils.waitFor(() -> { + try { + DatanodeInfo[] slowNodeInfo = distributedFileSystem.getSlowDatanodeStats(); + LOG.info("Slow Datanode report: {}", Arrays.asList(slowNodeInfo)); + return slowNodeInfo.length == 1; + } catch (IOException e) { + LOG.error("Failed to retrieve slownode report", e); + return false; + } + }, 2000, 180000, "Slow nodes could not be detected"); + } + + @Test + public void testMultiNodesReport() throws Exception { + List dataNodes = cluster.getDataNodes(); + dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of( + dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 15.5)); + dataNodes.get(1).getPeerMetrics().setTestOutliers(ImmutableMap.of( + dataNodes.get(2).getDatanodeHostname() + ":" + dataNodes.get(2).getIpcPort(), 18.7)); + DistributedFileSystem distributedFileSystem = cluster.getFileSystem(); + Assert.assertEquals(3, distributedFileSystem.getDataNodeStats().length); + GenericTestUtils.waitFor(() -> { + try { + DatanodeInfo[] slowNodeInfo = distributedFileSystem.getSlowDatanodeStats(); + LOG.info("Slow Datanode report: {}", Arrays.asList(slowNodeInfo)); + return slowNodeInfo.length == 2; + } catch (IOException e) { + LOG.error("Failed to retrieve slownode report", e); + return false; + } + }, 2000, 200000, "Slow nodes could not be detected"); + } + +}