HDFS-14268. RBF: Fix the location of the DNs in getDatanodeReport(). Contributed by Inigo Goiri.

This commit is contained in:
Giovanni Matteo Fumarola 2019-02-15 10:47:17 -08:00 committed by Brahma Reddy Battula
parent a2c8633275
commit 9c46012fbb
4 changed files with 114 additions and 53 deletions

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hdfs.protocol;
import java.util.Collection;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -103,4 +107,71 @@ public final class ECBlockGroupStats {
statsBuilder.append("]");
return statsBuilder.toString();
}
@Override
public int hashCode() {
return new HashCodeBuilder()
.append(lowRedundancyBlockGroups)
.append(corruptBlockGroups)
.append(missingBlockGroups)
.append(bytesInFutureBlockGroups)
.append(pendingDeletionBlocks)
.append(highestPriorityLowRedundancyBlocks)
.toHashCode();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ECBlockGroupStats other = (ECBlockGroupStats)o;
return new EqualsBuilder()
.append(lowRedundancyBlockGroups, other.lowRedundancyBlockGroups)
.append(corruptBlockGroups, other.corruptBlockGroups)
.append(missingBlockGroups, other.missingBlockGroups)
.append(bytesInFutureBlockGroups, other.bytesInFutureBlockGroups)
.append(pendingDeletionBlocks, other.pendingDeletionBlocks)
.append(highestPriorityLowRedundancyBlocks,
other.highestPriorityLowRedundancyBlocks)
.isEquals();
}
/**
* Merge the multiple ECBlockGroupStats.
* @param stats Collection of stats to merge.
* @return A new ECBlockGroupStats merging all the input ones
*/
public static ECBlockGroupStats merge(Collection<ECBlockGroupStats> stats) {
long lowRedundancyBlockGroups = 0;
long corruptBlockGroups = 0;
long missingBlockGroups = 0;
long bytesInFutureBlockGroups = 0;
long pendingDeletionBlocks = 0;
long highestPriorityLowRedundancyBlocks = 0;
boolean hasHighestPriorityLowRedundancyBlocks = false;
for (ECBlockGroupStats stat : stats) {
lowRedundancyBlockGroups += stat.getLowRedundancyBlockGroups();
corruptBlockGroups += stat.getCorruptBlockGroups();
missingBlockGroups += stat.getMissingBlockGroups();
bytesInFutureBlockGroups += stat.getBytesInFutureBlockGroups();
pendingDeletionBlocks += stat.getPendingDeletionBlocks();
if (stat.hasHighestPriorityLowRedundancyBlocks()) {
hasHighestPriorityLowRedundancyBlocks = true;
highestPriorityLowRedundancyBlocks +=
stat.getHighestPriorityLowRedundancyBlocks();
}
}
if (hasHighestPriorityLowRedundancyBlocks) {
return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups,
missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks,
highestPriorityLowRedundancyBlocks);
}
return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups,
missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks);
}
}

View File

@ -187,33 +187,6 @@ public class ErasureCoding {
rpcClient.invokeConcurrent(
nss, method, true, false, ECBlockGroupStats.class);
// Merge the stats from all the namespaces
long lowRedundancyBlockGroups = 0;
long corruptBlockGroups = 0;
long missingBlockGroups = 0;
long bytesInFutureBlockGroups = 0;
long pendingDeletionBlocks = 0;
long highestPriorityLowRedundancyBlocks = 0;
boolean hasHighestPriorityLowRedundancyBlocks = false;
for (ECBlockGroupStats stats : allStats.values()) {
lowRedundancyBlockGroups += stats.getLowRedundancyBlockGroups();
corruptBlockGroups += stats.getCorruptBlockGroups();
missingBlockGroups += stats.getMissingBlockGroups();
bytesInFutureBlockGroups += stats.getBytesInFutureBlockGroups();
pendingDeletionBlocks += stats.getPendingDeletionBlocks();
if (stats.hasHighestPriorityLowRedundancyBlocks()) {
hasHighestPriorityLowRedundancyBlocks = true;
highestPriorityLowRedundancyBlocks +=
stats.getHighestPriorityLowRedundancyBlocks();
}
}
if (hasHighestPriorityLowRedundancyBlocks) {
return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups,
missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks,
highestPriorityLowRedundancyBlocks);
}
return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups,
missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks);
return ECBlockGroupStats.merge(allStats.values());
}
}

View File

