diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 97d3fba7bcf..351926bf3fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -128,6 +128,7 @@ public class RouterRpcClient { Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)"); private static final String CLIENT_IP_STR = "clientIp"; + private static final String CLIENT_PORT_STR = "clientPort"; /** * Create a router RPC client to manage remote procedure calls to NNs. @@ -416,7 +417,7 @@ public class RouterRpcClient { + router.getRouterId()); } - appendClientIpToCallerContextIfAbsent(); + appendClientIpPortToCallerContextIfAbsent(); Object ret = null; if (rpcMonitor != null) { @@ -534,25 +535,20 @@ public class RouterRpcClient { /** * For tracking which is the actual client address. - * It adds trace info "clientIp:ip" to caller context if it's absent. + * It adds trace info "clientIp:ip" and "clientPort:port" + * to caller context if they are absent. */ - private void appendClientIpToCallerContextIfAbsent() { - String clientIpInfo = CLIENT_IP_STR + ":" + Server.getRemoteAddress(); - final CallerContext ctx = CallerContext.getCurrent(); - if (isClientIpInfoAbsent(clientIpInfo, ctx)) { - String origContext = ctx == null ? null : ctx.getContext(); - byte[] origSignature = ctx == null ? null : ctx.getSignature(); - CallerContext.setCurrent( - new CallerContext.Builder(origContext, contextFieldSeparator) - .append(clientIpInfo) - .setSignature(origSignature) - .build()); - } - } - - private boolean isClientIpInfoAbsent(String clientIpInfo, CallerContext ctx){ - return ctx == null || ctx.getContext() == null - || !ctx.getContext().contains(clientIpInfo); + private void appendClientIpPortToCallerContextIfAbsent() { + CallerContext ctx = CallerContext.getCurrent(); + String origContext = ctx == null ? null : ctx.getContext(); + byte[] origSignature = ctx == null ? null : ctx.getSignature(); + CallerContext.setCurrent( + new CallerContext.Builder(origContext, contextFieldSeparator) + .appendIfAbsent(CLIENT_IP_STR, Server.getRemoteAddress()) + .appendIfAbsent(CLIENT_PORT_STR, + Integer.toString(Server.getRemotePort())) + .setSignature(origSignature) + .build()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index ceb291dc167..f2b86dcc759 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -97,6 +97,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; @@ -1867,4 +1868,51 @@ public class TestRouterRpc { .contains("callerContext=clientContext,clientIp:")); assertTrue(verifyFileExists(routerFS, dirPath)); } + + @Test + public void testSetBalancerBandwidth() throws Exception { + long defaultBandwidth = + DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT; + long newBandwidth = defaultBandwidth * 2; + routerProtocol.setBalancerBandwidth(newBandwidth); + ArrayList datanodes = cluster.getCluster().getDataNodes(); + GenericTestUtils.waitFor(() -> { + return datanodes.get(0).getBalancerBandwidth() == newBandwidth; + }, 100, 60 * 1000); + } + + @Test + public void testAddClientIpPortToCallerContext() throws IOException { + GenericTestUtils.LogCapturer auditLog = + GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog); + + // 1. ClientIp and ClientPort are not set on the client. + // Set client context. + CallerContext.setCurrent( + new CallerContext.Builder("clientContext").build()); + + // Create a directory via the router. + String dirPath = "/test"; + routerProtocol.mkdirs(dirPath, new FsPermission("755"), false); + + // The audit log should contains "clientIp:" and "clientPort:". + assertTrue(auditLog.getOutput().contains("clientIp:")); + assertTrue(auditLog.getOutput().contains("clientPort:")); + assertTrue(verifyFileExists(routerFS, dirPath)); + auditLog.clearOutput(); + + // 2. ClientIp and ClientPort are set on the client. + // Reset client context. + CallerContext.setCurrent( + new CallerContext.Builder( + "clientContext,clientIp:1.1.1.1,clientPort:1234").build()); + + // Create a directory via the router. + routerProtocol.getFileInfo(dirPath); + + // The audit log should contains the original clientIp and clientPort + // set by client. + assertTrue(auditLog.getOutput().contains("clientIp:1.1.1.1")); + assertTrue(auditLog.getOutput().contains("clientPort:1234")); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index e8903224e10..964879bb99e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -448,8 +448,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, private void appendClientPortToCallerContextIfAbsent() { final CallerContext ctx = CallerContext.getCurrent(); - if (isClientPortInfoAbsent(CLIENT_PORT_STR + ":" + Server.getRemotePort(), - ctx)) { + if (isClientPortInfoAbsent(ctx)) { String origContext = ctx == null ? null : ctx.getContext(); byte[] origSignature = ctx == null ? null : ctx.getSignature(); CallerContext.setCurrent( @@ -460,9 +459,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } } - private boolean isClientPortInfoAbsent(String clientPortInfo, CallerContext ctx){ + private boolean isClientPortInfoAbsent(CallerContext ctx){ return ctx == null || ctx.getContext() == null - || !ctx.getContext().contains(clientPortInfo); + || !ctx.getContext().contains(CLIENT_PORT_STR); } /**