From 9146ad23f3f1af7c5547fba08e2a867cee49e015 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Tue, 29 Nov 2011 02:27:45 +0000 Subject: [PATCH] HDFS-2582. Scope dfs.ha.namenodes config by nameservice. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1207738 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES.HDFS-1623.txt | 2 + .../java/org/apache/hadoop/hdfs/DFSUtil.java | 300 ++++++++++-------- .../java/org/apache/hadoop/hdfs/HAUtil.java | 35 +- .../hadoop/hdfs/server/balancer/Balancer.java | 13 +- .../server/balancer/NameNodeConnector.java | 17 +- .../hadoop/hdfs/server/datanode/DataNode.java | 24 +- .../server/namenode/ClusterJspHelper.java | 17 +- .../hadoop/hdfs/server/namenode/NameNode.java | 7 +- .../ha/ConfiguredFailoverProxyProvider.java | 10 +- .../org/apache/hadoop/hdfs/tools/GetConf.java | 26 +- .../org/apache/hadoop/hdfs/TestDFSUtil.java | 84 ++++- .../hdfs/server/balancer/TestBalancer.java | 5 +- .../TestBalancerWithMultipleNameNodes.java | 4 +- .../apache/hadoop/hdfs/tools/TestGetConf.java | 23 +- 14 files changed, 358 insertions(+), 209 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 6530cdcc8c3..0c854a5056e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -23,3 +23,5 @@ HDFS-2393. Mark appropriate methods of ClientProtocol with the idempotent annota HDFS-2523. Small NN fixes to include HAServiceProtocol and prevent NPE on shutdown. (todd) HDFS-2577. NN fails to start since it tries to start secret manager in safemode. (todd) + +HDFS-2582. Scope dfs.ha.namenodes config by nameservice (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index f35f4910395..fcb7d4cb891 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -24,10 +24,11 @@ import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.StringTokenizer; @@ -45,11 +46,14 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.UserGroupInformation; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + @InterfaceAudience.Private public class DFSUtil { private DFSUtil() { /* Hidden constructor */ } @@ -288,10 +292,22 @@ public class DFSUtil { /** * Returns collection of nameservice Ids from the configuration. * @param conf configuration - * @return collection of nameservice Ids + * @return collection of nameservice Ids, or null if not specified */ public static Collection getNameServiceIds(Configuration conf) { - return conf.getStringCollection(DFS_FEDERATION_NAMESERVICES); + return conf.getTrimmedStringCollection(DFS_FEDERATION_NAMESERVICES); + } + + /** + * @return coll if it is non-null and non-empty. Otherwise, + * returns a list with a single null value. + */ + private static Collection emptyAsSingletonNull(Collection coll) { + if (coll == null || coll.isEmpty()) { + return Collections.singletonList(null); + } else { + return coll; + } } /** @@ -300,12 +316,14 @@ public class DFSUtil { * for each namenode in the in the HA setup. * * @param conf configuration + * @param nsId the nameservice ID to look at, or null for non-federated * @return collection of namenode Ids */ - public static Collection getNameNodeIds(Configuration conf) { - return conf.getStringCollection(DFS_HA_NAMENODES_KEY); + static Collection getNameNodeIds(Configuration conf, String nsId) { + String key = addSuffix(DFS_HA_NAMENODES_KEY, nsId); + return conf.getTrimmedStringCollection(key); } - + /** * Given a list of keys in the order of preference, returns a value * for the key in the given order from the configuration. @@ -333,13 +351,12 @@ public class DFSUtil { /** Add non empty and non null suffix to a key */ private static String addSuffix(String key, String suffix) { - if (suffix == null || suffix.length() == 0) { + if (suffix == null || suffix.isEmpty()) { return key; } - if (!suffix.startsWith(".")) { - key += "."; - } - return key += suffix; + assert !suffix.startsWith(".") : + "suffix '" + suffix + "' should not already have '.' prepended."; + return key + "." + suffix; } /** Concatenate list of suffix strings '.' separated */ @@ -347,11 +364,7 @@ public class DFSUtil { if (suffixes == null) { return null; } - String ret = ""; - for (int i = 0; i < suffixes.length - 1; i++) { - ret = addSuffix(ret, suffixes[i]); - } - return addSuffix(ret, suffixes[suffixes.length - 1]); + return Joiner.on(".").skipNulls().join(suffixes); } /** @@ -363,69 +376,44 @@ public class DFSUtil { } /** - * Returns list of InetSocketAddress for a given set of keys. + * Returns the configured address for all NameNodes in the cluster. * @param conf configuration - * @param defaultAddress default address to return in case key is not found + * @param defaultAddress default address to return in case key is not found. * @param keys Set of keys to look for in the order of preference - * @return list of InetSocketAddress corresponding to the key + * @return a map(nameserviceId to map(namenodeId to InetSocketAddress)) */ - private static List getAddresses(Configuration conf, + private static Map> + getAddresses(Configuration conf, String defaultAddress, String... keys) { Collection nameserviceIds = getNameServiceIds(conf); - Collection namenodeIds = getNameNodeIds(conf); - List isas = new ArrayList(); + + // Look for configurations of the form [.][.] + // across all of the configured nameservices and namenodes. + Map> ret = Maps.newHashMap(); + for (String nsId : emptyAsSingletonNull(nameserviceIds)) { + Map isas = + getAddressesForNameserviceId(conf, nsId, defaultAddress, keys); + if (!isas.isEmpty()) { + ret.put(nsId, isas); + } + } + return ret; + } - final boolean federationEnabled = nameserviceIds != null - && !nameserviceIds.isEmpty(); - final boolean haEnabled = namenodeIds != null - && !namenodeIds.isEmpty(); - - // Configuration with no federation and ha, return default address - if (!federationEnabled && !haEnabled) { - String address = getConfValue(defaultAddress, null, conf, keys); - if (address == null) { - return null; - } - isas.add(NetUtils.createSocketAddr(address)); - return isas; - } - - if (!federationEnabled) { - nameserviceIds = new ArrayList(); - nameserviceIds.add(null); - } - if (!haEnabled) { - namenodeIds = new ArrayList(); - namenodeIds.add(null); - } - - // Get configuration suffixed with nameserviceId and/or namenodeId - if (federationEnabled && haEnabled) { - for (String nameserviceId : nameserviceIds) { - for (String nnId : namenodeIds) { - String keySuffix = concatSuffixes(nameserviceId, nnId); - String address = getConfValue(null, keySuffix, conf, keys); - if (address != null) { - isas.add(NetUtils.createSocketAddr(address)); - } - } - } - } else if (!federationEnabled && haEnabled) { - for (String nnId : namenodeIds) { - String address = getConfValue(null, nnId, conf, keys); - if (address != null) { - isas.add(NetUtils.createSocketAddr(address)); - } - } - } else if (federationEnabled && !haEnabled) { - for (String nameserviceId : nameserviceIds) { - String address = getConfValue(null, nameserviceId, conf, keys); - if (address != null) { - isas.add(NetUtils.createSocketAddr(address)); - } + private static Map getAddressesForNameserviceId( + Configuration conf, String nsId, String defaultValue, + String[] keys) { + Collection nnIds = getNameNodeIds(conf, nsId); + Map ret = Maps.newHashMap(); + for (String nnId : emptyAsSingletonNull(nnIds)) { + String suffix = concatSuffixes(nsId, nnId); + String address = getConfValue(defaultValue, suffix, conf, keys); + if (address != null) { + InetSocketAddress isa = NetUtils.createSocketAddr(address); + ret.put(nnId, isa); } } - return isas; + return ret; } /** @@ -436,15 +424,9 @@ public class DFSUtil { * @return list of InetSocketAddresses * @throws IOException if no addresses are configured */ - public static List getHaNnRpcAddresses( - Configuration conf) throws IOException { - List addressList = getAddresses(conf, null, - DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); - if (addressList == null) { - throw new IOException("Incorrect configuration: HA name node addresses " - + DFS_NAMENODE_RPC_ADDRESS_KEY + " is not configured."); - } - return addressList; + public static Map> getHaNnRpcAddresses( + Configuration conf) { + return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); } /** @@ -455,11 +437,11 @@ public class DFSUtil { * @return list of InetSocketAddresses * @throws IOException on error */ - public static List getBackupNodeAddresses( + public static Map> getBackupNodeAddresses( Configuration conf) throws IOException { - List addressList = getAddresses(conf, + Map> addressList = getAddresses(conf, null, DFS_NAMENODE_BACKUP_ADDRESS_KEY); - if (addressList == null) { + if (addressList.isEmpty()) { throw new IOException("Incorrect configuration: backup node address " + DFS_NAMENODE_BACKUP_ADDRESS_KEY + " is not configured."); } @@ -474,11 +456,11 @@ public class DFSUtil { * @return list of InetSocketAddresses * @throws IOException on error */ - public static List getSecondaryNameNodeAddresses( + public static Map> getSecondaryNameNodeAddresses( Configuration conf) throws IOException { - List addressList = getAddresses(conf, null, + Map> addressList = getAddresses(conf, null, DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY); - if (addressList == null) { + if (addressList.isEmpty()) { throw new IOException("Incorrect configuration: secondary namenode address " + DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY + " is not configured."); } @@ -498,7 +480,7 @@ public class DFSUtil { * @return list of InetSocketAddress * @throws IOException on error */ - public static List getNNServiceRpcAddresses( + public static Map> getNNServiceRpcAddresses( Configuration conf) throws IOException { // Use default address as fall back String defaultAddress; @@ -508,9 +490,10 @@ public class DFSUtil { defaultAddress = null; } - List addressList = getAddresses(conf, defaultAddress, + Map> addressList = + getAddresses(conf, defaultAddress, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY); - if (addressList == null) { + if (addressList.isEmpty()) { throw new IOException("Incorrect configuration: namenode address " + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or " + DFS_NAMENODE_RPC_ADDRESS_KEY @@ -519,6 +502,77 @@ public class DFSUtil { return addressList; } + /** + * Flatten the given map, as returned by other functions in this class, + * into a flat list of {@link ConfiguredNNAddress} instances. + */ + public static List flattenAddressMap( + Map> map) { + List ret = Lists.newArrayList(); + + for (Map.Entry> entry : + map.entrySet()) { + String nsId = entry.getKey(); + Map nnMap = entry.getValue(); + for (Map.Entry e2 : nnMap.entrySet()) { + String nnId = e2.getKey(); + InetSocketAddress addr = e2.getValue(); + + ret.add(new ConfiguredNNAddress(nsId, nnId, addr)); + } + } + return ret; + } + + /** + * Format the given map, as returned by other functions in this class, + * into a string suitable for debugging display. The format of this string + * should not be considered an interface, and is liable to change. + */ + public static String addressMapToString( + Map> map) { + StringBuilder b = new StringBuilder(); + for (Map.Entry> entry : + map.entrySet()) { + String nsId = entry.getKey(); + Map nnMap = entry.getValue(); + b.append("Nameservice <").append(nsId).append(">:").append("\n"); + for (Map.Entry e2 : nnMap.entrySet()) { + b.append(" NN ID ").append(e2.getKey()) + .append(" => ").append(e2.getValue()).append("\n"); + } + } + return b.toString(); + } + + /** + * Represent one of the NameNodes configured in the cluster. + */ + public static class ConfiguredNNAddress { + private final String nameserviceId; + private final String namenodeId; + private final InetSocketAddress addr; + + private ConfiguredNNAddress(String nameserviceId, String namenodeId, + InetSocketAddress addr) { + this.nameserviceId = nameserviceId; + this.namenodeId = namenodeId; + this.addr = addr; + } + + public String getNameserviceId() { + return nameserviceId; + } + + public String getNamenodeId() { + return namenodeId; + } + + public InetSocketAddress getAddress() { + return addr; + } + } + /** * Given the InetSocketAddress this method returns the nameservice Id * corresponding to the key with matching address, by doing a reverse @@ -545,11 +599,8 @@ public class DFSUtil { public static String getNameServiceIdFromAddress(final Configuration conf, final InetSocketAddress address, String... keys) { // Configuration with a single namenode and no nameserviceId - if (!isFederationEnabled(conf)) { - return null; - } String[] ids = getSuffixIDs(conf, address, keys); - return (ids != null && ids.length > 0) ? ids[0] : null; + return (ids != null) ? ids[0] : null; } /** @@ -715,14 +766,6 @@ public class DFSUtil { ClientDatanodeProtocolTranslatorR23(datanodeid, conf, socketTimeout, locatedBlock); } - - /** - * Returns true if federation configuration is enabled - */ - public static boolean isFederationEnabled(Configuration conf) { - Collection collection = getNameServiceIds(conf); - return collection != null && collection.size() != 0; - } /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */ static ClientDatanodeProtocol createClientDatanodeProtocolProxy( @@ -783,16 +826,9 @@ public class DFSUtil { if (nameserviceId != null) { return nameserviceId; } - if (!isFederationEnabled(conf)) { - return null; - } - nameserviceId = getSuffixIDs(conf, addressKey, LOCAL_ADDRESS_MATCHER)[0]; - if (nameserviceId == null) { - String msg = "Configuration " + addressKey + " must be suffixed with" + - " nameserviceId for federation configuration."; - throw new HadoopIllegalArgumentException(msg); - } - return nameserviceId; + String nnId = conf.get(DFS_HA_NAMENODE_ID_KEY); + + return getSuffixIDs(conf, addressKey, null, nnId, LOCAL_ADDRESS_MATCHER)[0]; } /** @@ -801,6 +837,8 @@ public class DFSUtil { * * @param conf Configuration * @param addressKey configuration key corresponding to the address. + * @param knownNsId only look at configs for the given nameservice, if not-null + * @param knownNNId only look at configs for the given namenode, if not null * @param matcher matching criteria for matching the address * @return Array with nameservice Id and namenode Id on success. First element * in the array is nameservice Id and second element is namenode Id. @@ -809,29 +847,23 @@ public class DFSUtil { * @throws HadoopIllegalArgumentException on error */ static String[] getSuffixIDs(final Configuration conf, final String addressKey, + String knownNsId, String knownNNId, final AddressMatcher matcher) { - Collection nsIds = getNameServiceIds(conf); - boolean federationEnabled = true; - if (nsIds == null || nsIds.size() == 0) { - federationEnabled = false; // federation not configured - nsIds = new ArrayList(); - nsIds.add(null); - } - - boolean haEnabled = true; - Collection nnIds = getNameNodeIds(conf); - if (nnIds == null || nnIds.size() == 0) { - haEnabled = false; // HA not configured - nnIds = new ArrayList(); - nnIds.add(null); - } - - // Match the address from addressKey.nsId.nnId based on the given matcher String nameserviceId = null; String namenodeId = null; int found = 0; - for (String nsId : nsIds) { - for (String nnId : nnIds) { + + Collection nsIds = getNameServiceIds(conf); + for (String nsId : emptyAsSingletonNull(nsIds)) { + if (knownNsId != null && !knownNsId.equals(nsId)) { + continue; + } + + Collection nnIds = getNameNodeIds(conf, nsId); + for (String nnId : emptyAsSingletonNull(nnIds)) { + if (knownNNId != null && !knownNNId.equals(nnId)) { + continue; + } String key = addKeySuffixes(addressKey, nsId, nnId); String addr = conf.get(key); InetSocketAddress s = null; @@ -850,8 +882,8 @@ public class DFSUtil { if (found > 1) { // Only one address must match the local address String msg = "Configuration has multiple addresses that match " + "local node's address. Please configure the system with " - + (federationEnabled ? DFS_FEDERATION_NAMESERVICE_ID : "") - + (haEnabled ? (" and " + DFS_HA_NAMENODE_ID_KEY) : ""); + + DFS_FEDERATION_NAMESERVICE_ID + " and " + + DFS_HA_NAMENODE_ID_KEY; throw new HadoopIllegalArgumentException(msg); } return new String[] { nameserviceId, namenodeId }; @@ -872,7 +904,7 @@ public class DFSUtil { }; for (String key : keys) { - String[] ids = getSuffixIDs(conf, key, matcher); + String[] ids = getSuffixIDs(conf, key, null, null, matcher); if (ids != null && (ids [0] != null || ids[1] != null)) { return ids; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index 2ba01f55015..6952871ccd1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import java.net.InetSocketAddress; import java.util.Collection; +import java.util.Map; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; @@ -29,14 +30,18 @@ public class HAUtil { private HAUtil() { /* Hidden constructor */ } /** - * Returns true if HA for namenode is configured. + * Returns true if HA for namenode is configured for the given nameservice * * @param conf Configuration + * @param nsId nameservice, or null if no federated NS is configured * @return true if HA is configured in the configuration; else false. */ - public static boolean isHAEnabled(Configuration conf) { - Collection collection = DFSUtil.getNameNodeIds(conf); - return collection != null && !collection.isEmpty(); + public static boolean isHAEnabled(Configuration conf, String nsId) { + Map> addresses = + DFSUtil.getHaNnRpcAddresses(conf); + if (addresses == null) return false; + Map nnMap = addresses.get(nsId); + return nnMap != null && nnMap.size() > 1; } /** @@ -52,22 +57,21 @@ public class HAUtil { * @return namenode Id on success, null on failure. * @throws HadoopIllegalArgumentException on error */ - public static String getNameNodeId(Configuration conf) { - String namenodeId = conf.get(DFS_HA_NAMENODE_ID_KEY); + public static String getNameNodeId(Configuration conf, String nsId) { + String namenodeId = conf.getTrimmed(DFS_HA_NAMENODE_ID_KEY); if (namenodeId != null) { return namenodeId; } - if (!isHAEnabled(conf)) { - return null; - } - namenodeId = DFSUtil.getSuffixIDs(conf, DFS_NAMENODE_RPC_ADDRESS_KEY, - DFSUtil.LOCAL_ADDRESS_MATCHER)[1]; - if (namenodeId == null) { + + String suffixes[] = DFSUtil.getSuffixIDs(conf, DFS_NAMENODE_RPC_ADDRESS_KEY, + nsId, null, DFSUtil.LOCAL_ADDRESS_MATCHER); + if (suffixes == null) { String msg = "Configuration " + DFS_NAMENODE_RPC_ADDRESS_KEY + " must be suffixed with" + namenodeId + " for HA configuration."; throw new HadoopIllegalArgumentException(msg); } - return namenodeId; + + return suffixes[1]; } /** @@ -78,14 +82,11 @@ public class HAUtil { public static String getNameNodeIdFromAddress(final Configuration conf, final InetSocketAddress address, String... keys) { // Configuration with a single namenode and no nameserviceId - if (!isHAEnabled(conf)) { - return null; - } - String[] ids = DFSUtil.getSuffixIDs(conf, address, keys); if (ids != null && ids.length > 1) { return ids[1]; } return null; } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index a0146e75a87..bc7c13a9147 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -39,6 +39,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -1379,7 +1380,8 @@ public class Balancer { * for each namenode, * execute a {@link Balancer} to work through all datanodes once. */ - static int run(List namenodes, final Parameters p, + static int run(Map> namenodes, + final Parameters p, Configuration conf) throws IOException, InterruptedException { final long sleeptime = 2000*conf.getLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, @@ -1393,8 +1395,10 @@ public class Balancer { final List connectors = new ArrayList(namenodes.size()); try { - for(InetSocketAddress isa : namenodes) { - connectors.add(new NameNodeConnector(isa, conf)); + for(Entry> entry : + namenodes.entrySet()) { + connectors.add( + new NameNodeConnector(entry.getValue().values(), conf)); } boolean done = false; @@ -1476,7 +1480,8 @@ public class Balancer { try { checkReplicationPolicyCompatibility(conf); - final List namenodes = DFSUtil.getNNServiceRpcAddresses(conf); + final Map> namenodes = + DFSUtil.getNNServiceRpcAddresses(conf); return Balancer.run(namenodes, parse(args), conf); } catch (IOException e) { System.out.println(e + ". Exiting ..."); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 634efdf5b3b..f43a41ea388 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; @@ -53,6 +55,9 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; + /** * The class provides utilities for {@link Balancer} to access a NameNode */ @@ -75,12 +80,14 @@ class NameNodeConnector { private BlockTokenSecretManager blockTokenSecretManager; private Daemon keyupdaterthread; // AccessKeyUpdater thread - NameNodeConnector(InetSocketAddress namenodeAddress, Configuration conf - ) throws IOException { - this.namenodeAddress = namenodeAddress; - this.namenode = createNamenode(namenodeAddress, conf); + NameNodeConnector(Collection haNNs, + Configuration conf) throws IOException { + InetSocketAddress nn = Lists.newArrayList(haNNs).get(0); + // TODO(HA): need to deal with connecting to HA NN pair here + this.namenodeAddress = nn; + this.namenode = createNamenode(nn, conf); this.client = DFSUtil.createNamenode(conf); - this.fs = FileSystem.get(NameNode.getUri(namenodeAddress), conf); + this.fs = FileSystem.get(NameNode.getUri(nn), conf); final NamespaceInfo namespaceinfo = namenode.versionRequest(); this.blockpoolID = namespaceinfo.getBlockPoolID(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 65ccba80dcd..87a62f4e57c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -77,6 +77,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -92,6 +93,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.Block; @@ -168,6 +170,8 @@ import org.apache.hadoop.util.VersionInfo; import org.mortbay.util.ajax.JSON; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /********************************************************** @@ -251,8 +255,14 @@ public class DataNode extends Configured bpMapping = new HashMap(); nameNodeThreads = new HashMap(); - List isas = DFSUtil.getNNServiceRpcAddresses(conf); - for(InetSocketAddress isa : isas) { + Map> map = + DFSUtil.getNNServiceRpcAddresses(conf); + for (Entry> entry : + map.entrySet()) { + List nnList = Lists.newArrayList(entry.getValue().values()); + // TODO(HA) when HDFS-1971 (dual BRs) is done, pass all of the NNs + // to BPOS + InetSocketAddress isa = nnList.get(0); BPOfferService bpos = new BPOfferService(isa, DataNode.this); nameNodeThreads.put(bpos.getNNSocketAddress(), bpos); } @@ -333,8 +343,16 @@ public class DataNode extends Configured throws IOException { LOG.info("Refresh request received for nameservices: " + conf.get(DFS_FEDERATION_NAMESERVICES)); - List newAddresses = + + // TODO(HA): need to update this for multiple NNs per nameservice + // For now, just list all of the NNs into this set + Map> newAddressMap = DFSUtil.getNNServiceRpcAddresses(conf); + Set newAddresses = Sets.newHashSet(); + for (ConfiguredNNAddress cnn : DFSUtil.flattenAddressMap(newAddressMap)) { + newAddresses.add(cnn.getAddress()); + } + List toShutdown = new ArrayList(); List toStart = new ArrayList(); synchronized (refreshNamenodesLock) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java index 10601b17235..3ffc852667d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java @@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.util.StringUtils; import org.codehaus.jackson.JsonNode; @@ -66,9 +67,10 @@ class ClusterJspHelper { ClusterStatus generateClusterHealthReport() { ClusterStatus cs = new ClusterStatus(); Configuration conf = new Configuration(); - List isas = null; + List nns = null; try { - isas = DFSUtil.getNNServiceRpcAddresses(conf); + nns = DFSUtil.flattenAddressMap( + DFSUtil.getNNServiceRpcAddresses(conf)); } catch (Exception e) { // Could not build cluster status cs.setError(e); @@ -76,7 +78,8 @@ class ClusterJspHelper { } // Process each namenode and add it to ClusterStatus - for (InetSocketAddress isa : isas) { + for (ConfiguredNNAddress cnn : nns) { + InetSocketAddress isa = cnn.getAddress(); NamenodeMXBeanHelper nnHelper = null; try { nnHelper = new NamenodeMXBeanHelper(isa, conf); @@ -102,9 +105,10 @@ class ClusterJspHelper { DecommissionStatus generateDecommissioningReport() { String clusterid = ""; Configuration conf = new Configuration(); - List isas = null; + List cnns = null; try { - isas = DFSUtil.getNNServiceRpcAddresses(conf); + cnns = DFSUtil.flattenAddressMap( + DFSUtil.getNNServiceRpcAddresses(conf)); } catch (Exception e) { // catch any exception encountered other than connecting to namenodes DecommissionStatus dInfo = new DecommissionStatus(clusterid, e); @@ -122,7 +126,8 @@ class ClusterJspHelper { new HashMap(); List unreportedNamenode = new ArrayList(); - for (InetSocketAddress isa : isas) { + for (ConfiguredNNAddress cnn : cnns) { + InetSocketAddress isa = cnn.getAddress(); NamenodeMXBeanHelper nnHelper = null; try { nnHelper = new NamenodeMXBeanHelper(isa, conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 4eb080105f0..f411a4adbb1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -527,10 +527,11 @@ public class NameNode { throws IOException { this.conf = conf; this.role = role; - this.haEnabled = HAUtil.isHAEnabled(conf); + String nsId = getNameServiceId(conf); + this.haEnabled = HAUtil.isHAEnabled(conf, nsId); this.haContext = new NameNodeHAContext(); try { - initializeGenericKeys(conf, getNameServiceId(conf)); + initializeGenericKeys(conf, nsId); initialize(conf); if (!haEnabled) { state = ACTIVE_STATE; @@ -848,7 +849,7 @@ public class NameNode { */ public static void initializeGenericKeys(Configuration conf, String nameserviceId) { - String namenodeId = HAUtil.getNameNodeId(conf); + String namenodeId = HAUtil.getNameNodeId(conf, nameserviceId); if ((nameserviceId == null || nameserviceId.isEmpty()) && (namenodeId == null || namenodeId.isEmpty())) { return; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index d002fde1844..483d9eb6230 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -89,9 +90,14 @@ public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider, try { ugi = UserGroupInformation.getCurrentUser(); - Collection addresses = DFSUtil.getHaNnRpcAddresses( + Map> map = DFSUtil.getHaNnRpcAddresses( conf); - for (InetSocketAddress address : addresses) { + // TODO(HA): currently hardcoding the nameservice used by MiniDFSCluster. + // We need to somehow communicate this into the proxy provider. + String nsId = "nameserviceId1"; + Map addressesInNN = map.get(nsId); + + for (InetSocketAddress address : addressesInNN.values()) { proxies.add(new AddressRpcProxyPair(address)); } } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java index b9631430d74..ae544c2be04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java @@ -21,12 +21,15 @@ import java.io.IOException; import java.io.PrintStream; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; +import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -155,7 +158,7 @@ public class GetConf extends Configured implements Tool { static class NameNodesCommandHandler extends CommandHandler { @Override int doWorkInternal(GetConf tool) throws IOException { - tool.printList(DFSUtil.getNNServiceRpcAddresses(tool.getConf())); + tool.printMap(DFSUtil.getNNServiceRpcAddresses(tool.getConf())); return 0; } } @@ -166,7 +169,7 @@ public class GetConf extends Configured implements Tool { static class BackupNodesCommandHandler extends CommandHandler { @Override public int doWorkInternal(GetConf tool) throws IOException { - tool.printList(DFSUtil.getBackupNodeAddresses(tool.getConf())); + tool.printMap(DFSUtil.getBackupNodeAddresses(tool.getConf())); return 0; } } @@ -177,7 +180,7 @@ public class GetConf extends Configured implements Tool { static class SecondaryNameNodesCommandHandler extends CommandHandler { @Override public int doWorkInternal(GetConf tool) throws IOException { - tool.printList(DFSUtil.getSecondaryNameNodeAddresses(tool.getConf())); + tool.printMap(DFSUtil.getSecondaryNameNodeAddresses(tool.getConf())); return 0; } } @@ -191,9 +194,11 @@ public class GetConf extends Configured implements Tool { @Override public int doWorkInternal(GetConf tool) throws IOException { Configuration config = tool.getConf(); - List rpclist = DFSUtil.getNNServiceRpcAddresses(config); - if (rpclist != null) { - for (InetSocketAddress rpc : rpclist) { + List cnnlist = DFSUtil.flattenAddressMap( + DFSUtil.getNNServiceRpcAddresses(config)); + if (!cnnlist.isEmpty()) { + for (ConfiguredNNAddress cnn : cnnlist) { + InetSocketAddress rpc = cnn.getAddress(); tool.printOut(rpc.getHostName()+":"+rpc.getPort()); } return 0; @@ -223,10 +228,13 @@ public class GetConf extends Configured implements Tool { void printOut(String message) { out.println(message); } - - void printList(List list) { + + void printMap(Map> map) { StringBuilder buffer = new StringBuilder(); - for (InetSocketAddress address : list) { + + List cnns = DFSUtil.flattenAddressMap(map); + for (ConfiguredNNAddress cnn : cnns) { + InetSocketAddress address = cnn.getAddress(); if (buffer.length() > 0) { buffer.append(" "); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java index d9c64f70be0..5fb5bd70e8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -183,14 +184,19 @@ public class TestDFSUtil { conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"), NN2_ADDRESS); - Collection nnAddresses = DFSUtil + Map> nnMap = DFSUtil .getNNServiceRpcAddresses(conf); - assertEquals(2, nnAddresses.size()); - Iterator iterator = nnAddresses.iterator(); - InetSocketAddress addr = iterator.next(); + assertEquals(2, nnMap.size()); + + Map nn1Map = nnMap.get("nn1"); + assertEquals(1, nn1Map.size()); + InetSocketAddress addr = nn1Map.get(null); assertEquals("localhost", addr.getHostName()); assertEquals(9000, addr.getPort()); - addr = iterator.next(); + + Map nn2Map = nnMap.get("nn2"); + assertEquals(1, nn2Map.size()); + addr = nn2Map.get(null); assertEquals("localhost", addr.getHostName()); assertEquals(9001, addr.getPort()); @@ -237,9 +243,14 @@ public class TestDFSUtil { conf.set(FS_DEFAULT_NAME_KEY, hdfs_default); // If DFS_FEDERATION_NAMESERVICES is not set, verify that // default namenode address is returned. - List addrList = DFSUtil.getNNServiceRpcAddresses(conf); - assertEquals(1, addrList.size()); - assertEquals(9999, addrList.get(0).getPort()); + Map> addrMap = + DFSUtil.getNNServiceRpcAddresses(conf); + assertEquals(1, addrMap.size()); + + Map defaultNsMap = addrMap.get(null); + assertEquals(1, defaultNsMap.size()); + + assertEquals(9999, defaultNsMap.get(null).getPort()); } /** @@ -279,22 +290,28 @@ public class TestDFSUtil { public void testEmptyConf() { HdfsConfiguration conf = new HdfsConfiguration(false); try { - DFSUtil.getNNServiceRpcAddresses(conf); - fail("Expected IOException is not thrown"); + Map> map = + DFSUtil.getNNServiceRpcAddresses(conf); + fail("Expected IOException is not thrown, result was: " + + DFSUtil.addressMapToString(map)); } catch (IOException expected) { /** Expected */ } try { - DFSUtil.getBackupNodeAddresses(conf); - fail("Expected IOException is not thrown"); + Map> map = + DFSUtil.getBackupNodeAddresses(conf); + fail("Expected IOException is not thrown, result was: " + + DFSUtil.addressMapToString(map)); } catch (IOException expected) { /** Expected */ } try { - DFSUtil.getSecondaryNameNodeAddresses(conf); - fail("Expected IOException is not thrown"); + Map> map = + DFSUtil.getSecondaryNameNodeAddresses(conf); + fail("Expected IOException is not thrown, result was: " + + DFSUtil.addressMapToString(map)); } catch (IOException expected) { /** Expected */ } @@ -310,5 +327,44 @@ public class TestDFSUtil { String httpport = DFSUtil.getInfoServer(null, conf, false); assertEquals("0.0.0.0:50070", httpport); } + + @Test + public void testHANameNodesWithFederation() { + HdfsConfiguration conf = new HdfsConfiguration(); + + final String NS1_NN1_HOST = "ns1-nn1.example.com:8020"; + final String NS1_NN2_HOST = "ns1-nn2.example.com:8020"; + final String NS2_NN1_HOST = "ns2-nn1.example.com:8020"; + final String NS2_NN2_HOST = "ns2-nn2.example.com:8020"; + + // Two nameservices, each with two NNs. + conf.set(DFS_FEDERATION_NAMESERVICES, "ns1,ns2"); + conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, "ns1"), + "ns1-nn1,ns1-nn2"); + conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, "ns2"), + "ns2-nn1,ns2-nn2"); + conf.set(DFSUtil.addKeySuffixes( + DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "ns1-nn1"), + NS1_NN1_HOST); + conf.set(DFSUtil.addKeySuffixes( + DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "ns1-nn2"), + NS1_NN2_HOST); + conf.set(DFSUtil.addKeySuffixes( + DFS_NAMENODE_RPC_ADDRESS_KEY, "ns2", "ns2-nn1"), + NS2_NN1_HOST); + conf.set(DFSUtil.addKeySuffixes( + DFS_NAMENODE_RPC_ADDRESS_KEY, "ns2", "ns2-nn2"), + NS2_NN2_HOST); + + Map> map = + DFSUtil.getHaNnRpcAddresses(conf); + System.err.println("TestHANameNodesWithFederation:\n" + + DFSUtil.addressMapToString(map)); + + assertEquals(NS1_NN1_HOST, map.get("ns1").get("ns1-nn1").toString()); + assertEquals(NS1_NN2_HOST, map.get("ns1").get("ns1-nn2").toString()); + assertEquals(NS2_NN1_HOST, map.get("ns2").get("ns2-nn1").toString()); + assertEquals(NS2_NN2_HOST, map.get("ns2").get("ns2-nn2").toString()); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 34cd784bd04..84235112aa5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.TimeoutException; @@ -330,8 +331,8 @@ public class TestBalancer extends TestCase { waitForHeartBeat(totalUsedSpace, totalCapacity); // start rebalancing - final List namenodes =new ArrayList(); - namenodes.add(NameNode.getServiceAddress(conf, true)); + Map> namenodes = + DFSUtil.getNNServiceRpcAddresses(conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java index 6ca0ffe7b31..151614b14b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Random; import org.apache.commons.logging.Log; @@ -157,7 +158,8 @@ public class TestBalancerWithMultipleNameNodes { LOG.info("BALANCER 1"); // start rebalancing - final List namenodes = DFSUtil.getNNServiceRpcAddresses(s.conf); + final Map> namenodes = + DFSUtil.getNNServiceRpcAddresses(s.conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf); Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java index 39e8e20a0df..4553543d73d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java @@ -24,6 +24,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.StringTokenizer; import static org.junit.Assert.*; @@ -32,6 +33,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.tools.GetConf; @@ -80,13 +82,13 @@ public class TestGetConf { } /* - * Convert list of InetSocketAddress to string array with each address - * represented as "host:port" + * Convert the map returned from DFSUtil functions to an array of + * addresses represented as "host:port" */ - private String[] toStringArray(List list) { + private String[] toStringArray(List list) { String[] ret = new String[list.size()]; for (int i = 0; i < list.size(); i++) { - ret[i] = NameNode.getHostPortString(list.get(i)); + ret[i] = NameNode.getHostPortString(list.get(i).getAddress()); } return ret; } @@ -94,8 +96,8 @@ public class TestGetConf { /** * Using DFSUtil methods get the list of given {@code type} of address */ - private List getAddressListFromConf(TestType type, - HdfsConfiguration conf) throws IOException { + private Map> getAddressListFromConf( + TestType type, HdfsConfiguration conf) throws IOException { switch (type) { case NAMENODE: return DFSUtil.getNNServiceRpcAddresses(conf); @@ -161,7 +163,7 @@ public class TestGetConf { * @param expected, expected addresses */ private void getAddressListFromTool(TestType type, HdfsConfiguration conf, - boolean checkPort, List expected) throws Exception { + boolean checkPort, List expected) throws Exception { String out = getAddressListFromTool(type, conf, expected.size() != 0); List values = new ArrayList(); @@ -176,7 +178,8 @@ public class TestGetConf { // Convert expected list to String[] of hosts int i = 0; String[] expectedHosts = new String[expected.size()]; - for (InetSocketAddress addr : expected) { + for (ConfiguredNNAddress cnn : expected) { + InetSocketAddress addr = cnn.getAddress(); if (!checkPort) { expectedHosts[i++] = addr.getHostName(); }else { @@ -191,7 +194,9 @@ public class TestGetConf { private void verifyAddresses(HdfsConfiguration conf, TestType type, boolean checkPort, String... expected) throws Exception { // Ensure DFSUtil returned the right set of addresses - List list = getAddressListFromConf(type, conf); + Map> map = + getAddressListFromConf(type, conf); + List list = DFSUtil.flattenAddressMap(map); String[] actual = toStringArray(list); Arrays.sort(actual); Arrays.sort(expected);