HDFS-15785. Datanode to support using DNS to resolve nameservices to IP addresses to get list of namenodes. (#2639)

* Rebase trunk

* Fix to use FQDN and update config name

* Fix javac

* Style and trigger build

* Trigger Build after force push

* Trigger Build

* Fix config names
This commit is contained in:
LeonGao 2021-07-13 14:18:59 -07:00 committed by GitHub
parent e95c3259de
commit ea90c5117d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 212 additions and 25 deletions

View File

@ -49,7 +49,11 @@ public final class DomainNameResolverFactory {
*/ */
public static DomainNameResolver newInstance( public static DomainNameResolver newInstance(
Configuration conf, URI uri, String configKey) { Configuration conf, URI uri, String configKey) {
String host = uri.getHost(); return newInstance(conf, uri.getHost(), configKey);
}
public static DomainNameResolver newInstance(
Configuration conf, String host, String configKey) {
String confKeyWithHost = configKey + "." + host; String confKeyWithHost = configKey + "." + host;
return newInstance(conf, confKeyWithHost); return newInstance(conf, confKeyWithHost);
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import org.apache.hadoop.net.DomainNameResolver;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
@ -72,6 +73,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.URI; import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
@ -406,6 +408,55 @@ public class DFSUtilClient {
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys); return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);
} }
/**
* Use DNS record to resolve NN and return resolved FQDN.
*
* @param conf Configuration
* @param nsId Nameservice Id to resolve
* @param dnr Class used to resolve DNS
* @param defaultValue default address to return in case key is not found.
* @param keys Set of keys to look for in the order of preference
* @return a map(namenodeId to InetSocketAddress),
* where namenodeId is combination of nsId,
* resolved hostname and port.
*/
static Map<String, InetSocketAddress> getResolvedAddressesForNsId(
Configuration conf, String nsId, DomainNameResolver dnr,
String defaultValue, String... keys) {
Collection<String> nnIds = getNameNodeIds(conf, nsId);
Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) {
String suffix = concatSuffixes(nsId, nnId);
String address = checkKeysAndProcess(defaultValue, suffix, conf, keys);
if (address != null) {
InetSocketAddress isa = NetUtils.createSocketAddr(address);
try {
// Datanode should just use FQDN
String[] resolvedHostNames = dnr
.getAllResolvedHostnameByDomainName(isa.getHostName(), true);
int port = isa.getPort();
for (String hostname : resolvedHostNames) {
InetSocketAddress inetSocketAddress = new InetSocketAddress(
hostname, port);
// Concat nn info with host info to make uniq ID
String concatId;
if (nnId == null || nnId.isEmpty()) {
concatId = String
.join("-", nsId, hostname, String.valueOf(port));
} else {
concatId = String
.join("-", nsId, nnId, hostname, String.valueOf(port));
}
ret.put(concatId, inetSocketAddress);
}
} catch (UnknownHostException e) {
LOG.error("Failed to resolve address: " + address);
}
}
}
return ret;
}
/** /**
* Returns the configured address for all NameNodes in the cluster. * Returns the configured address for all NameNodes in the cluster.
* @param conf configuration * @param conf configuration

View File

@ -1600,6 +1600,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String public static final String
DFS_DATANODE_SAME_DISK_TIERING_CAPACITY_RATIO_PERCENTAGE_DEFAULT = ""; DFS_DATANODE_SAME_DISK_TIERING_CAPACITY_RATIO_PERCENTAGE_DEFAULT = "";
public static final String
DFS_NAMESERVICES_RESOLUTION_ENABLED =
"dfs.datanode.nameservices.resolution-enabled";
public static final boolean
DFS_NAMESERVICES_RESOLUTION_ENABLED_DEFAULT = false;
public static final String
DFS_NAMESERVICES_RESOLVER_IMPL =
"dfs.datanode.nameservices.resolver.impl";
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated @Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
@ -1936,4 +1946,5 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_LEASE_HARDLIMIT_DEFAULT = public static final long DFS_LEASE_HARDLIMIT_DEFAULT =
HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT; HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT;
} }

View File

@ -72,9 +72,12 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath; import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.net.DomainNameResolver;
import org.apache.hadoop.net.DomainNameResolverFactory;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.Sets;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
@ -628,26 +631,10 @@ public class DFSUtil {
defaultAddress = null; defaultAddress = null;
} }
Collection<String> parentNameServices = conf.getTrimmedStringCollection Collection<String> parentNameServices = getParentNameServices(conf);
(DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
if (parentNameServices.isEmpty()) {
parentNameServices = conf.getTrimmedStringCollection
(DFSConfigKeys.DFS_NAMESERVICES);
} else {
// Ensure that the internal service is ineed in the list of all available
// nameservices.
Set<String> availableNameServices = Sets.newHashSet(conf
.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES));
for (String nsId : parentNameServices) {
if (!availableNameServices.contains(nsId)) {
throw new IOException("Unknown nameservice: " + nsId);
}
}
}
Map<String, Map<String, InetSocketAddress>> addressList = Map<String, Map<String, InetSocketAddress>> addressList =
DFSUtilClient.getAddressesForNsIds(conf, parentNameServices, getAddressesForNsIds(conf, parentNameServices,
defaultAddress, defaultAddress,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFS_NAMENODE_RPC_ADDRESS_KEY); DFS_NAMENODE_RPC_ADDRESS_KEY);
@ -673,6 +660,58 @@ public class DFSUtil {
getNNLifelineRpcAddressesForCluster(Configuration conf) getNNLifelineRpcAddressesForCluster(Configuration conf)
throws IOException { throws IOException {
Collection<String> parentNameServices = getParentNameServices(conf);
return getAddressesForNsIds(conf, parentNameServices, null,
DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
}
//
/**
* Returns the configured address for all NameNodes in the cluster.
* This is similar with DFSUtilClient.getAddressesForNsIds()
* but can access DFSConfigKeys.
*
* @param conf configuration
* @param defaultAddress default address to return in case key is not found.
* @param keys Set of keys to look for in the order of preference
*
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
*/
static Map<String, Map<String, InetSocketAddress>> getAddressesForNsIds(
Configuration conf, Collection<String> nsIds, String defaultAddress,
String... keys) {
// Look for configurations of the form
// <key>[.<nameserviceId>][.<namenodeId>]
// across all of the configured nameservices and namenodes.
Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
for (String nsId : DFSUtilClient.emptyAsSingletonNull(nsIds)) {
String configKeyWithHost =
DFSConfigKeys.DFS_NAMESERVICES_RESOLUTION_ENABLED + "." + nsId;
boolean resolveNeeded = conf.getBoolean(configKeyWithHost,
DFSConfigKeys.DFS_NAMESERVICES_RESOLUTION_ENABLED_DEFAULT);
Map<String, InetSocketAddress> isas;
if (resolveNeeded) {
DomainNameResolver dnr = DomainNameResolverFactory.newInstance(
conf, nsId, DFSConfigKeys.DFS_NAMESERVICES_RESOLVER_IMPL);
isas = DFSUtilClient.getResolvedAddressesForNsId(
conf, nsId, dnr, defaultAddress, keys);
} else {
isas = DFSUtilClient.getAddressesForNameserviceId(
conf, nsId, defaultAddress, keys);
}
if (!isas.isEmpty()) {
ret.put(nsId, isas);
}
}
return ret;
}
private static Collection<String> getParentNameServices(Configuration conf)
throws IOException {
Collection<String> parentNameServices = conf.getTrimmedStringCollection( Collection<String> parentNameServices = conf.getTrimmedStringCollection(
DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY); DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
@ -691,8 +730,7 @@ public class DFSUtil {
} }
} }
return DFSUtilClient.getAddressesForNsIds(conf, parentNameServices, null, return parentNameServices;
DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
} }
/** /**

View File

@ -218,6 +218,11 @@ class BPServiceActor implements Runnable {
bpNamenode = dnProtocol; bpNamenode = dnProtocol;
} }
@VisibleForTesting
String getNnId() {
return nnId;
}
@VisibleForTesting @VisibleForTesting
DatanodeProtocolClientSideTranslatorPB getNameNodeProxy() { DatanodeProtocolClientSideTranslatorPB getNameNodeProxy() {
return bpNamenode; return bpNamenode;

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.Sets;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
@ -301,4 +302,9 @@ class BlockPoolManager {
return new BPOfferService(nameserviceId, nnIds, nnAddrs, lifelineNnAddrs, return new BPOfferService(nameserviceId, nnIds, nnAddrs, lifelineNnAddrs,
dn); dn);
} }
@VisibleForTesting
Map<String, BPOfferService> getBpByNameserviceId() {
return bpByNameserviceId;
}
} }

View File

@ -6187,6 +6187,7 @@
accessed or modified before the specified time interval. accessed or modified before the specified time interval.
</description> </description>
</property> </property>
<property> <property>
<name>dfs.datanode.directoryscan.max.notify.count</name> <name>dfs.datanode.directoryscan.max.notify.count</name>
<value>5</value> <value>5</value>
@ -6195,4 +6196,23 @@
namenode right way for received or deleted blocks after one round. namenode right way for received or deleted blocks after one round.
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.nameservices.resolution-enabled</name>
<value>false</value>
<description>
Determines if the given nameservice address is a domain name which needs to
be resolved (using the resolver configured by dfs.nameservices.resolver.impl).
This is used by datanode to resolve namenodes.
</description>
</property>
<property>
<name>dfs.datanode.nameservices.resolver.impl</name>
<value></value>
<description>
Nameservice resolver implementation used by datanode.
Effective with dfs.nameservices.resolution-enabled on.
</description>
</property>
</configuration> </configuration>

View File

@ -25,12 +25,12 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.net.MockDomainNameResolver;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.test.Whitebox;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -61,6 +61,14 @@ public class TestBlockPoolManager {
doLog("create #" + idx); doLog("create #" + idx);
final BPOfferService bpos = Mockito.mock(BPOfferService.class); final BPOfferService bpos = Mockito.mock(BPOfferService.class);
Mockito.doReturn("Mock BPOS #" + idx).when(bpos).toString(); Mockito.doReturn("Mock BPOS #" + idx).when(bpos).toString();
List<BPServiceActor> bpsa = new ArrayList<>(nnIds.size());
for (int i = 0; i < nnIds.size(); i++) {
BPServiceActor actor = Mockito.mock(BPServiceActor.class);
Mockito.doReturn(nnIds.get(i)).when(actor).getNnId();
Mockito.doReturn(nnAddrs.get(i)).when(actor).getNNSocketAddress();
bpsa.add(actor);
}
Mockito.doReturn(bpsa).when(bpos).getBPServiceActors();
// Log refreshes // Log refreshes
try { try {
Mockito.doAnswer( Mockito.doAnswer(
@ -150,15 +158,59 @@ public class TestBlockPoolManager {
conf.set(DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY, "ns1"); conf.set(DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY, "ns1");
bpm.refreshNamenodes(conf); bpm.refreshNamenodes(conf);
assertEquals("create #1\n", log.toString()); assertEquals("create #1\n", log.toString());
@SuppressWarnings("unchecked") Map<String, BPOfferService> map = bpm.getBpByNameserviceId();
Map<String, BPOfferService> map = (Map<String, BPOfferService>) Whitebox
.getInternalState(bpm, "bpByNameserviceId");
Assert.assertFalse(map.containsKey("ns2")); Assert.assertFalse(map.containsKey("ns2"));
Assert.assertFalse(map.containsKey("ns3")); Assert.assertFalse(map.containsKey("ns3"));
Assert.assertTrue(map.containsKey("ns1")); Assert.assertTrue(map.containsKey("ns1"));
log.setLength(0); log.setLength(0);
} }
@Test
public void testNameServiceNeedToBeResolved() throws Exception {
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1,ns2,ns3");
addNN(conf, "ns1", "mock1:8020");
addNN(conf, "ns2", "mock1:8020");
addNN(conf, "ns3", MockDomainNameResolver.DOMAIN + ":8020");
addDNSSettings(conf, "ns3");
bpm.refreshNamenodes(conf);
assertEquals(
"create #1\n" +
"create #2\n" +
"create #3\n", log.toString());
Map<String, BPOfferService> map = bpm.getBpByNameserviceId();
Assert.assertTrue(map.containsKey("ns1"));
Assert.assertTrue(map.containsKey("ns2"));
Assert.assertTrue(map.containsKey("ns3"));
Assert.assertEquals(2, map.get("ns3").getBPServiceActors().size());
Assert.assertEquals("ns3-" + MockDomainNameResolver.FQDN_1 + "-8020",
map.get("ns3").getBPServiceActors().get(0).getNnId());
Assert.assertEquals("ns3-" + MockDomainNameResolver.FQDN_2 + "-8020",
map.get("ns3").getBPServiceActors().get(1).getNnId());
Assert.assertEquals(
new InetSocketAddress(MockDomainNameResolver.FQDN_1, 8020),
map.get("ns3").getBPServiceActors().get(0).getNNSocketAddress());
Assert.assertEquals(
new InetSocketAddress(MockDomainNameResolver.FQDN_2, 8020),
map.get("ns3").getBPServiceActors().get(1).getNNSocketAddress());
log.setLength(0);
}
/**
* Add more DNS related settings to the passed in configuration.
* @param config Configuration file to add settings to.
*/
private void addDNSSettings(Configuration config,
String nameservice) {
config.setBoolean(
DFSConfigKeys.DFS_NAMESERVICES_RESOLUTION_ENABLED + "."
+ nameservice, true);
config.set(
DFSConfigKeys.DFS_NAMESERVICES_RESOLVER_IMPL + "." + nameservice,
MockDomainNameResolver.class.getName());
}
private static void addNN(Configuration conf, String ns, String addr) { private static void addNN(Configuration conf, String ns, String addr) {
String key = DFSUtil.addKeySuffixes( String key = DFSUtil.addKeySuffixes(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, ns); DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, ns);