HDFS-15912. Allow ProtobufRpcEngine to be extensible (#2999)
This commit is contained in:
parent
77d9c6d0f7
commit
7f6185b17c
|
@ -115,7 +115,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
factory)), false);
|
||||
}
|
||||
|
||||
private static class Invoker implements RpcInvocationHandler {
|
||||
protected static class Invoker implements RpcInvocationHandler {
|
||||
private final Map<String, Message> returnTypes =
|
||||
new ConcurrentHashMap<String, Message>();
|
||||
private boolean isClosed = false;
|
||||
|
@ -126,7 +126,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
private AtomicBoolean fallbackToSimpleAuth;
|
||||
private AlignmentContext alignmentContext;
|
||||
|
||||
private Invoker(Class<?> protocol, InetSocketAddress addr,
|
||||
protected Invoker(Class<?> protocol, InetSocketAddress addr,
|
||||
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
|
||||
int rpcTimeout, RetryPolicy connectionRetryPolicy,
|
||||
AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
|
||||
|
@ -141,7 +141,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
/**
|
||||
* This constructor takes a connectionId, instead of creating a new one.
|
||||
*/
|
||||
private Invoker(Class<?> protocol, Client.ConnectionId connId,
|
||||
protected Invoker(Class<?> protocol, Client.ConnectionId connId,
|
||||
Configuration conf, SocketFactory factory) {
|
||||
this.remoteId = connId;
|
||||
this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
|
||||
|
@ -217,8 +217,6 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method));
|
||||
}
|
||||
|
||||
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(Thread.currentThread().getId() + ": Call -> " +
|
||||
remoteId + ": " + method.getName() +
|
||||
|
@ -230,7 +228,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
final RpcWritable.Buffer val;
|
||||
try {
|
||||
val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
||||
new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
|
||||
constructRpcRequest(method, theRequest), remoteId,
|
||||
fallbackToSimpleAuth, alignmentContext);
|
||||
|
||||
} catch (Throwable e) {
|
||||
|
@ -275,6 +273,11 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
}
|
||||
}
|
||||
|
||||
protected Writable constructRpcRequest(Method method, Message theRequest) {
|
||||
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
|
||||
return new RpcProtobufRequest(rpcRequestHeader, theRequest);
|
||||
}
|
||||
|
||||
private Message getReturnMessage(final Method method,
|
||||
final RpcWritable.Buffer buf) throws ServiceException {
|
||||
Message prototype = null;
|
||||
|
@ -324,6 +327,14 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
public ConnectionId getConnectionId() {
|
||||
return remoteId;
|
||||
}
|
||||
|
||||
protected long getClientProtocolVersion() {
|
||||
return clientProtocolVersion;
|
||||
}
|
||||
|
||||
protected String getProtocolName() {
|
||||
return protocolName;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -484,6 +495,13 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
String methodName = rpcRequest.getMethodName();
|
||||
String protoName = rpcRequest.getDeclaringClassProtocolName();
|
||||
long clientVersion = rpcRequest.getClientProtocolVersion();
|
||||
return call(server, protocol, request, receiveTime,
|
||||
methodName, protoName, clientVersion);
|
||||
}
|
||||
|
||||
protected Writable call(RPC.Server server, String protocol,
|
||||
RpcWritable.Buffer request, long receiveTime, String methodName,
|
||||
String protoName, long clientVersion) throws Exception {
|
||||
if (server.verbose)
|
||||
LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
|
||||
|
||||
|
|
Loading…
Reference in New Issue