HDFS-2979. Balancer should use logical uri for creating failover proxy with HA enabled. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1295340 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-02-29 23:42:52 +00:00
parent 7b6b204924
commit 01b17c40cf
10 changed files with 158 additions and 51 deletions

View File

@ -242,3 +242,5 @@ HDFS-2958. Sweep for remaining proxy construction which doesn't go through failo
HDFS-2920. fix remaining TODO items. (atm and todd) HDFS-2920. fix remaining TODO items. (atm and todd)
HDFS-3027. Implement a simple NN health check. (atm) HDFS-3027. Implement a simple NN health check. (atm)
HDFS-2979. Balancer should use logical uri for creating failover proxy with HA enabled. (atm)

View File

@ -28,9 +28,11 @@ import java.security.SecureRandom;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import javax.net.SocketFactory; import javax.net.SocketFactory;
@ -43,6 +45,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
@ -605,6 +608,68 @@ public class DFSUtil {
"nnId=" + namenodeId + ";addr=" + addr + "]"; "nnId=" + namenodeId + ";addr=" + addr + "]";
} }
} }
/**
* Get a URI for each configured nameservice. If a nameservice is
* HA-enabled, then the logical URI of the nameservice is returned. If the
* nameservice is not HA-enabled, then a URI corresponding to an RPC address
* of the single NN for that nameservice is returned, preferring the service
* RPC address over the client RPC address.
*
* @param conf configuration
* @return a collection of all configured NN URIs, preferring service
* addresses
*/
public static Collection<URI> getNsServiceRpcUris(Configuration conf) {
return getNameServiceUris(conf,
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}
/**
* Get a URI for each configured nameservice. If a nameservice is
* HA-enabled, then the logical URI of the nameservice is returned. If the
* nameservice is not HA-enabled, then a URI corresponding to the address of
* the single NN for that nameservice is returned.
*
* @param conf configuration
* @param keys configuration keys to try in order to get the URI for non-HA
* nameservices
* @return a collection of all configured NN URIs
*/
public static Collection<URI> getNameServiceUris(Configuration conf,
String... keys) {
Set<URI> ret = new HashSet<URI>();
for (String nsId : getNameServiceIds(conf)) {
if (HAUtil.isHAEnabled(conf, nsId)) {
// Add the logical URI of the nameservice.
try {
ret.add(new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + nsId));
} catch (URISyntaxException ue) {
throw new IllegalArgumentException(ue);
}
} else {
// Add the URI corresponding to the address of the NN.
for (String key : keys) {
String addr = conf.get(concatSuffixes(key, nsId));
if (addr != null) {
ret.add(createUri(HdfsConstants.HDFS_URI_SCHEME,
NetUtils.createSocketAddr(addr)));
break;
}
}
}
}
// Add the generic configuration keys.
for (String key : keys) {
String addr = conf.get(key);
if (addr != null) {
ret.add(createUri("hdfs", NetUtils.createSocketAddr(addr)));
break;
}
}
return ret;
}
/** /**
* Given the InetSocketAddress this method returns the nameservice Id * Given the InetSocketAddress this method returns the nameservice Id

View File

@ -24,8 +24,8 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.URI;
import java.text.DateFormat; import java.text.DateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -39,7 +39,6 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -1380,8 +1379,7 @@ public class Balancer {
* for each namenode, * for each namenode,
* execute a {@link Balancer} to work through all datanodes once. * execute a {@link Balancer} to work through all datanodes once.
*/ */
static int run(Map<String, Map<String, InetSocketAddress>> namenodes, static int run(Collection<URI> namenodes, final Parameters p,
final Parameters p,
Configuration conf) throws IOException, InterruptedException { Configuration conf) throws IOException, InterruptedException {
final long sleeptime = 2000*conf.getLong( final long sleeptime = 2000*conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
@ -1395,10 +1393,8 @@ public class Balancer {
final List<NameNodeConnector> connectors final List<NameNodeConnector> connectors
= new ArrayList<NameNodeConnector>(namenodes.size()); = new ArrayList<NameNodeConnector>(namenodes.size());
try { try {
for(Entry<String, Map<String, InetSocketAddress>> entry : for (URI uri : namenodes) {
namenodes.entrySet()) { connectors.add(new NameNodeConnector(uri, conf));
connectors.add(
new NameNodeConnector(entry.getValue().values(), conf));
} }
boolean done = false; boolean done = false;
@ -1480,8 +1476,7 @@ public class Balancer {
try { try {
checkReplicationPolicyCompatibility(conf); checkReplicationPolicyCompatibility(conf);
final Map<String, Map<String, InetSocketAddress>> namenodes = final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
DFSUtil.getNNServiceRpcAddresses(conf);
return Balancer.run(namenodes, parse(args), conf); return Balancer.run(namenodes, parse(args), conf);
} catch (IOException e) { } catch (IOException e) {
System.out.println(e + ". Exiting ..."); System.out.println(e + ". Exiting ...");

View File

@ -21,9 +21,7 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -38,7 +36,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -46,8 +43,6 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import com.google.common.collect.Lists;
/** /**
* The class provides utilities for {@link Balancer} to access a NameNode * The class provides utilities for {@link Balancer} to access a NameNode
*/ */
@ -56,7 +51,7 @@ class NameNodeConnector {
private static final Log LOG = Balancer.LOG; private static final Log LOG = Balancer.LOG;
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
final InetSocketAddress namenodeAddress; final URI nameNodeUri;
final String blockpoolID; final String blockpoolID;
final NamenodeProtocol namenode; final NamenodeProtocol namenode;
@ -70,10 +65,9 @@ class NameNodeConnector {
private BlockTokenSecretManager blockTokenSecretManager; private BlockTokenSecretManager blockTokenSecretManager;
private Daemon keyupdaterthread; // AccessKeyUpdater thread private Daemon keyupdaterthread; // AccessKeyUpdater thread
NameNodeConnector(Collection<InetSocketAddress> haNNs, NameNodeConnector(URI nameNodeUri,
Configuration conf) throws IOException { Configuration conf) throws IOException {
this.namenodeAddress = Lists.newArrayList(haNNs).get(0); this.nameNodeUri = nameNodeUri;
URI nameNodeUri = NameNode.getUri(this.namenodeAddress);
this.namenode = this.namenode =
NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class) NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class)
@ -186,7 +180,7 @@ class NameNodeConnector {
@Override @Override
public String toString() { public String toString() {
return getClass().getSimpleName() + "[namenodeAddress=" + namenodeAddress return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri
+ ", id=" + blockpoolID + ", id=" + blockpoolID
+ "]"; + "]";
} }

View File

@ -526,21 +526,21 @@ public class NameNode {
protected NameNode(Configuration conf, NamenodeRole role) protected NameNode(Configuration conf, NamenodeRole role)
throws IOException { throws IOException {
this.conf = conf; this.conf = new Configuration(conf);
this.role = role; this.role = role;
String nsId = getNameServiceId(conf); String nsId = getNameServiceId(this.conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId); String namenodeId = HAUtil.getNameNodeId(this.conf, nsId);
this.haEnabled = HAUtil.isHAEnabled(conf, nsId); this.haEnabled = HAUtil.isHAEnabled(this.conf, nsId);
if (!haEnabled) { if (!haEnabled) {
state = ACTIVE_STATE; state = ACTIVE_STATE;
} else { } else {
state = STANDBY_STATE; state = STANDBY_STATE;
} }
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf); this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(this.conf);
this.haContext = createHAContext(); this.haContext = createHAContext();
try { try {
initializeGenericKeys(conf, nsId, namenodeId); initializeGenericKeys(this.conf, nsId, namenodeId);
initialize(conf); initialize(this.conf);
state.prepareToEnterState(haContext); state.prepareToEnterState(haContext);
state.enterState(haContext); state.enterState(haContext);
} catch (IOException e) { } catch (IOException e) {
@ -651,6 +651,7 @@ public class NameNode {
throws IOException { throws IOException {
String nsId = DFSUtil.getNamenodeNameServiceId(conf); String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId); String namenodeId = HAUtil.getNameNodeId(conf, nsId);
conf = new Configuration(conf);
initializeGenericKeys(conf, nsId, namenodeId); initializeGenericKeys(conf, nsId, namenodeId);
if (!conf.getBoolean(DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY, if (!conf.getBoolean(DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY,
@ -697,6 +698,7 @@ public class NameNode {
private static boolean finalize(Configuration conf, private static boolean finalize(Configuration conf,
boolean isConfirmationNeeded boolean isConfirmationNeeded
) throws IOException { ) throws IOException {
conf = new Configuration(conf);
String nsId = DFSUtil.getNamenodeNameServiceId(conf); String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId); String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId); initializeGenericKeys(conf, nsId, namenodeId);

View File

@ -25,6 +25,8 @@ import static org.junit.Assert.*;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
@ -41,6 +43,8 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
public class TestDFSUtil { public class TestDFSUtil {
@ -233,11 +237,12 @@ public class TestDFSUtil {
* {@link DFSUtil#isDefaultNamenodeAddress(Configuration, InetSocketAddress, String...)} * {@link DFSUtil#isDefaultNamenodeAddress(Configuration, InetSocketAddress, String...)}
*/ */
@Test @Test
public void testSingleNamenode() { public void testSingleNamenode() throws URISyntaxException {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
final String DEFAULT_ADDRESS = "localhost:9000"; final String DEFAULT_ADDRESS = "localhost:9000";
final String NN2_ADDRESS = "localhost:9001"; final String NN2_ADDRESS = "localhost:9001";
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, DEFAULT_ADDRESS); conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, DEFAULT_ADDRESS);
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DEFAULT_ADDRESS);
InetSocketAddress testAddress1 = NetUtils.createSocketAddr(DEFAULT_ADDRESS); InetSocketAddress testAddress1 = NetUtils.createSocketAddr(DEFAULT_ADDRESS);
boolean isDefault = DFSUtil.isDefaultNamenodeAddress(conf, testAddress1, boolean isDefault = DFSUtil.isDefaultNamenodeAddress(conf, testAddress1,
@ -247,6 +252,10 @@ public class TestDFSUtil {
isDefault = DFSUtil.isDefaultNamenodeAddress(conf, testAddress2, isDefault = DFSUtil.isDefaultNamenodeAddress(conf, testAddress2,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY); DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
assertFalse(isDefault); assertFalse(isDefault);
Collection<URI> uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY);
assertEquals(1, uris.size());
assertTrue(uris.contains(new URI("hdfs://" + DEFAULT_ADDRESS)));
} }
/** Tests to ensure default namenode is used as fallback */ /** Tests to ensure default namenode is used as fallback */
@ -407,13 +416,14 @@ public class TestDFSUtil {
} }
@Test @Test
public void testHANameNodesWithFederation() { public void testHANameNodesWithFederation() throws URISyntaxException {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
final String NS1_NN1_HOST = "ns1-nn1.example.com:8020"; final String NS1_NN1_HOST = "ns1-nn1.example.com:8020";
final String NS1_NN2_HOST = "ns1-nn2.example.com:8020"; final String NS1_NN2_HOST = "ns1-nn2.example.com:8020";
final String NS2_NN1_HOST = "ns2-nn1.example.com:8020"; final String NS2_NN1_HOST = "ns2-nn1.example.com:8020";
final String NS2_NN2_HOST = "ns2-nn2.example.com:8020"; final String NS2_NN2_HOST = "ns2-nn2.example.com:8020";
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1");
// Two nameservices, each with two NNs. // Two nameservices, each with two NNs.
conf.set(DFS_FEDERATION_NAMESERVICES, "ns1,ns2"); conf.set(DFS_FEDERATION_NAMESERVICES, "ns1,ns2");
@ -460,6 +470,11 @@ public class TestDFSUtil {
// Ditto for nameservice IDs, if multiple are defined // Ditto for nameservice IDs, if multiple are defined
assertEquals(null, DFSUtil.getNamenodeNameServiceId(conf)); assertEquals(null, DFSUtil.getNamenodeNameServiceId(conf));
assertEquals(null, DFSUtil.getSecondaryNameServiceId(conf)); assertEquals(null, DFSUtil.getSecondaryNameServiceId(conf));
Collection<URI> uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY);
assertEquals(2, uris.size());
assertTrue(uris.contains(new URI("hdfs://ns1")));
assertTrue(uris.contains(new URI("hdfs://ns2")));
} }
@Test @Test
@ -509,4 +524,34 @@ public class TestDFSUtil {
assertEquals("127.0.0.1:12345", assertEquals("127.0.0.1:12345",
DFSUtil.substituteForWildcardAddress("127.0.0.1:12345", "foo")); DFSUtil.substituteForWildcardAddress("127.0.0.1:12345", "foo"));
} }
@Test
public void testGetNNUris() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
final String NS1_NN1_HOST = "ns1-nn1.example.com:8020";
final String NS1_NN2_HOST = "ns1-nn1.example.com:8020";
final String NS2_NN_HOST = "ns2-nn.example.com:8020";
final String NN_HOST = "nn.example.com:8020";
conf.set(DFS_FEDERATION_NAMESERVICES, "ns1,ns2");
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, "ns1"),"nn1,nn2");
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn1"), NS1_NN1_HOST);
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn2"), NS1_NN2_HOST);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns2"),
NS2_NN_HOST);
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, "hdfs://" + NN_HOST);
Collection<URI> uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
assertEquals(3, uris.size());
assertTrue(uris.contains(new URI("hdfs://ns1")));
assertTrue(uris.contains(new URI("hdfs://" + NS2_NN_HOST)));
assertTrue(uris.contains(new URI("hdfs://" + NN_HOST)));
}
} }

View File

@ -18,11 +18,11 @@
package org.apache.hadoop.hdfs.server.balancer; package org.apache.hadoop.hdfs.server.balancer;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -338,8 +338,7 @@ public class TestBalancer extends TestCase {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing // start rebalancing
Map<String, Map<String, InetSocketAddress>> namenodes = Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
DFSUtil.getNNServiceRpcAddresses(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.hdfs.server.balancer; package org.apache.hadoop.hdfs.server.balancer;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.net.InetSocketAddress; import java.net.URI;
import java.util.Map; import java.util.Collection;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -67,12 +68,12 @@ public class TestBalancerWithHANameNodes {
int numOfDatanodes = capacities.length; int numOfDatanodes = capacities.length;
NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1"); NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
nn1Conf.setIpcPort(NameNode.DEFAULT_PORT); nn1Conf.setIpcPort(NameNode.DEFAULT_PORT);
MiniDFSNNTopology simpleHATopology = new MiniDFSNNTopology() cluster = new MiniDFSCluster.Builder(conf)
.addNameservice(new MiniDFSNNTopology.NSConf(null).addNN(nn1Conf) .nnTopology(MiniDFSNNTopology.simpleHATopology())
.addNN(new MiniDFSNNTopology.NNConf("nn2"))); .numDataNodes(capacities.length)
cluster = new MiniDFSCluster.Builder(conf).nnTopology(simpleHATopology) .racks(racks)
.numDataNodes(capacities.length).racks(racks).simulatedCapacities( .simulatedCapacities(capacities)
capacities).build(); .build();
HATestUtil.setFailoverConfigurations(cluster, conf); HATestUtil.setFailoverConfigurations(cluster, conf);
try { try {
cluster.waitActive(); cluster.waitActive();
@ -89,14 +90,12 @@ public class TestBalancerWithHANameNodes {
// start up an empty node with the same capacity and on the same rack // start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack }, cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack },
new long[] { newNodeCapacity }); new long[] { newNodeCapacity });
HATestUtil.setFailoverConfigurations(cluster, conf, NameNode.getUri(
cluster.getNameNode(0).getNameNodeAddress()).getHost());
totalCapacity += newNodeCapacity; totalCapacity += newNodeCapacity;
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client, TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
cluster); cluster);
Map<String, Map<String, InetSocketAddress>> namenodes = DFSUtil Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
.getNNServiceRpcAddresses(conf); assertEquals(1, namenodes.size());
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,

View File

@ -18,10 +18,10 @@
package org.apache.hadoop.hdfs.server.balancer; package org.apache.hadoop.hdfs.server.balancer;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -40,8 +40,8 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -157,8 +157,7 @@ public class TestBalancerWithMultipleNameNodes {
LOG.info("BALANCER 1"); LOG.info("BALANCER 1");
// start rebalancing // start rebalancing
final Map<String, Map<String, InetSocketAddress>> namenodes = final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
DFSUtil.getNNServiceRpcAddresses(s.conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf);
Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter; import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
@ -188,6 +189,12 @@ public abstract class HATestUtil {
return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId()); return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId());
} }
public static URI getLogicalUri(MiniDFSCluster cluster)
throws URISyntaxException {
return new URI(HdfsConstants.HDFS_URI_SCHEME + "://" +
getLogicalHostname(cluster));
}
public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx, public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx,
List<Integer> txids) throws InterruptedException { List<Integer> txids) throws InterruptedException {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();