HDFS-16203. Discover datanodes with unbalanced block pool usage by the standard deviation (#3366)

Reviewed-by: Hui Fei <ferhui@apache.org>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
litao 2021-09-16 09:00:02 +08:00 committed by GitHub
parent c54bf19978
commit 2d479309cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 260 additions and 17 deletions

View File

@ -28,6 +28,7 @@ public class StorageReport {
private final long nonDfsUsed;
private final long remaining;
private final long blockPoolUsed;
private final float blockPoolUsagePercent;
private final String mount;
public static final StorageReport[] EMPTY_ARRAY = {};
@ -48,6 +49,8 @@ public class StorageReport {
this.nonDfsUsed = nonDfsUsed;
this.remaining = remaining;
this.blockPoolUsed = bpUsed;
this.blockPoolUsagePercent = capacity <= 0 ? 0.0f :
(bpUsed * 100.0f) / capacity;
this.mount = mount;
}
@ -82,4 +85,8 @@ public class StorageReport {
public String getMount() {
return mount;
}
public float getBlockPoolUsagePercent() {
return blockPoolUsagePercent;
}
}

View File

@ -41,10 +41,11 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol;
import org.apache.hadoop.hdfs.server.federation.router.SubClusterTimeoutException;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
@ -53,6 +54,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoR
import org.apache.hadoop.hdfs.server.namenode.NameNodeMXBean;
import org.apache.hadoop.hdfs.server.namenode.NameNodeStatusMXBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetUtils;
@ -461,10 +464,13 @@ public class NamenodeBeanMetrics
private String getNodesImpl(final DatanodeReportType type) {
final Map<String, Map<String, Object>> info = new HashMap<>();
try {
RouterRpcServer rpcServer = this.router.getRpcServer();
DatanodeInfo[] datanodes =
rpcServer.getDatanodeReport(type, false, dnReportTimeOut);
for (DatanodeInfo node : datanodes) {
RouterClientProtocol clientProtocol =
this.router.getRpcServer().getClientProtocolModule();
DatanodeStorageReport[] datanodeStorageReports =
clientProtocol.getDatanodeStorageReport(type, false, dnReportTimeOut);
for (DatanodeStorageReport datanodeStorageReport : datanodeStorageReports) {
DatanodeInfo node = datanodeStorageReport.getDatanodeInfo();
StorageReport[] storageReports = datanodeStorageReport.getStorageReports();
Map<String, Object> innerinfo = new HashMap<>();
innerinfo.put("infoAddr", node.getInfoAddr());
innerinfo.put("infoSecureAddr", node.getInfoSecureAddr());
@ -484,6 +490,8 @@ public class NamenodeBeanMetrics
innerinfo.put("blockPoolUsed", node.getBlockPoolUsed());
innerinfo.put("blockPoolUsedPercent", node.getBlockPoolUsedPercent());
innerinfo.put("volfails", -1); // node.getVolumeFailures()
innerinfo.put("blockPoolUsedPercentStdDev",
Util.getBlockPoolUsedPercentStdDev(storageReports));
info.put(node.getXferAddrWithHostname(),
Collections.unmodifiableMap(innerinfo));
}

View File

@ -999,7 +999,21 @@ public class RouterClientProtocol implements ClientProtocol {
Map<String, DatanodeStorageReport[]> dnSubcluster =
rpcServer.getDatanodeStorageReportMap(type);
return mergeDtanodeStorageReport(dnSubcluster);
}
public DatanodeStorageReport[] getDatanodeStorageReport(
HdfsConstants.DatanodeReportType type, boolean requireResponse, long timeOutMs)
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
Map<String, DatanodeStorageReport[]> dnSubcluster =
rpcServer.getDatanodeStorageReportMap(type, requireResponse, timeOutMs);
return mergeDtanodeStorageReport(dnSubcluster);
}
private DatanodeStorageReport[] mergeDtanodeStorageReport(
Map<String, DatanodeStorageReport[]> dnSubcluster) {
// Avoid repeating machines in multiple subclusters
Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>();
for (DatanodeStorageReport[] dns : dnSubcluster.values()) {
@ -1818,7 +1832,7 @@ public class RouterClientProtocol implements ClientProtocol {
* path may be located. On return this list is trimmed to include
* only the paths that have corresponding destinations in the same
* namespace.
* @param dst The destination path
* @param dstLocations The destination path
* @return A map of all eligible source namespaces and their corresponding
* replacement value.
* @throws IOException If the dst paths could not be determined.

View File

@ -1135,6 +1135,23 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
*/
public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap(
DatanodeReportType type) throws IOException {
return getDatanodeStorageReportMap(type, true, -1);
}
/**
* Get the list of datanodes per subcluster.
*
* @param type Type of the datanodes to get.
* @param requireResponse If true an exception will be thrown if all calls do
* not complete. If false exceptions are ignored and all data results
* successfully received are returned.
* @param timeOutMs Time out for the reply in milliseconds.
* @return nsId to datanode list.
* @throws IOException If the method cannot be invoked remotely.
*/
public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap(
DatanodeReportType type, boolean requireResponse, long timeOutMs)
throws IOException {
Map<String, DatanodeStorageReport[]> ret = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getDatanodeStorageReport",
@ -1142,7 +1159,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, DatanodeStorageReport[]> results =
rpcClient.invokeConcurrent(
nss, method, true, false, DatanodeStorageReport[].class);
nss, method, requireResponse, false, timeOutMs, DatanodeStorageReport[].class);
for (Entry<FederationNamespaceInfo, DatanodeStorageReport[]> entry :
results.entrySet()) {
FederationNamespaceInfo ns = entry.getKey();

View File

@ -349,6 +349,7 @@
<th style="width:180px; text-align:center">Capacity</th>
<th>Blocks</th-->
<th>Block pool used</th>
<th>Block pool usage StdDev</th>
<th>Version</th>
</tr>
</thead>
@ -372,6 +373,7 @@
</td>
<td>{numBlocks}</td>
<td ng-value="{blockPoolUsedPercent}">{blockPoolUsed|fmt_bytes} ({blockPoolUsedPercent|fmt_percentage})</td>
<td ng-value="{blockPoolUsedPercentStdDev}">{blockPoolUsedPercentStdDev|fmt_percentage}</td>
<td>{version}</td>
</tr>
{/LiveNodes}

View File

@ -411,6 +411,7 @@
{ 'orderDataType': 'ng-value', 'type': 'num' , "defaultContent": 0},
{ 'type': 'num' , "defaultContent": 0},
{ 'orderDataType': 'ng-value', 'type': 'num' , "defaultContent": 0},
{ 'orderDataType': 'ng-value', 'type': 'num' , "defaultContent": 0},
{ 'type': 'string' , "defaultContent": ""}
],
initComplete: function () {

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.DomainNameResolver;
@ -437,4 +438,30 @@ public final class Util {
return isEnabled;
}
/**
* Return the standard deviation of storage block pool usage.
*/
public static float getBlockPoolUsedPercentStdDev(StorageReport[] storageReports) {
ArrayList<Float> usagePercentList = new ArrayList<>();
float totalUsagePercent = 0.0f;
float dev = 0.0f;
if (storageReports.length == 0) {
return dev;
}
for (StorageReport s : storageReports) {
usagePercentList.add(s.getBlockPoolUsagePercent());
totalUsagePercent += s.getBlockPoolUsagePercent();
}
totalUsagePercent /= storageReports.length;
for (Float usagePercent : usagePercentList) {
dev += (usagePercent - totalUsagePercent)
* (usagePercent - totalUsagePercent);
}
dev = (float) Math.sqrt(dev / usagePercentList.size());
return dev;
}
}

View File

@ -5714,7 +5714,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
/**
* Increments, logs and then returns the stamp
* Increments, logs and then returns the stamp.
*/
long nextGenerationStamp(boolean legacyBlock)
throws IOException {
@ -5846,7 +5846,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Get a new generation stamp together with an access token for
* a block under construction
* a block under construction.
*
* This method is called for recovering a failed write or setting up
* a block for appended.
@ -5884,7 +5884,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
/**
* Update a pipeline for a block under construction
* Update a pipeline for a block under construction.
*
* @param clientName the name of the client
* @param oldBlock and old block
@ -6294,7 +6294,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
/**
* Log the updateMasterKey operation to edit logs
* Log the updateMasterKey operation to edit logs.
*
* @param key new delegation key.
*/
@ -6311,7 +6311,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
/**
* Log the cancellation of expired tokens to edit logs
* Log the cancellation of expired tokens to edit logs.
*
* @param id token identifier to cancel
*/
@ -6348,7 +6348,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
/**
* Returns authentication method used to establish the connection
* Returns authentication method used to establish the connection.
* @return AuthenticationMethod used to establish connection
* @throws IOException
*/
@ -6401,7 +6401,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
/**
* Class representing Namenode information for JMX interfaces
* Class representing Namenode information for JMX interfaces.
*/
@Override // NameNodeMXBean
public String getVersion() {
@ -6497,7 +6497,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Returned information is a JSON representation of map with host name as the
* key and value is a map of live node attribute keys to its values
* key and value is a map of live node attribute keys to its values.
*/
@Override // NameNodeMXBean
public String getLiveNodes() {
@ -6541,6 +6541,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (node.getUpgradeDomain() != null) {
innerinfo.put("upgradeDomain", node.getUpgradeDomain());
}
StorageReport[] storageReports = node.getStorageReports();
innerinfo.put("blockPoolUsedPercentStdDev",
Util.getBlockPoolUsedPercentStdDev(storageReports));
info.put(node.getXferAddrWithHostname(), innerinfo.build());
}
return JSON.toString(info);
@ -6548,7 +6551,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Returned information is a JSON representation of map with host name as the
* key and value is a map of dead node attribute keys to its values
* key and value is a map of dead node attribute keys to its values.
*/
@Override // NameNodeMXBean
public String getDeadNodes() {
@ -6572,7 +6575,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Returned information is a JSON representation of map with host name as the
* key and value is a map of decommissioning node attribute keys to its
* values
* values.
*/
@Override // NameNodeMXBean
public String getDecomNodes() {

View File

@ -320,6 +320,7 @@
<th style="width:180px; text-align:center">Capacity</th>
<th>Blocks</th>
<th>Block pool used</th>
<th>Block pool usage StdDev</th>
<th>Version</th>
</tr>
</thead>
@ -343,6 +344,7 @@
</td>
<td title="Blocks Scheduled : {blockScheduled}">{numBlocks}</td>
<td ng-value="{blockPoolUsedPercent}">{blockPoolUsed|fmt_bytes} ({blockPoolUsedPercent|fmt_percentage})</td>
<td ng-value="{blockPoolUsedPercentStdDev}">{blockPoolUsedPercentStdDev|fmt_percentage}</td>
<td>{version}</td>
</tr>
{/LiveNodes}

View File

@ -360,6 +360,7 @@
{ 'orderDataType': 'ng-value', 'type': 'num' , "defaultContent": 0},
{ 'type': 'num' , "defaultContent": 0},
{ 'orderDataType': 'ng-value', 'type': 'num' , "defaultContent": 0},
{ 'orderDataType': 'ng-value', 'type': 'num' , "defaultContent": 0},
{ 'type': 'string' , "defaultContent": ""}
],
initComplete: function () {

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.eclipse.jetty.util.ajax.JSON;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Map;
public class TestStorageBlockPoolUsageStdDev {
private final static int NUM_DATANODES = 5;
private final static int STORAGES_PER_DATANODE = 3;
private final static int DEFAULT_BLOCK_SIZE = 102400;
private final static int BUFFER_LENGTH = 1024;
private static Configuration conf;
private MiniDFSCluster cluster;
private FileSystem fs;
@Before
public void setup() throws Exception {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
// Ensure that each volume capacity is larger than the DEFAULT_BLOCK_SIZE.
long capacity = 8 * DEFAULT_BLOCK_SIZE;
long[][] capacities = new long[NUM_DATANODES][STORAGES_PER_DATANODE];
String[] hostnames = new String[5];
for (int i = 0; i < NUM_DATANODES; i++) {
hostnames[i] = i + "." + i + "." + i + "." + i;
for(int j = 0; j < STORAGES_PER_DATANODE; j++){
capacities[i][j]=capacity;
}
}
cluster = new MiniDFSCluster.Builder(conf)
.hosts(hostnames)
.numDataNodes(NUM_DATANODES)
.storagesPerDatanode(STORAGES_PER_DATANODE)
.storageCapacities(capacities).build();
cluster.waitActive();
fs = cluster.getFileSystem();
}
/**
* Create files of different sizes for each datanode.
* Ensure that the file size is smaller than the blocksize
* and only one block is generated. In this way, data will
* be written to only one volume.
*
* Using favoredNodes, we can write files of a specified size
* to specified datanodes to create a batch of datanodes with
* different storageBlockPoolUsageStdDev.
*
* Then, we assert the order of storageBlockPoolUsageStdDev.
*/
@SuppressWarnings("unchecked")
@Test
public void testStorageBlockPoolUsageStdDev() throws IOException {
// Create file for each datanode.
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
DataNode dn0 = dataNodes.get(0);
DataNode dn1 = dataNodes.get(1);
DataNode dn2 = dataNodes.get(2);
DataNode dn3 = dataNodes.get(3);
DataNode dn4 = dataNodes.get(4);
DFSTestUtil.createFile(fs, new Path("/file0"), false, BUFFER_LENGTH, 1000,
DEFAULT_BLOCK_SIZE, (short) 1, 0, false,
new InetSocketAddress[]{dn0.getXferAddress()});
DFSTestUtil.createFile(fs, new Path("/file1"), false, BUFFER_LENGTH, 2000,
DEFAULT_BLOCK_SIZE, (short) 1, 0, false,
new InetSocketAddress[]{dn1.getXferAddress()});
DFSTestUtil.createFile(fs, new Path("/file2"), false, BUFFER_LENGTH, 4000,
DEFAULT_BLOCK_SIZE, (short) 1, 0, false,
new InetSocketAddress[]{dn2.getXferAddress()});
DFSTestUtil.createFile(fs, new Path("/file3"), false, BUFFER_LENGTH, 8000,
DEFAULT_BLOCK_SIZE, (short) 1, 0, false,
new InetSocketAddress[]{dn3.getXferAddress()});
DFSTestUtil.createFile(fs, new Path("/file4"), false, BUFFER_LENGTH, 16000,
DEFAULT_BLOCK_SIZE, (short) 1, 0, false,
new InetSocketAddress[]{dn4.getXferAddress()});
// Trigger Heartbeats.
cluster.triggerHeartbeats();
// Assert that the blockPoolUsedPercentStdDev on namenode
// and Datanode are the same.
String liveNodes = cluster.getNameNode().getNamesystem().getLiveNodes();
Map<String, Map<String, Object>> info =
(Map<String, Map<String, Object>>) JSON.parse(liveNodes);
// Create storageReports for datanodes.
FSNamesystem namesystem = cluster.getNamesystem();
String blockPoolId = namesystem.getBlockPoolId();
StorageReport[] storageReportsDn0 =
dn0.getFSDataset().getStorageReports(blockPoolId);
StorageReport[] storageReportsDn1 =
dn1.getFSDataset().getStorageReports(blockPoolId);
StorageReport[] storageReportsDn2 =
dn2.getFSDataset().getStorageReports(blockPoolId);
StorageReport[] storageReportsDn3 =
dn3.getFSDataset().getStorageReports(blockPoolId);
StorageReport[] storageReportsDn4 =
dn4.getFSDataset().getStorageReports(blockPoolId);
// A float or double may lose precision when being evaluated.
// When multiple values are operated on in different order,
// the results may be inconsistent, so we only take two decimal
// points to assert.
Assert.assertEquals(
Util.getBlockPoolUsedPercentStdDev(storageReportsDn0),
(double) info.get(dn0.getDisplayName()).get("blockPoolUsedPercentStdDev"),
0.01d);
Assert.assertEquals(
Util.getBlockPoolUsedPercentStdDev(storageReportsDn1),
(double) info.get(dn1.getDisplayName()).get("blockPoolUsedPercentStdDev"),
0.01d);
Assert.assertEquals(
Util.getBlockPoolUsedPercentStdDev(storageReportsDn2),
(double) info.get(dn2.getDisplayName()).get("blockPoolUsedPercentStdDev"),
0.01d);
Assert.assertEquals(
Util.getBlockPoolUsedPercentStdDev(storageReportsDn3),
(double) info.get(dn3.getDisplayName()).get("blockPoolUsedPercentStdDev"),
0.01d);
Assert.assertEquals(
Util.getBlockPoolUsedPercentStdDev(storageReportsDn4),
(double) info.get(dn4.getDisplayName()).get("blockPoolUsedPercentStdDev"),
0.01d);
}
}