HADOOP-9163 The rpc msg in ProtobufRpcEngine.proto should be moved out to avoid an extra copy (Sanjay Radia)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1452581 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanjay Radia 2013-03-04 22:55:22 +00:00
parent 5889f54ad1
commit ead90cc1a8
3 changed files with 48 additions and 39 deletions

View File

@ -153,6 +153,9 @@ Trunk (Unreleased)
HADOOP-9112. test-patch should -1 for @Tests without a timeout HADOOP-9112. test-patch should -1 for @Tests without a timeout
(Surenkumar Nihalani via bobby) (Surenkumar Nihalani via bobby)
HADOOP-9163 The rpc msg in ProtobufRpcEngine.proto should be moved out to
avoid an extra copy (Sanjay Radia)
BUG FIXES BUG FIXES
HADOOP-8419. Fixed GzipCode NPE reset for IBM JDK. (Yu Li via eyang) HADOOP-8419. Fixed GzipCode NPE reset for IBM JDK. (Yu Li via eyang)

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.ipc;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -39,7 +40,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestProto; import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
@ -128,25 +129,12 @@ public class ProtobufRpcEngine implements RpcEngine {
.getProtocolVersion(protocol); .getProtocolVersion(protocol);
} }
private RequestProto constructRpcRequest(Method method, private RequestHeaderProto constructRpcRequestHeader(Method method) {
Object[] params) throws ServiceException { RequestHeaderProto.Builder builder = RequestHeaderProto
RequestProto rpcRequest;
RequestProto.Builder builder = RequestProto
.newBuilder(); .newBuilder();
builder.setMethodName(method.getName()); builder.setMethodName(method.getName());
if (params.length != 2) { // RpcController + Message
throw new ServiceException("Too many parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ params.length);
}
if (params[1] == null) {
throw new ServiceException("null param while calling Method: ["
+ method.getName() + "]");
}
Message param = (Message) params[1];
builder.setRequest(param.toByteString());
// For protobuf, {@code protocol} used when creating client side proxy is // For protobuf, {@code protocol} used when creating client side proxy is
// the interface extending BlockingInterface, which has the annotations // the interface extending BlockingInterface, which has the annotations
// such as ProtocolName etc. // such as ProtocolName etc.
@ -160,8 +148,7 @@ public class ProtobufRpcEngine implements RpcEngine {
// For PB this may limit the use of mixins on client side. // For PB this may limit the use of mixins on client side.
builder.setDeclaringClassProtocolName(protocolName); builder.setDeclaringClassProtocolName(protocolName);
builder.setClientProtocolVersion(clientProtocolVersion); builder.setClientProtocolVersion(clientProtocolVersion);
rpcRequest = builder.build(); return builder.build();
return rpcRequest;
} }
/** /**
@ -190,7 +177,17 @@ public class ProtobufRpcEngine implements RpcEngine {
startTime = Time.now(); startTime = Time.now();
} }
RequestProto rpcRequest = constructRpcRequest(method, args); if (args.length != 2) { // RpcController + Message
throw new ServiceException("Too many parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ args.length);
}
if (args[1] == null) {
throw new ServiceException("null param while calling Method: ["
+ method.getName() + "]");
}
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
RpcResponseWrapper val = null; RpcResponseWrapper val = null;
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -198,9 +195,12 @@ public class ProtobufRpcEngine implements RpcEngine {
remoteId + ": " + method.getName() + remoteId + ": " + method.getName() +
" {" + TextFormat.shortDebugString((Message) args[1]) + "}"); " {" + TextFormat.shortDebugString((Message) args[1]) + "}");
} }
Message theRequest = (Message) args[1];
try { try {
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequest), remoteId); new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId);
} catch (Throwable e) { } catch (Throwable e) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -275,20 +275,25 @@ public class ProtobufRpcEngine implements RpcEngine {
* use type Writable as a wrapper to work across multiple RpcEngine kinds. * use type Writable as a wrapper to work across multiple RpcEngine kinds.
*/ */
private static class RpcRequestWrapper implements Writable { private static class RpcRequestWrapper implements Writable {
RequestProto message; RequestHeaderProto requestHeader;
Message theRequest; // for clientSide, the request is here
byte[] theRequestRead; // for server side, the request is here
@SuppressWarnings("unused") @SuppressWarnings("unused")
public RpcRequestWrapper() { public RpcRequestWrapper() {
} }
RpcRequestWrapper(RequestProto message) { RpcRequestWrapper(RequestHeaderProto requestHeader, Message theRequest) {
this.message = message; this.requestHeader = requestHeader;
this.theRequest = theRequest;
} }
@Override @Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
((Message)message).writeDelimitedTo( OutputStream os = DataOutputOutputStream.constructOutputStream(out);
DataOutputOutputStream.constructOutputStream(out));
((Message)requestHeader).writeDelimitedTo(os);
theRequest.writeDelimitedTo(os);
} }
@Override @Override
@ -296,13 +301,16 @@ public class ProtobufRpcEngine implements RpcEngine {
int length = ProtoUtil.readRawVarint32(in); int length = ProtoUtil.readRawVarint32(in);
byte[] bytes = new byte[length]; byte[] bytes = new byte[length];
in.readFully(bytes); in.readFully(bytes);
message = RequestProto.parseFrom(bytes); requestHeader = RequestHeaderProto.parseFrom(bytes);
length = ProtoUtil.readRawVarint32(in);
theRequestRead = new byte[length];
in.readFully(theRequestRead);
} }
@Override @Override
public String toString() { public String toString() {
return message.getDeclaringClassProtocolName() + "." + return requestHeader.getDeclaringClassProtocolName() + "." +
message.getMethodName(); requestHeader.getMethodName();
} }
} }
@ -434,7 +442,7 @@ public class ProtobufRpcEngine implements RpcEngine {
public Writable call(RPC.Server server, String connectionProtocolName, public Writable call(RPC.Server server, String connectionProtocolName,
Writable writableRequest, long receiveTime) throws Exception { Writable writableRequest, long receiveTime) throws Exception {
RpcRequestWrapper request = (RpcRequestWrapper) writableRequest; RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
RequestProto rpcRequest = request.message; RequestHeaderProto rpcRequest = request.requestHeader;
String methodName = rpcRequest.getMethodName(); String methodName = rpcRequest.getMethodName();
@ -474,7 +482,8 @@ public class ProtobufRpcEngine implements RpcEngine {
} }
Message prototype = service.getRequestPrototype(methodDescriptor); Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = prototype.newBuilderForType() Message param = prototype.newBuilderForType()
.mergeFrom(rpcRequest.getRequest()).build(); .mergeFrom(request.theRequestRead).build();
Message result; Message result;
try { try {
long startTime = Time.now(); long startTime = Time.now();

View File

@ -1,4 +1,4 @@
/** /**DER
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -28,20 +28,17 @@ option java_generate_equals_and_hash = true;
package hadoop.common; package hadoop.common;
/** /**
* This message is used for Protobuf Rpc Engine. * This message is the header for the Protobuf Rpc Engine
* The message is used to marshal a Rpc-request * when sending a RPC request from RPC client to the RPC server.
* from RPC client to the RPC server. * The actual request (serialized as protobuf) follows this request.
* *
* No special header is needed for the Rpc Response for Protobuf Rpc Engine. * No special header is needed for the Rpc Response for Protobuf Rpc Engine.
* The normal RPC response header (see RpcHeader.proto) are sufficient. * The normal RPC response header (see RpcHeader.proto) are sufficient.
*/ */
message RequestProto { message RequestHeaderProto {
/** Name of the RPC method */ /** Name of the RPC method */
required string methodName = 1; required string methodName = 1;
/** Bytes corresponding to the client protobuf request */
optional bytes request = 2;
/** /**
* RPCs for a particular interface (ie protocol) are done using a * RPCs for a particular interface (ie protocol) are done using a
* IPC connection that is setup using rpcProxy. * IPC connection that is setup using rpcProxy.