HDFS-14714. RBF: implement getReplicatedBlockStats interface. Contributed by Chen Zhang.

This commit is contained in:
Inigo Goiri 2019-08-21 09:38:17 -07:00
parent 57f7370174
commit 5eeb6da2d4
3 changed files with 93 additions and 2 deletions

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.util.Collection;
/**
* Get statistics pertaining to blocks of type {@link BlockType#CONTIGUOUS}
* in the filesystem.
@ -111,4 +113,44 @@ public final class ReplicatedBlockStats {
statsBuilder.append("]");
return statsBuilder.toString();
}
/**
* Merge the multiple ReplicatedBlockStats.
* @param stats Collection of stats to merge.
* @return A new ReplicatedBlockStats merging all the input ones
*/
public static ReplicatedBlockStats merge(
Collection<ReplicatedBlockStats> stats) {
long lowRedundancyBlocks = 0;
long corruptBlocks = 0;
long missingBlocks = 0;
long missingReplicationOneBlocks = 0;
long bytesInFutureBlocks = 0;
long pendingDeletionBlocks = 0;
long highestPriorityLowRedundancyBlocks = 0;
boolean hasHighestPriorityLowRedundancyBlocks = false;
// long's range is large enough that we don't need to consider overflow
for (ReplicatedBlockStats stat : stats) {
lowRedundancyBlocks += stat.getLowRedundancyBlocks();
corruptBlocks += stat.getCorruptBlocks();
missingBlocks += stat.getMissingReplicaBlocks();
missingReplicationOneBlocks += stat.getMissingReplicationOneBlocks();
bytesInFutureBlocks += stat.getBytesInFutureBlocks();
pendingDeletionBlocks += stat.getPendingDeletionBlocks();
if (stat.hasHighestPriorityLowRedundancyBlocks()) {
hasHighestPriorityLowRedundancyBlocks = true;
highestPriorityLowRedundancyBlocks +=
stat.getHighestPriorityLowRedundancyBlocks();
}
}
if (hasHighestPriorityLowRedundancyBlocks) {
return new ReplicatedBlockStats(lowRedundancyBlocks, corruptBlocks,
missingBlocks, missingReplicationOneBlocks, bytesInFutureBlocks,
pendingDeletionBlocks, highestPriorityLowRedundancyBlocks);
}
return new ReplicatedBlockStats(lowRedundancyBlocks, corruptBlocks,
missingBlocks, missingReplicationOneBlocks, bytesInFutureBlocks,
pendingDeletionBlocks);
}
}

View File

@ -1681,8 +1681,13 @@ public class RouterClientProtocol implements ClientProtocol {
@Override
public ReplicatedBlockStats getReplicatedBlockStats() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
return null;
rpcServer.checkOperation(NameNode.OperationCategory.READ);
RemoteMethod method = new RemoteMethod("getReplicatedBlockStats");
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, ReplicatedBlockStats> ret = rpcClient
.invokeConcurrent(nss, method, true, false, ReplicatedBlockStats.class);
return ReplicatedBlockStats.merge(ret.values());
}
@Deprecated

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@ -86,11 +87,14 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
@ -1380,6 +1384,46 @@ public class TestRouterRpc {
"Parent directory doesn't exist: /a/a/b", "/a", "/ns1/a"));
}
/**
* Create a file for each NameSpace, then find their 1st block and mark one of
* the replica as corrupt through BlockManager#findAndMarkBlockAsCorrupt.
*
* After all NameNode received the corrupt replica report, the
* replicatedBlockStats.getCorruptBlocks() should equal to the sum of
* corruptBlocks of all NameSpaces.
*/
@Test
public void testGetReplicatedBlockStats() throws Exception {
String testFile = "/test-file";
for (String nsid : cluster.getNameservices()) {
NamenodeContext context = cluster.getNamenode(nsid, null);
NameNode nameNode = context.getNamenode();
FSNamesystem namesystem = nameNode.getNamesystem();
BlockManager bm = namesystem.getBlockManager();
FileSystem fileSystem = context.getFileSystem();
// create a test file
createFile(fileSystem, testFile, 1024);
// mark a replica as corrupt
LocatedBlock block = NameNodeAdapter
.getBlockLocations(nameNode, testFile, 0, 1024).get(0);
namesystem.writeLock();
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
"STORAGE_ID", "TEST");
namesystem.writeUnlock();
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.waitCorruptReplicas(fileSystem, namesystem,
new Path(testFile), block.getBlock(), 1);
// save the getReplicatedBlockStats result
ReplicatedBlockStats stats =
context.getClient().getNamenode().getReplicatedBlockStats();
assertEquals(1, stats.getCorruptBlocks());
}
ReplicatedBlockStats routerStat = routerProtocol.getReplicatedBlockStats();
assertEquals("There should be 1 corrupt blocks for each NN",
cluster.getNameservices().size(), routerStat.getCorruptBlocks());
}
@Test
public void testErasureCoding() throws Exception {