HDFS-8897. Balancer should handle fs.defaultFS trailing slash in HA. Contributed by John Zhuge.
This commit is contained in:
parent
e7e8aed208
commit
a428d4f50e
|
@ -771,12 +771,7 @@ public class DFSUtil {
|
||||||
Set<URI> nonPreferredUris = new HashSet<URI>();
|
Set<URI> nonPreferredUris = new HashSet<URI>();
|
||||||
|
|
||||||
for (String nsId : nameServices) {
|
for (String nsId : nameServices) {
|
||||||
URI nsUri;
|
URI nsUri = createUri(HdfsConstants.HDFS_URI_SCHEME, nsId, -1);
|
||||||
try {
|
|
||||||
nsUri = new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + nsId);
|
|
||||||
} catch (URISyntaxException ue) {
|
|
||||||
throw new IllegalArgumentException(ue);
|
|
||||||
}
|
|
||||||
/**
|
/**
|
||||||
* Determine whether the logical URI of the name service can be resolved
|
* Determine whether the logical URI of the name service can be resolved
|
||||||
* by the configured failover proxy provider. If not, we should try to
|
* by the configured failover proxy provider. If not, we should try to
|
||||||
|
@ -816,7 +811,8 @@ public class DFSUtil {
|
||||||
for (String key : keys) {
|
for (String key : keys) {
|
||||||
String addr = conf.get(key);
|
String addr = conf.get(key);
|
||||||
if (addr != null) {
|
if (addr != null) {
|
||||||
URI uri = createUri("hdfs", NetUtils.createSocketAddr(addr));
|
URI uri = createUri(HdfsConstants.HDFS_URI_SCHEME,
|
||||||
|
NetUtils.createSocketAddr(addr));
|
||||||
if (!uriFound) {
|
if (!uriFound) {
|
||||||
uriFound = true;
|
uriFound = true;
|
||||||
ret.add(uri);
|
ret.add(uri);
|
||||||
|
@ -834,19 +830,21 @@ public class DFSUtil {
|
||||||
// nor the rpc-address (which overrides defaultFS) is given.
|
// nor the rpc-address (which overrides defaultFS) is given.
|
||||||
if (!uriFound) {
|
if (!uriFound) {
|
||||||
URI defaultUri = FileSystem.getDefaultUri(conf);
|
URI defaultUri = FileSystem.getDefaultUri(conf);
|
||||||
|
if (defaultUri != null) {
|
||||||
|
// checks if defaultUri is ip:port format
|
||||||
|
// and convert it to hostname:port format
|
||||||
|
if (defaultUri.getPort() != -1) {
|
||||||
|
defaultUri = createUri(defaultUri.getScheme(),
|
||||||
|
NetUtils.createSocketAddr(defaultUri.getHost(),
|
||||||
|
defaultUri.getPort()));
|
||||||
|
}
|
||||||
|
|
||||||
// checks if defaultUri is ip:port format
|
defaultUri = trimUri(defaultUri);
|
||||||
// and convert it to hostname:port format
|
|
||||||
if (defaultUri != null && (defaultUri.getPort() != -1)) {
|
|
||||||
defaultUri = createUri(defaultUri.getScheme(),
|
|
||||||
NetUtils.createSocketAddr(defaultUri.getHost(),
|
|
||||||
defaultUri.getPort()));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (defaultUri != null &&
|
if (HdfsConstants.HDFS_URI_SCHEME.equals(defaultUri.getScheme()) &&
|
||||||
HdfsConstants.HDFS_URI_SCHEME.equals(defaultUri.getScheme()) &&
|
!nonPreferredUris.contains(defaultUri)) {
|
||||||
!nonPreferredUris.contains(defaultUri)) {
|
ret.add(defaultUri);
|
||||||
ret.add(defaultUri);
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1175,16 +1173,30 @@ public class DFSUtil {
|
||||||
public boolean match(InetSocketAddress s);
|
public boolean match(InetSocketAddress s);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Create a URI from the scheme and address */
|
/** Create an URI from scheme and address. */
|
||||||
public static URI createUri(String scheme, InetSocketAddress address) {
|
public static URI createUri(String scheme, InetSocketAddress address) {
|
||||||
|
return createUri(scheme, address.getHostName(), address.getPort());
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Create an URI from scheme, host, and port. */
|
||||||
|
public static URI createUri(String scheme, String host, int port) {
|
||||||
try {
|
try {
|
||||||
return new URI(scheme, null, address.getHostName(), address.getPort(),
|
return new URI(scheme, null, host, port, null, null, null);
|
||||||
null, null, null);
|
} catch (URISyntaxException x) {
|
||||||
} catch (URISyntaxException ue) {
|
throw new IllegalArgumentException(x.getMessage(), x);
|
||||||
throw new IllegalArgumentException(ue);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Remove unnecessary path from HDFS URI. */
|
||||||
|
static URI trimUri(URI uri) {
|
||||||
|
String path = uri.getPath();
|
||||||
|
if (HdfsConstants.HDFS_URI_SCHEME.equals(uri.getScheme()) &&
|
||||||
|
path != null && !path.isEmpty()) {
|
||||||
|
uri = createUri(uri.getScheme(), uri.getHost(), uri.getPort());
|
||||||
|
}
|
||||||
|
return uri;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add protobuf based protocol to the {@link org.apache.hadoop.ipc.RPC.Server}
|
* Add protobuf based protocol to the {@link org.apache.hadoop.ipc.RPC.Server}
|
||||||
* @param conf configuration
|
* @param conf configuration
|
||||||
|
|
|
@ -81,7 +81,11 @@ import org.junit.Test;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
public class TestDFSUtil {
|
public class TestDFSUtil {
|
||||||
|
|
||||||
|
static final String NS1_NN_ADDR = "ns1-nn.example.com:9820";
|
||||||
|
static final String NS1_NN1_ADDR = "ns1-nn1.example.com:9820";
|
||||||
|
static final String NS1_NN2_ADDR = "ns1-nn2.example.com:9820";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reset to default UGI settings since some tests change them.
|
* Reset to default UGI settings since some tests change them.
|
||||||
*/
|
*/
|
||||||
|
@ -587,8 +591,6 @@ public class TestDFSUtil {
|
||||||
@Test
|
@Test
|
||||||
public void testGetHaNnHttpAddresses() throws IOException {
|
public void testGetHaNnHttpAddresses() throws IOException {
|
||||||
final String LOGICAL_HOST_NAME = "ns1";
|
final String LOGICAL_HOST_NAME = "ns1";
|
||||||
final String NS1_NN1_ADDR = "ns1-nn1.example.com:9820";
|
|
||||||
final String NS1_NN2_ADDR = "ns1-nn2.example.com:9820";
|
|
||||||
|
|
||||||
Configuration conf = createWebHDFSHAConfiguration(LOGICAL_HOST_NAME, NS1_NN1_ADDR, NS1_NN2_ADDR);
|
Configuration conf = createWebHDFSHAConfiguration(LOGICAL_HOST_NAME, NS1_NN1_ADDR, NS1_NN2_ADDR);
|
||||||
|
|
||||||
|
@ -637,9 +639,6 @@ public class TestDFSUtil {
|
||||||
public void testGetNNUris() throws Exception {
|
public void testGetNNUris() throws Exception {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
final String NS1_NN_ADDR = "ns1-nn.example.com:9820";
|
|
||||||
final String NS1_NN1_ADDR = "ns1-nn1.example.com:9820";
|
|
||||||
final String NS1_NN2_ADDR = "ns1-nn2.example.com:9820";
|
|
||||||
final String NS2_NN_ADDR = "ns2-nn.example.com:9820";
|
final String NS2_NN_ADDR = "ns2-nn.example.com:9820";
|
||||||
final String NN1_ADDR = "nn.example.com:9820";
|
final String NN1_ADDR = "nn.example.com:9820";
|
||||||
final String NN1_SRVC_ADDR = "nn.example.com:9821";
|
final String NN1_SRVC_ADDR = "nn.example.com:9821";
|
||||||
|
@ -648,12 +647,10 @@ public class TestDFSUtil {
|
||||||
conf.set(DFS_NAMESERVICES, "ns1");
|
conf.set(DFS_NAMESERVICES, "ns1");
|
||||||
conf.set(DFSUtil.addKeySuffixes(
|
conf.set(DFSUtil.addKeySuffixes(
|
||||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns1"), NS1_NN1_ADDR);
|
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns1"), NS1_NN1_ADDR);
|
||||||
|
|
||||||
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "hdfs://" + NN2_ADDR);
|
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "hdfs://" + NN2_ADDR);
|
||||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
|
||||||
|
|
||||||
Collection<URI> uris = DFSUtil.getInternalNsRpcUris(conf);
|
Collection<URI> uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 2, uris.size());
|
assertEquals("Incorrect number of URIs returned", 2, uris.size());
|
||||||
assertTrue("Missing URI for name service ns1",
|
assertTrue("Missing URI for name service ns1",
|
||||||
uris.contains(new URI("hdfs://" + NS1_NN1_ADDR)));
|
uris.contains(new URI("hdfs://" + NS1_NN1_ADDR)));
|
||||||
|
@ -668,15 +665,11 @@ public class TestDFSUtil {
|
||||||
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn1"), NS1_NN1_ADDR);
|
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn1"), NS1_NN1_ADDR);
|
||||||
conf.set(DFSUtil.addKeySuffixes(
|
conf.set(DFSUtil.addKeySuffixes(
|
||||||
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn2"), NS1_NN2_ADDR);
|
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn2"), NS1_NN2_ADDR);
|
||||||
|
|
||||||
conf.set(DFSUtil.addKeySuffixes(
|
conf.set(DFSUtil.addKeySuffixes(
|
||||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns1"), NS1_NN_ADDR);
|
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns1"), NS1_NN_ADDR);
|
||||||
|
|
||||||
conf.set(DFSUtil.addKeySuffixes(
|
conf.set(DFSUtil.addKeySuffixes(
|
||||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns2"), NS2_NN_ADDR);
|
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns2"), NS2_NN_ADDR);
|
||||||
|
|
||||||
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, "hdfs://" + NN1_ADDR);
|
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, "hdfs://" + NN1_ADDR);
|
||||||
|
|
||||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN2_ADDR);
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN2_ADDR);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -714,7 +707,6 @@ public class TestDFSUtil {
|
||||||
+ "ConfiguredFailoverProxyProvider");
|
+ "ConfiguredFailoverProxyProvider");
|
||||||
|
|
||||||
uris = DFSUtil.getInternalNsRpcUris(conf);
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 3, uris.size());
|
assertEquals("Incorrect number of URIs returned", 3, uris.size());
|
||||||
assertTrue("Missing URI for name service ns1",
|
assertTrue("Missing URI for name service ns1",
|
||||||
uris.contains(new URI("hdfs://ns1")));
|
uris.contains(new URI("hdfs://ns1")));
|
||||||
|
@ -728,7 +720,6 @@ public class TestDFSUtil {
|
||||||
"viewfs://vfs-name.example.com");
|
"viewfs://vfs-name.example.com");
|
||||||
|
|
||||||
uris = DFSUtil.getInternalNsRpcUris(conf);
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 3, uris.size());
|
assertEquals("Incorrect number of URIs returned", 3, uris.size());
|
||||||
assertTrue("Missing URI for name service ns1",
|
assertTrue("Missing URI for name service ns1",
|
||||||
uris.contains(new URI("hdfs://ns1")));
|
uris.contains(new URI("hdfs://ns1")));
|
||||||
|
@ -742,7 +733,6 @@ public class TestDFSUtil {
|
||||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1");
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1");
|
||||||
|
|
||||||
uris = DFSUtil.getInternalNsRpcUris(conf);
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 3, uris.size());
|
assertEquals("Incorrect number of URIs returned", 3, uris.size());
|
||||||
assertTrue("Missing URI for name service ns1",
|
assertTrue("Missing URI for name service ns1",
|
||||||
uris.contains(new URI("hdfs://ns1")));
|
uris.contains(new URI("hdfs://ns1")));
|
||||||
|
@ -756,7 +746,6 @@ public class TestDFSUtil {
|
||||||
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
|
||||||
|
|
||||||
uris = DFSUtil.getInternalNsRpcUris(conf);
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
||||||
assertTrue("Missing URI for RPC address (defaultFS)",
|
assertTrue("Missing URI for RPC address (defaultFS)",
|
||||||
uris.contains(new URI("hdfs://" + NN1_ADDR)));
|
uris.contains(new URI("hdfs://" + NN1_ADDR)));
|
||||||
|
@ -766,7 +755,6 @@ public class TestDFSUtil {
|
||||||
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, NN2_ADDR);
|
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, NN2_ADDR);
|
||||||
|
|
||||||
uris = DFSUtil.getInternalNsRpcUris(conf);
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
||||||
assertTrue("Missing URI for RPC address",
|
assertTrue("Missing URI for RPC address",
|
||||||
uris.contains(new URI("hdfs://" + NN2_ADDR)));
|
uris.contains(new URI("hdfs://" + NN2_ADDR)));
|
||||||
|
@ -778,7 +766,6 @@ public class TestDFSUtil {
|
||||||
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_ADDR);
|
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_ADDR);
|
||||||
|
|
||||||
uris = DFSUtil.getInternalNsRpcUris(conf);
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
||||||
assertTrue("Missing URI for service ns1",
|
assertTrue("Missing URI for service ns1",
|
||||||
uris.contains(new URI("hdfs://" + NN1_ADDR)));
|
uris.contains(new URI("hdfs://" + NN1_ADDR)));
|
||||||
|
@ -790,12 +777,41 @@ public class TestDFSUtil {
|
||||||
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_SRVC_ADDR);
|
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_SRVC_ADDR);
|
||||||
|
|
||||||
uris = DFSUtil.getInternalNsRpcUris(conf);
|
uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
|
||||||
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
||||||
assertTrue("Missing URI for service address",
|
assertTrue("Missing URI for service address",
|
||||||
uris.contains(new URI("hdfs://" + NN1_SRVC_ADDR)));
|
uris.contains(new URI("hdfs://" + NN1_SRVC_ADDR)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetNNUris2() throws Exception {
|
||||||
|
// Make sure that an HA URI plus a slash being the default URI doesn't
|
||||||
|
// result in multiple entries being returned.
|
||||||
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
conf.set(DFS_NAMESERVICES, "ns1");
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, "ns1"),
|
||||||
|
"nn1,nn2");
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(
|
||||||
|
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn1"), NS1_NN1_ADDR);
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(
|
||||||
|
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn2"), NS1_NN2_ADDR);
|
||||||
|
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(
|
||||||
|
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns1"), NS1_NN_ADDR);
|
||||||
|
|
||||||
|
String proxyProviderKey = HdfsClientConfigKeys.Failover.
|
||||||
|
PROXY_PROVIDER_KEY_PREFIX + ".ns1";
|
||||||
|
conf.set(proxyProviderKey, "org.apache.hadoop.hdfs.server.namenode.ha."
|
||||||
|
+ "ConfiguredFailoverProxyProvider");
|
||||||
|
|
||||||
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1/");
|
||||||
|
|
||||||
|
Collection<URI> uris = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
|
||||||
|
assertEquals("Incorrect number of URIs returned", 1, uris.size());
|
||||||
|
assertTrue("Missing URI for name service ns1",
|
||||||
|
uris.contains(new URI("hdfs://ns1")));
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout=15000)
|
@Test (timeout=15000)
|
||||||
public void testLocalhostReverseLookup() {
|
public void testLocalhostReverseLookup() {
|
||||||
// 127.0.0.1 -> localhost reverse resolution does not happen on Windows.
|
// 127.0.0.1 -> localhost reverse resolution does not happen on Windows.
|
||||||
|
|
Loading…
Reference in New Issue