diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java index f5e5272ac1c..dbb6ffa0fdd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java @@ -94,8 +94,8 @@ public class Quota { final List quotaLocs = getValidQuotaLocations(path); RemoteMethod method = new RemoteMethod("getQuotaUsage", new Class[] {String.class}, new RemoteParam()); - Map results = rpcClient.invokeConcurrent(quotaLocs, - method, true, false); + Map results = rpcClient.invokeConcurrent( + quotaLocs, method, true, false, QuotaUsage.class); return aggregateQuota(results); } @@ -151,14 +151,14 @@ public class Quota { * @param results Quota query result. * @return Aggregated Quota. */ - private QuotaUsage aggregateQuota(Map results) { + private QuotaUsage aggregateQuota(Map results) { long nsCount = 0; long ssCount = 0; boolean hasQuotaUnSet = false; - for (Map.Entry entry : results.entrySet()) { + for (Map.Entry entry : results.entrySet()) { RemoteLocation loc = entry.getKey(); - QuotaUsage usage = (QuotaUsage) entry.getValue(); + QuotaUsage usage = entry.getValue(); if (usage != null) { // If quota is not set in real FileSystem, the usage // value will return -1. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 06add71cbec..ebb0ec7a588 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -666,9 +666,9 @@ public class RouterRpcClient { * @throws IOException if the success condition is not met, return the first * remote exception generated. */ - public Object invokeSequential( + public T invokeSequential( final List locations, - final RemoteMethod remoteMethod, Class expectedResultClass, + final RemoteMethod remoteMethod, Class expectedResultClass, Object expectedResultValue) throws IOException { final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); @@ -688,7 +688,9 @@ public class RouterRpcClient { if (isExpectedClass(expectedResultClass, result) && isExpectedValue(expectedResultValue, result)) { // Valid result, stop here - return result; + @SuppressWarnings("unchecked") + T ret = (T)result; + return ret; } if (firstResult == null) { firstResult = result; @@ -718,7 +720,9 @@ public class RouterRpcClient { throw firstThrownException; } // Return the last result, whether it is the value we are looking for or a - return firstResult; + @SuppressWarnings("unchecked") + T ret = (T)firstResult; + return ret; } /** @@ -758,7 +762,7 @@ public class RouterRpcClient { } /** - * Invokes multiple concurrent proxy calls to different clients. Returns an + * Invoke multiple concurrent proxy calls to different clients. Returns an * array of results. * * Re-throws exceptions generated by the remote RPC call as either @@ -771,14 +775,12 @@ public class RouterRpcClient { * not complete. If false exceptions are ignored and all data results * successfully received are returned. * @param standby If the requests should go to the standby namenodes too. - * @return Result of invoking the method per subcluster: nsId -> result. - * @throws IOException If requiredResponse=true and any of the calls throw an - * exception. + * @throws IOException If all the calls throw an exception. */ - public Map invokeConcurrent( + public void invokeConcurrent( final Collection locations, final RemoteMethod method, boolean requireResponse, boolean standby) throws IOException { - return invokeConcurrent(locations, method, requireResponse, standby, -1); + invokeConcurrent(locations, method, requireResponse, standby, void.class); } /** @@ -788,6 +790,36 @@ public class RouterRpcClient { * Re-throws exceptions generated by the remote RPC call as either * RemoteException or IOException. * + * @param The type of the remote location. + * @param The type of the remote method return. + * @param locations List of remote locations to call concurrently. + * @param method The remote method and parameters to invoke. + * @param requireResponse If true an exception will be thrown if all calls do + * not complete. If false exceptions are ignored and all data results + * successfully received are returned. + * @param standby If the requests should go to the standby namenodes too. + * @param clazz Type of the remote return type. + * @return Result of invoking the method per subcluster: nsId -> result. + * @throws IOException If requiredResponse=true and any of the calls throw an + * exception. + */ + public Map invokeConcurrent( + final Collection locations, final RemoteMethod method, + boolean requireResponse, boolean standby, Class clazz) + throws IOException { + return invokeConcurrent( + locations, method, requireResponse, standby, -1, clazz); + } + + /** + * Invokes multiple concurrent proxy calls to different clients. Returns an + * array of results. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param The type of the remote location. + * @param The type of the remote method return. * @param locations List of remote locations to call concurrently. * @param method The remote method and parameters to invoke. * @param requireResponse If true an exception will be thrown if all calls do @@ -795,14 +827,15 @@ public class RouterRpcClient { * successfully received are returned. * @param standby If the requests should go to the standby namenodes too. * @param timeOutMs Timeout for each individual call. + * @param clazz Type of the remote return type. * @return Result of invoking the method per subcluster: nsId -> result. * @throws IOException If requiredResponse=true and any of the calls throw an * exception. */ @SuppressWarnings("unchecked") - public Map invokeConcurrent( + public Map invokeConcurrent( final Collection locations, final RemoteMethod method, - boolean requireResponse, boolean standby, long timeOutMs) + boolean requireResponse, boolean standby, long timeOutMs, Class clazz) throws IOException { final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); @@ -816,7 +849,7 @@ public class RouterRpcClient { getNamenodesForNameservice(ns); Object[] paramList = method.getParams(location); Object result = invokeMethod(ugi, namenodes, m, paramList); - return Collections.singletonMap(location, result); + return Collections.singletonMap(location, clazz.cast(result)); } List orderedLocations = new LinkedList<>(); @@ -866,14 +899,14 @@ public class RouterRpcClient { } else { futures = executorService.invokeAll(callables); } - Map results = new TreeMap<>(); + Map results = new TreeMap<>(); Map exceptions = new TreeMap<>(); for (int i=0; i future = futures.get(i); Object result = future.get(); - results.put(location, result); + results.put(location, clazz.cast(result)); } catch (CancellationException ce) { T loc = orderedLocations.get(i); String msg = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 4f8d19b179c..7e333cd8393 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -28,12 +28,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_ import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Array; import java.net.InetSocketAddress; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -966,8 +968,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { RemoteMethod method = new RemoteMethod("getListing", new Class[] {String.class, startAfter.getClass(), boolean.class}, new RemoteParam(), startAfter, needLocation); - Map listings = - rpcClient.invokeConcurrent(locations, method, false, false); + Map listings = + rpcClient.invokeConcurrent( + locations, method, false, false, DirectoryListing.class); Map nnListing = new TreeMap<>(); int totalRemainingEntries = 0; @@ -976,9 +979,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { if (listings != null) { // Check the subcluster listing with the smallest name String lastName = null; - for (Entry entry : listings.entrySet()) { + for (Entry entry : + listings.entrySet()) { RemoteLocation location = entry.getKey(); - DirectoryListing listing = (DirectoryListing) entry.getValue(); + DirectoryListing listing = entry.getValue(); if (listing == null) { LOG.debug("Cannot get listing from {}", location); } else { @@ -1102,11 +1106,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { RemoteMethod method = new RemoteMethod("getStats"); Set nss = namenodeResolver.getNamespaces(); - Map results = - rpcClient.invokeConcurrent(nss, method, true, false); + Map results = + rpcClient.invokeConcurrent(nss, method, true, false, long[].class); long[] combinedData = new long[STATS_ARRAY_LENGTH]; - for (Object o : results.values()) { - long[] data = (long[]) o; + for (long[] data : results.values()) { for (int i = 0; i < combinedData.length && i < data.length; i++) { if (data[i] >= 0) { combinedData[i] += data[i]; @@ -1139,11 +1142,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { new Class[] {DatanodeReportType.class}, type); Set nss = namenodeResolver.getNamespaces(); - Map results = - rpcClient.invokeConcurrent(nss, method, true, false, timeOutMs); - for (Entry entry : results.entrySet()) { + Map results = + rpcClient.invokeConcurrent( + nss, method, true, false, timeOutMs, DatanodeInfo[].class); + for (Entry entry : + results.entrySet()) { FederationNamespaceInfo ns = entry.getKey(); - DatanodeInfo[] result = (DatanodeInfo[]) entry.getValue(); + DatanodeInfo[] result = entry.getValue(); for (DatanodeInfo node : result) { String nodeId = node.getXferAddr(); if (!datanodesMap.containsKey(nodeId)) { @@ -1229,17 +1234,14 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { new Class[] {SafeModeAction.class, boolean.class}, action, isChecked); Set nss = namenodeResolver.getNamespaces(); - Map results = - rpcClient.invokeConcurrent(nss, method, true, true); + Map results = + rpcClient.invokeConcurrent(nss, method, true, true, boolean.class); // We only report true if all the name space are in safe mode int numSafemode = 0; - for (Object result : results.values()) { - if (result instanceof Boolean) { - boolean safemode = (boolean) result; - if (safemode) { - numSafemode++; - } + for (boolean safemode : results.values()) { + if (safemode) { + numSafemode++; } } return numSafemode == results.size(); @@ -1252,18 +1254,14 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { RemoteMethod method = new RemoteMethod("restoreFailedStorage", new Class[] {String.class}, arg); final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent(nss, method, true, false); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false, boolean.class); boolean success = true; - Object obj = ret; - @SuppressWarnings("unchecked") - Map results = - (Map)obj; - Collection sucesses = results.values(); - for (boolean s : sucesses) { + for (boolean s : ret.values()) { if (!s) { success = false; + break; } } return success; @@ -1284,17 +1282,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { RemoteMethod method = new RemoteMethod("rollEdits", new Class[] {}); final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent(nss, method, true, false); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false, long.class); // Return the maximum txid long txid = 0; - Object obj = ret; - @SuppressWarnings("unchecked") - Map results = - (Map)obj; - Collection txids = results.values(); - for (long t : txids) { + for (long t : ret.values()) { if (t > txid) { txid = t; } @@ -1329,17 +1322,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { RemoteMethod method = new RemoteMethod("rollingUpgrade", new Class[] {RollingUpgradeAction.class}, action); final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent(nss, method, true, false); + Map ret = + rpcClient.invokeConcurrent( + nss, method, true, false, RollingUpgradeInfo.class); // Return the first rolling upgrade info RollingUpgradeInfo info = null; - Object obj = ret; - @SuppressWarnings("unchecked") - Map results = - (Map)obj; - Collection infos = results.values(); - for (RollingUpgradeInfo infoNs : infos) { + for (RollingUpgradeInfo infoNs : ret.values()) { if (info == null && infoNs != null) { info = infoNs; } @@ -1391,10 +1380,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { final List locations = getLocationsForPath(path, false); RemoteMethod method = new RemoteMethod("getContentSummary", new Class[] {String.class}, new RemoteParam()); - @SuppressWarnings("unchecked") - Map results = - (Map) ((Object)rpcClient.invokeConcurrent( - locations, method, false, false)); + Map results = + rpcClient.invokeConcurrent( + locations, method, false, false, ContentSummary.class); summaries.addAll(results.values()); } catch (FileNotFoundException e) { notFoundException = e; @@ -1767,16 +1755,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { RemoteMethod method = new RemoteMethod( "getCurrentEditLogTxid", new Class[] {}); final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent(nss, method, true, false); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false, long.class); // Return the maximum txid long txid = 0; - Object obj = ret; - @SuppressWarnings("unchecked") - Map results = - (Map)obj; - Collection txids = results.values(); + Collection txids = ret.values(); for (long t : txids) { if (t > txid) { txid = t; @@ -2039,6 +2023,39 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser(); } + /** + * Merge the outputs from multiple namespaces. + * @param map Namespace -> Output array. + * @param clazz Class of the values. + * @return Array with the outputs. + */ + protected static T[] merge( + Map map, Class clazz) { + + // Put all results into a set to avoid repeats + Set ret = new LinkedHashSet<>(); + for (T[] values : map.values()) { + for (T val : values) { + ret.add(val); + } + } + + return toArray(ret, clazz); + } + + /** + * Convert a set of values into an array. + * @param set Input set. + * @param clazz Class of the values. + * @return Array with the values in set. + */ + private static T[] toArray(Set set, Class clazz) { + @SuppressWarnings("unchecked") + T[] combinedData = (T[]) Array.newInstance(clazz, set.size()); + combinedData = set.toArray(combinedData); + return combinedData; + } + /** * Get quota module implement. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java index 74244997912..8f8bd3ebcba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java @@ -109,6 +109,8 @@ public class RouterDFSCluster { private List routers; /** If the Namenodes are in high availability.*/ private boolean highAvailability; + /** Number of datanodes per nameservice. */ + private int numDatanodesPerNameservice = 2; /** Mini cluster. */ private MiniDFSCluster cluster; @@ -356,8 +358,8 @@ public class RouterDFSCluster { DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS); } - public RouterDFSCluster(boolean ha, int numNameservices, int numNamnodes) { - this(ha, numNameservices, numNamnodes, + public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) { + this(ha, numNameservices, numNamenodes, DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS); } @@ -531,6 +533,10 @@ public class RouterDFSCluster { } } + public void setNumDatanodesPerNameservice(int num) { + this.numDatanodesPerNameservice = num; + } + public String getNameservicesKey() { StringBuilder sb = new StringBuilder(); for (String nsId : this.nameservices) { @@ -658,7 +664,7 @@ public class RouterDFSCluster { nnConf.addResource(overrideConf); } cluster = new MiniDFSCluster.Builder(nnConf) - .numDataNodes(nameservices.size()*2) + .numDataNodes(nameservices.size() * numDatanodesPerNameservice) .nnTopology(topology) .build(); cluster.waitActive(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 993751378bd..9bcbcad2ba8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -153,9 +153,9 @@ public class TestRouterRpc { // Wait to ensure NN has fully created its test directories Thread.sleep(100); - // Pick a NS, namenode and router for this test + // Default namenode and random router for this test this.router = cluster.getRandomRouter(); - this.ns = cluster.getRandomNameservice(); + this.ns = cluster.getNameservices().get(0); this.namenode = cluster.getNamenode(ns, null); // Handles to the ClientProtocol interface