HDFS-2582. Scope dfs.ha.namenodes config by nameservice. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1207738 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-11-29 02:27:45 +00:00
parent 73b3de6204
commit 9146ad23f3
14 changed files with 358 additions and 209 deletions

View File

@ -23,3 +23,5 @@ HDFS-2393. Mark appropriate methods of ClientProtocol with the idempotent annota
HDFS-2523. Small NN fixes to include HAServiceProtocol and prevent NPE on shutdown. (todd)
HDFS-2577. NN fails to start since it tries to start secret manager in safemode. (todd)
HDFS-2582. Scope dfs.ha.namenodes config by nameservice (todd)

View File

@ -24,10 +24,11 @@ import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.StringTokenizer;
@ -45,11 +46,14 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@InterfaceAudience.Private
public class DFSUtil {
private DFSUtil() { /* Hidden constructor */ }
@ -288,10 +292,22 @@ public class DFSUtil {
/**
* Returns collection of nameservice Ids from the configuration.
* @param conf configuration
* @return collection of nameservice Ids
* @return collection of nameservice Ids, or null if not specified
*/
public static Collection<String> getNameServiceIds(Configuration conf) {
return conf.getStringCollection(DFS_FEDERATION_NAMESERVICES);
return conf.getTrimmedStringCollection(DFS_FEDERATION_NAMESERVICES);
}
/**
* @return <code>coll</code> if it is non-null and non-empty. Otherwise,
* returns a list with a single null value.
*/
private static Collection<String> emptyAsSingletonNull(Collection<String> coll) {
if (coll == null || coll.isEmpty()) {
return Collections.singletonList(null);
} else {
return coll;
}
}
/**
@ -300,10 +316,12 @@ public class DFSUtil {
* for each namenode in the in the HA setup.
*
* @param conf configuration
* @param nsId the nameservice ID to look at, or null for non-federated
* @return collection of namenode Ids
*/
public static Collection<String> getNameNodeIds(Configuration conf) {
return conf.getStringCollection(DFS_HA_NAMENODES_KEY);
static Collection<String> getNameNodeIds(Configuration conf, String nsId) {
String key = addSuffix(DFS_HA_NAMENODES_KEY, nsId);
return conf.getTrimmedStringCollection(key);
}
/**
@ -333,13 +351,12 @@ public class DFSUtil {
/** Add non empty and non null suffix to a key */
private static String addSuffix(String key, String suffix) {
if (suffix == null || suffix.length() == 0) {
if (suffix == null || suffix.isEmpty()) {
return key;
}
if (!suffix.startsWith(".")) {
key += ".";
}
return key += suffix;
assert !suffix.startsWith(".") :
"suffix '" + suffix + "' should not already have '.' prepended.";
return key + "." + suffix;
}
/** Concatenate list of suffix strings '.' separated */
@ -347,11 +364,7 @@ public class DFSUtil {
if (suffixes == null) {
return null;
}
String ret = "";
for (int i = 0; i < suffixes.length - 1; i++) {
ret = addSuffix(ret, suffixes[i]);
}
return addSuffix(ret, suffixes[suffixes.length - 1]);
return Joiner.on(".").skipNulls().join(suffixes);
}
/**
@ -363,69 +376,44 @@ public class DFSUtil {
}
/**
* Returns list of InetSocketAddress for a given set of keys.
* Returns the configured address for all NameNodes in the cluster.
* @param conf configuration
* @param defaultAddress default address to return in case key is not found
* @param defaultAddress default address to return in case key is not found.
* @param keys Set of keys to look for in the order of preference
* @return list of InetSocketAddress corresponding to the key
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
*/
private static List<InetSocketAddress> getAddresses(Configuration conf,
private static Map<String, Map<String, InetSocketAddress>>
getAddresses(Configuration conf,
String defaultAddress, String... keys) {
Collection<String> nameserviceIds = getNameServiceIds(conf);
Collection<String> namenodeIds = getNameNodeIds(conf);
List<InetSocketAddress> isas = new ArrayList<InetSocketAddress>();
final boolean federationEnabled = nameserviceIds != null
&& !nameserviceIds.isEmpty();
final boolean haEnabled = namenodeIds != null
&& !namenodeIds.isEmpty();
// Configuration with no federation and ha, return default address
if (!federationEnabled && !haEnabled) {
String address = getConfValue(defaultAddress, null, conf, keys);
if (address == null) {
return null;
// Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
// across all of the configured nameservices and namenodes.
Map<String, Map<String, InetSocketAddress>> ret = Maps.newHashMap();
for (String nsId : emptyAsSingletonNull(nameserviceIds)) {
Map<String, InetSocketAddress> isas =
getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
if (!isas.isEmpty()) {
ret.put(nsId, isas);
}
isas.add(NetUtils.createSocketAddr(address));
return isas;
}
return ret;
}
if (!federationEnabled) {
nameserviceIds = new ArrayList<String>();
nameserviceIds.add(null);
}
if (!haEnabled) {
namenodeIds = new ArrayList<String>();
namenodeIds.add(null);
}
// Get configuration suffixed with nameserviceId and/or namenodeId
if (federationEnabled && haEnabled) {
for (String nameserviceId : nameserviceIds) {
for (String nnId : namenodeIds) {
String keySuffix = concatSuffixes(nameserviceId, nnId);
String address = getConfValue(null, keySuffix, conf, keys);
private static Map<String, InetSocketAddress> getAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue,
String[] keys) {
Collection<String> nnIds = getNameNodeIds(conf, nsId);
Map<String, InetSocketAddress> ret = Maps.newHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) {
String suffix = concatSuffixes(nsId, nnId);
String address = getConfValue(defaultValue, suffix, conf, keys);
if (address != null) {
isas.add(NetUtils.createSocketAddr(address));
InetSocketAddress isa = NetUtils.createSocketAddr(address);
ret.put(nnId, isa);
}
}
}
} else if (!federationEnabled && haEnabled) {
for (String nnId : namenodeIds) {
String address = getConfValue(null, nnId, conf, keys);
if (address != null) {
isas.add(NetUtils.createSocketAddr(address));
}
}
} else if (federationEnabled && !haEnabled) {
for (String nameserviceId : nameserviceIds) {
String address = getConfValue(null, nameserviceId, conf, keys);
if (address != null) {
isas.add(NetUtils.createSocketAddr(address));
}
}
}
return isas;
return ret;
}
/**
@ -436,15 +424,9 @@ public class DFSUtil {
* @return list of InetSocketAddresses
* @throws IOException if no addresses are configured
*/
public static List<InetSocketAddress> getHaNnRpcAddresses(
Configuration conf) throws IOException {
List<InetSocketAddress> addressList = getAddresses(conf, null,
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
if (addressList == null) {
throw new IOException("Incorrect configuration: HA name node addresses "
+ DFS_NAMENODE_RPC_ADDRESS_KEY + " is not configured.");
}
return addressList;
public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
Configuration conf) {
return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}
/**
@ -455,11 +437,11 @@ public class DFSUtil {
* @return list of InetSocketAddresses
* @throws IOException on error
*/
public static List<InetSocketAddress> getBackupNodeAddresses(
public static Map<String, Map<String, InetSocketAddress>> getBackupNodeAddresses(
Configuration conf) throws IOException {
List<InetSocketAddress> addressList = getAddresses(conf,
Map<String, Map<String, InetSocketAddress>> addressList = getAddresses(conf,
null, DFS_NAMENODE_BACKUP_ADDRESS_KEY);
if (addressList == null) {
if (addressList.isEmpty()) {
throw new IOException("Incorrect configuration: backup node address "
+ DFS_NAMENODE_BACKUP_ADDRESS_KEY + " is not configured.");
}
@ -474,11 +456,11 @@ public class DFSUtil {
* @return list of InetSocketAddresses
* @throws IOException on error
*/
public static List<InetSocketAddress> getSecondaryNameNodeAddresses(
public static Map<String, Map<String, InetSocketAddress>> getSecondaryNameNodeAddresses(
Configuration conf) throws IOException {
List<InetSocketAddress> addressList = getAddresses(conf, null,
Map<String, Map<String, InetSocketAddress>> addressList = getAddresses(conf, null,
DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY);
if (addressList == null) {
if (addressList.isEmpty()) {
throw new IOException("Incorrect configuration: secondary namenode address "
+ DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY + " is not configured.");
}
@ -498,7 +480,7 @@ public class DFSUtil {
* @return list of InetSocketAddress
* @throws IOException on error
*/
public static List<InetSocketAddress> getNNServiceRpcAddresses(
public static Map<String, Map<String, InetSocketAddress>> getNNServiceRpcAddresses(
Configuration conf) throws IOException {
// Use default address as fall back
String defaultAddress;
@ -508,9 +490,10 @@ public class DFSUtil {
defaultAddress = null;
}
List<InetSocketAddress> addressList = getAddresses(conf, defaultAddress,
Map<String, Map<String, InetSocketAddress>> addressList =
getAddresses(conf, defaultAddress,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
if (addressList == null) {
if (addressList.isEmpty()) {
throw new IOException("Incorrect configuration: namenode address "
+ DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "
+ DFS_NAMENODE_RPC_ADDRESS_KEY
@ -519,6 +502,77 @@ public class DFSUtil {
return addressList;
}
/**
* Flatten the given map, as returned by other functions in this class,
* into a flat list of {@link ConfiguredNNAddress} instances.
*/
public static List<ConfiguredNNAddress> flattenAddressMap(
Map<String, Map<String, InetSocketAddress>> map) {
List<ConfiguredNNAddress> ret = Lists.newArrayList();
for (Map.Entry<String, Map<String, InetSocketAddress>> entry :
map.entrySet()) {
String nsId = entry.getKey();
Map<String, InetSocketAddress> nnMap = entry.getValue();
for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
String nnId = e2.getKey();
InetSocketAddress addr = e2.getValue();
ret.add(new ConfiguredNNAddress(nsId, nnId, addr));
}
}
return ret;
}
/**
* Format the given map, as returned by other functions in this class,
* into a string suitable for debugging display. The format of this string
* should not be considered an interface, and is liable to change.
*/
public static String addressMapToString(
Map<String, Map<String, InetSocketAddress>> map) {
StringBuilder b = new StringBuilder();
for (Map.Entry<String, Map<String, InetSocketAddress>> entry :
map.entrySet()) {
String nsId = entry.getKey();
Map<String, InetSocketAddress> nnMap = entry.getValue();
b.append("Nameservice <").append(nsId).append(">:").append("\n");
for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
b.append(" NN ID ").append(e2.getKey())
.append(" => ").append(e2.getValue()).append("\n");
}
}
return b.toString();
}
/**
* Represent one of the NameNodes configured in the cluster.
*/
public static class ConfiguredNNAddress {
private final String nameserviceId;
private final String namenodeId;
private final InetSocketAddress addr;
private ConfiguredNNAddress(String nameserviceId, String namenodeId,
InetSocketAddress addr) {
this.nameserviceId = nameserviceId;
this.namenodeId = namenodeId;
this.addr = addr;
}
public String getNameserviceId() {
return nameserviceId;
}
public String getNamenodeId() {
return namenodeId;
}
public InetSocketAddress getAddress() {
return addr;
}
}
/**
* Given the InetSocketAddress this method returns the nameservice Id
* corresponding to the key with matching address, by doing a reverse
@ -545,11 +599,8 @@ public class DFSUtil {
public static String getNameServiceIdFromAddress(final Configuration conf,
final InetSocketAddress address, String... keys) {
// Configuration with a single namenode and no nameserviceId
if (!isFederationEnabled(conf)) {
return null;
}
String[] ids = getSuffixIDs(conf, address, keys);
return (ids != null && ids.length > 0) ? ids[0] : null;
return (ids != null) ? ids[0] : null;
}
/**
@ -716,14 +767,6 @@ public class DFSUtil {
locatedBlock);
}
/**
* Returns true if federation configuration is enabled
*/
public static boolean isFederationEnabled(Configuration conf) {
Collection<String> collection = getNameServiceIds(conf);
return collection != null && collection.size() != 0;
}
/** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout)
@ -783,16 +826,9 @@ public class DFSUtil {
if (nameserviceId != null) {
return nameserviceId;
}
if (!isFederationEnabled(conf)) {
return null;
}
nameserviceId = getSuffixIDs(conf, addressKey, LOCAL_ADDRESS_MATCHER)[0];
if (nameserviceId == null) {
String msg = "Configuration " + addressKey + " must be suffixed with" +
" nameserviceId for federation configuration.";
throw new HadoopIllegalArgumentException(msg);
}
return nameserviceId;
String nnId = conf.get(DFS_HA_NAMENODE_ID_KEY);
return getSuffixIDs(conf, addressKey, null, nnId, LOCAL_ADDRESS_MATCHER)[0];
}
/**
@ -801,6 +837,8 @@ public class DFSUtil {
*
* @param conf Configuration
* @param addressKey configuration key corresponding to the address.
* @param knownNsId only look at configs for the given nameservice, if not-null
* @param knownNNId only look at configs for the given namenode, if not null
* @param matcher matching criteria for matching the address
* @return Array with nameservice Id and namenode Id on success. First element
* in the array is nameservice Id and second element is namenode Id.
@ -809,29 +847,23 @@ public class DFSUtil {
* @throws HadoopIllegalArgumentException on error
*/
static String[] getSuffixIDs(final Configuration conf, final String addressKey,
String knownNsId, String knownNNId,
final AddressMatcher matcher) {
Collection<String> nsIds = getNameServiceIds(conf);
boolean federationEnabled = true;
if (nsIds == null || nsIds.size() == 0) {
federationEnabled = false; // federation not configured
nsIds = new ArrayList<String>();
nsIds.add(null);
}
boolean haEnabled = true;
Collection<String> nnIds = getNameNodeIds(conf);
if (nnIds == null || nnIds.size() == 0) {
haEnabled = false; // HA not configured
nnIds = new ArrayList<String>();
nnIds.add(null);
}
// Match the address from addressKey.nsId.nnId based on the given matcher
String nameserviceId = null;
String namenodeId = null;
int found = 0;
for (String nsId : nsIds) {
for (String nnId : nnIds) {
Collection<String> nsIds = getNameServiceIds(conf);
for (String nsId : emptyAsSingletonNull(nsIds)) {
if (knownNsId != null && !knownNsId.equals(nsId)) {
continue;
}
Collection<String> nnIds = getNameNodeIds(conf, nsId);
for (String nnId : emptyAsSingletonNull(nnIds)) {
if (knownNNId != null && !knownNNId.equals(nnId)) {
continue;
}
String key = addKeySuffixes(addressKey, nsId, nnId);
String addr = conf.get(key);
InetSocketAddress s = null;
@ -850,8 +882,8 @@ public class DFSUtil {
if (found > 1) { // Only one address must match the local address
String msg = "Configuration has multiple addresses that match "
+ "local node's address. Please configure the system with "
+ (federationEnabled ? DFS_FEDERATION_NAMESERVICE_ID : "")
+ (haEnabled ? (" and " + DFS_HA_NAMENODE_ID_KEY) : "");
+ DFS_FEDERATION_NAMESERVICE_ID + " and "
+ DFS_HA_NAMENODE_ID_KEY;
throw new HadoopIllegalArgumentException(msg);
}
return new String[] { nameserviceId, namenodeId };
@ -872,7 +904,7 @@ public class DFSUtil {
};
for (String key : keys) {
String[] ids = getSuffixIDs(conf, key, matcher);
String[] ids = getSuffixIDs(conf, key, null, null, matcher);
if (ids != null && (ids [0] != null || ids[1] != null)) {
return ids;
}

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
@ -29,14 +30,18 @@ public class HAUtil {
private HAUtil() { /* Hidden constructor */ }
/**
* Returns true if HA for namenode is configured.
* Returns true if HA for namenode is configured for the given nameservice
*
* @param conf Configuration
* @param nsId nameservice, or null if no federated NS is configured
* @return true if HA is configured in the configuration; else false.
*/
public static boolean isHAEnabled(Configuration conf) {
Collection<String> collection = DFSUtil.getNameNodeIds(conf);
return collection != null && !collection.isEmpty();
public static boolean isHAEnabled(Configuration conf, String nsId) {
Map<String, Map<String, InetSocketAddress>> addresses =
DFSUtil.getHaNnRpcAddresses(conf);
if (addresses == null) return false;
Map<String, InetSocketAddress> nnMap = addresses.get(nsId);
return nnMap != null && nnMap.size() > 1;
}
/**
@ -52,22 +57,21 @@ public class HAUtil {
* @return namenode Id on success, null on failure.
* @throws HadoopIllegalArgumentException on error
*/
public static String getNameNodeId(Configuration conf) {
String namenodeId = conf.get(DFS_HA_NAMENODE_ID_KEY);
public static String getNameNodeId(Configuration conf, String nsId) {
String namenodeId = conf.getTrimmed(DFS_HA_NAMENODE_ID_KEY);
if (namenodeId != null) {
return namenodeId;
}
if (!isHAEnabled(conf)) {
return null;
}
namenodeId = DFSUtil.getSuffixIDs(conf, DFS_NAMENODE_RPC_ADDRESS_KEY,
DFSUtil.LOCAL_ADDRESS_MATCHER)[1];
if (namenodeId == null) {
String suffixes[] = DFSUtil.getSuffixIDs(conf, DFS_NAMENODE_RPC_ADDRESS_KEY,
nsId, null, DFSUtil.LOCAL_ADDRESS_MATCHER);
if (suffixes == null) {
String msg = "Configuration " + DFS_NAMENODE_RPC_ADDRESS_KEY +
" must be suffixed with" + namenodeId + " for HA configuration.";
throw new HadoopIllegalArgumentException(msg);
}
return namenodeId;
return suffixes[1];
}
/**
@ -78,14 +82,11 @@ public class HAUtil {
public static String getNameNodeIdFromAddress(final Configuration conf,
final InetSocketAddress address, String... keys) {
// Configuration with a single namenode and no nameserviceId
if (!isHAEnabled(conf)) {
return null;
}
String[] ids = DFSUtil.getSuffixIDs(conf, address, keys);
if (ids != null && ids.length > 1) {
return ids[1];
}
return null;
}
}

View File

@ -39,6 +39,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -1379,7 +1380,8 @@ public class Balancer {
* for each namenode,
* execute a {@link Balancer} to work through all datanodes once.
*/
static int run(List<InetSocketAddress> namenodes, final Parameters p,
static int run(Map<String, Map<String, InetSocketAddress>> namenodes,
final Parameters p,
Configuration conf) throws IOException, InterruptedException {
final long sleeptime = 2000*conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
@ -1393,8 +1395,10 @@ public class Balancer {
final List<NameNodeConnector> connectors
= new ArrayList<NameNodeConnector>(namenodes.size());
try {
for(InetSocketAddress isa : namenodes) {
connectors.add(new NameNodeConnector(isa, conf));
for(Entry<String, Map<String, InetSocketAddress>> entry :
namenodes.entrySet()) {
connectors.add(
new NameNodeConnector(entry.getValue().values(), conf));
}
boolean done = false;
@ -1476,7 +1480,8 @@ public class Balancer {
try {
checkReplicationPolicyCompatibility(conf);
final List<InetSocketAddress> namenodes = DFSUtil.getNNServiceRpcAddresses(conf);
final Map<String, Map<String, InetSocketAddress>> namenodes =
DFSUtil.getNNServiceRpcAddresses(conf);
return Balancer.run(namenodes, parse(args), conf);
} catch (IOException e) {
System.out.println(e + ". Exiting ...");

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@ -53,6 +55,9 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
/**
* The class provides utilities for {@link Balancer} to access a NameNode
*/
@ -75,12 +80,14 @@ class NameNodeConnector {
private BlockTokenSecretManager blockTokenSecretManager;
private Daemon keyupdaterthread; // AccessKeyUpdater thread
NameNodeConnector(InetSocketAddress namenodeAddress, Configuration conf
) throws IOException {
this.namenodeAddress = namenodeAddress;
this.namenode = createNamenode(namenodeAddress, conf);
NameNodeConnector(Collection<InetSocketAddress> haNNs,
Configuration conf) throws IOException {
InetSocketAddress nn = Lists.newArrayList(haNNs).get(0);
// TODO(HA): need to deal with connecting to HA NN pair here
this.namenodeAddress = nn;
this.namenode = createNamenode(nn, conf);
this.client = DFSUtil.createNamenode(conf);
this.fs = FileSystem.get(NameNode.getUri(namenodeAddress), conf);
this.fs = FileSystem.get(NameNode.getUri(nn), conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest();
this.blockpoolID = namespaceinfo.getBlockPoolID();

View File

@ -77,6 +77,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@ -92,6 +93,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
@ -168,6 +170,8 @@ import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**********************************************************
@ -251,8 +255,14 @@ public class DataNode extends Configured
bpMapping = new HashMap<String, BPOfferService>();
nameNodeThreads = new HashMap<InetSocketAddress, BPOfferService>();
List<InetSocketAddress> isas = DFSUtil.getNNServiceRpcAddresses(conf);
for(InetSocketAddress isa : isas) {
Map<String, Map<String, InetSocketAddress>> map =
DFSUtil.getNNServiceRpcAddresses(conf);
for (Entry<String, Map<String, InetSocketAddress>> entry :
map.entrySet()) {
List<InetSocketAddress> nnList = Lists.newArrayList(entry.getValue().values());
// TODO(HA) when HDFS-1971 (dual BRs) is done, pass all of the NNs
// to BPOS
InetSocketAddress isa = nnList.get(0);
BPOfferService bpos = new BPOfferService(isa, DataNode.this);
nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
}
@ -333,8 +343,16 @@ public class DataNode extends Configured
throws IOException {
LOG.info("Refresh request received for nameservices: "
+ conf.get(DFS_FEDERATION_NAMESERVICES));
List<InetSocketAddress> newAddresses =
// TODO(HA): need to update this for multiple NNs per nameservice
// For now, just list all of the NNs into this set
Map<String, Map<String, InetSocketAddress>> newAddressMap =
DFSUtil.getNNServiceRpcAddresses(conf);
Set<InetSocketAddress> newAddresses = Sets.newHashSet();
for (ConfiguredNNAddress cnn : DFSUtil.flattenAddressMap(newAddressMap)) {
newAddresses.add(cnn.getAddress());
}
List<BPOfferService> toShutdown = new ArrayList<BPOfferService>();
List<InetSocketAddress> toStart = new ArrayList<InetSocketAddress>();
synchronized (refreshNamenodesLock) {

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.util.StringUtils;
import org.codehaus.jackson.JsonNode;
@ -66,9 +67,10 @@ class ClusterJspHelper {
ClusterStatus generateClusterHealthReport() {
ClusterStatus cs = new ClusterStatus();
Configuration conf = new Configuration();
List<InetSocketAddress> isas = null;
List<ConfiguredNNAddress> nns = null;
try {
isas = DFSUtil.getNNServiceRpcAddresses(conf);
nns = DFSUtil.flattenAddressMap(
DFSUtil.getNNServiceRpcAddresses(conf));
} catch (Exception e) {
// Could not build cluster status
cs.setError(e);
@ -76,7 +78,8 @@ class ClusterJspHelper {
}
// Process each namenode and add it to ClusterStatus
for (InetSocketAddress isa : isas) {
for (ConfiguredNNAddress cnn : nns) {
InetSocketAddress isa = cnn.getAddress();
NamenodeMXBeanHelper nnHelper = null;
try {
nnHelper = new NamenodeMXBeanHelper(isa, conf);
@ -102,9 +105,10 @@ class ClusterJspHelper {
DecommissionStatus generateDecommissioningReport() {
String clusterid = "";
Configuration conf = new Configuration();
List<InetSocketAddress> isas = null;
List<ConfiguredNNAddress> cnns = null;
try {
isas = DFSUtil.getNNServiceRpcAddresses(conf);
cnns = DFSUtil.flattenAddressMap(
DFSUtil.getNNServiceRpcAddresses(conf));
} catch (Exception e) {
// catch any exception encountered other than connecting to namenodes
DecommissionStatus dInfo = new DecommissionStatus(clusterid, e);
@ -122,7 +126,8 @@ class ClusterJspHelper {
new HashMap<String, Exception>();
List<String> unreportedNamenode = new ArrayList<String>();
for (InetSocketAddress isa : isas) {
for (ConfiguredNNAddress cnn : cnns) {
InetSocketAddress isa = cnn.getAddress();
NamenodeMXBeanHelper nnHelper = null;
try {
nnHelper = new NamenodeMXBeanHelper(isa, conf);

View File

@ -527,10 +527,11 @@ public class NameNode {
throws IOException {
this.conf = conf;
this.role = role;
this.haEnabled = HAUtil.isHAEnabled(conf);
String nsId = getNameServiceId(conf);
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
this.haContext = new NameNodeHAContext();
try {
initializeGenericKeys(conf, getNameServiceId(conf));
initializeGenericKeys(conf, nsId);
initialize(conf);
if (!haEnabled) {
state = ACTIVE_STATE;
@ -848,7 +849,7 @@ public class NameNode {
*/
public static void initializeGenericKeys(Configuration conf, String
nameserviceId) {
String namenodeId = HAUtil.getNameNodeId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nameserviceId);
if ((nameserviceId == null || nameserviceId.isEmpty()) &&
(namenodeId == null || namenodeId.isEmpty())) {
return;

View File

@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -89,9 +90,14 @@ public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider,
try {
ugi = UserGroupInformation.getCurrentUser();
Collection<InetSocketAddress> addresses = DFSUtil.getHaNnRpcAddresses(
Map<String, Map<String, InetSocketAddress>> map = DFSUtil.getHaNnRpcAddresses(
conf);
for (InetSocketAddress address : addresses) {
// TODO(HA): currently hardcoding the nameservice used by MiniDFSCluster.
// We need to somehow communicate this into the proxy provider.
String nsId = "nameserviceId1";
Map<String, InetSocketAddress> addressesInNN = map.get(nsId);
for (InetSocketAddress address : addressesInNN.values()) {
proxies.add(new AddressRpcProxyPair(address));
}
} catch (IOException e) {

View File

@ -21,12 +21,15 @@ import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@ -155,7 +158,7 @@ public class GetConf extends Configured implements Tool {
static class NameNodesCommandHandler extends CommandHandler {
@Override
int doWorkInternal(GetConf tool) throws IOException {
tool.printList(DFSUtil.getNNServiceRpcAddresses(tool.getConf()));
tool.printMap(DFSUtil.getNNServiceRpcAddresses(tool.getConf()));
return 0;
}
}
@ -166,7 +169,7 @@ public class GetConf extends Configured implements Tool {
static class BackupNodesCommandHandler extends CommandHandler {
@Override
public int doWorkInternal(GetConf tool) throws IOException {
tool.printList(DFSUtil.getBackupNodeAddresses(tool.getConf()));
tool.printMap(DFSUtil.getBackupNodeAddresses(tool.getConf()));
return 0;
}
}
@ -177,7 +180,7 @@ public class GetConf extends Configured implements Tool {
static class SecondaryNameNodesCommandHandler extends CommandHandler {
@Override
public int doWorkInternal(GetConf tool) throws IOException {
tool.printList(DFSUtil.getSecondaryNameNodeAddresses(tool.getConf()));
tool.printMap(DFSUtil.getSecondaryNameNodeAddresses(tool.getConf()));
return 0;
}
}
@ -191,9 +194,11 @@ public class GetConf extends Configured implements Tool {
@Override
public int doWorkInternal(GetConf tool) throws IOException {
Configuration config = tool.getConf();
List<InetSocketAddress> rpclist = DFSUtil.getNNServiceRpcAddresses(config);
if (rpclist != null) {
for (InetSocketAddress rpc : rpclist) {
List<ConfiguredNNAddress> cnnlist = DFSUtil.flattenAddressMap(
DFSUtil.getNNServiceRpcAddresses(config));
if (!cnnlist.isEmpty()) {
for (ConfiguredNNAddress cnn : cnnlist) {
InetSocketAddress rpc = cnn.getAddress();
tool.printOut(rpc.getHostName()+":"+rpc.getPort());
}
return 0;
@ -224,9 +229,12 @@ public class GetConf extends Configured implements Tool {
out.println(message);
}
void printList(List<InetSocketAddress> list) {
void printMap(Map<String, Map<String, InetSocketAddress>> map) {
StringBuilder buffer = new StringBuilder();
for (InetSocketAddress address : list) {
List<ConfiguredNNAddress> cnns = DFSUtil.flattenAddressMap(map);
for (ConfiguredNNAddress cnn : cnns) {
InetSocketAddress address = cnn.getAddress();
if (buffer.length() > 0) {
buffer.append(" ");
}

View File

@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -183,14 +184,19 @@ public class TestDFSUtil {
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"),
NN2_ADDRESS);
Collection<InetSocketAddress> nnAddresses = DFSUtil
Map<String, Map<String, InetSocketAddress>> nnMap = DFSUtil
.getNNServiceRpcAddresses(conf);
assertEquals(2, nnAddresses.size());
Iterator<InetSocketAddress> iterator = nnAddresses.iterator();
InetSocketAddress addr = iterator.next();
assertEquals(2, nnMap.size());
Map<String, InetSocketAddress> nn1Map = nnMap.get("nn1");
assertEquals(1, nn1Map.size());
InetSocketAddress addr = nn1Map.get(null);
assertEquals("localhost", addr.getHostName());
assertEquals(9000, addr.getPort());
addr = iterator.next();
Map<String, InetSocketAddress> nn2Map = nnMap.get("nn2");
assertEquals(1, nn2Map.size());
addr = nn2Map.get(null);
assertEquals("localhost", addr.getHostName());
assertEquals(9001, addr.getPort());
@ -237,9 +243,14 @@ public class TestDFSUtil {
conf.set(FS_DEFAULT_NAME_KEY, hdfs_default);
// If DFS_FEDERATION_NAMESERVICES is not set, verify that
// default namenode address is returned.
List<InetSocketAddress> addrList = DFSUtil.getNNServiceRpcAddresses(conf);
assertEquals(1, addrList.size());
assertEquals(9999, addrList.get(0).getPort());
Map<String, Map<String, InetSocketAddress>> addrMap =
DFSUtil.getNNServiceRpcAddresses(conf);
assertEquals(1, addrMap.size());
Map<String, InetSocketAddress> defaultNsMap = addrMap.get(null);
assertEquals(1, defaultNsMap.size());
assertEquals(9999, defaultNsMap.get(null).getPort());
}
/**
@ -279,22 +290,28 @@ public class TestDFSUtil {
public void testEmptyConf() {
HdfsConfiguration conf = new HdfsConfiguration(false);
try {
Map<String, Map<String, InetSocketAddress>> map =
DFSUtil.getNNServiceRpcAddresses(conf);
fail("Expected IOException is not thrown");
fail("Expected IOException is not thrown, result was: " +
DFSUtil.addressMapToString(map));
} catch (IOException expected) {
/** Expected */
}
try {
Map<String, Map<String, InetSocketAddress>> map =
DFSUtil.getBackupNodeAddresses(conf);
fail("Expected IOException is not thrown");
fail("Expected IOException is not thrown, result was: " +
DFSUtil.addressMapToString(map));
} catch (IOException expected) {
/** Expected */
}
try {
Map<String, Map<String, InetSocketAddress>> map =
DFSUtil.getSecondaryNameNodeAddresses(conf);
fail("Expected IOException is not thrown");
fail("Expected IOException is not thrown, result was: " +
DFSUtil.addressMapToString(map));
} catch (IOException expected) {
/** Expected */
}
@ -311,4 +328,43 @@ public class TestDFSUtil {
assertEquals("0.0.0.0:50070", httpport);
}
@Test
public void testHANameNodesWithFederation() {
HdfsConfiguration conf = new HdfsConfiguration();
final String NS1_NN1_HOST = "ns1-nn1.example.com:8020";
final String NS1_NN2_HOST = "ns1-nn2.example.com:8020";
final String NS2_NN1_HOST = "ns2-nn1.example.com:8020";
final String NS2_NN2_HOST = "ns2-nn2.example.com:8020";
// Two nameservices, each with two NNs.
conf.set(DFS_FEDERATION_NAMESERVICES, "ns1,ns2");
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, "ns1"),
"ns1-nn1,ns1-nn2");
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, "ns2"),
"ns2-nn1,ns2-nn2");
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "ns1-nn1"),
NS1_NN1_HOST);
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "ns1-nn2"),
NS1_NN2_HOST);
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns2", "ns2-nn1"),
NS2_NN1_HOST);
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns2", "ns2-nn2"),
NS2_NN2_HOST);
Map<String, Map<String, InetSocketAddress>> map =
DFSUtil.getHaNnRpcAddresses(conf);
System.err.println("TestHANameNodesWithFederation:\n" +
DFSUtil.addressMapToString(map));
assertEquals(NS1_NN1_HOST, map.get("ns1").get("ns1-nn1").toString());
assertEquals(NS1_NN2_HOST, map.get("ns1").get("ns1-nn2").toString());
assertEquals(NS2_NN1_HOST, map.get("ns2").get("ns2-nn1").toString());
assertEquals(NS2_NN2_HOST, map.get("ns2").get("ns2-nn2").toString());
}
}

View File

@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeoutException;
@ -330,8 +331,8 @@ public class TestBalancer extends TestCase {
waitForHeartBeat(totalUsedSpace, totalCapacity);
// start rebalancing
final List<InetSocketAddress> namenodes =new ArrayList<InetSocketAddress>();
namenodes.add(NameNode.getServiceAddress(conf, true));
Map<String, Map<String, InetSocketAddress>> namenodes =
DFSUtil.getNNServiceRpcAddresses(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
@ -157,7 +158,8 @@ public class TestBalancerWithMultipleNameNodes {
LOG.info("BALANCER 1");
// start rebalancing
final List<InetSocketAddress> namenodes = DFSUtil.getNNServiceRpcAddresses(s.conf);
final Map<String, Map<String, InetSocketAddress>> namenodes =
DFSUtil.getNNServiceRpcAddresses(s.conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf);
Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);

View File

@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import static org.junit.Assert.*;
@ -32,6 +33,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.tools.GetConf;
@ -80,13 +82,13 @@ public class TestGetConf {
}
/*
* Convert list of InetSocketAddress to string array with each address
* represented as "host:port"
* Convert the map returned from DFSUtil functions to an array of
* addresses represented as "host:port"
*/
private String[] toStringArray(List<InetSocketAddress> list) {
private String[] toStringArray(List<ConfiguredNNAddress> list) {
String[] ret = new String[list.size()];
for (int i = 0; i < list.size(); i++) {
ret[i] = NameNode.getHostPortString(list.get(i));
ret[i] = NameNode.getHostPortString(list.get(i).getAddress());
}
return ret;
}
@ -94,8 +96,8 @@ public class TestGetConf {
/**
* Using DFSUtil methods get the list of given {@code type} of address
*/
private List<InetSocketAddress> getAddressListFromConf(TestType type,
HdfsConfiguration conf) throws IOException {
private Map<String, Map<String, InetSocketAddress>> getAddressListFromConf(
TestType type, HdfsConfiguration conf) throws IOException {
switch (type) {
case NAMENODE:
return DFSUtil.getNNServiceRpcAddresses(conf);
@ -161,7 +163,7 @@ public class TestGetConf {
* @param expected, expected addresses
*/
private void getAddressListFromTool(TestType type, HdfsConfiguration conf,
boolean checkPort, List<InetSocketAddress> expected) throws Exception {
boolean checkPort, List<ConfiguredNNAddress> expected) throws Exception {
String out = getAddressListFromTool(type, conf, expected.size() != 0);
List<String> values = new ArrayList<String>();
@ -176,7 +178,8 @@ public class TestGetConf {
// Convert expected list to String[] of hosts
int i = 0;
String[] expectedHosts = new String[expected.size()];
for (InetSocketAddress addr : expected) {
for (ConfiguredNNAddress cnn : expected) {
InetSocketAddress addr = cnn.getAddress();
if (!checkPort) {
expectedHosts[i++] = addr.getHostName();
}else {
@ -191,7 +194,9 @@ public class TestGetConf {
private void verifyAddresses(HdfsConfiguration conf, TestType type,
boolean checkPort, String... expected) throws Exception {
// Ensure DFSUtil returned the right set of addresses
List<InetSocketAddress> list = getAddressListFromConf(type, conf);
Map<String, Map<String, InetSocketAddress>> map =
getAddressListFromConf(type, conf);
List<ConfiguredNNAddress> list = DFSUtil.flattenAddressMap(map);
String[] actual = toStringArray(list);
Arrays.sort(actual);
Arrays.sort(expected);