HDFS-9365. Balaner does not work with the HDFS-6376 HA setup.

(cherry picked from commit d3bdea7f7f94332ffe51fb65eec1f219fbf6657f)
This commit is contained in:
Tsz-Wo Nicholas Sze 2016-05-24 12:49:48 -07:00 committed by Sangjin Lee
parent 020ef7f5bb
commit bddc6cd3fb
11 changed files with 57 additions and 35 deletions

View File

@ -42,6 +42,9 @@ Release 2.6.5 - UNRELEASED
HDFS-10178. Permanent write failures can happen if pipeline recoveries HDFS-10178. Permanent write failures can happen if pipeline recoveries
occur for the first packet (kihwal) occur for the first packet (kihwal)
HDFS-9365. Balancer does not work with the HDFS-6376 HA setup. (Tsz Wo
Nicholas Sze)
Release 2.6.4 - 2016-02-11 Release 2.6.4 - 2016-02-11
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -963,6 +963,13 @@ public String toString() {
} }
} }
/** @return Internal name services specified in the conf. */
static Collection<String> getInternalNameServices(Configuration conf) {
final Collection<String> ids = conf.getTrimmedStringCollection(
DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
return !ids.isEmpty()? ids: getNameServiceIds(conf);
}
/** /**
* Get a URI for each internal nameservice. If a nameservice is * Get a URI for each internal nameservice. If a nameservice is
* HA-enabled, and the configured failover proxy provider supports logical * HA-enabled, and the configured failover proxy provider supports logical
@ -975,8 +982,8 @@ public String toString() {
* @return a collection of all configured NN URIs, preferring service * @return a collection of all configured NN URIs, preferring service
* addresses * addresses
*/ */
public static Collection<URI> getNsServiceRpcUris(Configuration conf) { public static Collection<URI> getInternalNsRpcUris(Configuration conf) {
return getNameServiceUris(conf, return getNameServiceUris(conf, getInternalNameServices(conf),
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
} }
@ -993,8 +1000,8 @@ public static Collection<URI> getNsServiceRpcUris(Configuration conf) {
* nameservices * nameservices
* @return a collection of all configured NN URIs * @return a collection of all configured NN URIs
*/ */
public static Collection<URI> getNameServiceUris(Configuration conf, static Collection<URI> getNameServiceUris(Configuration conf,
String... keys) { Collection<String> nameServices, String... keys) {
Set<URI> ret = new HashSet<URI>(); Set<URI> ret = new HashSet<URI>();
// We're passed multiple possible configuration keys for any given NN or HA // We're passed multiple possible configuration keys for any given NN or HA
@ -1004,7 +1011,7 @@ public static Collection<URI> getNameServiceUris(Configuration conf,
// keep track of non-preferred keys here. // keep track of non-preferred keys here.
Set<URI> nonPreferredUris = new HashSet<URI>(); Set<URI> nonPreferredUris = new HashSet<URI>();
for (String nsId : getNameServiceIds(conf)) { for (String nsId : nameServices) {
URI nsUri; URI nsUri;
try { try {
nsUri = new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + nsId); nsUri = new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + nsId);

View File

@ -670,7 +670,7 @@ public int run(String[] args) {
try { try {
checkReplicationPolicyCompatibility(conf); checkReplicationPolicyCompatibility(conf);
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
return Balancer.run(namenodes, parse(args), conf); return Balancer.run(namenodes, parse(args), conf);
} catch (IOException e) { } catch (IOException e) {
System.out.println(e + ". Exiting ..."); System.out.println(e + ". Exiting ...");

View File

@ -577,7 +577,7 @@ private static Map<URI, List<Path>> getNameNodePaths(CommandLine line,
} else if (line.hasOption("p")) { } else if (line.hasOption("p")) {
paths = line.getOptionValues("p"); paths = line.getOptionValues("p");
} }
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
if (paths == null || paths.length == 0) { if (paths == null || paths.length == 0) {
for (URI namenode : namenodes) { for (URI namenode : namenodes) {
map.put(namenode, null); map.put(namenode, null);

View File

@ -79,6 +79,8 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.Sets;
public class TestDFSUtil { public class TestDFSUtil {
/** /**
@ -533,7 +535,7 @@ public void testHANameNodesWithFederation() throws URISyntaxException {
".ns2"; ".ns2";
conf.set(proxyProviderKey, "org.apache.hadoop.hdfs.server.namenode.ha." conf.set(proxyProviderKey, "org.apache.hadoop.hdfs.server.namenode.ha."
+ "ConfiguredFailoverProxyProvider"); + "ConfiguredFailoverProxyProvider");
Collection<URI> uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY); Collection<URI> uris = getInternalNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY);
assertEquals(2, uris.size()); assertEquals(2, uris.size());
assertTrue(uris.contains(new URI("hdfs://ns1"))); assertTrue(uris.contains(new URI("hdfs://ns1")));
assertTrue(uris.contains(new URI("hdfs://ns2"))); assertTrue(uris.contains(new URI("hdfs://ns2")));
@ -617,6 +619,12 @@ public void testSubstituteForWildcardAddress() throws IOException {
DFSUtil.substituteForWildcardAddress("127.0.0.1:12345", "foo")); DFSUtil.substituteForWildcardAddress("127.0.0.1:12345", "foo"));
} }
private static Collection<URI> getInternalNameServiceUris(Configuration conf,
String... keys) {
final Collection<String> ids = DFSUtil.getInternalNameServices(conf);
return DFSUtil.getNameServiceUris(conf, ids, keys);
}
@Test @Test
public void testGetNNUris() throws Exception { public void testGetNNUris() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
@ -659,8 +667,7 @@ public void testGetNNUris() throws Exception {
".ns1"; ".ns1";
conf.set(proxyProviderKey, "org.apache.hadoop.hdfs.server.namenode.ha." conf.set(proxyProviderKey, "org.apache.hadoop.hdfs.server.namenode.ha."
+ "IPFailoverProxyProvider"); + "IPFailoverProxyProvider");
Collection<URI> uris = DFSUtil.getNameServiceUris(conf, Collection<URI> uris = DFSUtil.getInternalNsRpcUris(conf);
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
assertEquals("Incorrect number of URIs returned", 4, uris.size()); assertEquals("Incorrect number of URIs returned", 4, uris.size());
assertTrue("Missing URI for name service ns1", assertTrue("Missing URI for name service ns1",
@ -680,8 +687,7 @@ public void testGetNNUris() throws Exception {
conf.set(proxyProviderKey, "org.apache.hadoop.hdfs.server.namenode.ha." conf.set(proxyProviderKey, "org.apache.hadoop.hdfs.server.namenode.ha."
+ "ConfiguredFailoverProxyProvider"); + "ConfiguredFailoverProxyProvider");
uris = DFSUtil.getNameServiceUris(conf, uris = DFSUtil.getInternalNsRpcUris(conf);
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
assertEquals("Incorrect number of URIs returned", 4, uris.size()); assertEquals("Incorrect number of URIs returned", 4, uris.size());
assertTrue("Missing URI for name service ns1", assertTrue("Missing URI for name service ns1",
@ -697,8 +703,7 @@ public void testGetNNUris() throws Exception {
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
"viewfs://vfs-name.example.com"); "viewfs://vfs-name.example.com");
uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, uris = DFSUtil.getInternalNsRpcUris(conf);
DFS_NAMENODE_RPC_ADDRESS_KEY);
assertEquals(3, uris.size()); assertEquals(3, uris.size());
assertTrue(uris.contains(new URI("hdfs://ns1"))); assertTrue(uris.contains(new URI("hdfs://ns1")));
@ -709,8 +714,7 @@ public void testGetNNUris() throws Exception {
// entries being returned. // entries being returned.
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1"); conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1");
uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, uris = DFSUtil.getInternalNsRpcUris(conf);
DFS_NAMENODE_RPC_ADDRESS_KEY);
assertEquals(3, uris.size()); assertEquals(3, uris.size());
assertTrue(uris.contains(new URI("hdfs://ns1"))); assertTrue(uris.contains(new URI("hdfs://ns1")));
@ -726,8 +730,7 @@ public void testGetNNUris() throws Exception {
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, NN1_ADDR); conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, NN1_ADDR);
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_SRVC_ADDR); conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_SRVC_ADDR);
uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, uris = DFSUtil.getInternalNsRpcUris(conf);
DFS_NAMENODE_RPC_ADDRESS_KEY);
assertEquals(1, uris.size()); assertEquals(1, uris.size());
assertTrue(uris.contains(new URI("hdfs://" + NN1_SRVC_ADDR))); assertTrue(uris.contains(new URI("hdfs://" + NN1_SRVC_ADDR)));
@ -742,7 +745,7 @@ public void testLocalhostReverseLookup() {
// it will automatically convert it to hostname // it will automatically convert it to hostname
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:8020"); conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:8020");
Collection<URI> uris = DFSUtil.getNameServiceUris(conf); Collection<URI> uris = getInternalNameServiceUris(conf);
assertEquals(1, uris.size()); assertEquals(1, uris.size());
for (URI uri : uris) { for (URI uri : uris) {
assertThat(uri.getHost(), not("127.0.0.1")); assertThat(uri.getHost(), not("127.0.0.1"));
@ -929,10 +932,19 @@ public void testGetNNServiceRpcAddressesForNsIds() throws IOException {
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"), conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, "nn2"),
NN2_ADDRESS); NN2_ADDRESS);
{
Collection<String> internal = DFSUtil.getInternalNameServices(conf);
assertEquals(Sets.newHashSet("nn1"), internal);
Collection<String> all = DFSUtil.getNameServiceIds(conf);
assertEquals(Sets.newHashSet("nn1", "nn2"), all);
}
Map<String, Map<String, InetSocketAddress>> nnMap = DFSUtil Map<String, Map<String, InetSocketAddress>> nnMap = DFSUtil
.getNNServiceRpcAddressesForCluster(conf); .getNNServiceRpcAddressesForCluster(conf);
assertEquals(1, nnMap.size()); assertEquals(1, nnMap.size());
assertTrue(nnMap.containsKey("nn1")); assertTrue(nnMap.containsKey("nn1"));
conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn3"); conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn3");
try { try {
DFSUtil.getNNServiceRpcAddressesForCluster(conf); DFSUtil.getNNServiceRpcAddressesForCluster(conf);

View File

@ -599,7 +599,7 @@ private void runBalancer(Configuration conf,
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing // start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
final int r = runBalancer(namenodes, p, conf); final int r = runBalancer(namenodes, p, conf);
if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) { DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
@ -795,7 +795,7 @@ public void testUnknownDatanode() throws Exception {
new String[]{RACK0}, null,new long[]{CAPACITY}); new String[]{RACK0}, null,new long[]{CAPACITY});
cluster.triggerHeartbeats(); cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Set<String> datanodes = new HashSet<String>(); Set<String> datanodes = new HashSet<String>();
datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName()); datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
Balancer.Parameters p = new Balancer.Parameters( Balancer.Parameters p = new Balancer.Parameters(
@ -1229,7 +1229,7 @@ public void testBalancerWithRamDisk() throws Exception {
null, null, storageCapacities, null, false, false, false, null); null, null, storageCapacities, null, false, false, false, null);
cluster.triggerHeartbeats(); cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
// Run Balancer // Run Balancer
Balancer.Parameters p = new Balancer.Parameters( Balancer.Parameters p = new Balancer.Parameters(

View File

@ -94,7 +94,7 @@ public void testBalancerWithHANameNodes() throws Exception {
totalCapacity += newNodeCapacity; totalCapacity += newNodeCapacity;
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client, TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
cluster); cluster);
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
assertEquals(1, namenodes.size()); assertEquals(1, namenodes.size());
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster))); assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);

View File

@ -158,7 +158,7 @@ static void runBalancer(Suite s,
LOG.info("BALANCER 1"); LOG.info("BALANCER 1");
// start rebalancing // start rebalancing
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf); final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(s.conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf);
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r); Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r);

View File

@ -174,7 +174,7 @@ private void runBalancer(Configuration conf,
waitForHeartBeat(totalUsedSpace, totalCapacity); waitForHeartBeat(totalUsedSpace, totalCapacity);
// start rebalancing // start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r); assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
@ -188,7 +188,7 @@ private void runBalancerCanFinish(Configuration conf,
waitForHeartBeat(totalUsedSpace, totalCapacity); waitForHeartBeat(totalUsedSpace, totalCapacity);
// start rebalancing // start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() || Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() ||
(r == ExitStatus.NO_MOVE_PROGRESS.getExitCode())); (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode()));

View File

@ -37,7 +37,7 @@
public class TestMover { public class TestMover {
static Mover newMover(Configuration conf) throws IOException { static Mover newMover(Configuration conf) throws IOException {
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size()); Assert.assertEquals(1, namenodes.size());
final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors( final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors(
@ -104,7 +104,7 @@ public void testMoverCli() throws Exception {
} }
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf); Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf);
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size()); Assert.assertEquals(1, namenodes.size());
Assert.assertEquals(1, movePaths.size()); Assert.assertEquals(1, movePaths.size());
URI nn = namenodes.iterator().next(); URI nn = namenodes.iterator().next();
@ -112,7 +112,7 @@ public void testMoverCli() throws Exception {
Assert.assertNull(movePaths.get(nn)); Assert.assertNull(movePaths.get(nn));
movePaths = Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo", "/bar"); movePaths = Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo", "/bar");
namenodes = DFSUtil.getNsServiceRpcUris(conf); namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, movePaths.size()); Assert.assertEquals(1, movePaths.size());
nn = namenodes.iterator().next(); nn = namenodes.iterator().next();
Assert.assertTrue(movePaths.containsKey(nn)); Assert.assertTrue(movePaths.containsKey(nn));
@ -133,7 +133,7 @@ public void testMoverCliWithHAConf() throws Exception {
try { try {
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf, Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf,
"-p", "/foo", "/bar"); "-p", "/foo", "/bar");
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size()); Assert.assertEquals(1, namenodes.size());
Assert.assertEquals(1, movePaths.size()); Assert.assertEquals(1, movePaths.size());
URI nn = namenodes.iterator().next(); URI nn = namenodes.iterator().next();
@ -154,7 +154,7 @@ public void testMoverCliWithFederation() throws Exception {
final Configuration conf = new HdfsConfiguration(); final Configuration conf = new HdfsConfiguration();
DFSTestUtil.setFederatedConfiguration(cluster, conf); DFSTestUtil.setFederatedConfiguration(cluster, conf);
try { try {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(3, namenodes.size()); Assert.assertEquals(3, namenodes.size());
try { try {
@ -202,7 +202,7 @@ public void testMoverCliWithFederationHA() throws Exception {
final Configuration conf = new HdfsConfiguration(); final Configuration conf = new HdfsConfiguration();
DFSTestUtil.setFederatedHAConfiguration(cluster, conf); DFSTestUtil.setFederatedHAConfiguration(cluster, conf);
try { try {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(3, namenodes.size()); Assert.assertEquals(3, namenodes.size());
Iterator<URI> iter = namenodes.iterator(); Iterator<URI> iter = namenodes.iterator();

View File

@ -270,7 +270,7 @@ void verify(boolean verifyAll) throws Exception {
} }
private void runMover() throws Exception { private void runMover() throws Exception {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Map<URI, List<Path>> nnMap = Maps.newHashMap(); Map<URI, List<Path>> nnMap = Maps.newHashMap();
for (URI nn : namenodes) { for (URI nn : namenodes) {
nnMap.put(nn, null); nnMap.put(nn, null);