HDFS-16369. RBF: Fix the retry logic of RouterRpcServer#invokeAtAvailableNs. (#3745). Contributed by Ayush Saxena.
Reviewed-by: litao <tomleescut@gmail.com> Reviewed-by: Inigo Goiri <inigoiri@apache.org>
This commit is contained in:
parent
c0f405a46b
commit
cab7086fbc
|
@ -43,7 +43,6 @@ import java.net.URISyntaxException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
|
@ -671,8 +670,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
|
|||
|
||||
/**
|
||||
* Invokes the method at default namespace, if default namespace is not
|
||||
* available then at the first available namespace.
|
||||
* If the namespace is unavailable, retry once with other namespace.
|
||||
* available then at the other available namespaces.
|
||||
* If the namespace is unavailable, retry with other namespaces.
|
||||
* @param <T> expected return type.
|
||||
* @param method the remote method.
|
||||
* @return the response received after invoking method.
|
||||
|
@ -681,28 +680,29 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
|
|||
<T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
|
||||
throws IOException {
|
||||
String nsId = subclusterResolver.getDefaultNamespace();
|
||||
// If default Ns is not present return result from first namespace.
|
||||
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
||||
try {
|
||||
if (!nsId.isEmpty()) {
|
||||
// If no namespace is available, then throw this IOException.
|
||||
IOException io = new IOException("No namespace available.");
|
||||
// If default Ns is present return result from that namespace.
|
||||
if (!nsId.isEmpty()) {
|
||||
try {
|
||||
return rpcClient.invokeSingle(nsId, method, clazz);
|
||||
} catch (IOException ioe) {
|
||||
if (!clientProto.isUnavailableSubclusterException(ioe)) {
|
||||
LOG.debug("{} exception cannot be retried",
|
||||
ioe.getClass().getSimpleName());
|
||||
throw ioe;
|
||||
}
|
||||
// Remove the already tried namespace.
|
||||
nss.removeIf(n -> n.getNameserviceId().equals(nsId));
|
||||
return invokeOnNs(method, clazz, io, nss);
|
||||
}
|
||||
// If no namespace is available, throw IOException.
|
||||
IOException io = new IOException("No namespace available.");
|
||||
return invokeOnNs(method, clazz, io, nss);
|
||||
} catch (IOException ioe) {
|
||||
if (!clientProto.isUnavailableSubclusterException(ioe)) {
|
||||
LOG.debug("{} exception cannot be retried",
|
||||
ioe.getClass().getSimpleName());
|
||||
throw ioe;
|
||||
}
|
||||
Set<FederationNamespaceInfo> nssWithoutFailed = getNameSpaceInfo(nss, nsId);
|
||||
return invokeOnNs(method, clazz, ioe, nssWithoutFailed);
|
||||
}
|
||||
return invokeOnNs(method, clazz, io, nss);
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke the method on first available namespace,
|
||||
* Invoke the method sequentially on available namespaces,
|
||||
* throw no namespace available exception, if no namespaces are available.
|
||||
* @param method the remote method.
|
||||
* @param clazz Class for the return type.
|
||||
|
@ -716,26 +716,22 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
|
|||
if (nss.isEmpty()) {
|
||||
throw ioe;
|
||||
}
|
||||
String nsId = nss.iterator().next().getNameserviceId();
|
||||
return rpcClient.invokeSingle(nsId, method, clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get set of namespace info's removing the already invoked namespaceinfo.
|
||||
* @param nss List of namespaces in the federation.
|
||||
* @param nsId Already invoked namespace id.
|
||||
* @return List of name spaces in the federation on
|
||||
* removing the already invoked namespaceinfo.
|
||||
*/
|
||||
private static Set<FederationNamespaceInfo> getNameSpaceInfo(
|
||||
final Set<FederationNamespaceInfo> nss, final String nsId) {
|
||||
Set<FederationNamespaceInfo> namespaceInfos = new HashSet<>();
|
||||
for (FederationNamespaceInfo ns : nss) {
|
||||
if (!nsId.equals(ns.getNameserviceId())) {
|
||||
namespaceInfos.add(ns);
|
||||
for (FederationNamespaceInfo fnInfo : nss) {
|
||||
String nsId = fnInfo.getNameserviceId();
|
||||
LOG.debug("Invoking {} on namespace {}", method, nsId);
|
||||
try {
|
||||
return rpcClient.invokeSingle(nsId, method, clazz);
|
||||
} catch (IOException e) {
|
||||
LOG.debug("Failed to invoke {} on namespace {}", method, nsId, e);
|
||||
// Ignore the exception and try on other namespace, if the tried
|
||||
// namespace is unavailable, else throw the received exception.
|
||||
if (!clientProto.isUnavailableSubclusterException(e)) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
return namespaceInfos;
|
||||
// Couldn't get a response from any of the namespace, throw ioe.
|
||||
throw ioe;
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
|
|
|
@ -76,13 +76,14 @@ import org.junit.Test;
|
|||
* Tests router rpc with multiple destination mount table resolver.
|
||||
*/
|
||||
public class TestRouterRPCMultipleDestinationMountTableResolver {
|
||||
private static final List<String> NS_IDS = Arrays.asList("ns0", "ns1");
|
||||
private static final List<String> NS_IDS = Arrays.asList("ns0", "ns1", "ns2");
|
||||
|
||||
private static StateStoreDFSCluster cluster;
|
||||
private static RouterContext routerContext;
|
||||
private static MountTableResolver resolver;
|
||||
private static DistributedFileSystem nnFs0;
|
||||
private static DistributedFileSystem nnFs1;
|
||||
private static DistributedFileSystem nnFs2;
|
||||
private static DistributedFileSystem routerFs;
|
||||
private static RouterRpcServer rpcServer;
|
||||
|
||||
|
@ -90,7 +91,7 @@ public class TestRouterRPCMultipleDestinationMountTableResolver {
|
|||
public static void setUp() throws Exception {
|
||||
|
||||
// Build and start a federated cluster
|
||||
cluster = new StateStoreDFSCluster(false, 2,
|
||||
cluster = new StateStoreDFSCluster(false, 3,
|
||||
MultipleDestinationMountTableResolver.class);
|
||||
Configuration routerConf =
|
||||
new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
|
||||
|
@ -111,6 +112,8 @@ public class TestRouterRPCMultipleDestinationMountTableResolver {
|
|||
.getNamenode(cluster.getNameservices().get(0), null).getFileSystem();
|
||||
nnFs1 = (DistributedFileSystem) cluster
|
||||
.getNamenode(cluster.getNameservices().get(1), null).getFileSystem();
|
||||
nnFs2 = (DistributedFileSystem) cluster
|
||||
.getNamenode(cluster.getNameservices().get(2), null).getFileSystem();
|
||||
routerFs = (DistributedFileSystem) routerContext.getFileSystem();
|
||||
rpcServer =routerContext.getRouter().getRpcServer();
|
||||
}
|
||||
|
@ -668,6 +671,7 @@ public class TestRouterRPCMultipleDestinationMountTableResolver {
|
|||
// Make one subcluster unavailable.
|
||||
MiniDFSCluster dfsCluster = cluster.getCluster();
|
||||
dfsCluster.shutdownNameNode(0);
|
||||
dfsCluster.shutdownNameNode(1);
|
||||
try {
|
||||
// Verify that #invokeAtAvailableNs works by calling #getServerDefaults.
|
||||
RemoteMethod method = new RemoteMethod("getServerDefaults");
|
||||
|
@ -675,7 +679,8 @@ public class TestRouterRPCMultipleDestinationMountTableResolver {
|
|||
rpcServer.invokeAtAvailableNs(method, FsServerDefaults.class);
|
||||
assertNotNull(serverDefaults);
|
||||
} finally {
|
||||
dfsCluster.restartNameNode(0);
|
||||
dfsCluster.restartNameNode(0, false);
|
||||
dfsCluster.restartNameNode(1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -893,6 +898,9 @@ public class TestRouterRPCMultipleDestinationMountTableResolver {
|
|||
if (nsId.equals("ns1")) {
|
||||
return nnFs1;
|
||||
}
|
||||
if (nsId.equals("ns2")) {
|
||||
return nnFs2;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue