From 0326b7e935c839c5e0aecdf496a2b08719250c34 Mon Sep 17 00:00:00 2001 From: Simba Dzinamarira Date: Wed, 17 Aug 2022 09:33:33 -0400 Subject: [PATCH] HADOOP-18406: Adds alignment context to call path for creating RPC proxy with multiple connections per user. Fixes #4748 Signed-off-by: Owen O'Malley --- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 15 ++++++------ .../apache/hadoop/ipc/ProtobufRpcEngine2.java | 15 ++++++------ .../main/java/org/apache/hadoop/ipc/RPC.java | 23 ++++++++++++++++++- .../java/org/apache/hadoop/ipc/RpcEngine.java | 4 +++- .../apache/hadoop/ipc/WritableRpcEngine.java | 6 +++-- .../java/org/apache/hadoop/ipc/TestRPC.java | 3 ++- .../org/apache/hadoop/ipc/TestRpcBase.java | 3 ++- 7 files changed, 49 insertions(+), 20 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index dce6631bb1d..01fceeb954e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -80,9 +80,9 @@ public class ProtobufRpcEngine implements RpcEngine { @Override @SuppressWarnings("unchecked") public ProtocolProxy getProxy(Class protocol, long clientVersion, - ConnectionId connId, Configuration conf, SocketFactory factory) - throws IOException { - final Invoker invoker = new Invoker(protocol, connId, conf, factory); + ConnectionId connId, Configuration conf, SocketFactory factory, + AlignmentContext alignmentContext) throws IOException { + final Invoker invoker = new Invoker(protocol, connId, conf, factory, alignmentContext); return new ProtocolProxy(protocol, (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] {protocol}, invoker), false); } @@ -126,7 +126,7 @@ public class ProtobufRpcEngine implements RpcEngine { return new ProtocolProxy(protocol, (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, connId, conf, - factory)), false); + factory, null)), false); } protected static class Invoker implements RpcInvocationHandler { @@ -147,9 +147,8 @@ public class ProtobufRpcEngine implements RpcEngine { throws IOException { this(protocol, Client.ConnectionId.getConnectionId( addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf), - conf, factory); + conf, factory, alignmentContext); this.fallbackToSimpleAuth = fallbackToSimpleAuth; - this.alignmentContext = alignmentContext; } /** @@ -158,14 +157,16 @@ public class ProtobufRpcEngine implements RpcEngine { * @param connId input connId. * @param conf input Configuration. * @param factory input factory. + * @param alignmentContext Alignment context */ protected Invoker(Class protocol, Client.ConnectionId connId, - Configuration conf, SocketFactory factory) { + Configuration conf, SocketFactory factory, AlignmentContext alignmentContext) { this.remoteId = connId; this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class); this.protocolName = RPC.getProtocolName(protocol); this.clientProtocolVersion = RPC .getProtocolVersion(protocol); + this.alignmentContext = alignmentContext; } private RequestHeaderProto constructRpcRequestHeader(Method method) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java index ea2dbba467e..3594320ce06 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java @@ -103,9 +103,9 @@ public class ProtobufRpcEngine2 implements RpcEngine { @Override @SuppressWarnings("unchecked") public ProtocolProxy getProxy(Class protocol, long clientVersion, - ConnectionId connId, Configuration conf, SocketFactory factory) - throws IOException { - final Invoker invoker = new Invoker(protocol, connId, conf, factory); + ConnectionId connId, Configuration conf, SocketFactory factory, + AlignmentContext alignmentContext) throws IOException { + final Invoker invoker = new Invoker(protocol, connId, conf, factory, alignmentContext); return new ProtocolProxy(protocol, (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] {protocol}, invoker), false); } @@ -133,7 +133,7 @@ public class ProtobufRpcEngine2 implements RpcEngine { return new ProtocolProxy(protocol, (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[]{protocol}, new Invoker(protocol, connId, conf, - factory)), false); + factory, null)), false); } protected static class Invoker implements RpcInvocationHandler { @@ -154,9 +154,8 @@ public class ProtobufRpcEngine2 implements RpcEngine { throws IOException { this(protocol, Client.ConnectionId.getConnectionId( addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf), - conf, factory); + conf, factory, alignmentContext); this.fallbackToSimpleAuth = fallbackToSimpleAuth; - this.alignmentContext = alignmentContext; } /** @@ -166,14 +165,16 @@ public class ProtobufRpcEngine2 implements RpcEngine { * @param connId input connId. * @param conf input Configuration. * @param factory input factory. + * @param alignmentContext Alignment context */ protected Invoker(Class protocol, Client.ConnectionId connId, - Configuration conf, SocketFactory factory) { + Configuration conf, SocketFactory factory, AlignmentContext alignmentContext) { this.remoteId = connId; this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class); this.protocolName = RPC.getProtocolName(protocol); this.clientProtocolVersion = RPC .getProtocolVersion(protocol); + this.alignmentContext = alignmentContext; } private RequestHeaderProto constructRpcRequestHeader(Method method) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 7f35b13aec9..fc562b525ad 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -558,11 +558,32 @@ public class RPC { public static ProtocolProxy getProtocolProxy(Class protocol, long clientVersion, ConnectionId connId, Configuration conf, SocketFactory factory) throws IOException { + return getProtocolProxy(protocol, clientVersion, connId, conf, + factory, null); + } + + /** + * Get a protocol proxy that contains a proxy connection to a remote server + * and a set of methods that are supported by the server. + * + * @param Generics Type T + * @param protocol protocol class + * @param clientVersion client's version + * @param connId client connection identifier + * @param conf configuration + * @param factory socket factory + * @param alignmentContext StateID alignment context + * @return the protocol proxy + * @throws IOException if the far end through a RemoteException + */ + public static ProtocolProxy getProtocolProxy(Class protocol, + long clientVersion, ConnectionId connId, Configuration conf, + SocketFactory factory, AlignmentContext alignmentContext) throws IOException { if (UserGroupInformation.isSecurityEnabled()) { SaslRpcServer.init(conf); } return getProtocolEngine(protocol, conf).getProxy( - protocol, clientVersion, connId, conf, factory); + protocol, clientVersion, connId, conf, factory, alignmentContext); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java index 1f0ff2d99d3..f322f6eb98a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java @@ -66,11 +66,13 @@ public interface RpcEngine { * @param connId input ConnectionId. * @param conf input Configuration. * @param factory input factory. + * @param alignmentContext Alignment context * @throws IOException raised on errors performing I/O. * @return ProtocolProxy. */ ProtocolProxy getProxy(Class protocol, long clientVersion, - Client.ConnectionId connId, Configuration conf, SocketFactory factory) + Client.ConnectionId connId, Configuration conf, SocketFactory factory, + AlignmentContext alignmentContext) throws IOException; /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 3e4ee707d46..d92bcea5d2e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -315,16 +315,18 @@ public class WritableRpcEngine implements RpcEngine { * @param connId input ConnectionId. * @param conf input Configuration. * @param factory input factory. + * @param alignmentContext Alignment context * @throws IOException raised on errors performing I/O. * @return ProtocolProxy. */ @Override public ProtocolProxy getProxy(Class protocol, long clientVersion, - Client.ConnectionId connId, Configuration conf, SocketFactory factory) + Client.ConnectionId connId, Configuration conf, SocketFactory factory, + AlignmentContext alignmentContext) throws IOException { return getProxy(protocol, clientVersion, connId.getAddress(), connId.getTicket(), conf, factory, connId.getRpcTimeout(), - connId.getRetryPolicy(), null, null); + connId.getRetryPolicy(), null, alignmentContext); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index a184ea173e5..85b0a7b6c84 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -294,7 +294,8 @@ public class TestRPC extends TestRpcBase { @Override public ProtocolProxy getProxy(Class protocol, long clientVersion, - ConnectionId connId, Configuration conf, SocketFactory factory) + ConnectionId connId, Configuration conf, SocketFactory factory, + AlignmentContext alignmentContext) throws IOException { throw new UnsupportedOperationException("This proxy is not supported"); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java index 7635b16dac0..5b5c8bbaa9b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java @@ -189,7 +189,8 @@ public class TestRpcBase { 0, connId, clientConf, - NetUtils.getDefaultSocketFactory(clientConf)).getProxy(); + NetUtils.getDefaultSocketFactory(clientConf), + null).getProxy(); } catch (IOException e) { throw new ServiceException(e); }