HDFS-13248: Namenode needs to use the actual client IP when going through the
RBF proxy. There is a new configuration knob dfs.namenode.ip-proxy-users that configures the list of users than can set their client ip address using the client context. Fixes #4081
This commit is contained in:
parent
c3124a3f6e
commit
1d5650c4d0
|
@ -43,6 +43,11 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_C
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public final class CallerContext {
|
public final class CallerContext {
|
||||||
public static final Charset SIGNATURE_ENCODING = StandardCharsets.UTF_8;
|
public static final Charset SIGNATURE_ENCODING = StandardCharsets.UTF_8;
|
||||||
|
|
||||||
|
// field names
|
||||||
|
public static final String CLIENT_IP_STR = "clientIp";
|
||||||
|
public static final String CLIENT_PORT_STR = "clientPort";
|
||||||
|
|
||||||
/** The caller context.
|
/** The caller context.
|
||||||
*
|
*
|
||||||
* It will be truncated if it exceeds the maximum allowed length in
|
* It will be truncated if it exceeds the maximum allowed length in
|
||||||
|
|
|
@ -1508,7 +1508,19 @@ public class UserGroupInformation {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If this is a proxy user, get the real user. Otherwise, return
|
||||||
|
* this user.
|
||||||
|
* @param user the user to check
|
||||||
|
* @return the real user or self
|
||||||
|
*/
|
||||||
|
public static UserGroupInformation getRealUserOrSelf(UserGroupInformation user) {
|
||||||
|
if (user == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
UserGroupInformation real = user.getRealUser();
|
||||||
|
return real != null ? real : user;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is used for storing the groups for testing. It stores a local
|
* This class is used for storing the groups for testing. It stores a local
|
||||||
|
|
|
@ -131,9 +131,6 @@ public class RouterRpcClient {
|
||||||
private static final Pattern STACK_TRACE_PATTERN =
|
private static final Pattern STACK_TRACE_PATTERN =
|
||||||
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
|
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
|
||||||
|
|
||||||
private static final String CLIENT_IP_STR = "clientIp";
|
|
||||||
private static final String CLIENT_PORT_STR = "clientPort";
|
|
||||||
|
|
||||||
/** Fairness manager to control handlers assigned per NS. */
|
/** Fairness manager to control handlers assigned per NS. */
|
||||||
private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
|
private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
|
||||||
private Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>();
|
private Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>();
|
||||||
|
@ -597,8 +594,8 @@ public class RouterRpcClient {
|
||||||
byte[] origSignature = ctx == null ? null : ctx.getSignature();
|
byte[] origSignature = ctx == null ? null : ctx.getSignature();
|
||||||
CallerContext.Builder builder =
|
CallerContext.Builder builder =
|
||||||
new CallerContext.Builder("", contextFieldSeparator)
|
new CallerContext.Builder("", contextFieldSeparator)
|
||||||
.append(CLIENT_IP_STR, Server.getRemoteAddress())
|
.append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress())
|
||||||
.append(CLIENT_PORT_STR,
|
.append(CallerContext.CLIENT_PORT_STR,
|
||||||
Integer.toString(Server.getRemotePort()))
|
Integer.toString(Server.getRemotePort()))
|
||||||
.setSignature(origSignature);
|
.setSignature(origSignature);
|
||||||
// Append the original caller context
|
// Append the original caller context
|
||||||
|
|
|
@ -375,6 +375,20 @@ With this setting a user can interact with `ns-fed` as a regular namespace:
|
||||||
This federated namespace can also be set as the default one at **core-site.xml** using `fs.defaultFS`.
|
This federated namespace can also be set as the default one at **core-site.xml** using `fs.defaultFS`.
|
||||||
|
|
||||||
|
|
||||||
|
NameNode configuration
|
||||||
|
--------------------
|
||||||
|
|
||||||
|
In order for the system to support data-locality, you must configure your NameNodes so that they will trust the routers to supply the user's client IP address. `dfs.namenode.ip-proxy-users` defines a comma separated list of users that are allowed to provide the client ip address via the caller context.
|
||||||
|
|
||||||
|
```xml
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.ip-proxy-users</name>
|
||||||
|
<value>hdfs</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
||||||
|
```
|
||||||
|
|
||||||
Router configuration
|
Router configuration
|
||||||
--------------------
|
--------------------
|
||||||
|
|
||||||
|
|
|
@ -991,6 +991,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
"dfs.namenode.lifeline.handler.count";
|
"dfs.namenode.lifeline.handler.count";
|
||||||
public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
|
public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
|
||||||
public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
|
public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
|
||||||
|
// List of users that can override their client ip
|
||||||
|
public static final String DFS_NAMENODE_IP_PROXY_USERS = "dfs.namenode.ip-proxy-users";
|
||||||
public static final String DFS_HTTP_POLICY_KEY = "dfs.http.policy";
|
public static final String DFS_HTTP_POLICY_KEY = "dfs.http.policy";
|
||||||
public static final String DFS_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY.name();
|
public static final String DFS_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY.name();
|
||||||
public static final String DFS_DATANODE_HTTPSERVER_FILTER_HANDLERS = "dfs.datanode.httpserver.filter.handlers";
|
public static final String DFS_DATANODE_HTTPSERVER_FILTER_HANDLERS = "dfs.datanode.httpserver.filter.handlers";
|
||||||
|
|
|
@ -401,7 +401,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
@Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
|
@Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
|
||||||
registry.newRatesWithAggregation("detailedLockHoldTimeMetrics");
|
registry.newRatesWithAggregation("detailedLockHoldTimeMetrics");
|
||||||
|
|
||||||
private static final String CLIENT_PORT_STR = "clientPort";
|
|
||||||
private final String contextFieldSeparator;
|
private final String contextFieldSeparator;
|
||||||
|
|
||||||
boolean isAuditEnabled() {
|
boolean isAuditEnabled() {
|
||||||
|
@ -467,7 +466,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
byte[] origSignature = ctx == null ? null : ctx.getSignature();
|
byte[] origSignature = ctx == null ? null : ctx.getSignature();
|
||||||
CallerContext.setCurrent(
|
CallerContext.setCurrent(
|
||||||
new CallerContext.Builder(origContext, contextFieldSeparator)
|
new CallerContext.Builder(origContext, contextFieldSeparator)
|
||||||
.append(CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
|
.append(CallerContext.CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
|
||||||
.setSignature(origSignature)
|
.setSignature(origSignature)
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
@ -475,7 +474,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
|
|
||||||
private boolean isClientPortInfoAbsent(CallerContext ctx){
|
private boolean isClientPortInfoAbsent(CallerContext ctx){
|
||||||
return ctx == null || ctx.getContext() == null
|
return ctx == null || ctx.getContext() == null
|
||||||
|| !ctx.getContext().contains(CLIENT_PORT_STR);
|
|| !ctx.getContext().contains(CallerContext.CLIENT_PORT_STR);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENG
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
|
||||||
|
@ -45,6 +46,9 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.ArrayUtils;
|
||||||
|
import org.apache.hadoop.ipc.CallerContext;
|
||||||
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -267,6 +271,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
|
|
||||||
private final String defaultECPolicyName;
|
private final String defaultECPolicyName;
|
||||||
|
|
||||||
|
// Users who can override the client ip
|
||||||
|
private final String[] ipProxyUsers;
|
||||||
|
|
||||||
public NameNodeRpcServer(Configuration conf, NameNode nn)
|
public NameNodeRpcServer(Configuration conf, NameNode nn)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.nn = nn;
|
this.nn = nn;
|
||||||
|
@ -277,6 +284,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
int handlerCount =
|
int handlerCount =
|
||||||
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
|
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
|
||||||
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
|
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
|
||||||
|
ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);
|
||||||
|
|
||||||
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
|
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
|
||||||
ProtobufRpcEngine2.class);
|
ProtobufRpcEngine2.class);
|
||||||
|
@ -1899,7 +1907,29 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getClientMachine() {
|
private String getClientMachine() {
|
||||||
|
if (ipProxyUsers != null) {
|
||||||
|
// Get the real user (or effective if it isn't a proxy user)
|
||||||
|
UserGroupInformation user =
|
||||||
|
UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser());
|
||||||
|
if (user != null &&
|
||||||
|
ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) {
|
||||||
|
CallerContext context = CallerContext.getCurrent();
|
||||||
|
if (context != null && context.isContextValid()) {
|
||||||
|
String cc = context.getContext();
|
||||||
|
// if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
|
||||||
|
// return "1.2.3.4" as the client machine.
|
||||||
|
String key = CallerContext.CLIENT_IP_STR +
|
||||||
|
CallerContext.Builder.KEY_VALUE_SEPARATOR;
|
||||||
|
int posn = cc.indexOf(key);
|
||||||
|
if (posn != -1) {
|
||||||
|
posn += key.length();
|
||||||
|
int end = cc.indexOf(",", posn);
|
||||||
|
return end == -1 ? cc.substring(posn) : cc.substring(posn, end);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
String clientMachine = Server.getRemoteAddress();
|
String clientMachine = Server.getRemoteAddress();
|
||||||
if (clientMachine == null) { //not a RPC client
|
if (clientMachine == null) { //not a RPC client
|
||||||
clientMachine = "";
|
clientMachine = "";
|
||||||
|
|
|
@ -565,6 +565,17 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.ip-proxy-users</name>
|
||||||
|
<value></value>
|
||||||
|
<description>A comma separated list of user names that are allowed by the
|
||||||
|
NameNode to specify a different client IP address in the caller context.
|
||||||
|
This is used by Router-Based Federation (RBF) to provide the actual client's
|
||||||
|
IP address to the NameNode, which is critical to preserve data locality when
|
||||||
|
using RBF. If you are using RBF, add the user that runs the routers.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.acls.enabled</name>
|
<name>dfs.namenode.acls.enabled</name>
|
||||||
<value>true</value>
|
<value>true</value>
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -24,14 +24,25 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.ipc.CallerContext;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestNameNodeRpcServer {
|
public class TestNameNodeRpcServer {
|
||||||
|
@ -59,5 +70,88 @@ public class TestNameNodeRpcServer {
|
||||||
conf.unset(DFS_NAMENODE_RPC_BIND_HOST_KEY);
|
conf.unset(DFS_NAMENODE_RPC_BIND_HOST_KEY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the preferred DataNode location for the first block of the
|
||||||
|
* given file.
|
||||||
|
* @param fs The file system to use
|
||||||
|
* @param p The path to use
|
||||||
|
* @return the preferred host to get the data
|
||||||
|
*/
|
||||||
|
private static String getPreferredLocation(DistributedFileSystem fs,
|
||||||
|
Path p) throws IOException{
|
||||||
|
// Use getLocatedBlocks because it is the basis for HDFS open,
|
||||||
|
// but provides visibility into which host will be used.
|
||||||
|
LocatedBlocks blocks = fs.getClient()
|
||||||
|
.getLocatedBlocks(p.toUri().getPath(), 0);
|
||||||
|
return blocks.get(0).getLocations()[0].getHostName();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Because of the randomness of the NN assigning DN, we run multiple
|
||||||
|
// trials. 1/3^20=3e-10, so that should be good enough.
|
||||||
|
static final int ITERATIONS_TO_USE = 20;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A test to make sure that if an authorized user adds "clientIp:" to their
|
||||||
|
* caller context, it will be used to make locality decisions on the NN.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testNamenodeRpcClientIpProxy() throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
|
conf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe");
|
||||||
|
// Make 3 nodes & racks so that we have a decent shot of detecting when
|
||||||
|
// our change overrides the random choice of datanode.
|
||||||
|
final String[] racks = new String[]{"/rack1", "/rack2", "/rack3"};
|
||||||
|
final String[] hosts = new String[]{"node1", "node2", "node3"};
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
final CallerContext original = CallerContext.getCurrent();
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.racks(racks).hosts(hosts).numDataNodes(hosts.length)
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
// Write a sample file
|
||||||
|
final Path fooName = fs.makeQualified(new Path("/foo"));
|
||||||
|
FSDataOutputStream stream = fs.create(fooName);
|
||||||
|
stream.write("Hello world!\n".getBytes(StandardCharsets.UTF_8));
|
||||||
|
stream.close();
|
||||||
|
// Set the caller context to set the ip address
|
||||||
|
CallerContext.setCurrent(
|
||||||
|
new CallerContext.Builder("test", conf)
|
||||||
|
.append(CallerContext.CLIENT_IP_STR, hosts[0])
|
||||||
|
.build());
|
||||||
|
// Should get a random mix of DataNodes since we aren't joe.
|
||||||
|
for (int trial = 0; trial < ITERATIONS_TO_USE; ++trial) {
|
||||||
|
String host = getPreferredLocation(fs, fooName);
|
||||||
|
if (!hosts[0].equals(host)) {
|
||||||
|
// found some other host, so things are good
|
||||||
|
break;
|
||||||
|
} else if (trial == ITERATIONS_TO_USE - 1) {
|
||||||
|
assertNotEquals("Failed to get non-node1", hosts[0], host);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Run as fake joe to authorize the test
|
||||||
|
UserGroupInformation joe =
|
||||||
|
UserGroupInformation.createUserForTesting("fake_joe",
|
||||||
|
new String[]{"fake_group"});
|
||||||
|
DistributedFileSystem joeFs =
|
||||||
|
(DistributedFileSystem) DFSTestUtil.getFileSystemAs(joe, conf);
|
||||||
|
// As joe, we should get all node1.
|
||||||
|
for (int trial = 0; trial < ITERATIONS_TO_USE; ++trial) {
|
||||||
|
String host = getPreferredLocation(joeFs, fooName);
|
||||||
|
assertEquals("Trial " + trial + " failed", hosts[0], host);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
CallerContext.setCurrent(original);
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
// Reset the config
|
||||||
|
conf.unset(DFS_NAMENODE_IP_PROXY_USERS);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue