svn merge -c 1304542 from trunk for HADOOP-8184.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1304546 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-03-23 18:27:17 +00:00
parent 13e2ed8c65
commit bec76354fd
5 changed files with 26 additions and 90 deletions

View File

@ -100,6 +100,9 @@ Release 0.23.3 - UNRELEASED
HADOOP-8200. Remove HADOOP_[JOBTRACKER|TASKTRACKER]_OPTS. (eli) HADOOP-8200. Remove HADOOP_[JOBTRACKER|TASKTRACKER]_OPTS. (eli)
HADOOP-8184. ProtoBuf RPC engine uses the IPC layer reply packet.
(Sanjay Radia via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -39,15 +39,12 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto; import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto.ResponseStatus;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
@ -191,21 +188,11 @@ public class ProtobufRpcEngine implements RpcEngine {
throw new ServiceException(e); throw new ServiceException(e);
} }
HadoopRpcResponseProto response = val.message;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime; long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime); LOG.debug("Call: " + method.getName() + " " + callTime);
} }
// Wrap the received message
ResponseStatus status = response.getStatus();
if (status != ResponseStatus.SUCCESS) {
RemoteException re = new RemoteException(response.getException()
.getExceptionName(), response.getException().getStackTrace());
re.fillInStackTrace();
throw new ServiceException(re);
}
Message prototype = null; Message prototype = null;
try { try {
prototype = getReturnProtoType(method); prototype = getReturnProtoType(method);
@ -215,7 +202,7 @@ public class ProtobufRpcEngine implements RpcEngine {
Message returnMessage; Message returnMessage;
try { try {
returnMessage = prototype.newBuilderForType() returnMessage = prototype.newBuilderForType()
.mergeFrom(response.getResponse()).build(); .mergeFrom(val.responseMessage).build();
} catch (Throwable e) { } catch (Throwable e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
@ -287,28 +274,28 @@ public class ProtobufRpcEngine implements RpcEngine {
* Writable Wrapper for Protocol Buffer Responses * Writable Wrapper for Protocol Buffer Responses
*/ */
private static class RpcResponseWritable implements Writable { private static class RpcResponseWritable implements Writable {
HadoopRpcResponseProto message; byte[] responseMessage;
@SuppressWarnings("unused") @SuppressWarnings("unused")
public RpcResponseWritable() { public RpcResponseWritable() {
} }
public RpcResponseWritable(HadoopRpcResponseProto message) { public RpcResponseWritable(Message message) {
this.message = message; this.responseMessage = message.toByteArray();
} }
@Override @Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
((Message)message).writeDelimitedTo( out.writeInt(responseMessage.length);
DataOutputOutputStream.constructOutputStream(out)); out.write(responseMessage);
} }
@Override @Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
int length = ProtoUtil.readRawVarint32(in); int length = in.readInt();
byte[] bytes = new byte[length]; byte[] bytes = new byte[length];
in.readFully(bytes); in.readFully(bytes);
message = HadoopRpcResponseProto.parseFrom(bytes); responseMessage = bytes;
} }
} }
@ -357,24 +344,6 @@ public class ProtobufRpcEngine implements RpcEngine {
protocolImpl); protocolImpl);
} }
private static RpcResponseWritable handleException(Throwable e) {
HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
.setExceptionName(e.getClass().getName())
.setStackTrace(StringUtils.stringifyException(e)).build();
HadoopRpcResponseProto response = HadoopRpcResponseProto.newBuilder()
.setStatus(ResponseStatus.ERRROR).setException(exception).build();
return new RpcResponseWritable(response);
}
private static HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
Message message) {
HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
.setResponse(message.toByteString())
.setStatus(ResponseStatus.SUCCESS)
.build();
return res;
}
/** /**
* Protobuf invoker for {@link RpcInvoker} * Protobuf invoker for {@link RpcInvoker}
*/ */
@ -418,7 +387,7 @@ public class ProtobufRpcEngine implements RpcEngine {
* </ol> * </ol>
*/ */
public Writable call(RPC.Server server, String protocol, public Writable call(RPC.Server server, String protocol,
Writable writableRequest, long receiveTime) throws IOException { Writable writableRequest, long receiveTime) throws Exception {
RpcRequestWritable request = (RpcRequestWritable) writableRequest; RpcRequestWritable request = (RpcRequestWritable) writableRequest;
HadoopRpcRequestProto rpcRequest = request.message; HadoopRpcRequestProto rpcRequest = request.message;
String methodName = rpcRequest.getMethodName(); String methodName = rpcRequest.getMethodName();
@ -436,7 +405,7 @@ public class ProtobufRpcEngine implements RpcEngine {
String msg = "Unknown method " + methodName + " called on " + protocol String msg = "Unknown method " + methodName + " called on " + protocol
+ " protocol."; + " protocol.";
LOG.warn(msg); LOG.warn(msg);
return handleException(new RpcServerException(msg)); throw new RpcServerException(msg);
} }
Message prototype = service.getRequestPrototype(methodDescriptor); Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = prototype.newBuilderForType() Message param = prototype.newBuilderForType()
@ -457,14 +426,11 @@ public class ProtobufRpcEngine implements RpcEngine {
server.rpcDetailedMetrics.addProcessingTime(methodName, server.rpcDetailedMetrics.addProcessingTime(methodName,
processingTime); processingTime);
} catch (ServiceException e) { } catch (ServiceException e) {
Throwable cause = e.getCause(); throw (Exception) e.getCause();
return handleException(cause != null ? cause : e);
} catch (Exception e) { } catch (Exception e) {
return handleException(e); throw e;
} }
return new RpcResponseWritable(result);
HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
return new RpcResponseWritable(response);
} }
} }
} }

View File

@ -85,7 +85,7 @@ public class RPC {
* @throws IOException * @throws IOException
**/ **/
public Writable call(Server server, String protocol, public Writable call(Server server, String protocol,
Writable rpcRequest, long receiveTime) throws IOException ; Writable rpcRequest, long receiveTime) throws Exception ;
} }
static final Log LOG = LogFactory.getLog(RPC.class); static final Log LOG = LogFactory.getLog(RPC.class);
@ -880,7 +880,7 @@ public class RPC {
@Override @Override
public Writable call(RpcKind rpcKind, String protocol, public Writable call(RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws IOException { Writable rpcRequest, long receiveTime) throws Exception {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest, return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime); receiveTime);
} }

View File

@ -1952,13 +1952,13 @@ public abstract class Server {
* Writable, long)} instead * Writable, long)} instead
*/ */
@Deprecated @Deprecated
public Writable call(Writable param, long receiveTime) throws IOException { public Writable call(Writable param, long receiveTime) throws Exception {
return call(RpcKind.RPC_BUILTIN, null, param, receiveTime); return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
} }
/** Called for each call. */ /** Called for each call. */
public abstract Writable call(RpcKind rpcKind, String protocol, public abstract Writable call(RpcKind rpcKind, String protocol,
Writable param, long receiveTime) throws IOException; Writable param, long receiveTime) throws Exception;
/** /**
* Authorize the incoming client connection. * Authorize the incoming client connection.

View File

@ -25,8 +25,11 @@ option java_outer_classname = "HadoopRpcProtos";
option java_generate_equals_and_hash = true; option java_generate_equals_and_hash = true;
/** /**
* Message used to marshal the client request * This message is used for Protobuf Rpc Engine.
* The message is used to marshal a Rpc-request
* from RPC client to the RPC server. * from RPC client to the RPC server.
* The Response to the Rpc call (including errors) are handled
* as part of the standard Rpc response.
*/ */
message HadoopRpcRequestProto { message HadoopRpcRequestProto {
/** Name of the RPC method */ /** Name of the RPC method */
@ -41,39 +44,3 @@ message HadoopRpcRequestProto {
/** protocol version of class declaring the called method */ /** protocol version of class declaring the called method */
required uint64 clientProtocolVersion = 4; required uint64 clientProtocolVersion = 4;
} }
/**
* At the RPC layer, this message is used to indicate
* the server side exception the the RPC client.
*
* Hadoop RPC client throws an exception indicated
* by exceptionName with the stackTrace.
*/
message HadoopRpcExceptionProto {
/** Class name of the exception thrown from the server */
optional string exceptionName = 1;
/** Exception stack trace from the server side */
optional string stackTrace = 2;
}
/**
* This message is used to marshal the response from
* RPC server to the client.
*/
message HadoopRpcResponseProto {
/** Status of IPC call */
enum ResponseStatus {
SUCCESS = 1;
ERRROR = 2;
}
required ResponseStatus status = 1;
// Protobuf response payload from the server, when status is SUCCESS.
optional bytes response = 2;
// Exception when status is ERROR or FATAL
optional HadoopRpcExceptionProto exception = 3;
}