HDFS-16188. RBF: Router to support resolving monitored namenodes with DNS (#3346) Contributed by Leon Gao
* Router to support resolving monitored namenodes with DNS * Style * fix style and test failure * Add test for NNHAServiceTarget const * Resolve comments * Fix test * Comments and style * Create a simple function to extract port * Use LambdaTestUtils.intercept * fix javadoc * Trigger Build
This commit is contained in:
@ -739,6 +739,23 @@ public class NetUtils {
public static String getHostPortString(InetSocketAddress addr) {
return addr.getHostName() + ":" + addr.getPort();
* Get port as integer from host port string like host:port.
* @param addr host + port string like host:port.
* @return an integer value representing the port.
* @throws IllegalArgumentException if the input is not in the correct format.
public static int getPortFromHostPortString(String addr)
throws IllegalArgumentException {
String[] hostport = addr.split(":");
if (hostport.length != 2) {
String errorMsg = "Address should be <host>:<port>, but it is " + addr;
throw new IllegalArgumentException(errorMsg);
return Integer.parseInt(hostport[1]);
* Checks if {@code host} is a local host name and return {@link InetAddress}
@ -44,6 +44,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.KerberosAuthException;
import org.apache.hadoop.security.NetUtilsTestResolver;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
@ -765,6 +766,18 @@ public class TestNetUtils {
assertEquals(defaultAddr.trim(), NetUtils.getHostPortString(addr));
public void testGetPortFromHostPortString() throws Exception {
assertEquals(1002, NetUtils.getPortFromHostPortString("testHost:1002"));
() -> NetUtils.getPortFromHostPortString("testHost"));
() -> NetUtils.getPortFromHostPortString("testHost:randomString"));
public void testBindToLocalAddress() throws Exception {
@ -426,37 +426,60 @@ public class DFSUtilClient {
Collection<String> nnIds = getNameNodeIds(conf, nsId);
Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) {
String suffix = concatSuffixes(nsId, nnId);
String address = checkKeysAndProcess(defaultValue, suffix, conf, keys);
if (address != null) {
InetSocketAddress isa = NetUtils.createSocketAddr(address);
try {
// Datanode should just use FQDN
String[] resolvedHostNames = dnr
.getAllResolvedHostnameByDomainName(isa.getHostName(), true);
int port = isa.getPort();
for (String hostname : resolvedHostNames) {
InetSocketAddress inetSocketAddress = new InetSocketAddress(
hostname, port);
// Concat nn info with host info to make uniq ID
String concatId;
if (nnId == null || nnId.isEmpty()) {
concatId = String
.join("-", nsId, hostname, String.valueOf(port));
} else {
concatId = String
.join("-", nsId, nnId, hostname, String.valueOf(port));
ret.put(concatId, inetSocketAddress);
} catch (UnknownHostException e) {
LOG.error("Failed to resolve address: " + address);
Map<String, InetSocketAddress> resolvedAddressesForNnId =
getResolvedAddressesForNnId(conf, nsId, nnId, dnr, defaultValue, keys);
return ret;
public static Map<String, InetSocketAddress> getResolvedAddressesForNnId(
Configuration conf, String nsId, String nnId,
DomainNameResolver dnr, String defaultValue,
String... keys) {
String suffix = concatSuffixes(nsId, nnId);
String address = checkKeysAndProcess(defaultValue, suffix, conf, keys);
Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();
if (address != null) {
InetSocketAddress isa = NetUtils.createSocketAddr(address);
try {
String[] resolvedHostNames = dnr
.getAllResolvedHostnameByDomainName(isa.getHostName(), true);
int port = isa.getPort();
for (String hostname : resolvedHostNames) {
InetSocketAddress inetSocketAddress = new InetSocketAddress(
hostname, port);
// Concat nn info with host info to make uniq ID
String concatId = getConcatNnId(nsId, nnId, hostname, port);
ret.put(concatId, inetSocketAddress);
} catch (UnknownHostException e) {
LOG.error("Failed to resolve address: {}", address);
return ret;
* Concat nn info with host info to make uniq ID.
* This is mainly used when configured nn is
* a domain record that has multiple hosts behind it.
* @param nsId nsId to be concatenated to a uniq ID.
* @param nnId nnId to be concatenated to a uniq ID.
* @param hostname hostname to be concatenated to a uniq ID.
* @param port port to be concatenated to a uniq ID.
* @return Concatenated uniq id.
private static String getConcatNnId(String nsId, String nnId, String hostname, int port) {
if (nnId == null || nnId.isEmpty()) {
return String
.join("-", nsId, hostname, String.valueOf(port));
return String
.join("-", nsId, nnId, hostname, String.valueOf(port));
* Returns the configured address for all NameNodes in the cluster.
* @param conf configuration
@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.net.NetUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@ -94,6 +95,9 @@ public class NamenodeHeartbeatService extends PeriodicService {
private URLConnectionFactory connectionFactory;
/** URL scheme to use for JMX calls. */
private String scheme;
private String resolvedHost;
private String originalNnId;
* Create a new Namenode status updater.
* @param resolver Namenode resolver service to handle NN registration.
@ -110,6 +114,28 @@ public class NamenodeHeartbeatService extends PeriodicService {
this.nameserviceId = nsId;
this.namenodeId = nnId;
* Create a new Namenode status updater.
* @param resolver Namenode resolver service to handle NN registration.
* @param nsId Identifier of the nameservice.
* @param nnId Identifier of the namenode in HA.
* @param resolvedHost resolvedHostname for this specific namenode.
public NamenodeHeartbeatService(
ActiveNamenodeResolver resolver, String nsId, String nnId, String resolvedHost) {
super(getNnHeartBeatServiceName(nsId, nnId));
this.resolver = resolver;
this.nameserviceId = nsId;
// Concat a uniq id from original nnId and resolvedHost
this.namenodeId = nnId + "-" + resolvedHost;
this.resolvedHost = resolvedHost;
// Same the original nnid to get the ports from config.
this.originalNnId = nnId;
@ -120,40 +146,59 @@ public class NamenodeHeartbeatService extends PeriodicService {
String nnDesc = nameserviceId;
if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
this.localTarget = new NNHAServiceTarget(
conf, nameserviceId, namenodeId);
nnDesc += "-" + namenodeId;
} else {
this.localTarget = null;
if (originalNnId == null) {
originalNnId = namenodeId;
// Get the RPC address for the clients to connect
this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId);
LOG.info("{} RPC address: {}", nnDesc, rpcAddress);
this.rpcAddress = getRpcAddress(conf, nameserviceId, originalNnId);
// Get the Service RPC address for monitoring
this.serviceAddress =
DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId);
DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, originalNnId);
if (this.serviceAddress == null) {
LOG.error("Cannot locate RPC service address for NN {}, " +
"using RPC address {}", nnDesc, this.rpcAddress);
this.serviceAddress = this.rpcAddress;
LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress);
// Get the Lifeline RPC address for faster monitoring
this.lifelineAddress =
DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, namenodeId);
DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, originalNnId);
if (this.lifelineAddress == null) {
this.lifelineAddress = this.serviceAddress;
LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress);
// Get the Web address for UI
this.webAddress =
DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId);
DFSUtil.getNamenodeWebAddr(conf, nameserviceId, originalNnId);
if (resolvedHost != null) {
// Get the addresses from resolvedHost plus the configured ports.
rpcAddress = resolvedHost + ":"
+ NetUtils.getPortFromHostPortString(rpcAddress);
serviceAddress = resolvedHost + ":"
+ NetUtils.getPortFromHostPortString(serviceAddress);
lifelineAddress = resolvedHost + ":"
+ NetUtils.getPortFromHostPortString(lifelineAddress);
webAddress = resolvedHost + ":"
+ NetUtils.getPortFromHostPortString(webAddress);
LOG.info("{} RPC address: {}", nnDesc, rpcAddress);
LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress);
LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress);
LOG.info("{} Web address: {}", nnDesc, webAddress);
if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
this.localTarget = new NNHAServiceTarget(
conf, nameserviceId, namenodeId, serviceAddress, lifelineAddress);
this.connectionFactory =
@ -336,6 +381,12 @@ public class NamenodeHeartbeatService extends PeriodicService {
private static String getNnHeartBeatServiceName(String nsId, String nnId) {
return NamenodeHeartbeatService.class.getSimpleName() +
(nsId == null ? "" : " " + nsId) +
(nnId == null ? "" : " " + nnId);
* Get the parameters for a Namenode from JMX and add them to the report.
* @param address Web interface of the Namenode to monitor.
@ -98,6 +98,12 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_MONITOR_NAMENODE =
FEDERATION_ROUTER_PREFIX + "monitor.namenode";
FEDERATION_ROUTER_PREFIX + "monitor.namenode.nameservice.resolution-enabled";
public static final boolean
= FEDERATION_ROUTER_PREFIX + "monitor.namenode.nameservice.resolver.impl";
public static final String DFS_ROUTER_MONITOR_LOCAL_NAMENODE =
FEDERATION_ROUTER_PREFIX + "monitor.localnamenode.enable";
public static final boolean DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT = true;
@ -17,6 +17,8 @@
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_KERBEROS_PRINCIPAL_HOSTNAME_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_KEYTAB_FILE_KEY;
@ -36,6 +38,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.TokenVerifier;
@ -48,9 +51,12 @@ import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.net.DomainNameResolver;
import org.apache.hadoop.net.DomainNameResolverFactory;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
@ -534,10 +540,34 @@ public class Router extends CompositeService implements
LOG.error("Wrong Namenode to monitor: {}", namenode);
if (nsId != null) {
NamenodeHeartbeatService heartbeatService =
createNamenodeHeartbeatService(nsId, nnId);
if (heartbeatService != null) {
ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
String configKeyWithHost =
boolean resolveNeeded = conf.getBoolean(configKeyWithHost,
if (nnId != null && resolveNeeded) {
DomainNameResolver dnr = DomainNameResolverFactory.newInstance(
Map<String, InetSocketAddress> hosts = Maps.newLinkedHashMap();
Map<String, InetSocketAddress> resolvedHosts =
DFSUtilClient.getResolvedAddressesForNnId(conf, nsId, nnId, dnr,
for (InetSocketAddress isa : hosts.values()) {
NamenodeHeartbeatService heartbeatService =
createNamenodeHeartbeatService(nsId, nnId, isa.getHostName());
if (heartbeatService != null) {
ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
} else {
NamenodeHeartbeatService heartbeatService =
createNamenodeHeartbeatService(nsId, nnId);
if (heartbeatService != null) {
ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
@ -586,6 +616,16 @@ public class Router extends CompositeService implements
return ret;
protected NamenodeHeartbeatService createNamenodeHeartbeatService(
String nsId, String nnId, String resolvedHost) {
LOG.info("Creating heartbeat service for" +
" Namenode {}, resolved host {}, in {}", nnId, resolvedHost, nsId);
NamenodeHeartbeatService ret = new NamenodeHeartbeatService(
namenodeResolver, nsId, nnId, resolvedHost);
return ret;
// Router State Management
@ -462,6 +462,26 @@
Determines if the given monitored namenode address is a domain name which needs to
be resolved.
This is used by router to resolve namenodes.
Nameservice resolver implementation used by router.
Effective with
dfs.federation.router.monitor.namenode.nameservices.resolution-enabled on.
@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
@ -28,15 +31,19 @@ import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.net.MockDomainNameResolver;
import org.apache.hadoop.service.Service.STATE;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -203,4 +210,64 @@ public class TestRouterNamenodeHeartbeat {
standby = normalNss.get(1);
assertEquals(NAMENODES[1], standby.getNamenodeId());
public void testNamenodeHeartbeatServiceNNResolution() {
String nsId = "test-ns";
String nnId = "nn";
int rpcPort = 1000;
int servicePort = 1001;
int lifelinePort = 1002;
int webAddressPort = 1003;
Configuration conf = generateNamenodeConfiguration(nsId, nnId,
rpcPort, servicePort, lifelinePort, webAddressPort);
Router testRouter = new Router();
Collection<NamenodeHeartbeatService> heartbeatServices =
assertEquals(2, heartbeatServices.size());
Iterator<NamenodeHeartbeatService> iterator = heartbeatServices.iterator();
NamenodeHeartbeatService service = iterator.next();
service = iterator.next();
private Configuration generateNamenodeConfiguration(
String nsId, String nnId,
int rpcPort, int servicePort,
int lifelinePort, int webAddressPort) {
Configuration conf = new HdfsConfiguration();
String suffix = nsId + "." + nnId;
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE, false);
conf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, nsId + "." + nnId);
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
MockDomainNameResolver.DOMAIN + ":" + rpcPort);
MockDomainNameResolver.DOMAIN + ":" + servicePort);
MockDomainNameResolver.DOMAIN + ":" + lifelinePort);
conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
MockDomainNameResolver.DOMAIN + ":" + webAddressPort);
return conf;
@ -53,17 +53,69 @@ public class NNHAServiceTarget extends HAServiceTarget {
private InetSocketAddress zkfcAddr;
private NodeFencer fencer;
private BadFencingConfigurationException fenceConfigError;
private final String nnId;
private final String nsId;
private final boolean autoFailoverEnabled;
private HdfsConfiguration targetConf;
private String nnId;
private String nsId;
private boolean autoFailoverEnabled;
* Create a NNHAServiceTarget for a namenode.
* Look up addresses from configuration.
* @param conf HDFS configuration.
* @param nsId nsId of this nn.
* @param nnId nnId of this nn.
public NNHAServiceTarget(Configuration conf,
String nsId, String nnId) {
initializeNnConfig(conf, nsId, nnId);
if (nsId == null) {
nsId = DFSUtil.getOnlyNameServiceIdOrNull(conf);
if (nsId == null) {
String serviceAddr =
DFSUtil.getNamenodeServiceAddr(targetConf, nsId, nnId);
if (serviceAddr == null) {
throw new IllegalArgumentException(
"Unable to determine service address for namenode '" + nnId + "'");
this.addr = NetUtils.createSocketAddr(serviceAddr,
String lifelineAddrStr =
DFSUtil.getNamenodeLifelineAddr(targetConf, nsId, nnId);
this.lifelineAddr = (lifelineAddrStr != null) ?
NetUtils.createSocketAddr(lifelineAddrStr) : null;
* Create a NNHAServiceTarget for a namenode.
* Addresses are provided so we don't need to lookup the config.
* @param conf HDFS configuration.
* @param nsId nsId of this nn.
* @param nnId nnId of this nn.
* @param addr Provided service address.
* @param lifelineAddr Provided lifeline address.
public NNHAServiceTarget(Configuration conf,
String nsId, String nnId,
String addr, String lifelineAddr) {
initializeNnConfig(conf, nsId, nnId);
this.addr = NetUtils.createSocketAddr(addr);
this.lifelineAddr = NetUtils.createSocketAddr(lifelineAddr);
private void initializeNnConfig(Configuration conf,
String providedNsId, String providedNnId) {
if (providedNsId == null) {
providedNsId = DFSUtil.getOnlyNameServiceIdOrNull(conf);
if (providedNsId == null) {
String errorString = "Unable to determine the name service ID.";
String[] dfsNames = conf.getStrings(DFS_NAMESERVICES);
if ((dfsNames != null) && (dfsNames.length > 1)) {
@ -75,27 +127,17 @@ public class NNHAServiceTarget extends HAServiceTarget {
throw new IllegalArgumentException(errorString);
assert nsId != null;
// Make a copy of the conf, and override configs based on the
// target node -- not the node we happen to be running on.
HdfsConfiguration targetConf = new HdfsConfiguration(conf);
NameNode.initializeGenericKeys(targetConf, nsId, nnId);
String serviceAddr =
DFSUtil.getNamenodeServiceAddr(targetConf, nsId, nnId);
if (serviceAddr == null) {
throw new IllegalArgumentException(
"Unable to determine service address for namenode '" + nnId + "'");
this.addr = NetUtils.createSocketAddr(serviceAddr,
this.targetConf = new HdfsConfiguration(conf);
NameNode.initializeGenericKeys(targetConf, providedNsId, providedNnId);
String lifelineAddrStr =
DFSUtil.getNamenodeLifelineAddr(targetConf, nsId, nnId);
this.lifelineAddr = (lifelineAddrStr != null) ?
NetUtils.createSocketAddr(lifelineAddrStr) : null;
this.nsId = providedNsId;
this.nnId = providedNnId;
private void initializeFailoverConfig() {
this.autoFailoverEnabled = targetConf.getBoolean(
@ -105,16 +147,13 @@ public class NNHAServiceTarget extends HAServiceTarget {
try {
this.fencer = NodeFencer.create(targetConf,
} catch (BadFencingConfigurationException e) {
this.fenceConfigError = e;
this.nnId = nnId;
this.nsId = nsId;
@ -21,6 +21,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEF
import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -79,6 +81,19 @@ public class TestNNHealthCheck {
public void testNNHAServiceTargetWithProvidedAddr() {
// Test constructor with provided address.
NNHAServiceTarget target = new NNHAServiceTarget(conf, "ns", "nn1",
"", "");
assertEquals("/", target.getAddress().toString());
assertEquals("/", target.getHealthMonitorAddress().toString());
public void testNNHealthCheckWithSafemodeAsUnhealthy() throws Exception {
Reference in New Issue