Merged HADOOP-9151

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1483144 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanjay Radia 2013-05-16 01:32:14 +00:00
parent 3c929c9d03
commit 02c99f3748
5 changed files with 40 additions and 25 deletions

View File

@ -7,6 +7,9 @@ Release 2.0.5-beta - UNRELEASED
HADOOP-9163 The rpc msg in ProtobufRpcEngine.proto should be moved out to HADOOP-9163 The rpc msg in ProtobufRpcEngine.proto should be moved out to
avoid an extra copy (Sanjay Radia) avoid an extra copy (Sanjay Radia)
HADOOP-9151 Include RPC error info in RpcResponseHeader instead of sending
it separately (sanjay Radia)
NEW FEATURES NEW FEATURES
HADOOP-9194. RPC support for QoS. (Junping Du via llu) HADOOP-9194. RPC support for QoS. (Junping Du via llu)

View File

@ -320,4 +320,11 @@
<Method name="removeRenewAction" /> <Method name="removeRenewAction" />
<Bug pattern="BC_UNCONFIRMED_CAST" /> <Bug pattern="BC_UNCONFIRMED_CAST" />
</Match> </Match>
<!-- Inconsistent synchronization flagged by findbugs is not valid. -->
<Match>
<Class name="org.apache.hadoop.ipc.Client$Connection" />
<Field name="in" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -59,7 +59,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
@ -942,31 +941,38 @@ public class Client {
touch(); touch();
try { try {
RpcResponseHeaderProto response = RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in); RpcResponseHeaderProto.parseDelimitedFrom(in);
if (response == null) { if (header == null) {
throw new IOException("Response is null."); throw new IOException("Response is null.");
} }
int callId = response.getCallId(); int callId = header.getCallId();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + callId); LOG.debug(getName() + " got value #" + callId);
Call call = calls.get(callId); Call call = calls.get(callId);
RpcStatusProto status = response.getStatus(); RpcStatusProto status = header.getStatus();
if (status == RpcStatusProto.SUCCESS) { if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf); Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value value.readFields(in); // read value
call.setRpcResponse(value); call.setRpcResponse(value);
calls.remove(callId); calls.remove(callId);
} else if (status == RpcStatusProto.ERROR) { } else { // Rpc Request failed
call.setException(new RemoteException(WritableUtils.readString(in), final String exceptionClassName = header.hasExceptionClassName() ?
WritableUtils.readString(in))); header.getExceptionClassName() :
"ServerDidNotSetExceptionClassName";
final String errorMsg = header.hasErrorMsg() ?
header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
RemoteException re =
new RemoteException(exceptionClassName, errorMsg);
if (status == RpcStatusProto.ERROR) {
call.setException(re);
calls.remove(callId); calls.remove(callId);
} else if (status == RpcStatusProto.FATAL) { } else if (status == RpcStatusProto.FATAL) {
// Close the connection // Close the connection
markClosed(new RemoteException(WritableUtils.readString(in), markClosed(re);
WritableUtils.readString(in))); }
} }
} catch (IOException e) { } catch (IOException e) {
markClosed(e); markClosed(e);

View File

@ -2024,6 +2024,7 @@ public abstract class Server {
RpcResponseHeaderProto.newBuilder(); RpcResponseHeaderProto.newBuilder();
response.setCallId(call.callId); response.setCallId(call.callId);
response.setStatus(status); response.setStatus(status);
response.setServerIpcVersionNum(Server.CURRENT_VERSION);
if (status == RpcStatusProto.SUCCESS) { if (status == RpcStatusProto.SUCCESS) {
@ -2040,13 +2041,10 @@ public abstract class Server {
StringUtils.stringifyException(t)); StringUtils.stringifyException(t));
return; return;
} }
} else { } else { // Rpc Failure
if (status == RpcStatusProto.FATAL) { response.setExceptionClassName(errorClass);
response.setServerIpcVersionNum(Server.CURRENT_VERSION); response.setErrorMsg(error);
}
response.build().writeDelimitedTo(out); response.build().writeDelimitedTo(out);
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
} }
if (call.connection.useWrap) { if (call.connection.useWrap) {
wrapWithSasl(responseBuf, call); wrapWithSasl(responseBuf, call);

View File

@ -70,12 +70,11 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
* | RpcResponseHeaderProto - serialized delimited ie has len | * | RpcResponseHeaderProto - serialized delimited ie has len |
* +------------------------------------------------------------------+ * +------------------------------------------------------------------+
* | if request is successful: | * | if request is successful: |
* | - RpcResponse - The actual rpc response bytes | * | - RpcResponse - The actual rpc response bytes follow |
* the response header |
* | This response is serialized based on RpcKindProto | * | This response is serialized based on RpcKindProto |
* | if request fails : | * | if request fails : |
* | - length (4 byte int) + Class name of exception - UTF-8 string | * | The rpc response header contains the necessary info |
* | - length (4 byte int) + Stacktrace - UTF-8 string |
* | if the strings are null then the length is -1 |
* +------------------------------------------------------------------+ * +------------------------------------------------------------------+
* *
*/ */
@ -88,5 +87,7 @@ message RpcResponseHeaderProto {
required uint32 callId = 1; // callId used in Request required uint32 callId = 1; // callId used in Request
required RpcStatusProto status = 2; required RpcStatusProto status = 2;
optional uint32 serverIpcVersionNum = 3; // in case of an fatal IPC error optional uint32 serverIpcVersionNum = 3; // Sent if success or fail
optional string exceptionClassName = 4; // if request fails
optional string errorMsg = 5; // if request fails, often contains strack trace
} }