Revert commit of HDFS-2979.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1295435 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
30cffeb388
commit
9318ff4250
|
@ -243,6 +243,4 @@ HDFS-2920. fix remaining TODO items. (atm and todd)
|
||||||
|
|
||||||
HDFS-3027. Implement a simple NN health check. (atm)
|
HDFS-3027. Implement a simple NN health check. (atm)
|
||||||
|
|
||||||
HDFS-2979. Balancer should use logical uri for creating failover proxy with HA enabled. (atm)
|
|
||||||
|
|
||||||
HDFS-3023. Optimize entries in edits log for persistBlocks call. (todd)
|
HDFS-3023. Optimize entries in edits log for persistBlocks call. (todd)
|
||||||
|
|
|
@ -28,11 +28,9 @@ import java.security.SecureRandom;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.StringTokenizer;
|
import java.util.StringTokenizer;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
@ -45,7 +43,6 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
|
||||||
|
@ -608,68 +605,6 @@ public class DFSUtil {
|
||||||
"nnId=" + namenodeId + ";addr=" + addr + "]";
|
"nnId=" + namenodeId + ";addr=" + addr + "]";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a URI for each configured nameservice. If a nameservice is
|
|
||||||
* HA-enabled, then the logical URI of the nameservice is returned. If the
|
|
||||||
* nameservice is not HA-enabled, then a URI corresponding to an RPC address
|
|
||||||
* of the single NN for that nameservice is returned, preferring the service
|
|
||||||
* RPC address over the client RPC address.
|
|
||||||
*
|
|
||||||
* @param conf configuration
|
|
||||||
* @return a collection of all configured NN URIs, preferring service
|
|
||||||
* addresses
|
|
||||||
*/
|
|
||||||
public static Collection<URI> getNsServiceRpcUris(Configuration conf) {
|
|
||||||
return getNameServiceUris(conf,
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a URI for each configured nameservice. If a nameservice is
|
|
||||||
* HA-enabled, then the logical URI of the nameservice is returned. If the
|
|
||||||
* nameservice is not HA-enabled, then a URI corresponding to the address of
|
|
||||||
* the single NN for that nameservice is returned.
|
|
||||||
*
|
|
||||||
* @param conf configuration
|
|
||||||
* @param keys configuration keys to try in order to get the URI for non-HA
|
|
||||||
* nameservices
|
|
||||||
* @return a collection of all configured NN URIs
|
|
||||||
*/
|
|
||||||
public static Collection<URI> getNameServiceUris(Configuration conf,
|
|
||||||
String... keys) {
|
|
||||||
Set<URI> ret = new HashSet<URI>();
|
|
||||||
for (String nsId : getNameServiceIds(conf)) {
|
|
||||||
if (HAUtil.isHAEnabled(conf, nsId)) {
|
|
||||||
// Add the logical URI of the nameservice.
|
|
||||||
try {
|
|
||||||
ret.add(new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + nsId));
|
|
||||||
} catch (URISyntaxException ue) {
|
|
||||||
throw new IllegalArgumentException(ue);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Add the URI corresponding to the address of the NN.
|
|
||||||
for (String key : keys) {
|
|
||||||
String addr = conf.get(concatSuffixes(key, nsId));
|
|
||||||
if (addr != null) {
|
|
||||||
ret.add(createUri(HdfsConstants.HDFS_URI_SCHEME,
|
|
||||||
NetUtils.createSocketAddr(addr)));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Add the generic configuration keys.
|
|
||||||
for (String key : keys) {
|
|
||||||
String addr = conf.get(key);
|
|
||||||
if (addr != null) {
|
|
||||||
ret.add(createUri("hdfs", NetUtils.createSocketAddr(addr)));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given the InetSocketAddress this method returns the nameservice Id
|
* Given the InetSocketAddress this method returns the nameservice Id
|
||||||
|
|
|
@ -24,8 +24,8 @@ import java.io.BufferedOutputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.net.URI;
|
|
||||||
import java.text.DateFormat;
|
import java.text.DateFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -39,6 +39,7 @@ import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
@ -1379,7 +1380,8 @@ public class Balancer {
|
||||||
* for each namenode,
|
* for each namenode,
|
||||||
* execute a {@link Balancer} to work through all datanodes once.
|
* execute a {@link Balancer} to work through all datanodes once.
|
||||||
*/
|
*/
|
||||||
static int run(Collection<URI> namenodes, final Parameters p,
|
static int run(Map<String, Map<String, InetSocketAddress>> namenodes,
|
||||||
|
final Parameters p,
|
||||||
Configuration conf) throws IOException, InterruptedException {
|
Configuration conf) throws IOException, InterruptedException {
|
||||||
final long sleeptime = 2000*conf.getLong(
|
final long sleeptime = 2000*conf.getLong(
|
||||||
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
||||||
|
@ -1393,8 +1395,10 @@ public class Balancer {
|
||||||
final List<NameNodeConnector> connectors
|
final List<NameNodeConnector> connectors
|
||||||
= new ArrayList<NameNodeConnector>(namenodes.size());
|
= new ArrayList<NameNodeConnector>(namenodes.size());
|
||||||
try {
|
try {
|
||||||
for (URI uri : namenodes) {
|
for(Entry<String, Map<String, InetSocketAddress>> entry :
|
||||||
connectors.add(new NameNodeConnector(uri, conf));
|
namenodes.entrySet()) {
|
||||||
|
connectors.add(
|
||||||
|
new NameNodeConnector(entry.getValue().values(), conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean done = false;
|
boolean done = false;
|
||||||
|
@ -1476,7 +1480,8 @@ public class Balancer {
|
||||||
try {
|
try {
|
||||||
checkReplicationPolicyCompatibility(conf);
|
checkReplicationPolicyCompatibility(conf);
|
||||||
|
|
||||||
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
final Map<String, Map<String, InetSocketAddress>> namenodes =
|
||||||
|
DFSUtil.getNNServiceRpcAddresses(conf);
|
||||||
return Balancer.run(namenodes, parse(args), conf);
|
return Balancer.run(namenodes, parse(args), conf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
System.out.println(e + ". Exiting ...");
|
System.out.println(e + ". Exiting ...");
|
||||||
|
|
|
@ -21,7 +21,9 @@ import java.io.DataOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -36,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
@ -43,6 +46,8 @@ import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The class provides utilities for {@link Balancer} to access a NameNode
|
* The class provides utilities for {@link Balancer} to access a NameNode
|
||||||
*/
|
*/
|
||||||
|
@ -51,7 +56,7 @@ class NameNodeConnector {
|
||||||
private static final Log LOG = Balancer.LOG;
|
private static final Log LOG = Balancer.LOG;
|
||||||
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
|
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
|
||||||
|
|
||||||
final URI nameNodeUri;
|
final InetSocketAddress namenodeAddress;
|
||||||
final String blockpoolID;
|
final String blockpoolID;
|
||||||
|
|
||||||
final NamenodeProtocol namenode;
|
final NamenodeProtocol namenode;
|
||||||
|
@ -65,9 +70,10 @@ class NameNodeConnector {
|
||||||
private BlockTokenSecretManager blockTokenSecretManager;
|
private BlockTokenSecretManager blockTokenSecretManager;
|
||||||
private Daemon keyupdaterthread; // AccessKeyUpdater thread
|
private Daemon keyupdaterthread; // AccessKeyUpdater thread
|
||||||
|
|
||||||
NameNodeConnector(URI nameNodeUri,
|
NameNodeConnector(Collection<InetSocketAddress> haNNs,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
this.nameNodeUri = nameNodeUri;
|
this.namenodeAddress = Lists.newArrayList(haNNs).get(0);
|
||||||
|
URI nameNodeUri = NameNode.getUri(this.namenodeAddress);
|
||||||
|
|
||||||
this.namenode =
|
this.namenode =
|
||||||
NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class)
|
NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class)
|
||||||
|
@ -180,7 +186,7 @@ class NameNodeConnector {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri
|
return getClass().getSimpleName() + "[namenodeAddress=" + namenodeAddress
|
||||||
+ ", id=" + blockpoolID
|
+ ", id=" + blockpoolID
|
||||||
+ "]";
|
+ "]";
|
||||||
}
|
}
|
||||||
|
|
|
@ -526,21 +526,21 @@ public class NameNode {
|
||||||
|
|
||||||
protected NameNode(Configuration conf, NamenodeRole role)
|
protected NameNode(Configuration conf, NamenodeRole role)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.conf = new Configuration(conf);
|
this.conf = conf;
|
||||||
this.role = role;
|
this.role = role;
|
||||||
String nsId = getNameServiceId(this.conf);
|
String nsId = getNameServiceId(conf);
|
||||||
String namenodeId = HAUtil.getNameNodeId(this.conf, nsId);
|
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
|
||||||
this.haEnabled = HAUtil.isHAEnabled(this.conf, nsId);
|
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
|
||||||
if (!haEnabled) {
|
if (!haEnabled) {
|
||||||
state = ACTIVE_STATE;
|
state = ACTIVE_STATE;
|
||||||
} else {
|
} else {
|
||||||
state = STANDBY_STATE;
|
state = STANDBY_STATE;
|
||||||
}
|
}
|
||||||
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(this.conf);
|
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
|
||||||
this.haContext = createHAContext();
|
this.haContext = createHAContext();
|
||||||
try {
|
try {
|
||||||
initializeGenericKeys(this.conf, nsId, namenodeId);
|
initializeGenericKeys(conf, nsId, namenodeId);
|
||||||
initialize(this.conf);
|
initialize(conf);
|
||||||
state.prepareToEnterState(haContext);
|
state.prepareToEnterState(haContext);
|
||||||
state.enterState(haContext);
|
state.enterState(haContext);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -651,7 +651,6 @@ public class NameNode {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||||
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
|
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
|
||||||
conf = new Configuration(conf);
|
|
||||||
initializeGenericKeys(conf, nsId, namenodeId);
|
initializeGenericKeys(conf, nsId, namenodeId);
|
||||||
|
|
||||||
if (!conf.getBoolean(DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY,
|
if (!conf.getBoolean(DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY,
|
||||||
|
@ -698,7 +697,6 @@ public class NameNode {
|
||||||
private static boolean finalize(Configuration conf,
|
private static boolean finalize(Configuration conf,
|
||||||
boolean isConfirmationNeeded
|
boolean isConfirmationNeeded
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
conf = new Configuration(conf);
|
|
||||||
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||||
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
|
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
|
||||||
initializeGenericKeys(conf, nsId, namenodeId);
|
initializeGenericKeys(conf, nsId, namenodeId);
|
||||||
|
|
|
@ -25,8 +25,6 @@ import static org.junit.Assert.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -43,8 +41,6 @@ import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||||
|
|
||||||
public class TestDFSUtil {
|
public class TestDFSUtil {
|
||||||
|
@ -237,12 +233,11 @@ public class TestDFSUtil {
|
||||||
* {@link DFSUtil#isDefaultNamenodeAddress(Configuration, InetSocketAddress, String...)}
|
* {@link DFSUtil#isDefaultNamenodeAddress(Configuration, InetSocketAddress, String...)}
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSingleNamenode() throws URISyntaxException {
|
public void testSingleNamenode() {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
final String DEFAULT_ADDRESS = "localhost:9000";
|
final String DEFAULT_ADDRESS = "localhost:9000";
|
||||||
final String NN2_ADDRESS = "localhost:9001";
|
final String NN2_ADDRESS = "localhost:9001";
|
||||||
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, DEFAULT_ADDRESS);
|
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, DEFAULT_ADDRESS);
|
||||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DEFAULT_ADDRESS);
|
|
||||||
|
|
||||||
InetSocketAddress testAddress1 = NetUtils.createSocketAddr(DEFAULT_ADDRESS);
|
InetSocketAddress testAddress1 = NetUtils.createSocketAddr(DEFAULT_ADDRESS);
|
||||||
boolean isDefault = DFSUtil.isDefaultNamenodeAddress(conf, testAddress1,
|
boolean isDefault = DFSUtil.isDefaultNamenodeAddress(conf, testAddress1,
|
||||||
|
@ -252,10 +247,6 @@ public class TestDFSUtil {
|
||||||
isDefault = DFSUtil.isDefaultNamenodeAddress(conf, testAddress2,
|
isDefault = DFSUtil.isDefaultNamenodeAddress(conf, testAddress2,
|
||||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||||
assertFalse(isDefault);
|
assertFalse(isDefault);
|
||||||
|
|
||||||
Collection<URI> uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
||||||
assertEquals(1, uris.size());
|
|
||||||
assertTrue(uris.contains(new URI("hdfs://" + DEFAULT_ADDRESS)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Tests to ensure default namenode is used as fallback */
|
/** Tests to ensure default namenode is used as fallback */
|
||||||
|
@ -416,14 +407,13 @@ public class TestDFSUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHANameNodesWithFederation() throws URISyntaxException {
|
public void testHANameNodesWithFederation() {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
final String NS1_NN1_HOST = "ns1-nn1.example.com:8020";
|
final String NS1_NN1_HOST = "ns1-nn1.example.com:8020";
|
||||||
final String NS1_NN2_HOST = "ns1-nn2.example.com:8020";
|
final String NS1_NN2_HOST = "ns1-nn2.example.com:8020";
|
||||||
final String NS2_NN1_HOST = "ns2-nn1.example.com:8020";
|
final String NS2_NN1_HOST = "ns2-nn1.example.com:8020";
|
||||||
final String NS2_NN2_HOST = "ns2-nn2.example.com:8020";
|
final String NS2_NN2_HOST = "ns2-nn2.example.com:8020";
|
||||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1");
|
|
||||||
|
|
||||||
// Two nameservices, each with two NNs.
|
// Two nameservices, each with two NNs.
|
||||||
conf.set(DFS_FEDERATION_NAMESERVICES, "ns1,ns2");
|
conf.set(DFS_FEDERATION_NAMESERVICES, "ns1,ns2");
|
||||||
|
@ -470,11 +460,6 @@ public class TestDFSUtil {
|
||||||
// Ditto for nameservice IDs, if multiple are defined
|
// Ditto for nameservice IDs, if multiple are defined
|
||||||
assertEquals(null, DFSUtil.getNamenodeNameServiceId(conf));
|
assertEquals(null, DFSUtil.getNamenodeNameServiceId(conf));
|
||||||
assertEquals(null, DFSUtil.getSecondaryNameServiceId(conf));
|
assertEquals(null, DFSUtil.getSecondaryNameServiceId(conf));
|
||||||
|
|
||||||
Collection<URI> uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
||||||
assertEquals(2, uris.size());
|
|
||||||
assertTrue(uris.contains(new URI("hdfs://ns1")));
|
|
||||||
assertTrue(uris.contains(new URI("hdfs://ns2")));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -524,34 +509,4 @@ public class TestDFSUtil {
|
||||||
assertEquals("127.0.0.1:12345",
|
assertEquals("127.0.0.1:12345",
|
||||||
DFSUtil.substituteForWildcardAddress("127.0.0.1:12345", "foo"));
|
DFSUtil.substituteForWildcardAddress("127.0.0.1:12345", "foo"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetNNUris() throws Exception {
|
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
|
||||||
|
|
||||||
final String NS1_NN1_HOST = "ns1-nn1.example.com:8020";
|
|
||||||
final String NS1_NN2_HOST = "ns1-nn1.example.com:8020";
|
|
||||||
final String NS2_NN_HOST = "ns2-nn.example.com:8020";
|
|
||||||
final String NN_HOST = "nn.example.com:8020";
|
|
||||||
|
|
||||||
conf.set(DFS_FEDERATION_NAMESERVICES, "ns1,ns2");
|
|
||||||
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, "ns1"),"nn1,nn2");
|
|
||||||
conf.set(DFSUtil.addKeySuffixes(
|
|
||||||
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn1"), NS1_NN1_HOST);
|
|
||||||
conf.set(DFSUtil.addKeySuffixes(
|
|
||||||
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn2"), NS1_NN2_HOST);
|
|
||||||
|
|
||||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns2"),
|
|
||||||
NS2_NN_HOST);
|
|
||||||
|
|
||||||
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, "hdfs://" + NN_HOST);
|
|
||||||
|
|
||||||
Collection<URI> uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY,
|
|
||||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
|
|
||||||
|
|
||||||
assertEquals(3, uris.size());
|
|
||||||
assertTrue(uris.contains(new URI("hdfs://ns1")));
|
|
||||||
assertTrue(uris.contains(new URI("hdfs://" + NS2_NN_HOST)));
|
|
||||||
assertTrue(uris.contains(new URI("hdfs://" + NN_HOST)));
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -18,11 +18,11 @@
|
||||||
package org.apache.hadoop.hdfs.server.balancer;
|
package org.apache.hadoop.hdfs.server.balancer;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
@ -338,7 +338,8 @@ public class TestBalancer extends TestCase {
|
||||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||||
|
|
||||||
// start rebalancing
|
// start rebalancing
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Map<String, Map<String, InetSocketAddress>> namenodes =
|
||||||
|
DFSUtil.getNNServiceRpcAddresses(conf);
|
||||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
|
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
|
||||||
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
||||||
|
|
||||||
|
|
|
@ -18,10 +18,9 @@
|
||||||
package org.apache.hadoop.hdfs.server.balancer;
|
package org.apache.hadoop.hdfs.server.balancer;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.Collection;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -68,12 +67,12 @@ public class TestBalancerWithHANameNodes {
|
||||||
int numOfDatanodes = capacities.length;
|
int numOfDatanodes = capacities.length;
|
||||||
NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
|
NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
|
||||||
nn1Conf.setIpcPort(NameNode.DEFAULT_PORT);
|
nn1Conf.setIpcPort(NameNode.DEFAULT_PORT);
|
||||||
cluster = new MiniDFSCluster.Builder(conf)
|
MiniDFSNNTopology simpleHATopology = new MiniDFSNNTopology()
|
||||||
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
.addNameservice(new MiniDFSNNTopology.NSConf(null).addNN(nn1Conf)
|
||||||
.numDataNodes(capacities.length)
|
.addNN(new MiniDFSNNTopology.NNConf("nn2")));
|
||||||
.racks(racks)
|
cluster = new MiniDFSCluster.Builder(conf).nnTopology(simpleHATopology)
|
||||||
.simulatedCapacities(capacities)
|
.numDataNodes(capacities.length).racks(racks).simulatedCapacities(
|
||||||
.build();
|
capacities).build();
|
||||||
HATestUtil.setFailoverConfigurations(cluster, conf);
|
HATestUtil.setFailoverConfigurations(cluster, conf);
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
@ -90,12 +89,14 @@ public class TestBalancerWithHANameNodes {
|
||||||
// start up an empty node with the same capacity and on the same rack
|
// start up an empty node with the same capacity and on the same rack
|
||||||
cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack },
|
cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack },
|
||||||
new long[] { newNodeCapacity });
|
new long[] { newNodeCapacity });
|
||||||
|
|
||||||
|
HATestUtil.setFailoverConfigurations(cluster, conf, NameNode.getUri(
|
||||||
|
cluster.getNameNode(0).getNameNodeAddress()).getHost());
|
||||||
totalCapacity += newNodeCapacity;
|
totalCapacity += newNodeCapacity;
|
||||||
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
|
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
|
||||||
cluster);
|
cluster);
|
||||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
Map<String, Map<String, InetSocketAddress>> namenodes = DFSUtil
|
||||||
assertEquals(1, namenodes.size());
|
.getNNServiceRpcAddresses(conf);
|
||||||
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
|
|
||||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
|
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
|
||||||
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
||||||
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
|
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
|
||||||
|
|
|
@ -18,10 +18,10 @@
|
||||||
package org.apache.hadoop.hdfs.server.balancer;
|
package org.apache.hadoop.hdfs.server.balancer;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -40,8 +40,8 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
@ -157,7 +157,8 @@ public class TestBalancerWithMultipleNameNodes {
|
||||||
LOG.info("BALANCER 1");
|
LOG.info("BALANCER 1");
|
||||||
|
|
||||||
// start rebalancing
|
// start rebalancing
|
||||||
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
|
final Map<String, Map<String, InetSocketAddress>> namenodes =
|
||||||
|
DFSUtil.getNNServiceRpcAddresses(s.conf);
|
||||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf);
|
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf);
|
||||||
Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||||
|
@ -189,12 +188,6 @@ public abstract class HATestUtil {
|
||||||
return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId());
|
return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static URI getLogicalUri(MiniDFSCluster cluster)
|
|
||||||
throws URISyntaxException {
|
|
||||||
return new URI(HdfsConstants.HDFS_URI_SCHEME + "://" +
|
|
||||||
getLogicalHostname(cluster));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx,
|
public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx,
|
||||||
List<Integer> txids) throws InterruptedException {
|
List<Integer> txids) throws InterruptedException {
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
|
|
Loading…
Reference in New Issue