HDFS-13214. RBF: Complete document of Router configuration. Contributed by Yiqun Lin.
This commit is contained in:
parent
edf9445708
commit
58ea2d7a65
|
@ -101,7 +101,7 @@ public class Router extends CompositeService {
|
||||||
/** Interface to identify the active NN for a nameservice or blockpool ID. */
|
/** Interface to identify the active NN for a nameservice or blockpool ID. */
|
||||||
private ActiveNamenodeResolver namenodeResolver;
|
private ActiveNamenodeResolver namenodeResolver;
|
||||||
/** Updates the namenode status in the namenode resolver. */
|
/** Updates the namenode status in the namenode resolver. */
|
||||||
private Collection<NamenodeHeartbeatService> namenodeHearbeatServices;
|
private Collection<NamenodeHeartbeatService> namenodeHeartbeatServices;
|
||||||
|
|
||||||
/** Router metrics. */
|
/** Router metrics. */
|
||||||
private RouterMetricsService metrics;
|
private RouterMetricsService metrics;
|
||||||
|
@ -196,13 +196,13 @@ public class Router extends CompositeService {
|
||||||
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
|
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
|
||||||
|
|
||||||
// Create status updater for each monitored Namenode
|
// Create status updater for each monitored Namenode
|
||||||
this.namenodeHearbeatServices = createNamenodeHearbeatServices();
|
this.namenodeHeartbeatServices = createNamenodeHeartbeatServices();
|
||||||
for (NamenodeHeartbeatService hearbeatService :
|
for (NamenodeHeartbeatService hearbeatService :
|
||||||
this.namenodeHearbeatServices) {
|
this.namenodeHeartbeatServices) {
|
||||||
addService(hearbeatService);
|
addService(hearbeatService);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.namenodeHearbeatServices.isEmpty()) {
|
if (this.namenodeHeartbeatServices.isEmpty()) {
|
||||||
LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
|
LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -411,7 +411,7 @@ public class Router extends CompositeService {
|
||||||
* @return List of heartbeat services.
|
* @return List of heartbeat services.
|
||||||
*/
|
*/
|
||||||
protected Collection<NamenodeHeartbeatService>
|
protected Collection<NamenodeHeartbeatService>
|
||||||
createNamenodeHearbeatServices() {
|
createNamenodeHeartbeatServices() {
|
||||||
|
|
||||||
Map<String, NamenodeHeartbeatService> ret = new HashMap<>();
|
Map<String, NamenodeHeartbeatService> ret = new HashMap<>();
|
||||||
|
|
||||||
|
@ -645,4 +645,12 @@ public class Router extends CompositeService {
|
||||||
RouterQuotaUpdateService getQuotaCacheUpdateService() {
|
RouterQuotaUpdateService getQuotaCacheUpdateService() {
|
||||||
return this.quotaUpdateService;
|
return this.quotaUpdateService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the list of namenode heartbeat service.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
Collection<NamenodeHeartbeatService> getNamenodeHearbeatServices() {
|
||||||
|
return this.namenodeHeartbeatServices;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -350,6 +350,11 @@ Monitor the namenodes in the subclusters for forwarding the client requests.
|
||||||
| dfs.federation.router.monitor.namenode | | The identifier of the namenodes to monitor and heartbeat. |
|
| dfs.federation.router.monitor.namenode | | The identifier of the namenodes to monitor and heartbeat. |
|
||||||
| dfs.federation.router.monitor.localnamenode.enable | `true` | If `true`, the Router should monitor the namenode in the local machine. |
|
| dfs.federation.router.monitor.localnamenode.enable | `true` | If `true`, the Router should monitor the namenode in the local machine. |
|
||||||
|
|
||||||
|
Note: The config *dfs.nameservice.id* is recommended to configure if *dfs.federation.router.monitor.localnamenode.enable* is enabled.
|
||||||
|
This will allow the Router finding the local node directly. Otherwise, it will find the nameservice Id by matching namenode RPC address with the
|
||||||
|
local node address. If multiple addresses are matched, the Router will fail to start. In addition, if the local node is in a HA mode, it is recommend
|
||||||
|
to configure *dfs.ha.namenode.id*.
|
||||||
|
|
||||||
### Quota
|
### Quota
|
||||||
|
|
||||||
Global quota supported in federation.
|
Global quota supported in federation.
|
||||||
|
|
|
@ -0,0 +1,143 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test namenodes monitor behavior in the Router.
|
||||||
|
*/
|
||||||
|
public class TestRouterNamenodeMonitoring {
|
||||||
|
|
||||||
|
private static StateStoreDFSCluster cluster;
|
||||||
|
private static RouterContext routerContext;
|
||||||
|
private static MembershipNamenodeResolver resolver;
|
||||||
|
|
||||||
|
private String ns0;
|
||||||
|
private String ns1;
|
||||||
|
private long initializedTime;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
// Build and start a federated cluster with HA enabled
|
||||||
|
cluster = new StateStoreDFSCluster(true, 2);
|
||||||
|
// Enable heartbeat service and local heartbeat
|
||||||
|
Configuration routerConf = new RouterConfigBuilder()
|
||||||
|
.stateStore()
|
||||||
|
.admin()
|
||||||
|
.rpc()
|
||||||
|
.enableLocalHeartbeat(true)
|
||||||
|
.heartbeat()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Specify local node (ns0.nn1) to monitor
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
ns0 = cluster.getNameservices().get(0);
|
||||||
|
NamenodeContext context = cluster.getNamenodes(ns0).get(1);
|
||||||
|
routerConf.set(DFS_NAMESERVICE_ID, ns0);
|
||||||
|
routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId());
|
||||||
|
|
||||||
|
// Specify namenodes (ns1.nn0,ns1.nn1) to monitor
|
||||||
|
sb = new StringBuilder();
|
||||||
|
ns1 = cluster.getNameservices().get(1);
|
||||||
|
for (NamenodeContext ctx : cluster.getNamenodes(ns1)) {
|
||||||
|
String suffix = ctx.getConfSuffix();
|
||||||
|
if (sb.length() != 0) {
|
||||||
|
sb.append(",");
|
||||||
|
}
|
||||||
|
sb.append(suffix);
|
||||||
|
}
|
||||||
|
// override with the namenodes: ns1.nn0,ns1.nn1
|
||||||
|
routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
|
||||||
|
|
||||||
|
cluster.addRouterOverrides(routerConf);
|
||||||
|
cluster.startCluster();
|
||||||
|
cluster.startRouters();
|
||||||
|
cluster.waitClusterUp();
|
||||||
|
|
||||||
|
routerContext = cluster.getRandomRouter();
|
||||||
|
resolver = (MembershipNamenodeResolver) routerContext.getRouter()
|
||||||
|
.getNamenodeResolver();
|
||||||
|
initializedTime = Time.now();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.stopRouter(routerContext);
|
||||||
|
cluster.shutdown();
|
||||||
|
cluster = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNamenodeMonitoring() throws Exception {
|
||||||
|
// Set nn0 to active for all nameservices
|
||||||
|
for (String ns : cluster.getNameservices()) {
|
||||||
|
cluster.switchToActive(ns, "nn0");
|
||||||
|
cluster.switchToStandby(ns, "nn1");
|
||||||
|
}
|
||||||
|
|
||||||
|
Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
|
||||||
|
.getRouter().getNamenodeHearbeatServices();
|
||||||
|
// manually trigger the heartbeat
|
||||||
|
for (NamenodeHeartbeatService service : heartbeatServices) {
|
||||||
|
service.periodicInvoke();
|
||||||
|
}
|
||||||
|
|
||||||
|
resolver.loadCache(true);
|
||||||
|
List<? extends FederationNamenodeContext> namespaceInfo0 =
|
||||||
|
resolver.getNamenodesForNameserviceId(ns0);
|
||||||
|
List<? extends FederationNamenodeContext> namespaceInfo1 =
|
||||||
|
resolver.getNamenodesForNameserviceId(ns1);
|
||||||
|
|
||||||
|
// The modified date won't be updated in ns0.nn0 since it isn't
|
||||||
|
// monitored by the Router.
|
||||||
|
assertEquals("nn0", namespaceInfo0.get(1).getNamenodeId());
|
||||||
|
assertTrue(namespaceInfo0.get(1).getDateModified() < initializedTime);
|
||||||
|
|
||||||
|
// other namnodes should be updated as expected
|
||||||
|
assertEquals("nn1", namespaceInfo0.get(0).getNamenodeId());
|
||||||
|
assertTrue(namespaceInfo0.get(0).getDateModified() > initializedTime);
|
||||||
|
|
||||||
|
assertEquals("nn0", namespaceInfo1.get(0).getNamenodeId());
|
||||||
|
assertTrue(namespaceInfo1.get(0).getDateModified() > initializedTime);
|
||||||
|
|
||||||
|
assertEquals("nn1", namespaceInfo1.get(1).getNamenodeId());
|
||||||
|
assertTrue(namespaceInfo1.get(1).getDateModified() > initializedTime);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue