Merged HADOOP-9380

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1483145 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanjay Radia 2013-05-16 01:35:16 +00:00
parent 02c99f3748
commit 47b052810b
4 changed files with 117 additions and 29 deletions

View File

@ -10,6 +10,8 @@ Release 2.0.5-beta - UNRELEASED
HADOOP-9151 Include RPC error info in RpcResponseHeader instead of sending
it separately (sanjay Radia)
HADOOP-9380 Add totalLength to rpc response (sanjay Radia)
NEW FEATURES
HADOOP-9194. RPC support for QoS. (Junping Du via llu)

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.CodedOutputStream;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@ -240,7 +241,7 @@ public class Client {
callComplete();
}
public synchronized Writable getRpcResult() {
public synchronized Writable getRpcResponse() {
return rpcResponse;
}
}
@ -941,11 +942,14 @@ public class Client {
touch();
try {
int totalLen = in.readInt();
RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in);
if (header == null) {
throw new IOException("Response is null.");
}
int headerLen = header.getSerializedSize();
headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
int callId = header.getCallId();
if (LOG.isDebugEnabled())
@ -958,11 +962,28 @@ public class Client {
value.readFields(in); // read value
call.setRpcResponse(value);
calls.remove(callId);
// verify that length was correct
// only for ProtobufEngine where len can be verified easily
if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
ProtobufRpcEngine.RpcWrapper resWrapper =
(ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
if (totalLen != headerLen + resWrapper.getLength()) {
throw new RpcClientException(
"RPC response length mismatch on rpc success");
}
}
} else { // Rpc Request failed
final String exceptionClassName = header.hasExceptionClassName() ?
// Verify that length was correct
if (totalLen != headerLen) {
throw new RpcClientException(
"RPC response length mismatch on rpc error");
}
final String exceptionClassName = header.hasExceptionClassName() ?
header.getExceptionClassName() :
"ServerDidNotSetExceptionClassName";
final String errorMsg = header.hasErrorMsg() ?
final String errorMsg = header.hasErrorMsg() ?
header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
RemoteException re =
new RemoteException(exceptionClassName, errorMsg);
@ -1280,7 +1301,7 @@ public class Client {
call.error);
}
} else {
return call.getRpcResult();
return call.getRpcResponse();
}
}
}

View File

@ -48,7 +48,9 @@ import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.AbstractMessageLite;
import com.google.protobuf.BlockingService;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
@ -226,7 +228,7 @@ public class ProtobufRpcEngine implements RpcEngine {
Message returnMessage;
try {
returnMessage = prototype.newBuilderForType()
.mergeFrom(val.responseMessage).build();
.mergeFrom(val.theResponseRead).build();
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Response <- " +
@ -267,6 +269,9 @@ public class ProtobufRpcEngine implements RpcEngine {
}
}
interface RpcWrapper extends Writable {
int getLength();
}
/**
* Wrapper for Protocol Buffer Requests
*
@ -274,7 +279,7 @@ public class ProtobufRpcEngine implements RpcEngine {
* Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC}
* use type Writable as a wrapper to work across multiple RpcEngine kinds.
*/
private static class RpcRequestWrapper implements Writable {
private static class RpcRequestWrapper implements RpcWrapper {
RequestHeaderProto requestHeader;
Message theRequest; // for clientSide, the request is here
byte[] theRequestRead; // for server side, the request is here
@ -312,6 +317,22 @@ public class ProtobufRpcEngine implements RpcEngine {
return requestHeader.getDeclaringClassProtocolName() + "." +
requestHeader.getMethodName();
}
@Override
public int getLength() {
int headerLen = requestHeader.getSerializedSize();
int reqLen;
if (theRequest != null) {
reqLen = theRequest.getSerializedSize();
} else if (theRequestRead != null ) {
reqLen = theRequestRead.length;
} else {
throw new IllegalArgumentException(
"getLenght on uninilialized RpcWrapper");
}
return CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen
+ CodedOutputStream.computeRawVarint32Size(reqLen) + reqLen;
}
}
/**
@ -321,29 +342,43 @@ public class ProtobufRpcEngine implements RpcEngine {
* Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC}
* use type Writable as a wrapper to work across multiple RpcEngine kinds.
*/
private static class RpcResponseWrapper implements Writable {
byte[] responseMessage;
private static class RpcResponseWrapper implements RpcWrapper {
Message theResponse; // for senderSide, the response is here
byte[] theResponseRead; // for receiver side, the response is here
@SuppressWarnings("unused")
public RpcResponseWrapper() {
}
public RpcResponseWrapper(Message message) {
this.responseMessage = message.toByteArray();
this.theResponse = message;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(responseMessage.length);
out.write(responseMessage);
OutputStream os = DataOutputOutputStream.constructOutputStream(out);
theResponse.writeDelimitedTo(os);
}
@Override
public void readFields(DataInput in) throws IOException {
int length = in.readInt();
byte[] bytes = new byte[length];
in.readFully(bytes);
responseMessage = bytes;
int length = ProtoUtil.readRawVarint32(in);
theResponseRead = new byte[length];
in.readFully(theResponseRead);
}
@Override
public int getLength() {
int resLen;
if (theResponse != null) {
resLen = theResponse.getSerializedSize();
} else if (theResponseRead != null ) {
resLen = theResponseRead.length;
} else {
throw new IllegalArgumentException(
"getLenght on uninilialized RpcWrapper");
}
return CodedOutputStream.computeRawVarint32Size(resLen) + resLen;
}
}

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@ -106,6 +107,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.CodedOutputStream;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@ -199,7 +201,8 @@ public abstract class Server {
// 6 : Made RPC Request header explicit
// 7 : Changed Ipc Connection Header to use Protocol buffers
// 8 : SASL server always sends a final response
public static final byte CURRENT_VERSION = 8;
// 9 : Changes to protocol for HADOOP-8990
public static final byte CURRENT_VERSION = 9;
/**
* Initial and max size of response buffer
@ -1519,10 +1522,15 @@ public abstract class Server {
" cannot communicate with client version " + clientVersion;
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
if (clientVersion >= 3) {
if (clientVersion >= 9) {
// Versions >>9 understand the normal response
Call fakeCall = new Call(-1, null, this);
// Versions 3 and greater can interpret this exception
// response in the same manner
setupResponse(buffer, fakeCall, RpcStatusProto.FATAL,
null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall);
} else if (clientVersion >= 3) {
Call fakeCall = new Call(-1, null, this);
// Versions 3 to 8 use older response
setupResponseOldVersionFatal(buffer, fakeCall,
null, VersionMismatch.class.getName(), errMsg);
@ -2020,17 +2028,34 @@ public abstract class Server {
throws IOException {
responseBuf.reset();
DataOutputStream out = new DataOutputStream(responseBuf);
RpcResponseHeaderProto.Builder response =
RpcResponseHeaderProto.Builder headerBuilder =
RpcResponseHeaderProto.newBuilder();
response.setCallId(call.callId);
response.setStatus(status);
response.setServerIpcVersionNum(Server.CURRENT_VERSION);
headerBuilder.setCallId(call.callId);
headerBuilder.setStatus(status);
headerBuilder.setServerIpcVersionNum(Server.CURRENT_VERSION);
if (status == RpcStatusProto.SUCCESS) {
RpcResponseHeaderProto header = headerBuilder.build();
final int headerLen = header.getSerializedSize();
int fullLength = CodedOutputStream.computeRawVarint32Size(headerLen) +
headerLen;
try {
response.build().writeDelimitedTo(out);
rv.write(out);
if (rv instanceof ProtobufRpcEngine.RpcWrapper) {
ProtobufRpcEngine.RpcWrapper resWrapper =
(ProtobufRpcEngine.RpcWrapper) rv;
fullLength += resWrapper.getLength();
out.writeInt(fullLength);
header.writeDelimitedTo(out);
rv.write(out);
} else { // Have to serialize to buffer to get len
final DataOutputBuffer buf = new DataOutputBuffer();
rv.write(buf);
byte[] data = buf.getData();
fullLength += buf.getLength();
out.writeInt(fullLength);
header.writeDelimitedTo(out);
out.write(data, 0, buf.getLength());
}
} catch (Throwable t) {
LOG.warn("Error serializing call response for call " + call, t);
// Call back to same function - this is OK since the
@ -2042,9 +2067,14 @@ public abstract class Server {
return;
}
} else { // Rpc Failure
response.setExceptionClassName(errorClass);
response.setErrorMsg(error);
response.build().writeDelimitedTo(out);
headerBuilder.setExceptionClassName(errorClass);
headerBuilder.setErrorMsg(error);
RpcResponseHeaderProto header = headerBuilder.build();
int headerLen = header.getSerializedSize();
final int fullLength =
CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen;
out.writeInt(fullLength);
header.writeDelimitedTo(out);
}
if (call.connection.useWrap) {
wrapWithSasl(responseBuf, call);