diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReplicatedBlockStats.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReplicatedBlockStats.java index c2100034bcd..d878a27168b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReplicatedBlockStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReplicatedBlockStats.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocol; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import java.util.Collection; + /** * Get statistics pertaining to blocks of type {@link BlockType#CONTIGUOUS} * in the filesystem. @@ -111,4 +113,44 @@ public final class ReplicatedBlockStats { statsBuilder.append("]"); return statsBuilder.toString(); } + + /** + * Merge the multiple ReplicatedBlockStats. + * @param stats Collection of stats to merge. + * @return A new ReplicatedBlockStats merging all the input ones + */ + public static ReplicatedBlockStats merge( + Collection stats) { + long lowRedundancyBlocks = 0; + long corruptBlocks = 0; + long missingBlocks = 0; + long missingReplicationOneBlocks = 0; + long bytesInFutureBlocks = 0; + long pendingDeletionBlocks = 0; + long highestPriorityLowRedundancyBlocks = 0; + boolean hasHighestPriorityLowRedundancyBlocks = false; + + // long's range is large enough that we don't need to consider overflow + for (ReplicatedBlockStats stat : stats) { + lowRedundancyBlocks += stat.getLowRedundancyBlocks(); + corruptBlocks += stat.getCorruptBlocks(); + missingBlocks += stat.getMissingReplicaBlocks(); + missingReplicationOneBlocks += stat.getMissingReplicationOneBlocks(); + bytesInFutureBlocks += stat.getBytesInFutureBlocks(); + pendingDeletionBlocks += stat.getPendingDeletionBlocks(); + if (stat.hasHighestPriorityLowRedundancyBlocks()) { + hasHighestPriorityLowRedundancyBlocks = true; + highestPriorityLowRedundancyBlocks += + stat.getHighestPriorityLowRedundancyBlocks(); + } + } + if (hasHighestPriorityLowRedundancyBlocks) { + return new ReplicatedBlockStats(lowRedundancyBlocks, corruptBlocks, + missingBlocks, missingReplicationOneBlocks, bytesInFutureBlocks, + pendingDeletionBlocks, highestPriorityLowRedundancyBlocks); + } + return new ReplicatedBlockStats(lowRedundancyBlocks, corruptBlocks, + missingBlocks, missingReplicationOneBlocks, bytesInFutureBlocks, + pendingDeletionBlocks); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 5e4bd2afa2d..e9cb54741dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -1681,8 +1681,13 @@ public class RouterClientProtocol implements ClientProtocol { @Override public ReplicatedBlockStats getReplicatedBlockStats() throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ, false); - return null; + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getReplicatedBlockStats"); + Set nss = namenodeResolver.getNamespaces(); + Map ret = rpcClient + .invokeConcurrent(nss, method, true, false, ReplicatedBlockStats.class); + return ReplicatedBlockStats.merge(ret.values()); } @Deprecated diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index a07daefc6e3..8ec87cb05f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -65,6 +65,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; @@ -86,11 +87,14 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing; import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; @@ -1380,6 +1384,46 @@ public class TestRouterRpc { "Parent directory doesn't exist: /a/a/b", "/a", "/ns1/a")); } + /** + * Create a file for each NameSpace, then find their 1st block and mark one of + * the replica as corrupt through BlockManager#findAndMarkBlockAsCorrupt. + * + * After all NameNode received the corrupt replica report, the + * replicatedBlockStats.getCorruptBlocks() should equal to the sum of + * corruptBlocks of all NameSpaces. + */ + @Test + public void testGetReplicatedBlockStats() throws Exception { + String testFile = "/test-file"; + for (String nsid : cluster.getNameservices()) { + NamenodeContext context = cluster.getNamenode(nsid, null); + NameNode nameNode = context.getNamenode(); + FSNamesystem namesystem = nameNode.getNamesystem(); + BlockManager bm = namesystem.getBlockManager(); + FileSystem fileSystem = context.getFileSystem(); + + // create a test file + createFile(fileSystem, testFile, 1024); + // mark a replica as corrupt + LocatedBlock block = NameNodeAdapter + .getBlockLocations(nameNode, testFile, 0, 1024).get(0); + namesystem.writeLock(); + bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0], + "STORAGE_ID", "TEST"); + namesystem.writeUnlock(); + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.waitCorruptReplicas(fileSystem, namesystem, + new Path(testFile), block.getBlock(), 1); + // save the getReplicatedBlockStats result + ReplicatedBlockStats stats = + context.getClient().getNamenode().getReplicatedBlockStats(); + assertEquals(1, stats.getCorruptBlocks()); + } + ReplicatedBlockStats routerStat = routerProtocol.getReplicatedBlockStats(); + assertEquals("There should be 1 corrupt blocks for each NN", + cluster.getNameservices().size(), routerStat.getCorruptBlocks()); + } + @Test public void testErasureCoding() throws Exception {