Merged in HADOOP-9163

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1483143 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanjay Radia 2013-05-16 01:25:30 +00:00
parent f41b189498
commit 3c929c9d03
3 changed files with 48 additions and 39 deletions

View File

@ -4,6 +4,9 @@ Release 2.0.5-beta - UNRELEASED
INCOMPATIBLE CHANGES
HADOOP-9163 The rpc msg in ProtobufRpcEngine.proto should be moved out to
avoid an extra copy (Sanjay Radia)
NEW FEATURES
HADOOP-9194. RPC support for QoS. (Junping Du via llu)

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
@ -39,7 +40,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestProto;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
@ -128,25 +129,12 @@ public class ProtobufRpcEngine implements RpcEngine {
.getProtocolVersion(protocol);
}
private RequestProto constructRpcRequest(Method method,
Object[] params) throws ServiceException {
RequestProto rpcRequest;
RequestProto.Builder builder = RequestProto
private RequestHeaderProto constructRpcRequestHeader(Method method) {
RequestHeaderProto.Builder builder = RequestHeaderProto
.newBuilder();
builder.setMethodName(method.getName());
if (params.length != 2) { // RpcController + Message
throw new ServiceException("Too many parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ params.length);
}
if (params[1] == null) {
throw new ServiceException("null param while calling Method: ["
+ method.getName() + "]");
}
Message param = (Message) params[1];
builder.setRequest(param.toByteString());
// For protobuf, {@code protocol} used when creating client side proxy is
// the interface extending BlockingInterface, which has the annotations
// such as ProtocolName etc.
@ -160,8 +148,7 @@ public class ProtobufRpcEngine implements RpcEngine {
// For PB this may limit the use of mixins on client side.
builder.setDeclaringClassProtocolName(protocolName);
builder.setClientProtocolVersion(clientProtocolVersion);
rpcRequest = builder.build();
return rpcRequest;
return builder.build();
}
/**
@ -189,8 +176,18 @@ public class ProtobufRpcEngine implements RpcEngine {
if (LOG.isDebugEnabled()) {
startTime = Time.now();
}
if (args.length != 2) { // RpcController + Message
throw new ServiceException("Too many parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ args.length);
}
if (args[1] == null) {
throw new ServiceException("null param while calling Method: ["
+ method.getName() + "]");
}
RequestProto rpcRequest = constructRpcRequest(method, args);
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
RpcResponseWrapper val = null;
if (LOG.isTraceEnabled()) {
@ -198,9 +195,12 @@ public class ProtobufRpcEngine implements RpcEngine {
remoteId + ": " + method.getName() +
" {" + TextFormat.shortDebugString((Message) args[1]) + "}");
}
Message theRequest = (Message) args[1];
try {
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequest), remoteId);
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) {
@ -275,20 +275,25 @@ public class ProtobufRpcEngine implements RpcEngine {
* use type Writable as a wrapper to work across multiple RpcEngine kinds.
*/
private static class RpcRequestWrapper implements Writable {
RequestProto message;
RequestHeaderProto requestHeader;
Message theRequest; // for clientSide, the request is here
byte[] theRequestRead; // for server side, the request is here
@SuppressWarnings("unused")
public RpcRequestWrapper() {
}
RpcRequestWrapper(RequestProto message) {
this.message = message;
RpcRequestWrapper(RequestHeaderProto requestHeader, Message theRequest) {
this.requestHeader = requestHeader;
this.theRequest = theRequest;
}
@Override
public void write(DataOutput out) throws IOException {
((Message)message).writeDelimitedTo(
DataOutputOutputStream.constructOutputStream(out));
OutputStream os = DataOutputOutputStream.constructOutputStream(out);
((Message)requestHeader).writeDelimitedTo(os);
theRequest.writeDelimitedTo(os);
}
@Override
@ -296,13 +301,16 @@ public class ProtobufRpcEngine implements RpcEngine {
int length = ProtoUtil.readRawVarint32(in);
byte[] bytes = new byte[length];
in.readFully(bytes);
message = RequestProto.parseFrom(bytes);
requestHeader = RequestHeaderProto.parseFrom(bytes);
length = ProtoUtil.readRawVarint32(in);
theRequestRead = new byte[length];
in.readFully(theRequestRead);
}
@Override
public String toString() {
return message.getDeclaringClassProtocolName() + "." +
message.getMethodName();
return requestHeader.getDeclaringClassProtocolName() + "." +
requestHeader.getMethodName();
}
}
@ -434,7 +442,7 @@ public class ProtobufRpcEngine implements RpcEngine {
public Writable call(RPC.Server server, String protocol,
Writable writableRequest, long receiveTime) throws Exception {
RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
RequestProto rpcRequest = request.message;
RequestHeaderProto rpcRequest = request.requestHeader;
String methodName = rpcRequest.getMethodName();
String protoName = rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion();
@ -454,7 +462,8 @@ public class ProtobufRpcEngine implements RpcEngine {
}
Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = prototype.newBuilderForType()
.mergeFrom(rpcRequest.getRequest()).build();
.mergeFrom(request.theRequestRead).build();
Message result;
try {
long startTime = Time.now();

View File

@ -1,4 +1,4 @@
/**
/**DER
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -28,20 +28,17 @@ option java_generate_equals_and_hash = true;
package hadoop.common;
/**
* This message is used for Protobuf Rpc Engine.
* The message is used to marshal a Rpc-request
* from RPC client to the RPC server.
* This message is the header for the Protobuf Rpc Engine
* when sending a RPC request from RPC client to the RPC server.
* The actual request (serialized as protobuf) follows this request.
*
* No special header is needed for the Rpc Response for Protobuf Rpc Engine.
* The normal RPC response header (see RpcHeader.proto) are sufficient.
*/
message RequestProto {
message RequestHeaderProto {
/** Name of the RPC method */
required string methodName = 1;
/** Bytes corresponding to the client protobuf request */
optional bytes request = 2;
/**
* RPCs for a particular interface (ie protocol) are done using a
* IPC connection that is setup using rpcProxy.