HDFS-13433. webhdfs requests can be routed incorrectly in federated cluster. Contributed by Arpit Agarwal.

This commit is contained in:
Arpit Agarwal 2018-04-23 10:08:53 -07:00
parent 83e5f25d5e
commit c533c77047
4 changed files with 237 additions and 39 deletions

View File

@ -191,7 +191,7 @@ public class DFSUtilClient {
/**
* Returns collection of nameservice Ids from the configuration.
* @param conf configuration
* @return collection of nameservice Ids, or null if not specified
* @return collection of nameservice Ids. Empty list if unspecified.
*/
public static Collection<String> getNameServiceIds(Configuration conf) {
return conf.getTrimmedStringCollection(DFS_NAMESERVICES);

View File

@ -453,43 +453,6 @@ public class NameNode extends ReconfigurableBase implements
return getClientNamenodeAddress();
}
/**
* Set the namenode address that will be used by clients to access this
* namenode or name service. This needs to be called before the config
* is overriden.
*/
public void setClientNamenodeAddress(Configuration conf) {
String nnAddr = conf.get(FS_DEFAULT_NAME_KEY);
if (nnAddr == null) {
// default fs is not set.
clientNamenodeAddress = null;
return;
}
LOG.info("{} is {}", FS_DEFAULT_NAME_KEY, nnAddr);
URI nnUri = URI.create(nnAddr);
String nnHost = nnUri.getHost();
if (nnHost == null) {
clientNamenodeAddress = null;
return;
}
if (DFSUtilClient.getNameServiceIds(conf).contains(nnHost)) {
// host name is logical
clientNamenodeAddress = nnHost;
} else if (nnUri.getPort() > 0) {
// physical address with a valid port
clientNamenodeAddress = nnUri.getAuthority();
} else {
// the port is missing or 0. Figure out real bind address later.
clientNamenodeAddress = null;
return;
}
LOG.info("Clients are to use {} to access"
+ " this namenode/service.", clientNamenodeAddress );
}
/**
* Get the namenode address to be used by clients.
* @return nn address
@ -956,9 +919,15 @@ public class NameNode extends ReconfigurableBase implements
this.tracerConfigurationManager =
new TracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf);
this.role = role;
setClientNamenodeAddress(conf);
String nsId = getNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
clientNamenodeAddress = NameNodeUtils.getClientNamenodeAddress(
conf, nsId);
if (clientNamenodeAddress != null) {
LOG.info("Clients should use {} to access"
+ " this namenode/service.", clientNamenodeAddress);
}
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
state = createHAState(getStartupOption(conf));
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Collection;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
/**
* Utility functions for the NameNode.
*/
@InterfaceAudience.Private
public final class NameNodeUtils {
public static final Logger LOG = LoggerFactory.getLogger(NameNodeUtils.class);
/**
* Return the namenode address that will be used by clients to access this
* namenode or name service. This needs to be called before the config
* is overriden.
*
* This method behaves as follows:
*
* 1. fs.defaultFS is undefined:
* - return null.
* 2. fs.defaultFS is defined but has no hostname (logical or physical):
* - return null.
* 3. Single NN (no HA, no federation):
* - return URI authority from fs.defaultFS
* 4. Current NN is in an HA nameservice (with or without federation):
* - return nameservice for current NN.
* 5. Current NN is in non-HA namespace, federated cluster:
* - return value of dfs.namenode.rpc-address.[nsId].[nnId]
* - If the above key is not defined, then return authority from
* fs.defaultFS if the port number is > 0.
* 6. If port number in the authority is missing or zero in step 6:
* - return null
*/
@VisibleForTesting
@Nullable
static String getClientNamenodeAddress(
Configuration conf, @Nullable String nsId) {
final Collection<String> nameservices =
DFSUtilClient.getNameServiceIds(conf);
final String nnAddr = conf.get(FS_DEFAULT_NAME_KEY);
if (nnAddr == null) {
// default fs is not set.
return null;
}
LOG.info("{} is {}", FS_DEFAULT_NAME_KEY, nnAddr);
final URI nnUri = URI.create(nnAddr);
String defaultNnHost = nnUri.getHost();
if (defaultNnHost == null) {
return null;
}
// Current Nameservice is HA.
if (nsId != null && nameservices.contains(nsId)) {
final Collection<String> namenodes = conf.getTrimmedStringCollection(
DFS_HA_NAMENODES_KEY_PREFIX + "." + nsId);
if (namenodes.size() > 1) {
return nsId;
}
}
// Federation without HA. We must handle the case when the current NN
// is not in the default nameservice.
String currentNnAddress = null;
if (nsId != null) {
String hostNameKey = DFS_NAMENODE_RPC_ADDRESS_KEY + "." + nsId;
currentNnAddress = conf.get(hostNameKey);
}
// Fallback to the address in fs.defaultFS.
if (currentNnAddress == null) {
currentNnAddress = nnUri.getAuthority();
}
int port = 0;
if (currentNnAddress.contains(":")) {
port = Integer.parseInt(currentNnAddress.split(":")[1]);
}
if (port > 0) {
return currentNnAddress;
} else {
// the port is missing or 0. Figure out real bind address later.
return null;
}
}
private NameNodeUtils() {
// Disallow construction
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
/**
* Test that {@link NameNodeUtils#getClientNamenodeAddress} correctly
* computes the client address for WebHDFS redirects for different
* combinations of HA, federated and single NN setups.
*/
public class TestClientNameNodeAddress {
public static final Logger LOG = LoggerFactory.getLogger(
TestClientNameNodeAddress.class);
@Rule
public Timeout globalTimeout = new Timeout(300000);
@Test
public void testSimpleConfig() {
final Configuration conf = new HdfsConfiguration();
conf.set(FS_DEFAULT_NAME_KEY, "hdfs://host1:100");
assertThat(NameNodeUtils.getClientNamenodeAddress(conf, null),
is("host1:100"));
}
@Test
public void testSimpleWithoutPort() {
final Configuration conf = new HdfsConfiguration();
conf.set(FS_DEFAULT_NAME_KEY, "hdfs://host1");
assertNull(NameNodeUtils.getClientNamenodeAddress(conf, null));
}
@Test
public void testWithNoDefaultFs() {
final Configuration conf = new HdfsConfiguration();
assertNull(NameNodeUtils.getClientNamenodeAddress(conf, null));
}
@Test
public void testWithNoHost() {
final Configuration conf = new HdfsConfiguration();
conf.set(FS_DEFAULT_NAME_KEY, "hdfs:///");
assertNull(NameNodeUtils.getClientNamenodeAddress(conf, null));
}
@Test
public void testFederationWithHa() {
final Configuration conf = new HdfsConfiguration();
conf.set(FS_DEFAULT_NAME_KEY, "hdfs://ns1");
conf.set(DFS_NAMESERVICES, "ns1,ns2");
conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1,nn2");
conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ns2", "nn1,nn2");
// The current namenode belongs to ns1 and ns1 is the default nameservice.
assertThat(NameNodeUtils.getClientNamenodeAddress(conf, "ns1"),
is("ns1"));
// The current namenode belongs to ns2 and ns1 is the default nameservice.
assertThat(NameNodeUtils.getClientNamenodeAddress(conf, "ns2"),
is("ns2"));
}
@Test
public void testFederationWithoutHa() {
final Configuration conf = new HdfsConfiguration();
conf.set(FS_DEFAULT_NAME_KEY, "hdfs://host1:100");
conf.set(DFS_NAMESERVICES, "ns1,ns2");
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + ".ns1", "host1:100");
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + ".ns2", "host2:200");
assertThat(NameNodeUtils.getClientNamenodeAddress(conf, "ns1"),
is("host1:100"));
assertThat(NameNodeUtils.getClientNamenodeAddress(conf, "ns2"),
is("host2:200"));
}
}