HDFS-6376. Distcp data between two HA clusters requires another configuration. Contributed by Dave Marion and Haohui Mai.

This commit is contained in:
Jing Zhao 2014-09-05 10:40:02 -07:00
parent 0b3f918b3b
commit a074342b4c
9 changed files with 164 additions and 20 deletions

View File

@ -185,6 +185,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6886. Use single editlog record for creating file + overwrite. (Yi Liu
via jing9)
HDFS-6376. Distcp data between two HA clusters requires another configuration.
(Dave Marion and Haohui Mai via jing9)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -538,6 +538,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMESERVICES = "dfs.nameservices";
public static final String DFS_NAMESERVICE_ID = "dfs.nameservice.id";
public static final String DFS_INTERNAL_NAMESERVICES_KEY = "dfs.internal.nameservices";
public static final String DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY = "dfs.namenode.resource.check.interval";
public static final int DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT = 5000;
public static final String DFS_NAMENODE_DU_RESERVED_KEY = "dfs.namenode.resource.du.reserved";

View File

@ -60,6 +60,7 @@
import javax.net.SocketFactory;
import com.google.common.collect.Sets;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Option;
@ -613,7 +614,7 @@ public static String addKeySuffixes(String key, String... suffixes) {
String keySuffix = concatSuffixes(suffixes);
return addSuffix(key, keySuffix);
}
/**
* Returns the configured address for all NameNodes in the cluster.
* @param conf configuration
@ -622,14 +623,25 @@ public static String addKeySuffixes(String key, String... suffixes) {
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
*/
private static Map<String, Map<String, InetSocketAddress>>
getAddresses(Configuration conf,
String defaultAddress, String... keys) {
getAddresses(Configuration conf, String defaultAddress, String... keys) {
Collection<String> nameserviceIds = getNameServiceIds(conf);
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);
}
/**
* Returns the configured address for all NameNodes in the cluster.
* @param conf configuration
* @param nsIds
*@param defaultAddress default address to return in case key is not found.
* @param keys Set of keys to look for in the order of preference @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
*/
private static Map<String, Map<String, InetSocketAddress>>
getAddressesForNsIds(Configuration conf, Collection<String> nsIds,
String defaultAddress, String... keys) {
// Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
// across all of the configured nameservices and namenodes.
Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
for (String nsId : emptyAsSingletonNull(nameserviceIds)) {
for (String nsId : emptyAsSingletonNull(nsIds)) {
Map<String, InetSocketAddress> isas =
getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
if (!isas.isEmpty()) {
@ -774,8 +786,7 @@ public static Map<String, Map<String, InetSocketAddress>> getSecondaryNameNodeAd
/**
* Returns list of InetSocketAddresses corresponding to namenodes from the
* configuration. Note this is to be used by datanodes to get the list of
* namenode addresses to talk to.
* configuration.
*
* Returns namenode address specifically configured for datanodes (using
* service ports), if found. If not, regular RPC address configured for other
@ -806,7 +817,60 @@ public static Map<String, Map<String, InetSocketAddress>> getNNServiceRpcAddress
}
return addressList;
}
/**
* Returns list of InetSocketAddresses corresponding to the namenode
* that manages this cluster. Note this is to be used by datanodes to get
* the list of namenode addresses to talk to.
*
* Returns namenode address specifically configured for datanodes (using
* service ports), if found. If not, regular RPC address configured for other
* clients is returned.
*
* @param conf configuration
* @return list of InetSocketAddress
* @throws IOException on error
*/
public static Map<String, Map<String, InetSocketAddress>>
getNNServiceRpcAddressesForCluster(Configuration conf) throws IOException {
// Use default address as fall back
String defaultAddress;
try {
defaultAddress = NetUtils.getHostPortString(NameNode.getAddress(conf));
} catch (IllegalArgumentException e) {
defaultAddress = null;
}
Collection<String> parentNameServices = conf.getTrimmedStringCollection
(DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
if (parentNameServices.isEmpty()) {
parentNameServices = conf.getTrimmedStringCollection
(DFSConfigKeys.DFS_NAMESERVICES);
} else {
// Ensure that the internal service is ineed in the list of all available
// nameservices.
Set<String> availableNameServices = Sets.newHashSet(conf
.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES));
for (String nsId : parentNameServices) {
if (!availableNameServices.contains(nsId)) {
throw new IOException("Unknown nameservice: " + nsId);
}
}
}
Map<String, Map<String, InetSocketAddress>> addressList =
getAddressesForNsIds(conf, parentNameServices, defaultAddress,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
if (addressList.isEmpty()) {
throw new IOException("Incorrect configuration: namenode address "
+ DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "
+ DFS_NAMENODE_RPC_ADDRESS_KEY
+ " is not configured.");
}
return addressList;
}
/**
* Flatten the given map, as returned by other functions in this class,
* into a flat list of {@link ConfiguredNNAddress} instances.

View File

@ -149,12 +149,12 @@ void joinAll() {
void refreshNamenodes(Configuration conf)
throws IOException {
LOG.info("Refresh request received for nameservices: "
+ conf.get(DFSConfigKeys.DFS_NAMESERVICES));
Map<String, Map<String, InetSocketAddress>> newAddressMap =
DFSUtil.getNNServiceRpcAddresses(conf);
LOG.info("Refresh request received for nameservices: " + conf.get
(DFSConfigKeys.DFS_NAMESERVICES));
Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
.getNNServiceRpcAddressesForCluster(conf);
synchronized (refreshNamenodesLock) {
doRefreshNamenodes(newAddressMap);
}

View File

@ -186,7 +186,7 @@ int doWorkInternal(GetConf tool, String[] args) throws Exception {
static class NameNodesCommandHandler extends CommandHandler {
@Override
int doWorkInternal(GetConf tool, String []args) throws IOException {
tool.printMap(DFSUtil.getNNServiceRpcAddresses(tool.getConf()));
tool.printMap(DFSUtil.getNNServiceRpcAddressesForCluster(tool.getConf()));
return 0;
}
}
@ -223,7 +223,7 @@ static class NNRpcAddressesCommandHandler extends CommandHandler {
public int doWorkInternal(GetConf tool, String []args) throws IOException {
Configuration config = tool.getConf();
List<ConfiguredNNAddress> cnnlist = DFSUtil.flattenAddressMap(
DFSUtil.getNNServiceRpcAddresses(config));
DFSUtil.getNNServiceRpcAddressesForCluster(config));
if (!cnnlist.isEmpty()) {
for (ConfiguredNNAddress cnn : cnnlist) {
InetSocketAddress rpc = cnn.getAddress();

View File

@ -1115,6 +1115,16 @@
</description>
</property>
<property>
<name>dfs.internal.nameservices</name>
<value></value>
<description>
Comma-separated list of nameservices that belong to this cluster.
Datanode will report to all the nameservices in this list. By default
this is set to the value of dfs.nameservices.
</description>
</property>
<property>
<name>dfs.ha.namenodes.EXAMPLENAMESERVICE</name>
<value></value>

View File

@ -21,6 +21,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
@ -865,4 +866,29 @@ public void testGetPassword() throws Exception {
// let's make sure that a password that doesn't exist returns null
Assert.assertEquals(null, DFSUtil.getPassword(conf,"invalid-alias"));
}
@Test
public void testGetNNServiceRpcAddressesForNsIds() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.set(DFS_NAMESERVICES, "nn1,nn2");
conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn1");
// Test - configured list of namenodes are returned
final String NN1_ADDRESS = "localhost:9000";
final String NN2_ADDRESS = "localhost:9001";
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn1"),
NN1_ADDRESS);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"),
NN2_ADDRESS);
Map<String, Map<String, InetSocketAddress>> nnMap = DFSUtil
.getNNServiceRpcAddressesForCluster(conf);
assertEquals(1, nnMap.size());
assertTrue(nnMap.containsKey("nn1"));
conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn3");
try {
DFSUtil.getNNServiceRpcAddressesForCluster(conf);
fail("Should fail for misconfiguration");
} catch (IOException ignored) {
}
}
}

View File

@ -23,15 +23,18 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -130,6 +133,25 @@ public void testFederationRefresh() throws Exception {
"refresh #2\n", log.toString());
}
@Test
public void testInternalNameService() throws Exception {
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1,ns2,ns3");
addNN(conf, "ns1", "mock1:8020");
addNN(conf, "ns2", "mock1:8020");
addNN(conf, "ns3", "mock1:8020");
conf.set(DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY, "ns1");
bpm.refreshNamenodes(conf);
assertEquals("create #1\n", log.toString());
@SuppressWarnings("unchecked")
Map<String, BPOfferService> map = (Map<String, BPOfferService>) Whitebox
.getInternalState(bpm, "bpByNameserviceId");
Assert.assertFalse(map.containsKey("ns2"));
Assert.assertFalse(map.containsKey("ns3"));
Assert.assertTrue(map.containsKey("ns1"));
log.setLength(0);
}
private static void addNN(Configuration conf, String ns, String addr) {
String key = DFSUtil.addKeySuffixes(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, ns);

View File

@ -18,11 +18,13 @@
package org.apache.hadoop.hdfs.tools;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -121,13 +123,13 @@ private Map<String, Map<String, InetSocketAddress>> getAddressListFromConf(
TestType type, HdfsConfiguration conf) throws IOException {
switch (type) {
case NAMENODE:
return DFSUtil.getNNServiceRpcAddresses(conf);
return DFSUtil.getNNServiceRpcAddressesForCluster(conf);
case BACKUP:
return DFSUtil.getBackupNodeAddresses(conf);
case SECONDARY:
return DFSUtil.getSecondaryNameNodeAddresses(conf);
case NNRPCADDRESSES:
return DFSUtil.getNNServiceRpcAddresses(conf);
return DFSUtil.getNNServiceRpcAddressesForCluster(conf);
}
return null;
}
@ -226,7 +228,7 @@ private void verifyAddresses(HdfsConfiguration conf, TestType type,
String[] actual = toStringArray(list);
Arrays.sort(actual);
Arrays.sort(expected);
assertTrue(Arrays.equals(expected, actual));
assertArrayEquals(expected, actual);
// Test GetConf returned addresses
getAddressListFromTool(type, conf, checkPort, list);
@ -425,7 +427,23 @@ public void TestGetConfIncludeCommand() throws Exception{
assertEquals(hostsFile.toUri().getPath(),ret.trim());
cleanupFile(localFileSys, excludeFile.getParent());
}
@Test
public void testIncludeInternalNameServices() throws Exception {
final int nsCount = 10;
final int remoteNsCount = 4;
HdfsConfiguration conf = new HdfsConfiguration();
setupNameServices(conf, nsCount);
setupAddress(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsCount, 1000);
setupAddress(conf, DFS_NAMENODE_RPC_ADDRESS_KEY, nsCount, 1500);
conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "ns1");
setupStaticHostResolution(nsCount);
String[] includedNN = new String[] {"nn1:1001"};
verifyAddresses(conf, TestType.NAMENODE, false, includedNN);
verifyAddresses(conf, TestType.NNRPCADDRESSES, true, includedNN);
}
private void writeConfigFile(Path name, ArrayList<String> nodes)
throws IOException {
// delete if it already exists