HDFS-16310. RBF: Add client port to CallerContext for Router (#3635)

Cherry-picked from 5b05068f by Owen O'Malley
This commit is contained in:
litao 2021-11-18 12:46:35 +08:00 committed by Owen O'Malley
parent 0029f22d7d
commit 496657c63f
3 changed files with 66 additions and 23 deletions

View File

@ -128,6 +128,7 @@ public class RouterRpcClient {
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)"); Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
private static final String CLIENT_IP_STR = "clientIp"; private static final String CLIENT_IP_STR = "clientIp";
private static final String CLIENT_PORT_STR = "clientPort";
/** /**
* Create a router RPC client to manage remote procedure calls to NNs. * Create a router RPC client to manage remote procedure calls to NNs.
@ -416,7 +417,7 @@ public class RouterRpcClient {
+ router.getRouterId()); + router.getRouterId());
} }
appendClientIpToCallerContextIfAbsent(); appendClientIpPortToCallerContextIfAbsent();
Object ret = null; Object ret = null;
if (rpcMonitor != null) { if (rpcMonitor != null) {
@ -534,25 +535,20 @@ public class RouterRpcClient {
/** /**
* For tracking which is the actual client address. * For tracking which is the actual client address.
* It adds trace info "clientIp:ip" to caller context if it's absent. * It adds trace info "clientIp:ip" and "clientPort:port"
* to caller context if they are absent.
*/ */
private void appendClientIpToCallerContextIfAbsent() { private void appendClientIpPortToCallerContextIfAbsent() {
String clientIpInfo = CLIENT_IP_STR + ":" + Server.getRemoteAddress(); CallerContext ctx = CallerContext.getCurrent();
final CallerContext ctx = CallerContext.getCurrent(); String origContext = ctx == null ? null : ctx.getContext();
if (isClientIpInfoAbsent(clientIpInfo, ctx)) { byte[] origSignature = ctx == null ? null : ctx.getSignature();
String origContext = ctx == null ? null : ctx.getContext(); CallerContext.setCurrent(
byte[] origSignature = ctx == null ? null : ctx.getSignature(); new CallerContext.Builder(origContext, contextFieldSeparator)
CallerContext.setCurrent( .appendIfAbsent(CLIENT_IP_STR, Server.getRemoteAddress())
new CallerContext.Builder(origContext, contextFieldSeparator) .appendIfAbsent(CLIENT_PORT_STR,
.append(clientIpInfo) Integer.toString(Server.getRemotePort()))
.setSignature(origSignature) .setSignature(origSignature)
.build()); .build());
}
}
private boolean isClientIpInfoAbsent(String clientIpInfo, CallerContext ctx){
return ctx == null || ctx.getContext() == null
|| !ctx.getContext().contains(clientIpInfo);
} }
/** /**

View File

@ -97,6 +97,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
@ -1867,4 +1868,51 @@ public class TestRouterRpc {
.contains("callerContext=clientContext,clientIp:")); .contains("callerContext=clientContext,clientIp:"));
assertTrue(verifyFileExists(routerFS, dirPath)); assertTrue(verifyFileExists(routerFS, dirPath));
} }
@Test
public void testSetBalancerBandwidth() throws Exception {
long defaultBandwidth =
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT;
long newBandwidth = defaultBandwidth * 2;
routerProtocol.setBalancerBandwidth(newBandwidth);
ArrayList<DataNode> datanodes = cluster.getCluster().getDataNodes();
GenericTestUtils.waitFor(() -> {
return datanodes.get(0).getBalancerBandwidth() == newBandwidth;
}, 100, 60 * 1000);
}
@Test
public void testAddClientIpPortToCallerContext() throws IOException {
GenericTestUtils.LogCapturer auditLog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
// 1. ClientIp and ClientPort are not set on the client.
// Set client context.
CallerContext.setCurrent(
new CallerContext.Builder("clientContext").build());
// Create a directory via the router.
String dirPath = "/test";
routerProtocol.mkdirs(dirPath, new FsPermission("755"), false);
// The audit log should contains "clientIp:" and "clientPort:".
assertTrue(auditLog.getOutput().contains("clientIp:"));
assertTrue(auditLog.getOutput().contains("clientPort:"));
assertTrue(verifyFileExists(routerFS, dirPath));
auditLog.clearOutput();
// 2. ClientIp and ClientPort are set on the client.
// Reset client context.
CallerContext.setCurrent(
new CallerContext.Builder(
"clientContext,clientIp:1.1.1.1,clientPort:1234").build());
// Create a directory via the router.
routerProtocol.getFileInfo(dirPath);
// The audit log should contains the original clientIp and clientPort
// set by client.
assertTrue(auditLog.getOutput().contains("clientIp:1.1.1.1"));
assertTrue(auditLog.getOutput().contains("clientPort:1234"));
}
} }

View File

@ -448,8 +448,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private void appendClientPortToCallerContextIfAbsent() { private void appendClientPortToCallerContextIfAbsent() {
final CallerContext ctx = CallerContext.getCurrent(); final CallerContext ctx = CallerContext.getCurrent();
if (isClientPortInfoAbsent(CLIENT_PORT_STR + ":" + Server.getRemotePort(), if (isClientPortInfoAbsent(ctx)) {
ctx)) {
String origContext = ctx == null ? null : ctx.getContext(); String origContext = ctx == null ? null : ctx.getContext();
byte[] origSignature = ctx == null ? null : ctx.getSignature(); byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.setCurrent( CallerContext.setCurrent(
@ -460,9 +459,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} }
} }
private boolean isClientPortInfoAbsent(String clientPortInfo, CallerContext ctx){ private boolean isClientPortInfoAbsent(CallerContext ctx){
return ctx == null || ctx.getContext() == null return ctx == null || ctx.getContext() == null
|| !ctx.getContext().contains(clientPortInfo); || !ctx.getContext().contains(CLIENT_PORT_STR);
} }
/** /**