HDFS-14351. RBF: Optimize configuration item resolving for monitor namenode. Contributed by He Xiaoqiao and Inigo Goiri.

This commit is contained in:
Ayush Saxena 2019-03-20 11:12:49 +05:30 committed by Brahma Reddy Battula
parent f539e2a4ee
commit 9a9fbbe145
3 changed files with 444 additions and 97 deletions

View File

@ -497,27 +497,25 @@ public class Router extends CompositeService {
}
// Create heartbeat services for a list specified by the admin
String namenodes = this.conf.get(
Collection<String> namenodes = this.conf.getTrimmedStringCollection(
RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE);
if (namenodes != null) {
for (String namenode : namenodes.split(",")) {
String[] namenodeSplit = namenode.split("\\.");
String nsId = null;
String nnId = null;
if (namenodeSplit.length == 2) {
nsId = namenodeSplit[0];
nnId = namenodeSplit[1];
} else if (namenodeSplit.length == 1) {
nsId = namenode;
} else {
LOG.error("Wrong Namenode to monitor: {}", namenode);
}
if (nsId != null) {
NamenodeHeartbeatService heartbeatService =
createNamenodeHeartbeatService(nsId, nnId);
if (heartbeatService != null) {
ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
}
for (String namenode : namenodes) {
String[] namenodeSplit = namenode.split("\\.");
String nsId = null;
String nnId = null;
if (namenodeSplit.length == 2) {
nsId = namenodeSplit[0];
nnId = namenodeSplit[1];
} else if (namenodeSplit.length == 1) {
nsId = namenode;
} else {
LOG.error("Wrong Namenode to monitor: {}", namenode);
}
if (nsId != null) {
NamenodeHeartbeatService heartbeatService =
createNamenodeHeartbeatService(nsId, nnId);
if (heartbeatService != null) {
ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
}
}
}

View File

@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.protobuf.BlockingService;
/**
* Mock for the network interfaces (e.g., RPC and HTTP) of a Namenode. This is
* used by the Routers in a mock cluster.
*/
public class MockNamenode {
/** Mock implementation of the Namenode. */
private final NamenodeProtocols mockNn;
/** HA state of the Namenode. */
private HAServiceState haState = HAServiceState.STANDBY;
/** RPC server of the Namenode that redirects calls to the mock. */
private Server rpcServer;
/** HTTP server of the Namenode that redirects calls to the mock. */
private HttpServer2 httpServer;
public MockNamenode() throws Exception {
Configuration conf = new Configuration();
this.mockNn = mock(NamenodeProtocols.class);
setupMock();
setupRPCServer(conf);
setupHTTPServer(conf);
}
/**
* Setup the mock of the Namenode. It offers the basic functionality for
* Routers to get the status.
* @throws IOException If the mock cannot be setup.
*/
protected void setupMock() throws IOException {
NamespaceInfo nsInfo = new NamespaceInfo(1, "clusterId", "bpId", 1);
when(mockNn.versionRequest()).thenReturn(nsInfo);
when(mockNn.getServiceStatus()).thenAnswer(new Answer<HAServiceStatus>() {
@Override
public HAServiceStatus answer(InvocationOnMock invocation)
throws Throwable {
HAServiceStatus haStatus = new HAServiceStatus(getHAServiceState());
haStatus.setNotReadyToBecomeActive("");
return haStatus;
}
});
}
/**
* Setup the RPC server of the Namenode that redirects calls to the mock.
* @param conf Configuration of the server.
* @throws IOException If the RPC server cannot be setup.
*/
private void setupRPCServer(final Configuration conf) throws IOException {
RPC.setProtocolEngine(
conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
ClientNamenodeProtocolServerSideTranslatorPB
clientNNProtoXlator =
new ClientNamenodeProtocolServerSideTranslatorPB(mockNn);
BlockingService clientNNPbService =
ClientNamenodeProtocol.newReflectiveBlockingService(
clientNNProtoXlator);
rpcServer = new RPC.Builder(conf)
.setProtocol(ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress("0.0.0.0")
.setPort(0)
.build();
NamenodeProtocolServerSideTranslatorPB nnProtoXlator =
new NamenodeProtocolServerSideTranslatorPB(mockNn);
BlockingService nnProtoPbService =
NamenodeProtocolService.newReflectiveBlockingService(
nnProtoXlator);
DFSUtil.addPBProtocol(
conf, NamenodeProtocolPB.class, nnProtoPbService, rpcServer);
DatanodeProtocolServerSideTranslatorPB dnProtoPbXlator =
new DatanodeProtocolServerSideTranslatorPB(mockNn, 1000);
BlockingService dnProtoPbService =
DatanodeProtocolService.newReflectiveBlockingService(
dnProtoPbXlator);
DFSUtil.addPBProtocol(
conf, DatanodeProtocolPB.class, dnProtoPbService, rpcServer);
HAServiceProtocolServerSideTranslatorPB haServiceProtoXlator =
new HAServiceProtocolServerSideTranslatorPB(mockNn);
BlockingService haProtoPbService =
HAServiceProtocolService.newReflectiveBlockingService(
haServiceProtoXlator);
DFSUtil.addPBProtocol(
conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer);
rpcServer.start();
}
/**
* Setup the HTTP server of the Namenode that redirects calls to the mock.
* @param conf Configuration of the server.
* @throws IOException If the HTTP server cannot be setup.
*/
private void setupHTTPServer(Configuration conf) throws IOException {
HttpServer2.Builder builder = new HttpServer2.Builder()
.setName("hdfs")
.setConf(conf)
.setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
.addEndpoint(URI.create("http://0.0.0.0:0"));
httpServer = builder.build();
httpServer.start();
}
/**
* Get the RPC port for the Mock Namenode.
* @return RPC port.
*/
public int getRPCPort() {
return rpcServer.getListenerAddress().getPort();
}
/**
* Get the HTTP port for the Mock Namenode.
* @return HTTP port.
*/
public int getHTTPPort() {
return httpServer.getConnectorAddress(0).getPort();
}
/**
* Get the Mock core. This is used to extend the mock.
* @return Mock Namenode protocol to be extended.
*/
public NamenodeProtocols getMock() {
return mockNn;
}
/**
* Get the HA state of the Mock Namenode.
* @return HA state (ACTIVE or STANDBY).
*/
public HAServiceState getHAServiceState() {
return haState;
}
/**
* Show the Mock Namenode as Active.
*/
public void transitionToActive() {
this.haState = HAServiceState.ACTIVE;
}
/**
* Show the Mock Namenode as Standby.
*/
public void transitionToStandby() {
this.haState = HAServiceState.STANDBY;
}
/**
* Stop the Mock Namenode. It stops all the servers.
* @throws Exception If it cannot stop the Namenode.
*/
public void stop() throws Exception {
if (rpcServer != null) {
rpcServer.stop();
}
if (httpServer != null) {
httpServer.stop();
}
}
}

View File

@ -17,127 +17,251 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
import static org.junit.Assert.assertEquals;
import static java.util.Arrays.asList;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test namenodes monitor behavior in the Router.
*/
public class TestRouterNamenodeMonitoring {
private static StateStoreDFSCluster cluster;
private static RouterContext routerContext;
private static MembershipNamenodeResolver resolver;
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterNamenodeMonitoring.class);
private String ns0;
private String ns1;
/** Router for the test. */
private Router router;
/** Namenodes in the cluster. */
private Map<String, Map<String, MockNamenode>> nns = new HashMap<>();
/** Nameservices in the federated cluster. */
private List<String> nsIds = asList("ns0", "ns1");
/** Time the test starts. */
private long initializedTime;
@Before
public void setUp() throws Exception {
// Build and start a federated cluster with HA enabled
cluster = new StateStoreDFSCluster(true, 2);
// Enable heartbeat service and local heartbeat
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.admin()
.rpc()
.enableLocalHeartbeat(true)
.heartbeat()
.build();
// Specify local node (ns0.nn1) to monitor
StringBuilder sb = new StringBuilder();
ns0 = cluster.getNameservices().get(0);
NamenodeContext context = cluster.getNamenodes(ns0).get(1);
routerConf.set(DFS_NAMESERVICE_ID, ns0);
routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId());
// Specify namenodes (ns1.nn0,ns1.nn1) to monitor
sb = new StringBuilder();
ns1 = cluster.getNameservices().get(1);
for (NamenodeContext ctx : cluster.getNamenodes(ns1)) {
String suffix = ctx.getConfSuffix();
if (sb.length() != 0) {
sb.append(",");
public void setup() throws Exception {
LOG.info("Initialize the Mock Namenodes to monitor");
for (String nsId : nsIds) {
nns.put(nsId, new HashMap<>());
for (String nnId : asList("nn0", "nn1")) {
nns.get(nsId).put(nnId, new MockNamenode());
}
sb.append(suffix);
}
// override with the namenodes: ns1.nn0,ns1.nn1
routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
cluster.addRouterOverrides(routerConf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
LOG.info("Set nn0 to active for all nameservices");
for (Map<String, MockNamenode> nnNS : nns.values()) {
nnNS.get("nn0").transitionToActive();
nnNS.get("nn1").transitionToStandby();
}
routerContext = cluster.getRandomRouter();
resolver = (MembershipNamenodeResolver) routerContext.getRouter()
.getNamenodeResolver();
initializedTime = Time.now();
}
@After
public void tearDown() {
if (cluster != null) {
cluster.stopRouter(routerContext);
cluster.shutdown();
cluster = null;
public void cleanup() throws Exception {
for (Map<String, MockNamenode> nnNS : nns.values()) {
for (MockNamenode nn : nnNS.values()) {
nn.stop();
}
}
nns.clear();
if (router != null) {
router.stop();
}
}
/**
* Get the configuration of the cluster which contains all the Namenodes and
* their addresses.
* @return Configuration containing all the Namenodes.
*/
private Configuration getNamenodesConfig() {
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_NAMESERVICES,
StringUtils.join(",", nns.keySet()));
for (String nsId : nns.keySet()) {
Set<String> nnIds = nns.get(nsId).keySet();
StringBuilder sb = new StringBuilder();
sb.append(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX);
sb.append(".").append(nsId);
conf.set(sb.toString(), StringUtils.join(",", nnIds));
for (String nnId : nnIds) {
final MockNamenode nn = nns.get(nsId).get(nnId);
sb = new StringBuilder();
sb.append(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
sb.append(".").append(nsId);
sb.append(".").append(nnId);
conf.set(sb.toString(), "localhost:" + nn.getRPCPort());
sb = new StringBuilder();
sb.append(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
sb.append(".").append(nsId);
sb.append(".").append(nnId);
conf.set(sb.toString(), "localhost:" + nn.getHTTPPort());
}
}
return conf;
}
@Test
public void testNamenodeMonitoring() throws Exception {
// Set nn0 to active for all nameservices
for (String ns : cluster.getNameservices()) {
cluster.switchToActive(ns, "nn0");
cluster.switchToStandby(ns, "nn1");
}
Configuration nsConf = getNamenodesConfig();
Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
.getRouter().getNamenodeHeartbeatServices();
// manually trigger the heartbeat
// Setup the State Store for the Router to use
Configuration stateStoreConfig = getStateStoreConfiguration();
stateStoreConfig.setClass(
RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
stateStoreConfig.setClass(
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
MountTableResolver.class, FileSubclusterResolver.class);
Configuration routerConf = new RouterConfigBuilder(nsConf)
.enableLocalHeartbeat(true)
.heartbeat()
.stateStore()
.rpc()
.build();
// Specify namenodes (ns1.nn0,ns1.nn1) to monitor
routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE,
"ns1.nn0,ns1.nn1");
routerConf.addResource(stateStoreConfig);
// Specify local node (ns0.nn1) to monitor
routerConf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, "ns0");
routerConf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
// Start the Router with the namenodes to monitor
router = new Router();
router.init(routerConf);
router.start();
// Manually trigger the heartbeat and update the values
Collection<NamenodeHeartbeatService> heartbeatServices =
router.getNamenodeHeartbeatServices();
for (NamenodeHeartbeatService service : heartbeatServices) {
service.periodicInvoke();
}
MembershipNamenodeResolver resolver =
(MembershipNamenodeResolver) router.getNamenodeResolver();
resolver.loadCache(true);
List<? extends FederationNamenodeContext> namespaceInfo0 =
resolver.getNamenodesForNameserviceId(ns0);
List<? extends FederationNamenodeContext> namespaceInfo1 =
resolver.getNamenodesForNameserviceId(ns1);
// The modified date won't be updated in ns0.nn0 since it isn't
// monitored by the Router.
assertEquals("nn0", namespaceInfo0.get(1).getNamenodeId());
assertTrue(namespaceInfo0.get(1).getDateModified() < initializedTime);
// Check that the monitored values are expected
final List<FederationNamenodeContext> namespaceInfo = new ArrayList<>();
for (String nsId : nns.keySet()) {
List<? extends FederationNamenodeContext> nnReports =
resolver.getNamenodesForNameserviceId(nsId);
namespaceInfo.addAll(nnReports);
}
for (FederationNamenodeContext nnInfo : namespaceInfo) {
long modTime = nnInfo.getDateModified();
long diff = modTime - initializedTime;
if ("ns0".equals(nnInfo.getNameserviceId()) &&
"nn0".equals(nnInfo.getNamenodeId())) {
// The modified date won't be updated in ns0.nn0
// since it isn't monitored by the Router.
assertTrue(nnInfo + " shouldn't be updated: " + diff,
modTime < initializedTime);
} else {
// other namnodes should be updated as expected
assertTrue(nnInfo + " should be updated: " + diff,
modTime > initializedTime);
}
}
}
// other namnodes should be updated as expected
assertEquals("nn1", namespaceInfo0.get(0).getNamenodeId());
assertTrue(namespaceInfo0.get(0).getDateModified() > initializedTime);
@Test
public void testNamenodeMonitoringConfig() throws Exception {
testConfig(asList(), "");
testConfig(asList("ns1.nn0"), "ns1.nn0");
testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1");
testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0, ns1.nn1");
testConfig(asList("ns1.nn0", "ns1.nn1"), " ns1.nn0,ns1.nn1");
testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1,");
}
assertEquals("nn0", namespaceInfo1.get(0).getNamenodeId());
assertTrue(namespaceInfo1.get(0).getDateModified() > initializedTime);
/**
* Test if configuring a Router to monitor particular Namenodes actually
* takes effect.
* @param expectedNNs Namenodes that should be monitored.
* @param confNsIds Router configuration setting for Namenodes to monitor.
*/
private void testConfig(
Collection<String> expectedNNs, String confNsIds) {
assertEquals("nn1", namespaceInfo1.get(1).getNamenodeId());
assertTrue(namespaceInfo1.get(1).getDateModified() > initializedTime);
// Setup and start the Router
Configuration conf = getNamenodesConfig();
Configuration routerConf = new RouterConfigBuilder(conf)
.heartbeat(true)
.build();
routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, confNsIds);
router = new Router();
router.init(routerConf);
// Test the heartbeat services of the Router
Collection<NamenodeHeartbeatService> heartbeatServices =
router.getNamenodeHeartbeatServices();
assertNamenodeHeartbeatService(expectedNNs, heartbeatServices);
}
/**
* Assert that the namenodes monitored by the Router are the expected.
* @param expected Expected namenodes.
* @param actual Actual heartbeat services for the Router
*/
private static void assertNamenodeHeartbeatService(
Collection<String> expected,
Collection<NamenodeHeartbeatService> actual) {
final Set<String> actualSet = new TreeSet<>();
for (NamenodeHeartbeatService heartbeatService : actual) {
NamenodeStatusReport report = heartbeatService.getNamenodeStatusReport();
StringBuilder sb = new StringBuilder();
sb.append(report.getNameserviceId());
sb.append(".");
sb.append(report.getNamenodeId());
actualSet.add(sb.toString());
}
assertTrue(expected + " does not contain all " + actualSet,
expected.containsAll(actualSet));
assertTrue(actualSet + " does not contain all " + expected,
actualSet.containsAll(expected));
}
}