HDFS-12919. RBF: Support erasure coding methods in RouterRpcServer. Contributed by Inigo Goiri.

This commit is contained in:
Inigo Goiri 2018-03-15 11:30:52 -07:00
parent ccf07fdafd
commit b1b10007a4
5 changed files with 136 additions and 80 deletions

View File

@ -94,8 +94,8 @@ public class Quota {
final List<RemoteLocation> quotaLocs = getValidQuotaLocations(path);
RemoteMethod method = new RemoteMethod("getQuotaUsage",
new Class<?>[] {String.class}, new RemoteParam());
Map<RemoteLocation, Object> results = rpcClient.invokeConcurrent(quotaLocs,
method, true, false);
Map<RemoteLocation, QuotaUsage> results = rpcClient.invokeConcurrent(
quotaLocs, method, true, false, QuotaUsage.class);
return aggregateQuota(results);
}
@ -151,14 +151,14 @@ public class Quota {
* @param results Quota query result.
* @return Aggregated Quota.
*/
private QuotaUsage aggregateQuota(Map<RemoteLocation, Object> results) {
private QuotaUsage aggregateQuota(Map<RemoteLocation, QuotaUsage> results) {
long nsCount = 0;
long ssCount = 0;
boolean hasQuotaUnSet = false;
for (Map.Entry<RemoteLocation, Object> entry : results.entrySet()) {
for (Map.Entry<RemoteLocation, QuotaUsage> entry : results.entrySet()) {
RemoteLocation loc = entry.getKey();
QuotaUsage usage = (QuotaUsage) entry.getValue();
QuotaUsage usage = entry.getValue();
if (usage != null) {
// If quota is not set in real FileSystem, the usage
// value will return -1.

View File

@ -666,9 +666,9 @@ public class RouterRpcClient {
* @throws IOException if the success condition is not met, return the first
* remote exception generated.
*/
public Object invokeSequential(
public <T> T invokeSequential(
final List<? extends RemoteLocationContext> locations,
final RemoteMethod remoteMethod, Class<?> expectedResultClass,
final RemoteMethod remoteMethod, Class<T> expectedResultClass,
Object expectedResultValue) throws IOException {
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
@ -688,7 +688,9 @@ public class RouterRpcClient {
if (isExpectedClass(expectedResultClass, result) &&
isExpectedValue(expectedResultValue, result)) {
// Valid result, stop here
return result;
@SuppressWarnings("unchecked")
T ret = (T)result;
return ret;
}
if (firstResult == null) {
firstResult = result;
@ -718,7 +720,9 @@ public class RouterRpcClient {
throw firstThrownException;
}
// Return the last result, whether it is the value we are looking for or a
return firstResult;
@SuppressWarnings("unchecked")
T ret = (T)firstResult;
return ret;
}
/**
@ -758,7 +762,7 @@ public class RouterRpcClient {
}
/**
* Invokes multiple concurrent proxy calls to different clients. Returns an
* Invoke multiple concurrent proxy calls to different clients. Returns an
* array of results.
*
* Re-throws exceptions generated by the remote RPC call as either
@ -771,14 +775,12 @@ public class RouterRpcClient {
* not complete. If false exceptions are ignored and all data results
* successfully received are returned.
* @param standby If the requests should go to the standby namenodes too.
* @return Result of invoking the method per subcluster: nsId -> result.
* @throws IOException If requiredResponse=true and any of the calls throw an
* exception.
* @throws IOException If all the calls throw an exception.
*/
public <T extends RemoteLocationContext> Map<T, Object> invokeConcurrent(
public <T extends RemoteLocationContext, R> void invokeConcurrent(
final Collection<T> locations, final RemoteMethod method,
boolean requireResponse, boolean standby) throws IOException {
return invokeConcurrent(locations, method, requireResponse, standby, -1);
invokeConcurrent(locations, method, requireResponse, standby, void.class);
}
/**
@ -788,6 +790,36 @@ public class RouterRpcClient {
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param <T> The type of the remote location.
* @param <R> The type of the remote method return.
* @param locations List of remote locations to call concurrently.
* @param method The remote method and parameters to invoke.
* @param requireResponse If true an exception will be thrown if all calls do
* not complete. If false exceptions are ignored and all data results
* successfully received are returned.
* @param standby If the requests should go to the standby namenodes too.
* @param clazz Type of the remote return type.
* @return Result of invoking the method per subcluster: nsId -> result.
* @throws IOException If requiredResponse=true and any of the calls throw an
* exception.
*/
public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
final Collection<T> locations, final RemoteMethod method,
boolean requireResponse, boolean standby, Class<R> clazz)
throws IOException {
return invokeConcurrent(
locations, method, requireResponse, standby, -1, clazz);
}
/**
* Invokes multiple concurrent proxy calls to different clients. Returns an
* array of results.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param <T> The type of the remote location.
* @param <R> The type of the remote method return.
* @param locations List of remote locations to call concurrently.
* @param method The remote method and parameters to invoke.
* @param requireResponse If true an exception will be thrown if all calls do
@ -795,14 +827,15 @@ public class RouterRpcClient {
* successfully received are returned.
* @param standby If the requests should go to the standby namenodes too.
* @param timeOutMs Timeout for each individual call.
* @param clazz Type of the remote return type.
* @return Result of invoking the method per subcluster: nsId -> result.
* @throws IOException If requiredResponse=true and any of the calls throw an
* exception.
*/
@SuppressWarnings("unchecked")
public <T extends RemoteLocationContext> Map<T, Object> invokeConcurrent(
public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
final Collection<T> locations, final RemoteMethod method,
boolean requireResponse, boolean standby, long timeOutMs)
boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz)
throws IOException {
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
@ -816,7 +849,7 @@ public class RouterRpcClient {
getNamenodesForNameservice(ns);
Object[] paramList = method.getParams(location);
Object result = invokeMethod(ugi, namenodes, m, paramList);
return Collections.singletonMap(location, result);
return Collections.singletonMap(location, clazz.cast(result));
}
List<T> orderedLocations = new LinkedList<>();
@ -866,14 +899,14 @@ public class RouterRpcClient {
} else {
futures = executorService.invokeAll(callables);
}
Map<T, Object> results = new TreeMap<>();
Map<T, R> results = new TreeMap<>();
Map<T, IOException> exceptions = new TreeMap<>();
for (int i=0; i<futures.size(); i++) {
T location = orderedLocations.get(i);
try {
Future<Object> future = futures.get(i);
Object result = future.get();
results.put(location, result);
results.put(location, clazz.cast(result));
} catch (CancellationException ce) {
T loc = orderedLocations.get(i);
String msg =

View File

@ -28,12 +28,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Array;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -966,8 +968,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
RemoteMethod method = new RemoteMethod("getListing",
new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
new RemoteParam(), startAfter, needLocation);
Map<RemoteLocation, Object> listings =
rpcClient.invokeConcurrent(locations, method, false, false);
Map<RemoteLocation, DirectoryListing> listings =
rpcClient.invokeConcurrent(
locations, method, false, false, DirectoryListing.class);
Map<String, HdfsFileStatus> nnListing = new TreeMap<>();
int totalRemainingEntries = 0;
@ -976,9 +979,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
if (listings != null) {
// Check the subcluster listing with the smallest name
String lastName = null;
for (Entry<RemoteLocation, Object> entry : listings.entrySet()) {
for (Entry<RemoteLocation, DirectoryListing> entry :
listings.entrySet()) {
RemoteLocation location = entry.getKey();
DirectoryListing listing = (DirectoryListing) entry.getValue();
DirectoryListing listing = entry.getValue();
if (listing == null) {
LOG.debug("Cannot get listing from {}", location);
} else {
@ -1102,11 +1106,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
RemoteMethod method = new RemoteMethod("getStats");
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, Object> results =
rpcClient.invokeConcurrent(nss, method, true, false);
Map<FederationNamespaceInfo, long[]> results =
rpcClient.invokeConcurrent(nss, method, true, false, long[].class);
long[] combinedData = new long[STATS_ARRAY_LENGTH];
for (Object o : results.values()) {
long[] data = (long[]) o;
for (long[] data : results.values()) {
for (int i = 0; i < combinedData.length && i < data.length; i++) {
if (data[i] >= 0) {
combinedData[i] += data[i];
@ -1139,11 +1142,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
new Class<?>[] {DatanodeReportType.class}, type);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, Object> results =
rpcClient.invokeConcurrent(nss, method, true, false, timeOutMs);
for (Entry<FederationNamespaceInfo, Object> entry : results.entrySet()) {
Map<FederationNamespaceInfo, DatanodeInfo[]> results =
rpcClient.invokeConcurrent(
nss, method, true, false, timeOutMs, DatanodeInfo[].class);
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
results.entrySet()) {
FederationNamespaceInfo ns = entry.getKey();
DatanodeInfo[] result = (DatanodeInfo[]) entry.getValue();
DatanodeInfo[] result = entry.getValue();
for (DatanodeInfo node : result) {
String nodeId = node.getXferAddr();
if (!datanodesMap.containsKey(nodeId)) {
@ -1229,17 +1234,14 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
new Class<?>[] {SafeModeAction.class, boolean.class},
action, isChecked);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, Object> results =
rpcClient.invokeConcurrent(nss, method, true, true);
Map<FederationNamespaceInfo, Boolean> results =
rpcClient.invokeConcurrent(nss, method, true, true, boolean.class);
// We only report true if all the name space are in safe mode
int numSafemode = 0;
for (Object result : results.values()) {
if (result instanceof Boolean) {
boolean safemode = (boolean) result;
if (safemode) {
numSafemode++;
}
for (boolean safemode : results.values()) {
if (safemode) {
numSafemode++;
}
}
return numSafemode == results.size();
@ -1252,18 +1254,14 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
RemoteMethod method = new RemoteMethod("restoreFailedStorage",
new Class<?>[] {String.class}, arg);
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, Object> ret =
rpcClient.invokeConcurrent(nss, method, true, false);
Map<FederationNamespaceInfo, Boolean> ret =
rpcClient.invokeConcurrent(nss, method, true, false, boolean.class);
boolean success = true;
Object obj = ret;
@SuppressWarnings("unchecked")
Map<FederationNamespaceInfo, Boolean> results =
(Map<FederationNamespaceInfo, Boolean>)obj;
Collection<Boolean> sucesses = results.values();
for (boolean s : sucesses) {
for (boolean s : ret.values()) {
if (!s) {
success = false;
break;
}
}
return success;
@ -1284,17 +1282,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
RemoteMethod method = new RemoteMethod("rollEdits", new Class<?>[] {});
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, Object> ret =
rpcClient.invokeConcurrent(nss, method, true, false);
Map<FederationNamespaceInfo, Long> ret =
rpcClient.invokeConcurrent(nss, method, true, false, long.class);
// Return the maximum txid
long txid = 0;
Object obj = ret;
@SuppressWarnings("unchecked")
Map<FederationNamespaceInfo, Long> results =
(Map<FederationNamespaceInfo, Long>)obj;
Collection<Long> txids = results.values();
for (long t : txids) {
for (long t : ret.values()) {
if (t > txid) {
txid = t;
}
@ -1329,17 +1322,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
RemoteMethod method = new RemoteMethod("rollingUpgrade",
new Class<?>[] {RollingUpgradeAction.class}, action);
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, Object> ret =
rpcClient.invokeConcurrent(nss, method, true, false);
Map<FederationNamespaceInfo, RollingUpgradeInfo> ret =
rpcClient.invokeConcurrent(
nss, method, true, false, RollingUpgradeInfo.class);
// Return the first rolling upgrade info
RollingUpgradeInfo info = null;
Object obj = ret;
@SuppressWarnings("unchecked")
Map<FederationNamespaceInfo, RollingUpgradeInfo> results =
(Map<FederationNamespaceInfo, RollingUpgradeInfo>)obj;
Collection<RollingUpgradeInfo> infos = results.values();
for (RollingUpgradeInfo infoNs : infos) {
for (RollingUpgradeInfo infoNs : ret.values()) {
if (info == null && infoNs != null) {
info = infoNs;
}
@ -1391,10 +1380,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
final List<RemoteLocation> locations = getLocationsForPath(path, false);
RemoteMethod method = new RemoteMethod("getContentSummary",
new Class<?>[] {String.class}, new RemoteParam());
@SuppressWarnings("unchecked")
Map<String, ContentSummary> results =
(Map<String, ContentSummary>) ((Object)rpcClient.invokeConcurrent(
locations, method, false, false));
Map<RemoteLocation, ContentSummary> results =
rpcClient.invokeConcurrent(
locations, method, false, false, ContentSummary.class);
summaries.addAll(results.values());
} catch (FileNotFoundException e) {
notFoundException = e;
@ -1767,16 +1755,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
RemoteMethod method = new RemoteMethod(
"getCurrentEditLogTxid", new Class<?>[] {});
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, Object> ret =
rpcClient.invokeConcurrent(nss, method, true, false);
Map<FederationNamespaceInfo, Long> ret =
rpcClient.invokeConcurrent(nss, method, true, false, long.class);
// Return the maximum txid
long txid = 0;
Object obj = ret;
@SuppressWarnings("unchecked")
Map<FederationNamespaceInfo, Long> results =
(Map<FederationNamespaceInfo, Long>)obj;
Collection<Long> txids = results.values();
Collection<Long> txids = ret.values();
for (long t : txids) {
if (t > txid) {
txid = t;
@ -2039,6 +2023,39 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
}
/**
* Merge the outputs from multiple namespaces.
* @param map Namespace -> Output array.
* @param clazz Class of the values.
* @return Array with the outputs.
*/
protected static <T> T[] merge(
Map<FederationNamespaceInfo, T[]> map, Class<T> clazz) {
// Put all results into a set to avoid repeats
Set<T> ret = new LinkedHashSet<>();
for (T[] values : map.values()) {
for (T val : values) {
ret.add(val);
}
}
return toArray(ret, clazz);
}
/**
* Convert a set of values into an array.
* @param set Input set.
* @param clazz Class of the values.
* @return Array with the values in set.
*/
private static <T> T[] toArray(Set<T> set, Class<T> clazz) {
@SuppressWarnings("unchecked")
T[] combinedData = (T[]) Array.newInstance(clazz, set.size());
combinedData = set.toArray(combinedData);
return combinedData;
}
/**
* Get quota module implement.
*/

View File

@ -109,6 +109,8 @@ public class RouterDFSCluster {
private List<RouterContext> routers;
/** If the Namenodes are in high availability.*/
private boolean highAvailability;
/** Number of datanodes per nameservice. */
private int numDatanodesPerNameservice = 2;
/** Mini cluster. */
private MiniDFSCluster cluster;
@ -356,8 +358,8 @@ public class RouterDFSCluster {
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
}
public RouterDFSCluster(boolean ha, int numNameservices, int numNamnodes) {
this(ha, numNameservices, numNamnodes,
public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) {
this(ha, numNameservices, numNamenodes,
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
}
@ -531,6 +533,10 @@ public class RouterDFSCluster {
}
}
public void setNumDatanodesPerNameservice(int num) {
this.numDatanodesPerNameservice = num;
}
public String getNameservicesKey() {
StringBuilder sb = new StringBuilder();
for (String nsId : this.nameservices) {
@ -658,7 +664,7 @@ public class RouterDFSCluster {
nnConf.addResource(overrideConf);
}
cluster = new MiniDFSCluster.Builder(nnConf)
.numDataNodes(nameservices.size()*2)
.numDataNodes(nameservices.size() * numDatanodesPerNameservice)
.nnTopology(topology)
.build();
cluster.waitActive();

View File

@ -153,9 +153,9 @@ public class TestRouterRpc {
// Wait to ensure NN has fully created its test directories
Thread.sleep(100);
// Pick a NS, namenode and router for this test
// Default namenode and random router for this test
this.router = cluster.getRandomRouter();
this.ns = cluster.getRandomNameservice();
this.ns = cluster.getNameservices().get(0);
this.namenode = cluster.getNamenode(ns, null);
// Handles to the ClientProtocol interface