@ -24,16 +24,15 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@ -1061,8 +1060,8 @@ public class RouterRpcClient {
}
}
List<T> orderedLocations = new LinkedList<>();
Set<Callable<Object>> callables = new HashSet<>();
List<T> orderedLocations = new ArrayList<>();
List<Callable<Object>> callables = new ArrayList<>();
for (final T location : locations) {
String nsId = location.getNameserviceId();
final List<? extends FederationNamenodeContext> namenodes =
@ -1080,20 +1079,12 @@ public class RouterRpcClient {
nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest());
}
orderedLocations.add(nnLocation);
callables.add(new Callable<Object>() {
public Object call() throws Exception {
return invokeMethod(ugi, nnList, proto, m, paramList);
}
});
callables.add(() -> invokeMethod(ugi, nnList, proto, m, paramList));
}
} else {
// Call the objectGetter in order of nameservices in the NS list
orderedLocations.add(location);
callables.add(new Callable<Object>() {
public Object call() throws Exception {
return invokeMethod(ugi, namenodes, proto, m, paramList);
}
});
callables.add(() -> invokeMethod(ugi, namenodes, proto, m, paramList));
}
}

View File

@ -37,6 +37,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.EnumSet;
@ -47,6 +48,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
@ -120,6 +122,11 @@ public class TestRouterRpc {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterRpc.class);
private static final int NUM_SUBCLUSTERS = 2;
// We need at least 6 DNs to test Erasure Coding with RS-6-3-64k
private static final int NUM_DNS = 6;
private static final Comparator<ErasureCodingPolicyInfo> EC_POLICY_CMP =
new Comparator<ErasureCodingPolicyInfo>() {
public int compare(
@ -165,9 +172,9 @@ public class TestRouterRpc {
@BeforeClass
public static void globalSetUp() throws Exception {
cluster = new MiniRouterDFSCluster(false, 2);
// We need 6 DNs to test Erasure Coding with RS-6-3-64k
cluster.setNumDatanodesPerNameservice(6);
cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS);
cluster.setNumDatanodesPerNameservice(NUM_DNS);
cluster.setIndependentDNs();
// Start NNs and DNs and wait until ready
cluster.startCluster();
@ -586,8 +593,13 @@ public class TestRouterRpc {
DatanodeInfo[] combinedData =
routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
final Map<Integer, String> routerDNMap = new TreeMap<>();
for (DatanodeInfo dn : combinedData) {
String subcluster = dn.getNetworkLocation().split("/")[1];
routerDNMap.put(dn.getXferPort(), subcluster);
}
Set<Integer> individualData = new HashSet<Integer>();
final Map<Integer, String> nnDNMap = new TreeMap<>();
for (String nameservice : cluster.getNameservices()) {
NamenodeContext n = cluster.getNamenode(nameservice, null);
DFSClient client = n.getClient();
@ -597,10 +609,10 @@ public class TestRouterRpc {
for (int i = 0; i < data.length; i++) {
// Collect unique DNs based on their xfer port
DatanodeInfo info = data[i];
individualData.add(info.getXferPort());
nnDNMap.put(info.getXferPort(), nameservice);
}
}
assertEquals(combinedData.length, individualData.size());
assertEquals(nnDNMap, routerDNMap);
}
@Test
@ -1234,7 +1246,7 @@ public class TestRouterRpc {
}
@Test
public void testErasureCoding() throws IOException {
public void testErasureCoding() throws Exception {
LOG.info("List the available erasurce coding policies");
ErasureCodingPolicyInfo[] policies = checkErasureCodingPolicies();
@ -1340,8 +1352,22 @@ public class TestRouterRpc {
LOG.info("Check the stats");
ECBlockGroupStats statsRouter = routerProtocol.getECBlockGroupStats();
ECBlockGroupStats statsNamenode = nnProtocol.getECBlockGroupStats();
assertEquals(statsNamenode.toString(), statsRouter.toString());
ECBlockGroupStats statsNamenode = getNamenodeECBlockGroupStats();
assertEquals(statsNamenode, statsRouter);
}
/**
* Get the EC stats from all namenodes and aggregate them.
* @return Aggregated EC stats from all namenodes.
* @throws Exception If we cannot get the stats.
*/
private ECBlockGroupStats getNamenodeECBlockGroupStats() throws Exception {
List<ECBlockGroupStats> nnStats = new ArrayList<>();
for (NamenodeContext nnContext : cluster.getNamenodes()) {
ClientProtocol cp = nnContext.getClient().getNamenode();
nnStats.add(cp.getECBlockGroupStats());
}
return ECBlockGroupStats.merge(nnStats);
}
@Test
@ -1375,9 +1401,9 @@ public class TestRouterRpc {
router.getRouter().getNamenodeMetrics();
final String jsonString0 = metrics.getLiveNodes();
// We should have 12 nodes in total
// We should have the nodes in all the subclusters
JSONObject jsonObject = new JSONObject(jsonString0);
assertEquals(12, jsonObject.names().length());
assertEquals(NUM_SUBCLUSTERS * NUM_DNS, jsonObject.names().length());
// We should be caching this information
String jsonString1 = metrics.getLiveNodes();