HDFS-9365. Balaner does not work with the HDFS-6376 HA setup.

This commit is contained in:
Tsz-Wo Nicholas Sze 2016-05-24 12:49:48 -07:00
parent f979d779e1
commit 15ed080e36
10 changed files with 63 additions and 48 deletions

View File

@ -708,9 +708,16 @@ public class DFSUtil {
"nnId=" + namenodeId + ";addr=" + addr + "]";
}
}
/** @return Internal name services specified in the conf. */
static Collection<String> getInternalNameServices(Configuration conf) {
final Collection<String> ids = conf.getTrimmedStringCollection(
DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
return !ids.isEmpty()? ids: DFSUtilClient.getNameServiceIds(conf);
}
/**
* Get a URI for each configured nameservice. If a nameservice is
* Get a URI for each internal nameservice. If a nameservice is
* HA-enabled, then the logical URI of the nameservice is returned. If the
* nameservice is not HA-enabled, then a URI corresponding to an RPC address
* of the single NN for that nameservice is returned, preferring the service
@ -720,8 +727,8 @@ public class DFSUtil {
* @return a collection of all configured NN URIs, preferring service
* addresses
*/
public static Collection<URI> getNsServiceRpcUris(Configuration conf) {
return getNameServiceUris(conf,
public static Collection<URI> getInternalNsRpcUris(Configuration conf) {
return getNameServiceUris(conf, getInternalNameServices(conf),
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}
@ -737,8 +744,8 @@ public class DFSUtil {
* nameservices
* @return a collection of all configured NN URIs
*/
public static Collection<URI> getNameServiceUris(Configuration conf,
String... keys) {
static Collection<URI> getNameServiceUris(Configuration conf,
Collection<String> nameServices, String... keys) {
Set<URI> ret = new HashSet<URI>();
// We're passed multiple possible configuration keys for any given NN or HA
@ -748,7 +755,7 @@ public class DFSUtil {
// keep track of non-preferred keys here.
Set<URI> nonPreferredUris = new HashSet<URI>();
for (String nsId : DFSUtilClient.getNameServiceIds(conf)) {
for (String nsId : nameServices) {
if (HAUtil.isHAEnabled(conf, nsId)) {
// Add the logical URI of the nameservice.
try {

View File

@ -764,7 +764,7 @@ public class Balancer {
try {
checkReplicationPolicyCompatibility(conf);
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
return Balancer.run(namenodes, parse(args), conf);
} catch (IOException e) {
System.out.println(e + ". Exiting ...");

View File

@ -666,7 +666,7 @@ public class Mover {
} else if (line.hasOption("p")) {
paths = line.getOptionValues("p");
}
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
if (paths == null || paths.length == 0) {
for (URI namenode : namenodes) {
map.put(namenode, null);

View File

@ -78,6 +78,8 @@ import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Sets;
public class TestDFSUtil {
/**
@ -532,7 +534,7 @@ public class TestDFSUtil {
assertEquals(null, DFSUtil.getNamenodeNameServiceId(conf));
assertEquals(null, DFSUtil.getSecondaryNameServiceId(conf));
Collection<URI> uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY);
Collection<URI> uris = getInternalNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY);
assertEquals(2, uris.size());
assertTrue(uris.contains(new URI("hdfs://ns1")));
assertTrue(uris.contains(new URI("hdfs://ns2")));
@ -615,7 +617,13 @@ public class TestDFSUtil {
assertEquals("127.0.0.1:12345",
DFSUtil.substituteForWildcardAddress("127.0.0.1:12345", "foo"));
}
private static Collection<URI> getInternalNameServiceUris(Configuration conf,
String... keys) {
final Collection<String> ids = DFSUtil.getInternalNameServices(conf);
return DFSUtil.getNameServiceUris(conf, ids, keys);
}
/**
* Test how name service URIs are handled with a variety of configuration
* settings
@ -639,8 +647,7 @@ public class TestDFSUtil {
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "hdfs://" + NN2_ADDR);
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
Collection<URI> uris = DFSUtil.getNameServiceUris(conf,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
Collection<URI> uris = DFSUtil.getInternalNsRpcUris(conf);
assertEquals("Incorrect number of URIs returned", 2, uris.size());
assertTrue("Missing URI for name service ns1",
@ -664,8 +671,7 @@ public class TestDFSUtil {
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN2_ADDR);
uris = DFSUtil.getNameServiceUris(conf,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
uris = DFSUtil.getInternalNsRpcUris(conf);
assertEquals("Incorrect number of URIs returned", 3, uris.size());
assertTrue("Missing URI for name service ns1",
@ -679,8 +685,7 @@ public class TestDFSUtil {
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
"viewfs://vfs-name.example.com");
uris = DFSUtil.getNameServiceUris(conf,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
uris = DFSUtil.getInternalNsRpcUris(conf);
assertEquals("Incorrect number of URIs returned", 3, uris.size());
assertTrue("Missing URI for name service ns1",
@ -694,8 +699,7 @@ public class TestDFSUtil {
// entries being returned.
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1");
uris = DFSUtil.getNameServiceUris(conf,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
uris = DFSUtil.getInternalNsRpcUris(conf);
assertEquals("Incorrect number of URIs returned", 3, uris.size());
assertTrue("Missing URI for name service ns1",
@ -709,8 +713,7 @@ public class TestDFSUtil {
conf = new HdfsConfiguration();
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
uris = DFSUtil.getNameServiceUris(conf,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
uris = DFSUtil.getInternalNsRpcUris(conf);
assertEquals("Incorrect number of URIs returned", 1, uris.size());
assertTrue("Missing URI for RPC address (defaultFS)",
@ -720,8 +723,7 @@ public class TestDFSUtil {
// and the default FS is given.
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, NN2_ADDR);
uris = DFSUtil.getNameServiceUris(conf,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
uris = DFSUtil.getInternalNsRpcUris(conf);
assertEquals("Incorrect number of URIs returned", 1, uris.size());
assertTrue("Missing URI for RPC address",
@ -733,8 +735,7 @@ public class TestDFSUtil {
// returned.
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_ADDR);
uris = DFSUtil.getNameServiceUris(conf,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
uris = DFSUtil.getInternalNsRpcUris(conf);
assertEquals("Incorrect number of URIs returned", 1, uris.size());
assertTrue("Missing URI for service ns1",
@ -746,8 +747,7 @@ public class TestDFSUtil {
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_SRVC_ADDR);
uris = DFSUtil.getNameServiceUris(conf,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
uris = DFSUtil.getInternalNsRpcUris(conf);
assertEquals("Incorrect number of URIs returned", 1, uris.size());
assertTrue("Missing URI for service address",
@ -763,7 +763,7 @@ public class TestDFSUtil {
// it will automatically convert it to hostname
HdfsConfiguration conf = new HdfsConfiguration();
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:9820");
Collection<URI> uris = DFSUtil.getNameServiceUris(conf);
Collection<URI> uris = getInternalNameServiceUris(conf);
assertEquals(1, uris.size());
for (URI uri : uris) {
assertThat(uri.getHost(), not("127.0.0.1"));
@ -950,10 +950,19 @@ public class TestDFSUtil {
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"),
NN2_ADDRESS);
{
Collection<String> internal = DFSUtil.getInternalNameServices(conf);
assertEquals(Sets.newHashSet("nn1"), internal);
Collection<String> all = DFSUtilClient.getNameServiceIds(conf);
assertEquals(Sets.newHashSet("nn1", "nn2"), all);
}
Map<String, Map<String, InetSocketAddress>> nnMap = DFSUtil
.getNNServiceRpcAddressesForCluster(conf);
assertEquals(1, nnMap.size());
assertTrue(nnMap.containsKey("nn1"));
conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn3");
try {
DFSUtil.getNNServiceRpcAddressesForCluster(conf);

View File

@ -494,7 +494,7 @@ public class TestBalancer {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
}
@ -578,7 +578,7 @@ public class TestBalancer {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
BlockPlacementPolicy placementPolicy =
cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy();
@ -886,7 +886,7 @@ public class TestBalancer {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
final int r = runBalancer(namenodes, p, conf);
if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
@ -1088,7 +1088,7 @@ public class TestBalancer {
new String[]{RACK0}, null,new long[]{CAPACITY});
cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Set<String> datanodes = new HashSet<String>();
datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
BalancerParameters.Builder pBuilder =
@ -1553,7 +1553,7 @@ public class TestBalancer {
null, null, storageCapacities, null, false, false, false, null);
cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
// Run Balancer
final BalancerParameters p = BalancerParameters.DEFAULT;
@ -1600,7 +1600,7 @@ public class TestBalancer {
// Add another DN with the same capacity, cluster is now unbalanced
cluster.startDataNodes(conf, 1, true, null, null);
cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
// Run balancer
final BalancerParameters p = BalancerParameters.DEFAULT;
@ -1679,7 +1679,7 @@ public class TestBalancer {
cluster.triggerHeartbeats();
BalancerParameters p = BalancerParameters.DEFAULT;
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
final int r = Balancer.run(namenodes, p, conf);
// Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
@ -1797,7 +1797,7 @@ public class TestBalancer {
LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length);
waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster);
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
{ // run Balancer with min-block-size=50
BalancerParameters.Builder b =
@ -1923,7 +1923,6 @@ public class TestBalancer {
// run balancer and validate results
BalancerParameters p = BalancerParameters.DEFAULT;
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
runBalancer(conf, totalUsedSpace, totalCapacity, p, 0);
// verify locations of striped blocks

View File

@ -94,7 +94,7 @@ public class TestBalancerWithHANameNodes {
totalCapacity += newNodeCapacity;
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
cluster);
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
assertEquals(1, namenodes.size());
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);

View File

@ -168,7 +168,7 @@ public class TestBalancerWithMultipleNameNodes {
getStorageReports(s);
// start rebalancing
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(s.conf);
final int r = Balancer.run(namenodes, s.parameters, s.conf);
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r);

View File

@ -174,7 +174,7 @@ public class TestBalancerWithNodeGroup {
waitForHeartBeat(totalUsedSpace, totalCapacity);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
@ -188,7 +188,7 @@ public class TestBalancerWithNodeGroup {
waitForHeartBeat(totalUsedSpace, totalCapacity);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() ||
(r == ExitStatus.NO_MOVE_PROGRESS.getExitCode()));

View File

@ -69,7 +69,7 @@ public class TestMover {
}
static Mover newMover(Configuration conf) throws IOException {
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size());
Map<URI, List<Path>> nnMap = Maps.newHashMap();
for (URI nn : namenodes) {
@ -188,7 +188,7 @@ public class TestMover {
}
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf);
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size());
Assert.assertEquals(1, movePaths.size());
URI nn = namenodes.iterator().next();
@ -196,7 +196,7 @@ public class TestMover {
Assert.assertNull(movePaths.get(nn));
movePaths = Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo", "/bar");
namenodes = DFSUtil.getNsServiceRpcUris(conf);
namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, movePaths.size());
nn = namenodes.iterator().next();
Assert.assertTrue(movePaths.containsKey(nn));
@ -217,7 +217,7 @@ public class TestMover {
try {
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf,
"-p", "/foo", "/bar");
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size());
Assert.assertEquals(1, movePaths.size());
URI nn = namenodes.iterator().next();
@ -238,7 +238,7 @@ public class TestMover {
final Configuration conf = new HdfsConfiguration();
DFSTestUtil.setFederatedConfiguration(cluster, conf);
try {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(3, namenodes.size());
try {
@ -286,7 +286,7 @@ public class TestMover {
final Configuration conf = new HdfsConfiguration();
DFSTestUtil.setFederatedHAConfiguration(cluster, conf);
try {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(3, namenodes.size());
Iterator<URI> iter = namenodes.iterator();

View File

@ -267,7 +267,7 @@ public class TestStorageMover {
}
private void runMover(ExitStatus expectedExitCode) throws Exception {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Map<URI, List<Path>> nnMap = Maps.newHashMap();
for (URI nn : namenodes) {
nnMap.put(nn, null);