HDFS-13750. RBF: Router ID in RouterRpcClient is always null. Contributed by Takanobu Asanuma.

(cherry picked from commit 01ff817814)
This commit is contained in:
Yiqun Lin 2018-08-20 23:01:59 +08:00
parent a3d4a25bbf
commit d7442c244f
4 changed files with 33 additions and 9 deletions

View File

@ -92,8 +92,8 @@ public class RouterRpcClient {
LoggerFactory.getLogger(RouterRpcClient.class); LoggerFactory.getLogger(RouterRpcClient.class);
/** Router identifier. */ /** Router using this RPC client. */
private final String routerId; private final Router router;
/** Interface to identify the active NN for a nameservice or blockpool ID. */ /** Interface to identify the active NN for a nameservice or blockpool ID. */
private final ActiveNamenodeResolver namenodeResolver; private final ActiveNamenodeResolver namenodeResolver;
@ -116,12 +116,13 @@ public class RouterRpcClient {
* Create a router RPC client to manage remote procedure calls to NNs. * Create a router RPC client to manage remote procedure calls to NNs.
* *
* @param conf Hdfs Configuation. * @param conf Hdfs Configuation.
* @param router A router using this RPC client.
* @param resolver A NN resolver to determine the currently active NN in HA. * @param resolver A NN resolver to determine the currently active NN in HA.
* @param monitor Optional performance monitor. * @param monitor Optional performance monitor.
*/ */
public RouterRpcClient(Configuration conf, String identifier, public RouterRpcClient(Configuration conf, Router router,
ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) { ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) {
this.routerId = identifier; this.router = router;
this.namenodeResolver = resolver; this.namenodeResolver = resolver;
@ -343,7 +344,8 @@ public class RouterRpcClient {
if (namenodes == null || namenodes.isEmpty()) { if (namenodes == null || namenodes.isEmpty()) {
throw new IOException("No namenodes to invoke " + method.getName() + throw new IOException("No namenodes to invoke " + method.getName() +
" with params " + Arrays.toString(params) + " from " + this.routerId); " with params " + Arrays.toString(params) + " from "
+ router.getRouterId());
} }
Object ret = null; Object ret = null;
@ -1126,7 +1128,7 @@ public class RouterRpcClient {
String msg = "Not enough client threads " + active + "/" + total; String msg = "Not enough client threads " + active + "/" + total;
LOG.error(msg); LOG.error(msg);
throw new StandbyException( throw new StandbyException(
"Router " + routerId + " is overloaded: " + msg); "Router " + router.getRouterId() + " is overloaded: " + msg);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
LOG.error("Unexpected error while invoking API: {}", ex.getMessage()); LOG.error("Unexpected error while invoking API: {}", ex.getMessage());
throw new IOException( throw new IOException(
@ -1150,7 +1152,7 @@ public class RouterRpcClient {
if (namenodes == null || namenodes.isEmpty()) { if (namenodes == null || namenodes.isEmpty()) {
throw new IOException("Cannot locate a registered namenode for " + nsId + throw new IOException("Cannot locate a registered namenode for " + nsId +
" from " + this.routerId); " from " + router.getRouterId());
} }
return namenodes; return namenodes;
} }
@ -1171,7 +1173,7 @@ public class RouterRpcClient {
if (namenodes == null || namenodes.isEmpty()) { if (namenodes == null || namenodes.isEmpty()) {
throw new IOException("Cannot locate a registered namenode for " + bpId + throw new IOException("Cannot locate a registered namenode for " + bpId +
" from " + this.routerId); " from " + router.getRouterId());
} }
return namenodes; return namenodes;
} }

View File

@ -209,6 +209,7 @@ public class RouterRpcServer extends AbstractService
* Construct a router RPC server. * Construct a router RPC server.
* *
* @param configuration HDFS Configuration. * @param configuration HDFS Configuration.
* @param router A router using this RPC server.
* @param nnResolver The NN resolver instance to determine active NNs in HA. * @param nnResolver The NN resolver instance to determine active NNs in HA.
* @param fileResolver File resolver to resolve file paths to subclusters. * @param fileResolver File resolver to resolve file paths to subclusters.
* @throws IOException If the RPC server could not be created. * @throws IOException If the RPC server could not be created.
@ -310,7 +311,7 @@ public class RouterRpcServer extends AbstractService
this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, conf); this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, conf);
// Create the client // Create the client
this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(), this.rpcClient = new RouterRpcClient(this.conf, this.router,
this.namenodeResolver, this.rpcMonitor); this.namenodeResolver, this.rpcMonitor);
// Initialize modules // Initialize modules

View File

@ -129,6 +129,9 @@ public class MockResolver
// Return a copy of the list because it is updated periodically // Return a copy of the list because it is updated periodically
List<? extends FederationNamenodeContext> namenodes = List<? extends FederationNamenodeContext> namenodes =
this.resolver.get(nameserviceId); this.resolver.get(nameserviceId);
if (namenodes == null) {
namenodes = new ArrayList<>();
}
return Collections.unmodifiableList(new ArrayList<>(namenodes)); return Collections.unmodifiableList(new ArrayList<>(namenodes));
} }

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.hdfs.server.federation.router; package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -185,4 +187,20 @@ public class TestRouter {
router.stop(); router.stop();
router.close(); router.close();
} }
@Test
public void testRouterIDInRouterRpcClient() throws Exception {
Router router = new Router();
router.init(new RouterConfigBuilder(conf).rpc().build());
router.setRouterId("Router-0");
RemoteMethod remoteMethod = mock(RemoteMethod.class);
intercept(IOException.class, "Router-0",
() -> router.getRpcServer().getRPCClient()
.invokeSingle("ns0", remoteMethod));
router.stop();
router.close();
}
} }