From 9c46012fbbf4a6a6740bfc3f20b82ddadd99b475 Mon Sep 17 00:00:00 2001 From: Giovanni Matteo Fumarola Date: Fri, 15 Feb 2019 10:47:17 -0800 Subject: [PATCH] HDFS-14268. RBF: Fix the location of the DNs in getDatanodeReport(). Contributed by Inigo Goiri. --- .../hdfs/protocol/ECBlockGroupStats.java | 71 +++++++++++++++++++ .../federation/router/ErasureCoding.java | 29 +------- .../federation/router/RouterRpcClient.java | 19 ++--- .../federation/router/TestRouterRpc.java | 48 ++++++++++--- 4 files changed, 114 insertions(+), 53 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ECBlockGroupStats.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ECBlockGroupStats.java index 3dde6043468..1ead5c1fd34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ECBlockGroupStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ECBlockGroupStats.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hdfs.protocol; +import java.util.Collection; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -103,4 +107,71 @@ public final class ECBlockGroupStats { statsBuilder.append("]"); return statsBuilder.toString(); } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(lowRedundancyBlockGroups) + .append(corruptBlockGroups) + .append(missingBlockGroups) + .append(bytesInFutureBlockGroups) + .append(pendingDeletionBlocks) + .append(highestPriorityLowRedundancyBlocks) + .toHashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ECBlockGroupStats other = (ECBlockGroupStats)o; + return new EqualsBuilder() + .append(lowRedundancyBlockGroups, other.lowRedundancyBlockGroups) + .append(corruptBlockGroups, other.corruptBlockGroups) + .append(missingBlockGroups, other.missingBlockGroups) + .append(bytesInFutureBlockGroups, other.bytesInFutureBlockGroups) + .append(pendingDeletionBlocks, other.pendingDeletionBlocks) + .append(highestPriorityLowRedundancyBlocks, + other.highestPriorityLowRedundancyBlocks) + .isEquals(); + } + + /** + * Merge the multiple ECBlockGroupStats. + * @param stats Collection of stats to merge. + * @return A new ECBlockGroupStats merging all the input ones + */ + public static ECBlockGroupStats merge(Collection stats) { + long lowRedundancyBlockGroups = 0; + long corruptBlockGroups = 0; + long missingBlockGroups = 0; + long bytesInFutureBlockGroups = 0; + long pendingDeletionBlocks = 0; + long highestPriorityLowRedundancyBlocks = 0; + boolean hasHighestPriorityLowRedundancyBlocks = false; + + for (ECBlockGroupStats stat : stats) { + lowRedundancyBlockGroups += stat.getLowRedundancyBlockGroups(); + corruptBlockGroups += stat.getCorruptBlockGroups(); + missingBlockGroups += stat.getMissingBlockGroups(); + bytesInFutureBlockGroups += stat.getBytesInFutureBlockGroups(); + pendingDeletionBlocks += stat.getPendingDeletionBlocks(); + if (stat.hasHighestPriorityLowRedundancyBlocks()) { + hasHighestPriorityLowRedundancyBlocks = true; + highestPriorityLowRedundancyBlocks += + stat.getHighestPriorityLowRedundancyBlocks(); + } + } + if (hasHighestPriorityLowRedundancyBlocks) { + return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups, + missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks, + highestPriorityLowRedundancyBlocks); + } + return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups, + missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java index f4584b1afaf..97c5f6a601d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java @@ -187,33 +187,6 @@ public class ErasureCoding { rpcClient.invokeConcurrent( nss, method, true, false, ECBlockGroupStats.class); - // Merge the stats from all the namespaces - long lowRedundancyBlockGroups = 0; - long corruptBlockGroups = 0; - long missingBlockGroups = 0; - long bytesInFutureBlockGroups = 0; - long pendingDeletionBlocks = 0; - long highestPriorityLowRedundancyBlocks = 0; - boolean hasHighestPriorityLowRedundancyBlocks = false; - - for (ECBlockGroupStats stats : allStats.values()) { - lowRedundancyBlockGroups += stats.getLowRedundancyBlockGroups(); - corruptBlockGroups += stats.getCorruptBlockGroups(); - missingBlockGroups += stats.getMissingBlockGroups(); - bytesInFutureBlockGroups += stats.getBytesInFutureBlockGroups(); - pendingDeletionBlocks += stats.getPendingDeletionBlocks(); - if (stats.hasHighestPriorityLowRedundancyBlocks()) { - hasHighestPriorityLowRedundancyBlocks = true; - highestPriorityLowRedundancyBlocks += - stats.getHighestPriorityLowRedundancyBlocks(); - } - } - if (hasHighestPriorityLowRedundancyBlocks) { - return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups, - missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks, - highestPriorityLowRedundancyBlocks); - } - return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups, - missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks); + return ECBlockGroupStats.merge(allStats.values()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index d21bde3d679..3d80c4167d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -24,16 +24,15 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -1061,8 +1060,8 @@ public class RouterRpcClient { } } - List orderedLocations = new LinkedList<>(); - Set> callables = new HashSet<>(); + List orderedLocations = new ArrayList<>(); + List> callables = new ArrayList<>(); for (final T location : locations) { String nsId = location.getNameserviceId(); final List namenodes = @@ -1080,20 +1079,12 @@ public class RouterRpcClient { nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest()); } orderedLocations.add(nnLocation); - callables.add(new Callable() { - public Object call() throws Exception { - return invokeMethod(ugi, nnList, proto, m, paramList); - } - }); + callables.add(() -> invokeMethod(ugi, nnList, proto, m, paramList)); } } else { // Call the objectGetter in order of nameservices in the NS list orderedLocations.add(location); - callables.add(new Callable() { - public Object call() throws Exception { - return invokeMethod(ugi, namenodes, proto, m, paramList); - } - }); + callables.add(() -> invokeMethod(ugi, namenodes, proto, m, paramList)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 2d26e1142e7..d9430767753 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -37,6 +37,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.lang.reflect.Method; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.EnumSet; @@ -47,6 +48,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.TimeUnit; @@ -120,6 +122,11 @@ public class TestRouterRpc { private static final Logger LOG = LoggerFactory.getLogger(TestRouterRpc.class); + private static final int NUM_SUBCLUSTERS = 2; + // We need at least 6 DNs to test Erasure Coding with RS-6-3-64k + private static final int NUM_DNS = 6; + + private static final Comparator EC_POLICY_CMP = new Comparator() { public int compare( @@ -165,9 +172,9 @@ public class TestRouterRpc { @BeforeClass public static void globalSetUp() throws Exception { - cluster = new MiniRouterDFSCluster(false, 2); - // We need 6 DNs to test Erasure Coding with RS-6-3-64k - cluster.setNumDatanodesPerNameservice(6); + cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS); + cluster.setNumDatanodesPerNameservice(NUM_DNS); + cluster.setIndependentDNs(); // Start NNs and DNs and wait until ready cluster.startCluster(); @@ -586,8 +593,13 @@ public class TestRouterRpc { DatanodeInfo[] combinedData = routerProtocol.getDatanodeReport(DatanodeReportType.ALL); + final Map routerDNMap = new TreeMap<>(); + for (DatanodeInfo dn : combinedData) { + String subcluster = dn.getNetworkLocation().split("/")[1]; + routerDNMap.put(dn.getXferPort(), subcluster); + } - Set individualData = new HashSet(); + final Map nnDNMap = new TreeMap<>(); for (String nameservice : cluster.getNameservices()) { NamenodeContext n = cluster.getNamenode(nameservice, null); DFSClient client = n.getClient(); @@ -597,10 +609,10 @@ public class TestRouterRpc { for (int i = 0; i < data.length; i++) { // Collect unique DNs based on their xfer port DatanodeInfo info = data[i]; - individualData.add(info.getXferPort()); + nnDNMap.put(info.getXferPort(), nameservice); } } - assertEquals(combinedData.length, individualData.size()); + assertEquals(nnDNMap, routerDNMap); } @Test @@ -1234,7 +1246,7 @@ public class TestRouterRpc { } @Test - public void testErasureCoding() throws IOException { + public void testErasureCoding() throws Exception { LOG.info("List the available erasurce coding policies"); ErasureCodingPolicyInfo[] policies = checkErasureCodingPolicies(); @@ -1340,8 +1352,22 @@ public class TestRouterRpc { LOG.info("Check the stats"); ECBlockGroupStats statsRouter = routerProtocol.getECBlockGroupStats(); - ECBlockGroupStats statsNamenode = nnProtocol.getECBlockGroupStats(); - assertEquals(statsNamenode.toString(), statsRouter.toString()); + ECBlockGroupStats statsNamenode = getNamenodeECBlockGroupStats(); + assertEquals(statsNamenode, statsRouter); + } + + /** + * Get the EC stats from all namenodes and aggregate them. + * @return Aggregated EC stats from all namenodes. + * @throws Exception If we cannot get the stats. + */ + private ECBlockGroupStats getNamenodeECBlockGroupStats() throws Exception { + List nnStats = new ArrayList<>(); + for (NamenodeContext nnContext : cluster.getNamenodes()) { + ClientProtocol cp = nnContext.getClient().getNamenode(); + nnStats.add(cp.getECBlockGroupStats()); + } + return ECBlockGroupStats.merge(nnStats); } @Test @@ -1375,9 +1401,9 @@ public class TestRouterRpc { router.getRouter().getNamenodeMetrics(); final String jsonString0 = metrics.getLiveNodes(); - // We should have 12 nodes in total + // We should have the nodes in all the subclusters JSONObject jsonObject = new JSONObject(jsonString0); - assertEquals(12, jsonObject.names().length()); + assertEquals(NUM_SUBCLUSTERS * NUM_DNS, jsonObject.names().length()); // We should be caching this information String jsonString1 = metrics.getLiveNodes();