HDFS-13214. RBF: Complete document of Router configuration. Contributed by Yiqun Lin.
(cherry picked from commit 58ea2d7a65
)
This commit is contained in:
parent
07a966ede3
commit
8262b63c8c
|
@ -101,7 +101,7 @@ public class Router extends CompositeService {
|
|||
/** Interface to identify the active NN for a nameservice or blockpool ID. */
|
||||
private ActiveNamenodeResolver namenodeResolver;
|
||||
/** Updates the namenode status in the namenode resolver. */
|
||||
private Collection<NamenodeHeartbeatService> namenodeHearbeatServices;
|
||||
private Collection<NamenodeHeartbeatService> namenodeHeartbeatServices;
|
||||
|
||||
/** Router metrics. */
|
||||
private RouterMetricsService metrics;
|
||||
|
@ -196,13 +196,13 @@ public class Router extends CompositeService {
|
|||
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
|
||||
|
||||
// Create status updater for each monitored Namenode
|
||||
this.namenodeHearbeatServices = createNamenodeHearbeatServices();
|
||||
this.namenodeHeartbeatServices = createNamenodeHeartbeatServices();
|
||||
for (NamenodeHeartbeatService hearbeatService :
|
||||
this.namenodeHearbeatServices) {
|
||||
this.namenodeHeartbeatServices) {
|
||||
addService(hearbeatService);
|
||||
}
|
||||
|
||||
if (this.namenodeHearbeatServices.isEmpty()) {
|
||||
if (this.namenodeHeartbeatServices.isEmpty()) {
|
||||
LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
|
||||
}
|
||||
|
||||
|
@ -411,7 +411,7 @@ public class Router extends CompositeService {
|
|||
* @return List of heartbeat services.
|
||||
*/
|
||||
protected Collection<NamenodeHeartbeatService>
|
||||
createNamenodeHearbeatServices() {
|
||||
createNamenodeHeartbeatServices() {
|
||||
|
||||
Map<String, NamenodeHeartbeatService> ret = new HashMap<>();
|
||||
|
||||
|
@ -645,4 +645,12 @@ public class Router extends CompositeService {
|
|||
RouterQuotaUpdateService getQuotaCacheUpdateService() {
|
||||
return this.quotaUpdateService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of namenode heartbeat service.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
Collection<NamenodeHeartbeatService> getNamenodeHearbeatServices() {
|
||||
return this.namenodeHeartbeatServices;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -350,6 +350,11 @@ Monitor the namenodes in the subclusters for forwarding the client requests.
|
|||
| dfs.federation.router.monitor.namenode | | The identifier of the namenodes to monitor and heartbeat. |
|
||||
| dfs.federation.router.monitor.localnamenode.enable | `true` | If `true`, the Router should monitor the namenode in the local machine. |
|
||||
|
||||
Note: The config *dfs.nameservice.id* is recommended to configure if *dfs.federation.router.monitor.localnamenode.enable* is enabled.
|
||||
This will allow the Router finding the local node directly. Otherwise, it will find the nameservice Id by matching namenode RPC address with the
|
||||
local node address. If multiple addresses are matched, the Router will fail to start. In addition, if the local node is in a HA mode, it is recommend
|
||||
to configure *dfs.ha.namenode.id*.
|
||||
|
||||
### Quota
|
||||
|
||||
Global quota supported in federation.
|
||||
|
|
|
@ -0,0 +1,143 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test namenodes monitor behavior in the Router.
|
||||
*/
|
||||
public class TestRouterNamenodeMonitoring {
|
||||
|
||||
private static StateStoreDFSCluster cluster;
|
||||
private static RouterContext routerContext;
|
||||
private static MembershipNamenodeResolver resolver;
|
||||
|
||||
private String ns0;
|
||||
private String ns1;
|
||||
private long initializedTime;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
// Build and start a federated cluster with HA enabled
|
||||
cluster = new StateStoreDFSCluster(true, 2);
|
||||
// Enable heartbeat service and local heartbeat
|
||||
Configuration routerConf = new RouterConfigBuilder()
|
||||
.stateStore()
|
||||
.admin()
|
||||
.rpc()
|
||||
.enableLocalHeartbeat(true)
|
||||
.heartbeat()
|
||||
.build();
|
||||
|
||||
// Specify local node (ns0.nn1) to monitor
|
||||
StringBuilder sb = new StringBuilder();
|
||||
ns0 = cluster.getNameservices().get(0);
|
||||
NamenodeContext context = cluster.getNamenodes(ns0).get(1);
|
||||
routerConf.set(DFS_NAMESERVICE_ID, ns0);
|
||||
routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId());
|
||||
|
||||
// Specify namenodes (ns1.nn0,ns1.nn1) to monitor
|
||||
sb = new StringBuilder();
|
||||
ns1 = cluster.getNameservices().get(1);
|
||||
for (NamenodeContext ctx : cluster.getNamenodes(ns1)) {
|
||||
String suffix = ctx.getConfSuffix();
|
||||
if (sb.length() != 0) {
|
||||
sb.append(",");
|
||||
}
|
||||
sb.append(suffix);
|
||||
}
|
||||
// override with the namenodes: ns1.nn0,ns1.nn1
|
||||
routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
|
||||
|
||||
cluster.addRouterOverrides(routerConf);
|
||||
cluster.startCluster();
|
||||
cluster.startRouters();
|
||||
cluster.waitClusterUp();
|
||||
|
||||
routerContext = cluster.getRandomRouter();
|
||||
resolver = (MembershipNamenodeResolver) routerContext.getRouter()
|
||||
.getNamenodeResolver();
|
||||
initializedTime = Time.now();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
if (cluster != null) {
|
||||
cluster.stopRouter(routerContext);
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNamenodeMonitoring() throws Exception {
|
||||
// Set nn0 to active for all nameservices
|
||||
for (String ns : cluster.getNameservices()) {
|
||||
cluster.switchToActive(ns, "nn0");
|
||||
cluster.switchToStandby(ns, "nn1");
|
||||
}
|
||||
|
||||
Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
|
||||
.getRouter().getNamenodeHearbeatServices();
|
||||
// manually trigger the heartbeat
|
||||
for (NamenodeHeartbeatService service : heartbeatServices) {
|
||||
service.periodicInvoke();
|
||||
}
|
||||
|
||||
resolver.loadCache(true);
|
||||
List<? extends FederationNamenodeContext> namespaceInfo0 =
|
||||
resolver.getNamenodesForNameserviceId(ns0);
|
||||
List<? extends FederationNamenodeContext> namespaceInfo1 =
|
||||
resolver.getNamenodesForNameserviceId(ns1);
|
||||
|
||||
// The modified date won't be updated in ns0.nn0 since it isn't
|
||||
// monitored by the Router.
|
||||
assertEquals("nn0", namespaceInfo0.get(1).getNamenodeId());
|
||||
assertTrue(namespaceInfo0.get(1).getDateModified() < initializedTime);
|
||||
|
||||
// other namnodes should be updated as expected
|
||||
assertEquals("nn1", namespaceInfo0.get(0).getNamenodeId());
|
||||
assertTrue(namespaceInfo0.get(0).getDateModified() > initializedTime);
|
||||
|
||||
assertEquals("nn0", namespaceInfo1.get(0).getNamenodeId());
|
||||
assertTrue(namespaceInfo1.get(0).getDateModified() > initializedTime);
|
||||
|
||||
assertEquals("nn1", namespaceInfo1.get(1).getNamenodeId());
|
||||
assertTrue(namespaceInfo1.get(1).getDateModified() > initializedTime);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue