HADOOP-8184. ProtoBuf RPC engine uses the IPC layer reply packet. Contributed by Sanjay Radia
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1304542 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b795c65f49
commit
081eda94fe
|
@ -211,6 +211,9 @@ Release 0.23.3 - UNRELEASED
|
|||
|
||||
HADOOP-8200. Remove HADOOP_[JOBTRACKER|TASKTRACKER]_OPTS. (eli)
|
||||
|
||||
HADOOP-8184. ProtoBuf RPC engine uses the IPC layer reply packet.
|
||||
(Sanjay Radia via szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -39,15 +39,12 @@ import org.apache.hadoop.io.Writable;
|
|||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||
import org.apache.hadoop.ipc.RPC.RpcInvoker;
|
||||
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
||||
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
|
||||
|
||||
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
|
||||
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto;
|
||||
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto.ResponseStatus;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.util.ProtoUtil;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
@ -191,21 +188,11 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
throw new ServiceException(e);
|
||||
}
|
||||
|
||||
HadoopRpcResponseProto response = val.message;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
long callTime = System.currentTimeMillis() - startTime;
|
||||
LOG.debug("Call: " + method.getName() + " " + callTime);
|
||||
}
|
||||
|
||||
// Wrap the received message
|
||||
ResponseStatus status = response.getStatus();
|
||||
if (status != ResponseStatus.SUCCESS) {
|
||||
RemoteException re = new RemoteException(response.getException()
|
||||
.getExceptionName(), response.getException().getStackTrace());
|
||||
re.fillInStackTrace();
|
||||
throw new ServiceException(re);
|
||||
}
|
||||
|
||||
Message prototype = null;
|
||||
try {
|
||||
prototype = getReturnProtoType(method);
|
||||
|
@ -215,7 +202,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
Message returnMessage;
|
||||
try {
|
||||
returnMessage = prototype.newBuilderForType()
|
||||
.mergeFrom(response.getResponse()).build();
|
||||
.mergeFrom(val.responseMessage).build();
|
||||
} catch (Throwable e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
|
@ -287,28 +274,28 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
* Writable Wrapper for Protocol Buffer Responses
|
||||
*/
|
||||
private static class RpcResponseWritable implements Writable {
|
||||
HadoopRpcResponseProto message;
|
||||
byte[] responseMessage;
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public RpcResponseWritable() {
|
||||
}
|
||||
|
||||
public RpcResponseWritable(HadoopRpcResponseProto message) {
|
||||
this.message = message;
|
||||
public RpcResponseWritable(Message message) {
|
||||
this.responseMessage = message.toByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
((Message)message).writeDelimitedTo(
|
||||
DataOutputOutputStream.constructOutputStream(out));
|
||||
out.writeInt(responseMessage.length);
|
||||
out.write(responseMessage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
int length = ProtoUtil.readRawVarint32(in);
|
||||
int length = in.readInt();
|
||||
byte[] bytes = new byte[length];
|
||||
in.readFully(bytes);
|
||||
message = HadoopRpcResponseProto.parseFrom(bytes);
|
||||
responseMessage = bytes;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -357,24 +344,6 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
protocolImpl);
|
||||
}
|
||||
|
||||
private static RpcResponseWritable handleException(Throwable e) {
|
||||
HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
|
||||
.setExceptionName(e.getClass().getName())
|
||||
.setStackTrace(StringUtils.stringifyException(e)).build();
|
||||
HadoopRpcResponseProto response = HadoopRpcResponseProto.newBuilder()
|
||||
.setStatus(ResponseStatus.ERRROR).setException(exception).build();
|
||||
return new RpcResponseWritable(response);
|
||||
}
|
||||
|
||||
private static HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
|
||||
Message message) {
|
||||
HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
|
||||
.setResponse(message.toByteString())
|
||||
.setStatus(ResponseStatus.SUCCESS)
|
||||
.build();
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* Protobuf invoker for {@link RpcInvoker}
|
||||
*/
|
||||
|
@ -418,7 +387,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
* </ol>
|
||||
*/
|
||||
public Writable call(RPC.Server server, String protocol,
|
||||
Writable writableRequest, long receiveTime) throws IOException {
|
||||
Writable writableRequest, long receiveTime) throws Exception {
|
||||
RpcRequestWritable request = (RpcRequestWritable) writableRequest;
|
||||
HadoopRpcRequestProto rpcRequest = request.message;
|
||||
String methodName = rpcRequest.getMethodName();
|
||||
|
@ -436,7 +405,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
String msg = "Unknown method " + methodName + " called on " + protocol
|
||||
+ " protocol.";
|
||||
LOG.warn(msg);
|
||||
return handleException(new RpcServerException(msg));
|
||||
throw new RpcServerException(msg);
|
||||
}
|
||||
Message prototype = service.getRequestPrototype(methodDescriptor);
|
||||
Message param = prototype.newBuilderForType()
|
||||
|
@ -457,14 +426,11 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
server.rpcDetailedMetrics.addProcessingTime(methodName,
|
||||
processingTime);
|
||||
} catch (ServiceException e) {
|
||||
Throwable cause = e.getCause();
|
||||
return handleException(cause != null ? cause : e);
|
||||
throw (Exception) e.getCause();
|
||||
} catch (Exception e) {
|
||||
return handleException(e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
|
||||
return new RpcResponseWritable(response);
|
||||
return new RpcResponseWritable(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,7 +85,7 @@ public class RPC {
|
|||
* @throws IOException
|
||||
**/
|
||||
public Writable call(Server server, String protocol,
|
||||
Writable rpcRequest, long receiveTime) throws IOException ;
|
||||
Writable rpcRequest, long receiveTime) throws Exception ;
|
||||
}
|
||||
|
||||
static final Log LOG = LogFactory.getLog(RPC.class);
|
||||
|
@ -880,7 +880,7 @@ public class RPC {
|
|||
|
||||
@Override
|
||||
public Writable call(RpcKind rpcKind, String protocol,
|
||||
Writable rpcRequest, long receiveTime) throws IOException {
|
||||
Writable rpcRequest, long receiveTime) throws Exception {
|
||||
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
|
||||
receiveTime);
|
||||
}
|
||||
|
|
|
@ -1952,13 +1952,13 @@ public abstract class Server {
|
|||
* Writable, long)} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public Writable call(Writable param, long receiveTime) throws IOException {
|
||||
public Writable call(Writable param, long receiveTime) throws Exception {
|
||||
return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
|
||||
}
|
||||
|
||||
/** Called for each call. */
|
||||
public abstract Writable call(RpcKind rpcKind, String protocol,
|
||||
Writable param, long receiveTime) throws IOException;
|
||||
Writable param, long receiveTime) throws Exception;
|
||||
|
||||
/**
|
||||
* Authorize the incoming client connection.
|
||||
|
|
|
@ -25,8 +25,11 @@ option java_outer_classname = "HadoopRpcProtos";
|
|||
option java_generate_equals_and_hash = true;
|
||||
|
||||
/**
|
||||
* Message used to marshal the client request
|
||||
* This message is used for Protobuf Rpc Engine.
|
||||
* The message is used to marshal a Rpc-request
|
||||
* from RPC client to the RPC server.
|
||||
* The Response to the Rpc call (including errors) are handled
|
||||
* as part of the standard Rpc response.
|
||||
*/
|
||||
message HadoopRpcRequestProto {
|
||||
/** Name of the RPC method */
|
||||
|
@ -41,39 +44,3 @@ message HadoopRpcRequestProto {
|
|||
/** protocol version of class declaring the called method */
|
||||
required uint64 clientProtocolVersion = 4;
|
||||
}
|
||||
|
||||
/**
|
||||
* At the RPC layer, this message is used to indicate
|
||||
* the server side exception the the RPC client.
|
||||
*
|
||||
* Hadoop RPC client throws an exception indicated
|
||||
* by exceptionName with the stackTrace.
|
||||
*/
|
||||
message HadoopRpcExceptionProto {
|
||||
/** Class name of the exception thrown from the server */
|
||||
|
||||
optional string exceptionName = 1;
|
||||
/** Exception stack trace from the server side */
|
||||
optional string stackTrace = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* This message is used to marshal the response from
|
||||
* RPC server to the client.
|
||||
*/
|
||||
message HadoopRpcResponseProto {
|
||||
/** Status of IPC call */
|
||||
enum ResponseStatus {
|
||||
SUCCESS = 1;
|
||||
ERRROR = 2;
|
||||
}
|
||||
|
||||
required ResponseStatus status = 1;
|
||||
|
||||
// Protobuf response payload from the server, when status is SUCCESS.
|
||||
optional bytes response = 2;
|
||||
|
||||
// Exception when status is ERROR or FATAL
|
||||
optional HadoopRpcExceptionProto exception = 3;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue