HDFS-14351. RBF: Optimize configuration item resolving for monitor namenode. Contributed by He Xiaoqiao and Inigo Goiri.
This commit is contained in:
parent
6c686253e9
commit
5cb7a4de34
|
@ -497,10 +497,9 @@ public class Router extends CompositeService {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create heartbeat services for a list specified by the admin
|
// Create heartbeat services for a list specified by the admin
|
||||||
String namenodes = this.conf.get(
|
Collection<String> namenodes = this.conf.getTrimmedStringCollection(
|
||||||
RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE);
|
RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE);
|
||||||
if (namenodes != null) {
|
for (String namenode : namenodes) {
|
||||||
for (String namenode : namenodes.split(",")) {
|
|
||||||
String[] namenodeSplit = namenode.split("\\.");
|
String[] namenodeSplit = namenode.split("\\.");
|
||||||
String nsId = null;
|
String nsId = null;
|
||||||
String nnId = null;
|
String nnId = null;
|
||||||
|
@ -520,7 +519,6 @@ public class Router extends CompositeService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return ret.values();
|
return ret.values();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,225 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
|
import org.apache.hadoop.ha.HAServiceStatus;
|
||||||
|
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
|
||||||
|
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
|
||||||
|
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
import org.apache.hadoop.http.HttpServer2;
|
||||||
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ipc.RPC.Server;
|
||||||
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import com.google.protobuf.BlockingService;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mock for the network interfaces (e.g., RPC and HTTP) of a Namenode. This is
|
||||||
|
* used by the Routers in a mock cluster.
|
||||||
|
*/
|
||||||
|
public class MockNamenode {
|
||||||
|
|
||||||
|
/** Mock implementation of the Namenode. */
|
||||||
|
private final NamenodeProtocols mockNn;
|
||||||
|
|
||||||
|
/** HA state of the Namenode. */
|
||||||
|
private HAServiceState haState = HAServiceState.STANDBY;
|
||||||
|
|
||||||
|
/** RPC server of the Namenode that redirects calls to the mock. */
|
||||||
|
private Server rpcServer;
|
||||||
|
/** HTTP server of the Namenode that redirects calls to the mock. */
|
||||||
|
private HttpServer2 httpServer;
|
||||||
|
|
||||||
|
|
||||||
|
public MockNamenode() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
this.mockNn = mock(NamenodeProtocols.class);
|
||||||
|
setupMock();
|
||||||
|
setupRPCServer(conf);
|
||||||
|
setupHTTPServer(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setup the mock of the Namenode. It offers the basic functionality for
|
||||||
|
* Routers to get the status.
|
||||||
|
* @throws IOException If the mock cannot be setup.
|
||||||
|
*/
|
||||||
|
protected void setupMock() throws IOException {
|
||||||
|
NamespaceInfo nsInfo = new NamespaceInfo(1, "clusterId", "bpId", 1);
|
||||||
|
when(mockNn.versionRequest()).thenReturn(nsInfo);
|
||||||
|
|
||||||
|
when(mockNn.getServiceStatus()).thenAnswer(new Answer<HAServiceStatus>() {
|
||||||
|
@Override
|
||||||
|
public HAServiceStatus answer(InvocationOnMock invocation)
|
||||||
|
throws Throwable {
|
||||||
|
HAServiceStatus haStatus = new HAServiceStatus(getHAServiceState());
|
||||||
|
haStatus.setNotReadyToBecomeActive("");
|
||||||
|
return haStatus;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setup the RPC server of the Namenode that redirects calls to the mock.
|
||||||
|
* @param conf Configuration of the server.
|
||||||
|
* @throws IOException If the RPC server cannot be setup.
|
||||||
|
*/
|
||||||
|
private void setupRPCServer(final Configuration conf) throws IOException {
|
||||||
|
RPC.setProtocolEngine(
|
||||||
|
conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
|
||||||
|
ClientNamenodeProtocolServerSideTranslatorPB
|
||||||
|
clientNNProtoXlator =
|
||||||
|
new ClientNamenodeProtocolServerSideTranslatorPB(mockNn);
|
||||||
|
BlockingService clientNNPbService =
|
||||||
|
ClientNamenodeProtocol.newReflectiveBlockingService(
|
||||||
|
clientNNProtoXlator);
|
||||||
|
|
||||||
|
rpcServer = new RPC.Builder(conf)
|
||||||
|
.setProtocol(ClientNamenodeProtocolPB.class)
|
||||||
|
.setInstance(clientNNPbService)
|
||||||
|
.setBindAddress("0.0.0.0")
|
||||||
|
.setPort(0)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
NamenodeProtocolServerSideTranslatorPB nnProtoXlator =
|
||||||
|
new NamenodeProtocolServerSideTranslatorPB(mockNn);
|
||||||
|
BlockingService nnProtoPbService =
|
||||||
|
NamenodeProtocolService.newReflectiveBlockingService(
|
||||||
|
nnProtoXlator);
|
||||||
|
DFSUtil.addPBProtocol(
|
||||||
|
conf, NamenodeProtocolPB.class, nnProtoPbService, rpcServer);
|
||||||
|
|
||||||
|
DatanodeProtocolServerSideTranslatorPB dnProtoPbXlator =
|
||||||
|
new DatanodeProtocolServerSideTranslatorPB(mockNn, 1000);
|
||||||
|
BlockingService dnProtoPbService =
|
||||||
|
DatanodeProtocolService.newReflectiveBlockingService(
|
||||||
|
dnProtoPbXlator);
|
||||||
|
DFSUtil.addPBProtocol(
|
||||||
|
conf, DatanodeProtocolPB.class, dnProtoPbService, rpcServer);
|
||||||
|
|
||||||
|
HAServiceProtocolServerSideTranslatorPB haServiceProtoXlator =
|
||||||
|
new HAServiceProtocolServerSideTranslatorPB(mockNn);
|
||||||
|
BlockingService haProtoPbService =
|
||||||
|
HAServiceProtocolService.newReflectiveBlockingService(
|
||||||
|
haServiceProtoXlator);
|
||||||
|
DFSUtil.addPBProtocol(
|
||||||
|
conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer);
|
||||||
|
|
||||||
|
rpcServer.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setup the HTTP server of the Namenode that redirects calls to the mock.
|
||||||
|
* @param conf Configuration of the server.
|
||||||
|
* @throws IOException If the HTTP server cannot be setup.
|
||||||
|
*/
|
||||||
|
private void setupHTTPServer(Configuration conf) throws IOException {
|
||||||
|
HttpServer2.Builder builder = new HttpServer2.Builder()
|
||||||
|
.setName("hdfs")
|
||||||
|
.setConf(conf)
|
||||||
|
.setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
|
||||||
|
.addEndpoint(URI.create("http://0.0.0.0:0"));
|
||||||
|
httpServer = builder.build();
|
||||||
|
httpServer.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the RPC port for the Mock Namenode.
|
||||||
|
* @return RPC port.
|
||||||
|
*/
|
||||||
|
public int getRPCPort() {
|
||||||
|
return rpcServer.getListenerAddress().getPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the HTTP port for the Mock Namenode.
|
||||||
|
* @return HTTP port.
|
||||||
|
*/
|
||||||
|
public int getHTTPPort() {
|
||||||
|
return httpServer.getConnectorAddress(0).getPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the Mock core. This is used to extend the mock.
|
||||||
|
* @return Mock Namenode protocol to be extended.
|
||||||
|
*/
|
||||||
|
public NamenodeProtocols getMock() {
|
||||||
|
return mockNn;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the HA state of the Mock Namenode.
|
||||||
|
* @return HA state (ACTIVE or STANDBY).
|
||||||
|
*/
|
||||||
|
public HAServiceState getHAServiceState() {
|
||||||
|
return haState;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Show the Mock Namenode as Active.
|
||||||
|
*/
|
||||||
|
public void transitionToActive() {
|
||||||
|
this.haState = HAServiceState.ACTIVE;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Show the Mock Namenode as Standby.
|
||||||
|
*/
|
||||||
|
public void transitionToStandby() {
|
||||||
|
this.haState = HAServiceState.STANDBY;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop the Mock Namenode. It stops all the servers.
|
||||||
|
* @throws Exception If it cannot stop the Namenode.
|
||||||
|
*/
|
||||||
|
public void stop() throws Exception {
|
||||||
|
if (rpcServer != null) {
|
||||||
|
rpcServer.stop();
|
||||||
|
}
|
||||||
|
if (httpServer != null) {
|
||||||
|
httpServer.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,127 +17,251 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.federation.router;
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
|
import static java.util.Arrays.asList;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
|
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
|
||||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
|
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
|
||||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test namenodes monitor behavior in the Router.
|
* Test namenodes monitor behavior in the Router.
|
||||||
*/
|
*/
|
||||||
public class TestRouterNamenodeMonitoring {
|
public class TestRouterNamenodeMonitoring {
|
||||||
|
|
||||||
private static StateStoreDFSCluster cluster;
|
private static final Logger LOG =
|
||||||
private static RouterContext routerContext;
|
LoggerFactory.getLogger(TestRouterNamenodeMonitoring.class);
|
||||||
private static MembershipNamenodeResolver resolver;
|
|
||||||
|
|
||||||
private String ns0;
|
|
||||||
private String ns1;
|
/** Router for the test. */
|
||||||
|
private Router router;
|
||||||
|
/** Namenodes in the cluster. */
|
||||||
|
private Map<String, Map<String, MockNamenode>> nns = new HashMap<>();
|
||||||
|
/** Nameservices in the federated cluster. */
|
||||||
|
private List<String> nsIds = asList("ns0", "ns1");
|
||||||
|
|
||||||
|
/** Time the test starts. */
|
||||||
private long initializedTime;
|
private long initializedTime;
|
||||||
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setup() throws Exception {
|
||||||
// Build and start a federated cluster with HA enabled
|
LOG.info("Initialize the Mock Namenodes to monitor");
|
||||||
cluster = new StateStoreDFSCluster(true, 2);
|
for (String nsId : nsIds) {
|
||||||
// Enable heartbeat service and local heartbeat
|
nns.put(nsId, new HashMap<>());
|
||||||
Configuration routerConf = new RouterConfigBuilder()
|
for (String nnId : asList("nn0", "nn1")) {
|
||||||
.stateStore()
|
nns.get(nsId).put(nnId, new MockNamenode());
|
||||||
.admin()
|
|
||||||
.rpc()
|
|
||||||
.enableLocalHeartbeat(true)
|
|
||||||
.heartbeat()
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// Specify local node (ns0.nn1) to monitor
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
ns0 = cluster.getNameservices().get(0);
|
|
||||||
NamenodeContext context = cluster.getNamenodes(ns0).get(1);
|
|
||||||
routerConf.set(DFS_NAMESERVICE_ID, ns0);
|
|
||||||
routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId());
|
|
||||||
|
|
||||||
// Specify namenodes (ns1.nn0,ns1.nn1) to monitor
|
|
||||||
sb = new StringBuilder();
|
|
||||||
ns1 = cluster.getNameservices().get(1);
|
|
||||||
for (NamenodeContext ctx : cluster.getNamenodes(ns1)) {
|
|
||||||
String suffix = ctx.getConfSuffix();
|
|
||||||
if (sb.length() != 0) {
|
|
||||||
sb.append(",");
|
|
||||||
}
|
}
|
||||||
sb.append(suffix);
|
|
||||||
}
|
}
|
||||||
// override with the namenodes: ns1.nn0,ns1.nn1
|
|
||||||
routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
|
|
||||||
|
|
||||||
cluster.addRouterOverrides(routerConf);
|
LOG.info("Set nn0 to active for all nameservices");
|
||||||
cluster.startCluster();
|
for (Map<String, MockNamenode> nnNS : nns.values()) {
|
||||||
cluster.startRouters();
|
nnNS.get("nn0").transitionToActive();
|
||||||
cluster.waitClusterUp();
|
nnNS.get("nn1").transitionToStandby();
|
||||||
|
}
|
||||||
|
|
||||||
routerContext = cluster.getRandomRouter();
|
|
||||||
resolver = (MembershipNamenodeResolver) routerContext.getRouter()
|
|
||||||
.getNamenodeResolver();
|
|
||||||
initializedTime = Time.now();
|
initializedTime = Time.now();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() {
|
public void cleanup() throws Exception {
|
||||||
if (cluster != null) {
|
for (Map<String, MockNamenode> nnNS : nns.values()) {
|
||||||
cluster.stopRouter(routerContext);
|
for (MockNamenode nn : nnNS.values()) {
|
||||||
cluster.shutdown();
|
nn.stop();
|
||||||
cluster = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
nns.clear();
|
||||||
|
|
||||||
|
if (router != null) {
|
||||||
|
router.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the configuration of the cluster which contains all the Namenodes and
|
||||||
|
* their addresses.
|
||||||
|
* @return Configuration containing all the Namenodes.
|
||||||
|
*/
|
||||||
|
private Configuration getNamenodesConfig() {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMESERVICES,
|
||||||
|
StringUtils.join(",", nns.keySet()));
|
||||||
|
for (String nsId : nns.keySet()) {
|
||||||
|
Set<String> nnIds = nns.get(nsId).keySet();
|
||||||
|
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX);
|
||||||
|
sb.append(".").append(nsId);
|
||||||
|
conf.set(sb.toString(), StringUtils.join(",", nnIds));
|
||||||
|
|
||||||
|
for (String nnId : nnIds) {
|
||||||
|
final MockNamenode nn = nns.get(nsId).get(nnId);
|
||||||
|
|
||||||
|
sb = new StringBuilder();
|
||||||
|
sb.append(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||||
|
sb.append(".").append(nsId);
|
||||||
|
sb.append(".").append(nnId);
|
||||||
|
conf.set(sb.toString(), "localhost:" + nn.getRPCPort());
|
||||||
|
|
||||||
|
sb = new StringBuilder();
|
||||||
|
sb.append(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||||
|
sb.append(".").append(nsId);
|
||||||
|
sb.append(".").append(nnId);
|
||||||
|
conf.set(sb.toString(), "localhost:" + nn.getHTTPPort());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNamenodeMonitoring() throws Exception {
|
public void testNamenodeMonitoring() throws Exception {
|
||||||
// Set nn0 to active for all nameservices
|
Configuration nsConf = getNamenodesConfig();
|
||||||
for (String ns : cluster.getNameservices()) {
|
|
||||||
cluster.switchToActive(ns, "nn0");
|
|
||||||
cluster.switchToStandby(ns, "nn1");
|
|
||||||
}
|
|
||||||
|
|
||||||
Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
|
// Setup the State Store for the Router to use
|
||||||
.getRouter().getNamenodeHeartbeatServices();
|
Configuration stateStoreConfig = getStateStoreConfiguration();
|
||||||
// manually trigger the heartbeat
|
stateStoreConfig.setClass(
|
||||||
|
RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
|
||||||
|
MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
|
||||||
|
stateStoreConfig.setClass(
|
||||||
|
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
|
||||||
|
MountTableResolver.class, FileSubclusterResolver.class);
|
||||||
|
|
||||||
|
Configuration routerConf = new RouterConfigBuilder(nsConf)
|
||||||
|
.enableLocalHeartbeat(true)
|
||||||
|
.heartbeat()
|
||||||
|
.stateStore()
|
||||||
|
.rpc()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Specify namenodes (ns1.nn0,ns1.nn1) to monitor
|
||||||
|
routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE,
|
||||||
|
"ns1.nn0,ns1.nn1");
|
||||||
|
routerConf.addResource(stateStoreConfig);
|
||||||
|
|
||||||
|
// Specify local node (ns0.nn1) to monitor
|
||||||
|
routerConf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, "ns0");
|
||||||
|
routerConf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
|
||||||
|
|
||||||
|
// Start the Router with the namenodes to monitor
|
||||||
|
router = new Router();
|
||||||
|
router.init(routerConf);
|
||||||
|
router.start();
|
||||||
|
|
||||||
|
// Manually trigger the heartbeat and update the values
|
||||||
|
Collection<NamenodeHeartbeatService> heartbeatServices =
|
||||||
|
router.getNamenodeHeartbeatServices();
|
||||||
for (NamenodeHeartbeatService service : heartbeatServices) {
|
for (NamenodeHeartbeatService service : heartbeatServices) {
|
||||||
service.periodicInvoke();
|
service.periodicInvoke();
|
||||||
}
|
}
|
||||||
|
MembershipNamenodeResolver resolver =
|
||||||
|
(MembershipNamenodeResolver) router.getNamenodeResolver();
|
||||||
resolver.loadCache(true);
|
resolver.loadCache(true);
|
||||||
List<? extends FederationNamenodeContext> namespaceInfo0 =
|
|
||||||
resolver.getNamenodesForNameserviceId(ns0);
|
|
||||||
List<? extends FederationNamenodeContext> namespaceInfo1 =
|
|
||||||
resolver.getNamenodesForNameserviceId(ns1);
|
|
||||||
|
|
||||||
// The modified date won't be updated in ns0.nn0 since it isn't
|
|
||||||
// monitored by the Router.
|
|
||||||
assertEquals("nn0", namespaceInfo0.get(1).getNamenodeId());
|
|
||||||
assertTrue(namespaceInfo0.get(1).getDateModified() < initializedTime);
|
|
||||||
|
|
||||||
|
// Check that the monitored values are expected
|
||||||
|
final List<FederationNamenodeContext> namespaceInfo = new ArrayList<>();
|
||||||
|
for (String nsId : nns.keySet()) {
|
||||||
|
List<? extends FederationNamenodeContext> nnReports =
|
||||||
|
resolver.getNamenodesForNameserviceId(nsId);
|
||||||
|
namespaceInfo.addAll(nnReports);
|
||||||
|
}
|
||||||
|
for (FederationNamenodeContext nnInfo : namespaceInfo) {
|
||||||
|
long modTime = nnInfo.getDateModified();
|
||||||
|
long diff = modTime - initializedTime;
|
||||||
|
if ("ns0".equals(nnInfo.getNameserviceId()) &&
|
||||||
|
"nn0".equals(nnInfo.getNamenodeId())) {
|
||||||
|
// The modified date won't be updated in ns0.nn0
|
||||||
|
// since it isn't monitored by the Router.
|
||||||
|
assertTrue(nnInfo + " shouldn't be updated: " + diff,
|
||||||
|
modTime < initializedTime);
|
||||||
|
} else {
|
||||||
// other namnodes should be updated as expected
|
// other namnodes should be updated as expected
|
||||||
assertEquals("nn1", namespaceInfo0.get(0).getNamenodeId());
|
assertTrue(nnInfo + " should be updated: " + diff,
|
||||||
assertTrue(namespaceInfo0.get(0).getDateModified() > initializedTime);
|
modTime > initializedTime);
|
||||||
|
}
|
||||||
assertEquals("nn0", namespaceInfo1.get(0).getNamenodeId());
|
}
|
||||||
assertTrue(namespaceInfo1.get(0).getDateModified() > initializedTime);
|
}
|
||||||
|
|
||||||
assertEquals("nn1", namespaceInfo1.get(1).getNamenodeId());
|
@Test
|
||||||
assertTrue(namespaceInfo1.get(1).getDateModified() > initializedTime);
|
public void testNamenodeMonitoringConfig() throws Exception {
|
||||||
|
testConfig(asList(), "");
|
||||||
|
testConfig(asList("ns1.nn0"), "ns1.nn0");
|
||||||
|
testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1");
|
||||||
|
testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0, ns1.nn1");
|
||||||
|
testConfig(asList("ns1.nn0", "ns1.nn1"), " ns1.nn0,ns1.nn1");
|
||||||
|
testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1,");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if configuring a Router to monitor particular Namenodes actually
|
||||||
|
* takes effect.
|
||||||
|
* @param expectedNNs Namenodes that should be monitored.
|
||||||
|
* @param confNsIds Router configuration setting for Namenodes to monitor.
|
||||||
|
*/
|
||||||
|
private void testConfig(
|
||||||
|
Collection<String> expectedNNs, String confNsIds) {
|
||||||
|
|
||||||
|
// Setup and start the Router
|
||||||
|
Configuration conf = getNamenodesConfig();
|
||||||
|
Configuration routerConf = new RouterConfigBuilder(conf)
|
||||||
|
.heartbeat(true)
|
||||||
|
.build();
|
||||||
|
routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, confNsIds);
|
||||||
|
router = new Router();
|
||||||
|
router.init(routerConf);
|
||||||
|
|
||||||
|
// Test the heartbeat services of the Router
|
||||||
|
Collection<NamenodeHeartbeatService> heartbeatServices =
|
||||||
|
router.getNamenodeHeartbeatServices();
|
||||||
|
assertNamenodeHeartbeatService(expectedNNs, heartbeatServices);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert that the namenodes monitored by the Router are the expected.
|
||||||
|
* @param expected Expected namenodes.
|
||||||
|
* @param actual Actual heartbeat services for the Router
|
||||||
|
*/
|
||||||
|
private static void assertNamenodeHeartbeatService(
|
||||||
|
Collection<String> expected,
|
||||||
|
Collection<NamenodeHeartbeatService> actual) {
|
||||||
|
|
||||||
|
final Set<String> actualSet = new TreeSet<>();
|
||||||
|
for (NamenodeHeartbeatService heartbeatService : actual) {
|
||||||
|
NamenodeStatusReport report = heartbeatService.getNamenodeStatusReport();
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append(report.getNameserviceId());
|
||||||
|
sb.append(".");
|
||||||
|
sb.append(report.getNamenodeId());
|
||||||
|
actualSet.add(sb.toString());
|
||||||
|
}
|
||||||
|
assertTrue(expected + " does not contain all " + actualSet,
|
||||||
|
expected.containsAll(actualSet));
|
||||||
|
assertTrue(actualSet + " does not contain all " + expected,
|
||||||
|
actualSet.containsAll(expected));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue