HDFS-9365. Balaner does not work with the HDFS-6376 HA setup.
This commit is contained in:
parent
108db38788
commit
076a73f207
|
@ -709,9 +709,16 @@ public class DFSUtil {
|
||||||
"nnId=" + namenodeId + ";addr=" + addr + "]";
|
"nnId=" + namenodeId + ";addr=" + addr + "]";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @return Internal name services specified in the conf. */
|
||||||
|
static Collection<String> getInternalNameServices(Configuration conf) {
|
||||||
|
final Collection<String> ids = conf.getTrimmedStringCollection(
|
||||||
|
DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
|
||||||
|
return !ids.isEmpty()? ids: DFSUtilClient.getNameServiceIds(conf);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a URI for each configured nameservice. If a nameservice is
|
* Get a URI for each internal nameservice. If a nameservice is
|
||||||
* HA-enabled, then the logical URI of the nameservice is returned. If the
|
* HA-enabled, then the logical URI of the nameservice is returned. If the
|
||||||
* nameservice is not HA-enabled, then a URI corresponding to an RPC address
|
* nameservice is not HA-enabled, then a URI corresponding to an RPC address
|
||||||
* of the single NN for that nameservice is returned, preferring the service
|
* of the single NN for that nameservice is returned, preferring the service
|
||||||
|
@ -721,8 +728,8 @@ public class DFSUtil {
|
||||||
* @return a collection of all configured NN URIs, preferring service
|
* @return a collection of all configured NN URIs, preferring service
|
||||||
* addresses
|
* addresses
|
||||||
*/
|
*/
|
||||||
public static Collection<URI> getNsServiceRpcUris(Configuration conf) {
|
public static Collection<URI> getInternalNsRpcUris(Configuration conf) {
|
||||||
return getNameServiceUris(conf,
|
return getNameServiceUris(conf, getInternalNameServices(conf),
|
||||||
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||||
}
|
}
|
||||||
|
@ -738,8 +745,8 @@ public class DFSUtil {
|
||||||
* nameservices
|
* nameservices
|
||||||
* @return a collection of all configured NN URIs
|
* @return a collection of all configured NN URIs
|
||||||
*/
|
*/
|
||||||
public static Collection<URI> getNameServiceUris(Configuration conf,
|
static Collection<URI> getNameServiceUris(Configuration conf,
|
||||||
String... keys) {
|
Collection<String> nameServices, String... keys) {
|
||||||
Set<URI> ret = new HashSet<URI>();
|
Set<URI> ret = new HashSet<URI>();
|
||||||
|
|
||||||
// We're passed multiple possible configuration keys for any given NN or HA
|
// We're passed multiple possible configuration keys for any given NN or HA
|
||||||
|
@ -749,7 +756,7 @@ public class DFSUtil {
|
||||||
// keep track of non-preferred keys here.
|
// keep track of non-preferred keys here.
|
||||||
Set<URI> nonPreferredUris = new HashSet<URI>();
|
Set<URI> nonPreferredUris = new HashSet<URI>();
|
||||||
|
|
||||||
for (String nsId : DFSUtilClient.getNameServiceIds(conf)) {
|
for (String nsId : nameServices) {
|
||||||
if (HAUtil.isHAEnabled(conf, nsId)) {
|
if (HAUtil.isHAEnabled(conf, nsId)) {
|
||||||
// Add the logical URI of the nameservice.
|
// Add the logical URI of the nameservice.
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -743,7 +743,7 @@ public class Balancer {
|
||||||
try {
|
try {
|
||||||
checkReplicationPolicyCompatibility(conf);
|
checkReplicationPolicyCompatibility(conf);
|
||||||
|
|
||||||
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
return Balancer.run(namenodes, parse(args), conf);
|
return Balancer.run(namenodes, parse(args), conf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
System.out.println(e + ". Exiting ...");
|
System.out.println(e + ". Exiting ...");
|
||||||
|
|
|
@ -645,7 +645,7 @@ public class Mover {
|
||||||
} else if (line.hasOption("p")) {
|
} else if (line.hasOption("p")) {
|
||||||
paths = line.getOptionValues("p");
|
paths = line.getOptionValues("p");
|
||||||
}
|
}
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
if (paths == null || paths.length == 0) {
|
if (paths == null || paths.length == 0) {
|
||||||
for (URI namenode : namenodes) {
|
for (URI namenode : namenodes) {
|
||||||
map.put(namenode, null);
|
map.put(namenode, null);
|
||||||
|
|
|
@ -78,6 +78,8 @@ import org.junit.Assume;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
public class TestDFSUtil {
|
public class TestDFSUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -532,7 +534,7 @@ public class TestDFSUtil {
|
||||||
assertEquals(null, DFSUtil.getNamenodeNameServiceId(conf));
|
assertEquals(null, DFSUtil.getNamenodeNameServiceId(conf));
|
||||||
assertEquals(null, DFSUtil.getSecondaryNameServiceId(conf));
|
assertEquals(null, DFSUtil.getSecondaryNameServiceId(conf));
|
||||||
|
|
||||||
Collection<URI> uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
Collection<URI> uris = getInternalNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||||
assertEquals(2, uris.size());
|
assertEquals(2, uris.size());
|
||||||
assertTrue(uris.contains(new URI("hdfs://ns1")));
|
assertTrue(uris.contains(new URI("hdfs://ns1")));
|
||||||
assertTrue(uris.contains(new URI("hdfs://ns2")));
|
assertTrue(uris.contains(new URI("hdfs://ns2")));
|
||||||
|
@ -615,7 +617,13 @@ public class TestDFSUtil {
|
||||||
assertEquals("127.0.0.1:12345",
|
assertEquals("127.0.0.1:12345",
|
||||||
DFSUtil.substituteForWildcardAddress("127.0.0.1:12345", "foo"));
|
DFSUtil.substituteForWildcardAddress("127.0.0.1:12345", "foo"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Collection<URI> getInternalNameServiceUris(Configuration conf,
|
||||||
|
String... keys) {
|
||||||
|
final Collection<String> ids = DFSUtil.getInternalNameServices(conf);
|
||||||
|
return DFSUtil.getNameServiceUris(conf, ids, keys);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test how name service URIs are handled with a variety of configuration
|
* Test how name service URIs are handled with a variety of configuration
|
||||||
* settings
|
* settings
|
||||||
|
@ -639,8 +647,7 @@ public class TestDFSUtil {
|
||||||
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "hdfs://" + NN2_ADDR);
|
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "hdfs://" + NN2_ADDR);
|
||||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
|
||||||
|
|
||||||
Collection<URI> uris = DFSUtil.getNameServiceUris(conf,
|
Collection<URI> uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 2, uris.size());
|
assertEquals("Incorrect number of URIs returned", 2, uris.size());
|
||||||
assertTrue("Missing URI for name service ns1",
|
assertTrue("Missing URI for name service ns1",
|
||||||
|
@ -664,8 +671,7 @@ public class TestDFSUtil {
|
||||||
|
|
||||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN2_ADDR);
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN2_ADDR);
|
||||||
|
|
||||||
uris = DFSUtil.getNameServiceUris(conf,
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 3, uris.size());
|
assertEquals("Incorrect number of URIs returned", 3, uris.size());
|
||||||
assertTrue("Missing URI for name service ns1",
|
assertTrue("Missing URI for name service ns1",
|
||||||
|
@ -679,8 +685,7 @@ public class TestDFSUtil {
|
||||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
|
||||||
"viewfs://vfs-name.example.com");
|
"viewfs://vfs-name.example.com");
|
||||||
|
|
||||||
uris = DFSUtil.getNameServiceUris(conf,
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 3, uris.size());
|
assertEquals("Incorrect number of URIs returned", 3, uris.size());
|
||||||
assertTrue("Missing URI for name service ns1",
|
assertTrue("Missing URI for name service ns1",
|
||||||
|
@ -694,8 +699,7 @@ public class TestDFSUtil {
|
||||||
// entries being returned.
|
// entries being returned.
|
||||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1");
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1");
|
||||||
|
|
||||||
uris = DFSUtil.getNameServiceUris(conf,
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 3, uris.size());
|
assertEquals("Incorrect number of URIs returned", 3, uris.size());
|
||||||
assertTrue("Missing URI for name service ns1",
|
assertTrue("Missing URI for name service ns1",
|
||||||
|
@ -709,8 +713,7 @@ public class TestDFSUtil {
|
||||||
conf = new HdfsConfiguration();
|
conf = new HdfsConfiguration();
|
||||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
|
||||||
|
|
||||||
uris = DFSUtil.getNameServiceUris(conf,
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
||||||
assertTrue("Missing URI for RPC address (defaultFS)",
|
assertTrue("Missing URI for RPC address (defaultFS)",
|
||||||
|
@ -720,8 +723,7 @@ public class TestDFSUtil {
|
||||||
// and the default FS is given.
|
// and the default FS is given.
|
||||||
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, NN2_ADDR);
|
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, NN2_ADDR);
|
||||||
|
|
||||||
uris = DFSUtil.getNameServiceUris(conf,
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
||||||
assertTrue("Missing URI for RPC address",
|
assertTrue("Missing URI for RPC address",
|
||||||
|
@ -733,8 +735,7 @@ public class TestDFSUtil {
|
||||||
// returned.
|
// returned.
|
||||||
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_ADDR);
|
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_ADDR);
|
||||||
|
|
||||||
uris = DFSUtil.getNameServiceUris(conf,
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
||||||
assertTrue("Missing URI for service ns1",
|
assertTrue("Missing URI for service ns1",
|
||||||
|
@ -746,8 +747,7 @@ public class TestDFSUtil {
|
||||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
|
||||||
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_SRVC_ADDR);
|
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_SRVC_ADDR);
|
||||||
|
|
||||||
uris = DFSUtil.getNameServiceUris(conf,
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
||||||
assertTrue("Missing URI for service address",
|
assertTrue("Missing URI for service address",
|
||||||
|
@ -763,7 +763,7 @@ public class TestDFSUtil {
|
||||||
// it will automatically convert it to hostname
|
// it will automatically convert it to hostname
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:8020");
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:8020");
|
||||||
Collection<URI> uris = DFSUtil.getNameServiceUris(conf);
|
Collection<URI> uris = getInternalNameServiceUris(conf);
|
||||||
assertEquals(1, uris.size());
|
assertEquals(1, uris.size());
|
||||||
for (URI uri : uris) {
|
for (URI uri : uris) {
|
||||||
assertThat(uri.getHost(), not("127.0.0.1"));
|
assertThat(uri.getHost(), not("127.0.0.1"));
|
||||||
|
@ -950,10 +950,19 @@ public class TestDFSUtil {
|
||||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"),
|
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"),
|
||||||
NN2_ADDRESS);
|
NN2_ADDRESS);
|
||||||
|
|
||||||
|
{
|
||||||
|
Collection<String> internal = DFSUtil.getInternalNameServices(conf);
|
||||||
|
assertEquals(Sets.newHashSet("nn1"), internal);
|
||||||
|
|
||||||
|
Collection<String> all = DFSUtilClient.getNameServiceIds(conf);
|
||||||
|
assertEquals(Sets.newHashSet("nn1", "nn2"), all);
|
||||||
|
}
|
||||||
|
|
||||||
Map<String, Map<String, InetSocketAddress>> nnMap = DFSUtil
|
Map<String, Map<String, InetSocketAddress>> nnMap = DFSUtil
|
||||||
.getNNServiceRpcAddressesForCluster(conf);
|
.getNNServiceRpcAddressesForCluster(conf);
|
||||||
assertEquals(1, nnMap.size());
|
assertEquals(1, nnMap.size());
|
||||||
assertTrue(nnMap.containsKey("nn1"));
|
assertTrue(nnMap.containsKey("nn1"));
|
||||||
|
|
||||||
conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn3");
|
conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn3");
|
||||||
try {
|
try {
|
||||||
DFSUtil.getNNServiceRpcAddressesForCluster(conf);
|
DFSUtil.getNNServiceRpcAddressesForCluster(conf);
|
||||||
|
|
|
@ -390,7 +390,7 @@ public class TestBalancer {
|
||||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||||
|
|
||||||
// start rebalancing
|
// start rebalancing
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
||||||
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
|
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
|
||||||
}
|
}
|
||||||
|
@ -474,7 +474,7 @@ public class TestBalancer {
|
||||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||||
|
|
||||||
// start rebalancing
|
// start rebalancing
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
||||||
BlockPlacementPolicy placementPolicy =
|
BlockPlacementPolicy placementPolicy =
|
||||||
cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy();
|
cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy();
|
||||||
|
@ -782,7 +782,7 @@ public class TestBalancer {
|
||||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||||
|
|
||||||
// start rebalancing
|
// start rebalancing
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
final int r = runBalancer(namenodes, p, conf);
|
final int r = runBalancer(namenodes, p, conf);
|
||||||
if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
|
||||||
|
@ -979,7 +979,7 @@ public class TestBalancer {
|
||||||
new String[]{RACK0}, null,new long[]{CAPACITY});
|
new String[]{RACK0}, null,new long[]{CAPACITY});
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
|
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
Set<String> datanodes = new HashSet<String>();
|
Set<String> datanodes = new HashSet<String>();
|
||||||
datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
|
datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
|
||||||
BalancerParameters.Builder pBuilder =
|
BalancerParameters.Builder pBuilder =
|
||||||
|
@ -1446,7 +1446,7 @@ public class TestBalancer {
|
||||||
null, null, storageCapacities, null, false, false, false, null);
|
null, null, storageCapacities, null, false, false, false, null);
|
||||||
|
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
|
||||||
// Run Balancer
|
// Run Balancer
|
||||||
final BalancerParameters p = BalancerParameters.DEFAULT;
|
final BalancerParameters p = BalancerParameters.DEFAULT;
|
||||||
|
@ -1493,7 +1493,7 @@ public class TestBalancer {
|
||||||
// Add another DN with the same capacity, cluster is now unbalanced
|
// Add another DN with the same capacity, cluster is now unbalanced
|
||||||
cluster.startDataNodes(conf, 1, true, null, null);
|
cluster.startDataNodes(conf, 1, true, null, null);
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
|
||||||
// Run balancer
|
// Run balancer
|
||||||
final BalancerParameters p = BalancerParameters.DEFAULT;
|
final BalancerParameters p = BalancerParameters.DEFAULT;
|
||||||
|
@ -1572,7 +1572,7 @@ public class TestBalancer {
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
|
|
||||||
BalancerParameters p = BalancerParameters.DEFAULT;
|
BalancerParameters p = BalancerParameters.DEFAULT;
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
final int r = Balancer.run(namenodes, p, conf);
|
final int r = Balancer.run(namenodes, p, conf);
|
||||||
|
|
||||||
// Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
|
// Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
|
||||||
|
@ -1690,7 +1690,7 @@ public class TestBalancer {
|
||||||
LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length);
|
LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length);
|
||||||
waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster);
|
waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster);
|
||||||
|
|
||||||
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
|
||||||
{ // run Balancer with min-block-size=50
|
{ // run Balancer with min-block-size=50
|
||||||
BalancerParameters.Builder b =
|
BalancerParameters.Builder b =
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class TestBalancerWithHANameNodes {
|
||||||
totalCapacity += newNodeCapacity;
|
totalCapacity += newNodeCapacity;
|
||||||
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
|
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
|
||||||
cluster);
|
cluster);
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
assertEquals(1, namenodes.size());
|
assertEquals(1, namenodes.size());
|
||||||
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
|
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
|
||||||
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
||||||
|
|
|
@ -168,7 +168,7 @@ public class TestBalancerWithMultipleNameNodes {
|
||||||
getStorageReports(s);
|
getStorageReports(s);
|
||||||
|
|
||||||
// start rebalancing
|
// start rebalancing
|
||||||
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
|
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(s.conf);
|
||||||
final int r = Balancer.run(namenodes, s.parameters, s.conf);
|
final int r = Balancer.run(namenodes, s.parameters, s.conf);
|
||||||
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
||||||
|
|
||||||
|
|
|
@ -174,7 +174,7 @@ public class TestBalancerWithNodeGroup {
|
||||||
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
||||||
|
|
||||||
// start rebalancing
|
// start rebalancing
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
||||||
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
||||||
|
|
||||||
|
@ -188,7 +188,7 @@ public class TestBalancerWithNodeGroup {
|
||||||
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
||||||
|
|
||||||
// start rebalancing
|
// start rebalancing
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
||||||
Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() ||
|
Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() ||
|
||||||
(r == ExitStatus.NO_MOVE_PROGRESS.getExitCode()));
|
(r == ExitStatus.NO_MOVE_PROGRESS.getExitCode()));
|
||||||
|
|
|
@ -63,7 +63,7 @@ public class TestMover {
|
||||||
}
|
}
|
||||||
|
|
||||||
static Mover newMover(Configuration conf) throws IOException {
|
static Mover newMover(Configuration conf) throws IOException {
|
||||||
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
Assert.assertEquals(1, namenodes.size());
|
Assert.assertEquals(1, namenodes.size());
|
||||||
Map<URI, List<Path>> nnMap = Maps.newHashMap();
|
Map<URI, List<Path>> nnMap = Maps.newHashMap();
|
||||||
for (URI nn : namenodes) {
|
for (URI nn : namenodes) {
|
||||||
|
@ -182,7 +182,7 @@ public class TestMover {
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf);
|
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf);
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
Assert.assertEquals(1, namenodes.size());
|
Assert.assertEquals(1, namenodes.size());
|
||||||
Assert.assertEquals(1, movePaths.size());
|
Assert.assertEquals(1, movePaths.size());
|
||||||
URI nn = namenodes.iterator().next();
|
URI nn = namenodes.iterator().next();
|
||||||
|
@ -190,7 +190,7 @@ public class TestMover {
|
||||||
Assert.assertNull(movePaths.get(nn));
|
Assert.assertNull(movePaths.get(nn));
|
||||||
|
|
||||||
movePaths = Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo", "/bar");
|
movePaths = Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo", "/bar");
|
||||||
namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
Assert.assertEquals(1, movePaths.size());
|
Assert.assertEquals(1, movePaths.size());
|
||||||
nn = namenodes.iterator().next();
|
nn = namenodes.iterator().next();
|
||||||
Assert.assertTrue(movePaths.containsKey(nn));
|
Assert.assertTrue(movePaths.containsKey(nn));
|
||||||
|
@ -211,7 +211,7 @@ public class TestMover {
|
||||||
try {
|
try {
|
||||||
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf,
|
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf,
|
||||||
"-p", "/foo", "/bar");
|
"-p", "/foo", "/bar");
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
Assert.assertEquals(1, namenodes.size());
|
Assert.assertEquals(1, namenodes.size());
|
||||||
Assert.assertEquals(1, movePaths.size());
|
Assert.assertEquals(1, movePaths.size());
|
||||||
URI nn = namenodes.iterator().next();
|
URI nn = namenodes.iterator().next();
|
||||||
|
@ -232,7 +232,7 @@ public class TestMover {
|
||||||
final Configuration conf = new HdfsConfiguration();
|
final Configuration conf = new HdfsConfiguration();
|
||||||
DFSTestUtil.setFederatedConfiguration(cluster, conf);
|
DFSTestUtil.setFederatedConfiguration(cluster, conf);
|
||||||
try {
|
try {
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
Assert.assertEquals(3, namenodes.size());
|
Assert.assertEquals(3, namenodes.size());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -280,7 +280,7 @@ public class TestMover {
|
||||||
final Configuration conf = new HdfsConfiguration();
|
final Configuration conf = new HdfsConfiguration();
|
||||||
DFSTestUtil.setFederatedHAConfiguration(cluster, conf);
|
DFSTestUtil.setFederatedHAConfiguration(cluster, conf);
|
||||||
try {
|
try {
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
Assert.assertEquals(3, namenodes.size());
|
Assert.assertEquals(3, namenodes.size());
|
||||||
|
|
||||||
Iterator<URI> iter = namenodes.iterator();
|
Iterator<URI> iter = namenodes.iterator();
|
||||||
|
|
|
@ -270,7 +270,7 @@ public class TestStorageMover {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runMover(ExitStatus expectedExitCode) throws Exception {
|
private void runMover(ExitStatus expectedExitCode) throws Exception {
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
Map<URI, List<Path>> nnMap = Maps.newHashMap();
|
Map<URI, List<Path>> nnMap = Maps.newHashMap();
|
||||||
for (URI nn : namenodes) {
|
for (URI nn : namenodes) {
|
||||||
nnMap.put(nn, null);
|
nnMap.put(nn, null);
|
||||||
|
|
Loading…
Reference in New Issue