From d7442c244f5490e66e89d3a4e787c170807ce338 Mon Sep 17 00:00:00 2001 From: Yiqun Lin Date: Mon, 20 Aug 2018 23:01:59 +0800 Subject: [PATCH] HDFS-13750. RBF: Router ID in RouterRpcClient is always null. Contributed by Takanobu Asanuma. (cherry picked from commit 01ff8178148790f7b0112058cf08d23d031b6868) --- .../federation/router/RouterRpcClient.java | 18 ++++++++++-------- .../federation/router/RouterRpcServer.java | 3 ++- .../hdfs/server/federation/MockResolver.java | 3 +++ .../server/federation/router/TestRouter.java | 18 ++++++++++++++++++ 4 files changed, 33 insertions(+), 9 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 3eb72414288..56ca55ff6ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -92,8 +92,8 @@ public class RouterRpcClient { LoggerFactory.getLogger(RouterRpcClient.class); - /** Router identifier. */ - private final String routerId; + /** Router using this RPC client. */ + private final Router router; /** Interface to identify the active NN for a nameservice or blockpool ID. */ private final ActiveNamenodeResolver namenodeResolver; @@ -116,12 +116,13 @@ public class RouterRpcClient { * Create a router RPC client to manage remote procedure calls to NNs. * * @param conf Hdfs Configuation. + * @param router A router using this RPC client. * @param resolver A NN resolver to determine the currently active NN in HA. * @param monitor Optional performance monitor. */ - public RouterRpcClient(Configuration conf, String identifier, + public RouterRpcClient(Configuration conf, Router router, ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) { - this.routerId = identifier; + this.router = router; this.namenodeResolver = resolver; @@ -343,7 +344,8 @@ public class RouterRpcClient { if (namenodes == null || namenodes.isEmpty()) { throw new IOException("No namenodes to invoke " + method.getName() + - " with params " + Arrays.toString(params) + " from " + this.routerId); + " with params " + Arrays.toString(params) + " from " + + router.getRouterId()); } Object ret = null; @@ -1126,7 +1128,7 @@ public class RouterRpcClient { String msg = "Not enough client threads " + active + "/" + total; LOG.error(msg); throw new StandbyException( - "Router " + routerId + " is overloaded: " + msg); + "Router " + router.getRouterId() + " is overloaded: " + msg); } catch (InterruptedException ex) { LOG.error("Unexpected error while invoking API: {}", ex.getMessage()); throw new IOException( @@ -1150,7 +1152,7 @@ public class RouterRpcClient { if (namenodes == null || namenodes.isEmpty()) { throw new IOException("Cannot locate a registered namenode for " + nsId + - " from " + this.routerId); + " from " + router.getRouterId()); } return namenodes; } @@ -1171,7 +1173,7 @@ public class RouterRpcClient { if (namenodes == null || namenodes.isEmpty()) { throw new IOException("Cannot locate a registered namenode for " + bpId + - " from " + this.routerId); + " from " + router.getRouterId()); } return namenodes; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 027db8a0104..b0eb2aa5ea9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -209,6 +209,7 @@ public class RouterRpcServer extends AbstractService * Construct a router RPC server. * * @param configuration HDFS Configuration. + * @param router A router using this RPC server. * @param nnResolver The NN resolver instance to determine active NNs in HA. * @param fileResolver File resolver to resolve file paths to subclusters. * @throws IOException If the RPC server could not be created. @@ -310,7 +311,7 @@ public class RouterRpcServer extends AbstractService this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, conf); // Create the client - this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(), + this.rpcClient = new RouterRpcClient(this.conf, this.router, this.namenodeResolver, this.rpcMonitor); // Initialize modules diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index 36cce391aea..f5636ceccd1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -129,6 +129,9 @@ public class MockResolver // Return a copy of the list because it is updated periodically List namenodes = this.resolver.get(nameserviceId); + if (namenodes == null) { + namenodes = new ArrayList<>(); + } return Collections.unmodifiableList(new ArrayList<>(namenodes)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java index f8cf009479a..db4be292fd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hdfs.server.federation.router; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import java.io.IOException; import java.net.InetSocketAddress; @@ -185,4 +187,20 @@ public class TestRouter { router.stop(); router.close(); } + + @Test + public void testRouterIDInRouterRpcClient() throws Exception { + + Router router = new Router(); + router.init(new RouterConfigBuilder(conf).rpc().build()); + router.setRouterId("Router-0"); + RemoteMethod remoteMethod = mock(RemoteMethod.class); + + intercept(IOException.class, "Router-0", + () -> router.getRpcServer().getRPCClient() + .invokeSingle("ns0", remoteMethod)); + + router.stop(); + router.close(); + } }