diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index c466d4a7314..8ae3731d6a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -72,6 +72,7 @@ import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.UserGroupInformation; @@ -414,7 +415,8 @@ public class RouterRpcClient { " with params " + Arrays.deepToString(params) + " from " + router.getRouterId()); } - appendClientIpToCallerContext(); + + appendClientIpToCallerContextIfAbsent(); Object ret = null; if (rpcMonitor != null) { @@ -531,17 +533,26 @@ public class RouterRpcClient { } /** - * For Tracking which is the actual client address. - * It adds key/value (clientIp/"ip") pair to the caller context. + * For tracking which is the actual client address. + * It adds trace info "clientIp:ip" to caller context if it's absent. */ - private void appendClientIpToCallerContext() { + private void appendClientIpToCallerContextIfAbsent() { + String clientIpInfo = CLIENT_IP_STR + ":" + Server.getRemoteAddress(); final CallerContext ctx = CallerContext.getCurrent(); - String origContext = ctx == null ? null : ctx.getContext(); - byte[] origSignature = ctx == null ? null : ctx.getSignature(); - CallerContext.setCurrent( - new CallerContext.Builder(origContext, contextFieldSeparator) - .append(CLIENT_IP_STR, Server.getRemoteAddress()) - .setSignature(origSignature).build()); + if (isClientIpInfoAbsent(clientIpInfo, ctx)) { + String origContext = ctx == null ? null : ctx.getContext(); + byte[] origSignature = ctx == null ? null : ctx.getSignature(); + CallerContext.setCurrent( + new CallerContext.Builder(origContext, contextFieldSeparator) + .append(clientIpInfo) + .setSignature(origSignature) + .build()); + } + } + + private boolean isClientIpInfoAbsent(String clientIpInfo, CallerContext ctx){ + return ctx == null || ctx.getContext() == null + || !ctx.getContext().contains(clientIpInfo); } /** @@ -1303,6 +1314,9 @@ public class RouterRpcClient { List orderedLocations = new ArrayList<>(); List> callables = new ArrayList<>(); + // transfer originCall & callerContext to worker threads of executor. + final Call originCall = Server.getCurCall().get(); + final CallerContext originContext = CallerContext.getCurrent(); for (final T location : locations) { String nsId = location.getNameserviceId(); final List namenodes = @@ -1320,12 +1334,20 @@ public class RouterRpcClient { nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest()); } orderedLocations.add(nnLocation); - callables.add(() -> invokeMethod(ugi, nnList, proto, m, paramList)); + callables.add( + () -> { + transferThreadLocalContext(originCall, originContext); + return invokeMethod(ugi, nnList, proto, m, paramList); + }); } } else { // Call the objectGetter in order of nameservices in the NS list orderedLocations.add(location); - callables.add(() -> invokeMethod(ugi, namenodes, proto, m, paramList)); + callables.add( + () -> { + transferThreadLocalContext(originCall, originContext); + return invokeMethod(ugi, namenodes, proto, m, paramList); + }); } } @@ -1392,6 +1414,20 @@ public class RouterRpcClient { } } + /** + * Transfer origin thread local context which is necessary to current + * worker thread when invoking method concurrently by executor service. + * + * @param originCall origin Call required for getting remote client ip. + * @param originContext origin CallerContext which should be transferred + * to server side. + */ + private void transferThreadLocalContext( + final Call originCall, final CallerContext originContext) { + Server.getCurCall().set(originCall); + CallerContext.setCurrent(originContext); + } + /** * Get a prioritized list of NNs that share the same nameservice ID (in the * same namespace). NNs that are reported as ACTIVE will be first in the list. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java index 30a47a45620..4f112ba9b72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java @@ -32,6 +32,7 @@ import static org.apache.hadoop.test.Whitebox.setInternalState; import java.io.IOException; import java.lang.reflect.Method; +import java.net.InetAddress; import java.net.URISyntaxException; import java.util.Arrays; import java.util.EnumSet; @@ -67,6 +68,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.test.GenericTestUtils; @@ -434,4 +436,45 @@ public class TestRouterRpcMultiDestination extends TestRouterRpc { setInternalState(ns0, "haContext", nn0haCtx); setInternalState(router0ClientProtocol, "allowPartialList", true); } + + @Test + public void testCallerContextWithMultiDestinations() throws IOException { + GenericTestUtils.LogCapturer auditLog = + GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog); + + // set client context + CallerContext.setCurrent( + new CallerContext.Builder("clientContext").build()); + // assert the initial caller context as expected + assertEquals("clientContext", CallerContext.getCurrent().getContext()); + + DistributedFileSystem routerFs = + (DistributedFileSystem) getRouterFileSystem(); + // create a directory via the router + Path dirPath = new Path("/test_caller_context_with_multi_destinations"); + routerFs.mkdirs(dirPath); + // invoke concurrently in RouterRpcClient + routerFs.listStatus(dirPath); + // invoke sequentially in RouterRpcClient + routerFs.getFileStatus(dirPath); + + String auditFlag = "src=" + dirPath.toString(); + String clientIpInfo = "clientIp:" + + InetAddress.getLocalHost().getHostAddress(); + for (String line : auditLog.getOutput().split("\n")) { + if (line.contains(auditFlag)) { + // assert origin caller context exist in audit log + assertTrue(line.contains("callerContext=clientContext")); + String callerContext = line.substring( + line.indexOf("callerContext=clientContext")); + // assert client ip info exist in caller context + assertTrue(callerContext.contains(clientIpInfo)); + // assert client ip info appears only once in caller context + assertEquals(callerContext.indexOf(clientIpInfo), + callerContext.lastIndexOf(clientIpInfo)); + } + } + // clear client context + CallerContext.setCurrent(null); + } } \ No newline at end of file