diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java
index c8b4135d088..dbd9184a2b9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java
@@ -43,6 +43,11 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_C
@InterfaceStability.Evolving
public final class CallerContext {
public static final Charset SIGNATURE_ENCODING = StandardCharsets.UTF_8;
+
+ // field names
+ public static final String CLIENT_IP_STR = "clientIp";
+ public static final String CLIENT_PORT_STR = "clientPort";
+
/** The caller context.
*
* It will be truncated if it exceeds the maximum allowed length in
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
index 67d151862be..973e6a20d49 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
@@ -1509,7 +1509,19 @@ public class UserGroupInformation {
return null;
}
-
+ /**
+ * If this is a proxy user, get the real user. Otherwise, return
+ * this user.
+ * @param user the user to check
+ * @return the real user or self
+ */
+ public static UserGroupInformation getRealUserOrSelf(UserGroupInformation user) {
+ if (user == null) {
+ return null;
+ }
+ UserGroupInformation real = user.getRealUser();
+ return real != null ? real : user;
+ }
/**
* This class is used for storing the groups for testing. It stores a local
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 16cd7a52cb4..ef84f301a90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -545,8 +545,8 @@ public class RouterRpcClient {
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.Builder builder =
new CallerContext.Builder("", contextFieldSeparator)
- .append(CLIENT_IP_STR, Server.getRemoteAddress())
- .append(CLIENT_PORT_STR,
+ .append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress())
+ .append(CallerContext.CLIENT_PORT_STR,
Integer.toString(Server.getRemotePort()))
.setSignature(origSignature);
// Append the original caller context
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
index de45645db09..dce2c654669 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
@@ -362,6 +362,20 @@ With this setting a user can interact with `ns-fed` as a regular namespace:
This federated namespace can also be set as the default one at **core-site.xml** using `fs.defaultFS`.
+NameNode configuration
+--------------------
+
+In order for the system to support data-locality, you must configure your NameNodes so that they will trust the routers to supply the user's client IP address. `dfs.namenode.ip-proxy-users` defines a comma separated list of users that are allowed to provide the client ip address via the caller context.
+
+```xml
+
+
+ dfs.namenode.ip-proxy-users
+ hdfs
+
+
+```
+
Router configuration
--------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f47ad6c1393..7df1c5cfb73 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -938,6 +938,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.lifeline.handler.count";
public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
+ // List of users that can override their client ip
+ public static final String DFS_NAMENODE_IP_PROXY_USERS = "dfs.namenode.ip-proxy-users";
public static final String DFS_HTTP_POLICY_KEY = "dfs.http.policy";
public static final String DFS_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY.name();
public static final String DFS_DATANODE_HTTPSERVER_FILTER_HANDLERS = "dfs.datanode.httpserver.filter.handlers";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 318442d2799..3ce8a80be65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -387,7 +387,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
@Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
registry.newRatesWithAggregation("detailedLockHoldTimeMetrics");
- private static final String CLIENT_PORT_STR = "clientPort";
private final String contextFieldSeparator;
boolean isAuditEnabled() {
@@ -453,7 +452,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.setCurrent(
new CallerContext.Builder(origContext, contextFieldSeparator)
- .append(CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
+ .append(CallerContext.CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
.setSignature(origSignature)
.build());
}
@@ -461,7 +460,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private boolean isClientPortInfoAbsent(CallerContext ctx){
return ctx == null || ctx.getContext() == null
- || !ctx.getContext().contains(CLIENT_PORT_STR);
+ || !ctx.getContext().contains(CallerContext.CLIENT_PORT_STR);
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index e5d8aa9a40d..e6610e0e505 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENG
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
@@ -45,7 +46,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -267,6 +270,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
private final String defaultECPolicyName;
+ // Users who can override the client ip
+ private final String[] ipProxyUsers;
+
public NameNodeRpcServer(Configuration conf, NameNode nn)
throws IOException {
this.nn = nn;
@@ -277,6 +283,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
int handlerCount =
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
+ ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class);
@@ -1890,7 +1897,29 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
}
- private static String getClientMachine() {
+ private String getClientMachine() {
+ if (ipProxyUsers != null) {
+ // Get the real user (or effective if it isn't a proxy user)
+ UserGroupInformation user =
+ UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser());
+ if (user != null &&
+ ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) {
+ CallerContext context = CallerContext.getCurrent();
+ if (context != null && context.isContextValid()) {
+ String cc = context.getContext();
+ // if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
+ // return "1.2.3.4" as the client machine.
+ String key = CallerContext.CLIENT_IP_STR +
+ CallerContext.Builder.KEY_VALUE_SEPARATOR;
+ int posn = cc.indexOf(key);
+ if (posn != -1) {
+ posn += key.length();
+ int end = cc.indexOf(",", posn);
+ return end == -1 ? cc.substring(posn) : cc.substring(posn, end);
+ }
+ }
+ }
+ }
String clientMachine = Server.getRemoteAddress();
if (clientMachine == null) { //not a RPC client
clientMachine = "";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index c24e288a608..925d42e3937 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -544,6 +544,17 @@
+
+ dfs.namenode.ip-proxy-users
+
+ A comma separated list of user names that are allowed by the
+ NameNode to specify a different client IP address in the caller context.
+ This is used by Router-Based Federation (RBF) to provide the actual client's
+ IP address to the NameNode, which is critical to preserve data locality when
+ using RBF. If you are using RBF, add the user that runs the routers.
+
+
+
dfs.namenode.acls.enabled
true
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java
index ada93e84f0e..74d85bc637e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,14 +24,25 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
public class TestNameNodeRpcServer {
@@ -59,5 +70,88 @@ public class TestNameNodeRpcServer {
conf.unset(DFS_NAMENODE_RPC_BIND_HOST_KEY);
}
}
+
+ /**
+ * Get the preferred DataNode location for the first block of the
+ * given file.
+ * @param fs The file system to use
+ * @param p The path to use
+ * @return the preferred host to get the data
+ */
+ private static String getPreferredLocation(DistributedFileSystem fs,
+ Path p) throws IOException{
+ // Use getLocatedBlocks because it is the basis for HDFS open,
+ // but provides visibility into which host will be used.
+ LocatedBlocks blocks = fs.getClient()
+ .getLocatedBlocks(p.toUri().getPath(), 0);
+ return blocks.get(0).getLocations()[0].getHostName();
+ }
+
+ // Because of the randomness of the NN assigning DN, we run multiple
+ // trials. 1/3^20=3e-10, so that should be good enough.
+ static final int ITERATIONS_TO_USE = 20;
+
+ /**
+ * A test to make sure that if an authorized user adds "clientIp:" to their
+ * caller context, it will be used to make locality decisions on the NN.
+ */
+ @Test
+ public void testNamenodeRpcClientIpProxy() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+
+ conf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe");
+ // Make 3 nodes & racks so that we have a decent shot of detecting when
+ // our change overrides the random choice of datanode.
+ final String[] racks = new String[]{"/rack1", "/rack2", "/rack3"};
+ final String[] hosts = new String[]{"node1", "node2", "node3"};
+ MiniDFSCluster cluster = null;
+ final CallerContext original = CallerContext.getCurrent();
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf)
+ .racks(racks).hosts(hosts).numDataNodes(hosts.length)
+ .build();
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ // Write a sample file
+ final Path fooName = fs.makeQualified(new Path("/foo"));
+ FSDataOutputStream stream = fs.create(fooName);
+ stream.write("Hello world!\n".getBytes(StandardCharsets.UTF_8));
+ stream.close();
+ // Set the caller context to set the ip address
+ CallerContext.setCurrent(
+ new CallerContext.Builder("test", conf)
+ .append(CallerContext.CLIENT_IP_STR, hosts[0])
+ .build());
+ // Should get a random mix of DataNodes since we aren't joe.
+ for (int trial = 0; trial < ITERATIONS_TO_USE; ++trial) {
+ String host = getPreferredLocation(fs, fooName);
+ if (!hosts[0].equals(host)) {
+ // found some other host, so things are good
+ break;
+ } else if (trial == ITERATIONS_TO_USE - 1) {
+ assertNotEquals("Failed to get non-node1", hosts[0], host);
+ }
+ }
+ // Run as fake joe to authorize the test
+ UserGroupInformation joe =
+ UserGroupInformation.createUserForTesting("fake_joe",
+ new String[]{"fake_group"});
+ DistributedFileSystem joeFs =
+ (DistributedFileSystem) DFSTestUtil.getFileSystemAs(joe, conf);
+ // As joe, we should get all node1.
+ for (int trial = 0; trial < ITERATIONS_TO_USE; ++trial) {
+ String host = getPreferredLocation(joeFs, fooName);
+ assertEquals("Trial " + trial + " failed", hosts[0], host);
+ }
+ } finally {
+ CallerContext.setCurrent(original);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ // Reset the config
+ conf.unset(DFS_NAMENODE_IP_PROXY_USERS);
+ }
+ }
}