HADOOP-9425 Add error codes to rpc-response (sanjay Radia)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1479143 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanjay Radia 2013-05-04 17:51:22 +00:00
parent c0fa9d2231
commit 725623534c
14 changed files with 329 additions and 92 deletions

View File

@ -19,6 +19,8 @@ Trunk (Unreleased)
HADOOP-9194. RPC Support for QoS. (Junping Du via llu) HADOOP-9194. RPC Support for QoS. (Junping Du via llu)
HADOOP-9425 Add error codes to rpc-response (sanjay Radia)
NEW FEATURES NEW FEATURES
HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -987,8 +988,15 @@ public class Client {
"ServerDidNotSetExceptionClassName"; "ServerDidNotSetExceptionClassName";
final String errorMsg = header.hasErrorMsg() ? final String errorMsg = header.hasErrorMsg() ?
header.getErrorMsg() : "ServerDidNotSetErrorMsg" ; header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
final RpcErrorCodeProto erCode =
(header.hasErrorDetail() ? header.getErrorDetail() : null);
if (erCode == null) {
LOG.warn("Detailed error code not set by server on rpc error");
}
RemoteException re = RemoteException re =
new RemoteException(exceptionClassName, errorMsg); ( (erCode == null) ?
new RemoteException(exceptionClassName, errorMsg) :
new RemoteException(exceptionClassName, errorMsg, erCode));
if (status == RpcStatusProto.ERROR) { if (status == RpcStatusProto.ERROR) {
call.setException(re); call.setException(re);
calls.remove(callId); calls.remove(callId);

View File

@ -437,8 +437,8 @@ public class ProtobufRpcEngine implements RpcEngine {
*/ */
static class ProtoBufRpcInvoker implements RpcInvoker { static class ProtoBufRpcInvoker implements RpcInvoker {
private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server, private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server,
String protoName, long version) throws IOException { String protoName, long clientVersion) throws RpcServerException {
ProtoNameVer pv = new ProtoNameVer(protoName, version); ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
ProtoClassProtoImpl impl = ProtoClassProtoImpl impl =
server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv); server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
if (impl == null) { // no match for Protocol AND Version if (impl == null) { // no match for Protocol AND Version
@ -446,10 +446,11 @@ public class ProtobufRpcEngine implements RpcEngine {
server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
protoName); protoName);
if (highest == null) { if (highest == null) {
throw new IOException("Unknown protocol: " + protoName); throw new RpcNoSuchProtocolException(
"Unknown protocol: " + protoName);
} }
// protocol supported but not the version that client wants // protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, version, throw new RPC.VersionMismatch(protoName, clientVersion,
highest.version); highest.version);
} }
return impl; return impl;
@ -513,7 +514,7 @@ public class ProtobufRpcEngine implements RpcEngine {
String msg = "Unknown method " + methodName + " called on " String msg = "Unknown method " + methodName + " called on "
+ connectionProtocolName + " protocol."; + connectionProtocolName + " protocol.";
LOG.warn(msg); LOG.warn(msg);
throw new RpcServerException(msg); throw new RpcNoSuchMethodException(msg);
} }
Message prototype = service.getRequestPrototype(methodDescriptor); Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = prototype.newBuilderForType() Message param = prototype.newBuilderForType()

View File

@ -43,6 +43,8 @@ import org.apache.hadoop.io.*;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -213,7 +215,7 @@ public class RPC {
/** /**
* A version mismatch for the RPC protocol. * A version mismatch for the RPC protocol.
*/ */
public static class VersionMismatch extends IOException { public static class VersionMismatch extends RpcServerException {
private static final long serialVersionUID = 0; private static final long serialVersionUID = 0;
private String interfaceName; private String interfaceName;
@ -257,6 +259,19 @@ public class RPC {
public long getServerVersion() { public long getServerVersion() {
return serverVersion; return serverVersion;
} }
/**
* get the rpc status corresponding to this exception
*/
public RpcStatusProto getRpcStatusProto() {
return RpcStatusProto.ERROR;
}
/**
* get the detailed rpc status corresponding to this exception
*/
public RpcErrorCodeProto getRpcErrorCodeProto() {
return RpcErrorCodeProto.ERROR_RPC_VERSION_MISMATCH;
}
} }
/** /**

View File

@ -21,22 +21,38 @@ package org.apache.hadoop.ipc;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.xml.sax.Attributes; import org.xml.sax.Attributes;
public class RemoteException extends IOException { public class RemoteException extends IOException {
/** For java.io.Serializable */ /** For java.io.Serializable */
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private final int errorCode;
private String className; private String className;
public RemoteException(String className, String msg) { public RemoteException(String className, String msg) {
super(msg); super(msg);
this.className = className; this.className = className;
errorCode = -1;
}
public RemoteException(String className, String msg, RpcErrorCodeProto erCode) {
super(msg);
this.className = className;
if (erCode != null)
errorCode = erCode.getNumber();
else
errorCode = -1;
} }
public String getClassName() { public String getClassName() {
return className; return className;
} }
public RpcErrorCodeProto getErrorCode() {
return RpcErrorCodeProto.valueOf(errorCode);
}
/** /**
* If this remote exception wraps up one of the lookupTypes * If this remote exception wraps up one of the lookupTypes

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
/**
* No such Method for an Rpc Call
*
*/
public class RpcNoSuchMethodException extends RpcServerException {
private static final long serialVersionUID = 1L;
public RpcNoSuchMethodException(final String message) {
super(message);
}
/**
* get the rpc status corresponding to this exception
*/
public RpcStatusProto getRpcStatusProto() {
return RpcStatusProto.ERROR;
}
/**
* get the detailed rpc status corresponding to this exception
*/
public RpcErrorCodeProto getRpcErrorCodeProto() {
return RpcErrorCodeProto.ERROR_NO_SUCH_METHOD;
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
/**
* No such protocol (i.e. interface) for and Rpc Call
*
*/
public class RpcNoSuchProtocolException extends RpcServerException {
private static final long serialVersionUID = 1L;
public RpcNoSuchProtocolException(final String message) {
super(message);
}
/**
* get the rpc status corresponding to this exception
*/
public RpcStatusProto getRpcStatusProto() {
return RpcStatusProto.ERROR;
}
/**
* get the detailed rpc status corresponding to this exception
*/
public RpcErrorCodeProto getRpcErrorCodeProto() {
return RpcErrorCodeProto.ERROR_NO_SUCH_PROTOCOL;
}
}

View File

@ -17,6 +17,9 @@
*/ */
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
/** /**
* Indicates an exception on the RPC server * Indicates an exception on the RPC server
*/ */
@ -42,4 +45,18 @@ public class RpcServerException extends RpcException {
public RpcServerException(final String message, final Throwable cause) { public RpcServerException(final String message, final Throwable cause) {
super(message, cause); super(message, cause);
} }
/**
* get the rpc status corresponding to this exception
*/
public RpcStatusProto getRpcStatusProto() {
return RpcStatusProto.ERROR;
}
/**
* get the detailed rpc status corresponding to this exception
*/
public RpcErrorCodeProto getRpcErrorCodeProto() {
return RpcErrorCodeProto.ERROR_RPC_SERVER;
}
} }

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.*; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.*;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
@ -1438,7 +1439,8 @@ public abstract class Server {
final String ioeMessage = ioe.getLocalizedMessage(); final String ioeMessage = ioe.getLocalizedMessage();
if (authMethod == AuthMethod.SIMPLE) { if (authMethod == AuthMethod.SIMPLE) {
setupResponse(authFailedResponse, authFailedCall, setupResponse(authFailedResponse, authFailedCall,
RpcStatusProto.FATAL, null, ioeClass, ioeMessage); RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED,
null, ioeClass, ioeMessage);
responder.doRespond(authFailedCall); responder.doRespond(authFailedCall);
} else { } else {
doSaslReply(SaslStatus.ERROR, null, ioeClass, ioeMessage); doSaslReply(SaslStatus.ERROR, null, ioeClass, ioeMessage);
@ -1528,7 +1530,8 @@ public abstract class Server {
if (clientVersion >= 9) { if (clientVersion >= 9) {
// Versions >>9 understand the normal response // Versions >>9 understand the normal response
Call fakeCall = new Call(-1, null, this); Call fakeCall = new Call(-1, null, this);
setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, setupResponse(buffer, fakeCall,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
null, VersionMismatch.class.getName(), errMsg); null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall); responder.doRespond(fakeCall);
} else if (clientVersion >= 3) { } else if (clientVersion >= 3) {
@ -1557,8 +1560,9 @@ public abstract class Server {
ByteArrayOutputStream buffer = new ByteArrayOutputStream(); ByteArrayOutputStream buffer = new ByteArrayOutputStream();
Call fakeCall = new Call(-1, null, this); Call fakeCall = new Call(-1, null, this);
setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, null, setupResponse(buffer, fakeCall,
IpcException.class.getName(), errMsg); RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNSUPPORTED_SERIALIZATION,
null, IpcException.class.getName(), errMsg);
responder.doRespond(fakeCall); responder.doRespond(fakeCall);
} }
@ -1647,7 +1651,7 @@ public abstract class Server {
private void processOneRpc(byte[] buf) throws IOException, private void processOneRpc(byte[] buf) throws IOException,
InterruptedException { InterruptedException {
if (connectionContextRead) { if (connectionContextRead) {
processData(buf); processRpcRequest(buf);
} else { } else {
processConnectionContext(buf); processConnectionContext(buf);
connectionContextRead = true; connectionContextRead = true;
@ -1659,7 +1663,17 @@ public abstract class Server {
} }
} }
private void processData(byte[] buf) throws IOException, InterruptedException { /**
* Process an RPC Request - the connection headers and context have been
* read
* @param buf - contains the RPC request header and the rpc request
* @throws RpcServerException due to fatal rpc layer issues such as
* invalid header. In this case a RPC fatal status response is sent back
* to client.
*/
private void processRpcRequest(byte[] buf)
throws RpcServerException, IOException, InterruptedException {
DataInputStream dis = DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf)); new DataInputStream(new ByteArrayInputStream(buf));
RpcRequestHeaderProto header = RpcRequestHeaderProto.parseDelimitedFrom(dis); RpcRequestHeaderProto header = RpcRequestHeaderProto.parseDelimitedFrom(dis);
@ -1667,51 +1681,58 @@ public abstract class Server {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(" got #" + header.getCallId()); LOG.debug(" got #" + header.getCallId());
if (!header.hasRpcOp()) { if (!header.hasRpcOp()) {
throw new IOException(" IPC Server: No rpc op in rpcRequestHeader"); String err = " IPC Server: No rpc op in rpcRequestHeader";
respondBadRpcHeader(new Call(header.getCallId(), null, this),
RpcServerException.class.getName(), err);
throw new RpcServerException(err);
} }
if (header.getRpcOp() != if (header.getRpcOp() !=
RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) { RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
throw new IOException("IPC Server does not implement operation" + String err = "IPC Server does not implement rpc header operation" +
header.getRpcOp()); header.getRpcOp();
respondBadRpcHeader(new Call(header.getCallId(), null, this),
RpcServerException.class.getName(), err);
throw new RpcServerException(err);
} }
// If we know the rpc kind, get its class so that we can deserialize // If we know the rpc kind, get its class so that we can deserialize
// (Note it would make more sense to have the handler deserialize but // (Note it would make more sense to have the handler deserialize but
// we continue with this original design. // we continue with this original design.
if (!header.hasRpcKind()) { if (!header.hasRpcKind()) {
throw new IOException(" IPC Server: No rpc kind in rpcRequestHeader"); String err = " IPC Server: No rpc kind in rpcRequestHeader";
respondBadRpcHeader(new Call(header.getCallId(), null, this),
RpcServerException.class.getName(), err);
throw new RpcServerException(err);
} }
Class<? extends Writable> rpcRequestClass = Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getRpcKind()); getRpcRequestWrapper(header.getRpcKind());
if (rpcRequestClass == null) { if (rpcRequestClass == null) {
LOG.warn("Unknown rpc kind " + header.getRpcKind() + LOG.warn("Unknown rpc kind " + header.getRpcKind() +
" from client " + getHostAddress()); " from client " + getHostAddress());
final Call readParamsFailedCall = final String err = "Unknown rpc kind in rpc header" +
new Call(header.getCallId(), null, this); header.getRpcKind();
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); respondBadRpcHeader(new Call(header.getCallId(), null, this),
RpcServerException.class.getName(), err);
setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null, throw new RpcServerException(err);
IOException.class.getName(),
"Unknown rpc kind " + header.getRpcKind());
responder.doRespond(readParamsFailedCall);
return;
} }
Writable rpcRequest; Writable rpcRequest;
try { //Read the rpc request try { //Read the rpc request
rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf); rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
rpcRequest.readFields(dis); rpcRequest.readFields(dis);
} catch (Throwable t) { } catch (Throwable t) { // includes runtime exception from newInstance
LOG.warn("Unable to read call parameters for client " + LOG.warn("Unable to read call parameters for client " +
getHostAddress() + "on connection protocol " + getHostAddress() + "on connection protocol " +
this.protocolName + " for rpcKind " + header.getRpcKind(), t); this.protocolName + " for rpcKind " + header.getRpcKind(), t);
final Call readParamsFailedCall = final Call readParamsFailedCall =
new Call(header.getCallId(), null, this); new Call(header.getCallId(), null, this);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
String err = "IPC server unable to read call parameters: "+ t.getMessage();
setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null, setupResponse(responseBuffer, readParamsFailedCall,
t.getClass().getName(), RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
"IPC server unable to read call parameters: " + t.getMessage()); null, t.getClass().getName(),
err);
responder.doRespond(readParamsFailedCall); responder.doRespond(readParamsFailedCall);
return; throw new RpcServerException(err, t);
} }
Call call = new Call(header.getCallId(), rpcRequest, this, Call call = new Call(header.getCallId(), rpcRequest, this,
@ -1737,7 +1758,8 @@ public abstract class Server {
rpcMetrics.incrAuthorizationSuccesses(); rpcMetrics.incrAuthorizationSuccesses();
} catch (AuthorizationException ae) { } catch (AuthorizationException ae) {
rpcMetrics.incrAuthorizationFailures(); rpcMetrics.incrAuthorizationFailures();
setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL, null, setupResponse(authFailedResponse, authFailedCall,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED, null,
ae.getClass().getName(), ae.getMessage()); ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall); responder.doRespond(authFailedCall);
return false; return false;
@ -1799,6 +1821,8 @@ public abstract class Server {
} }
String errorClass = null; String errorClass = null;
String error = null; String error = null;
RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
RpcErrorCodeProto detailedErr = null;
Writable value = null; Writable value = null;
CurCall.set(call); CurCall.set(call);
@ -1839,7 +1863,14 @@ public abstract class Server {
} else { } else {
LOG.info(logMsg, e); LOG.info(logMsg, e);
} }
if (e instanceof RpcServerException) {
RpcServerException rse = ((RpcServerException)e);
returnStatus = rse.getRpcStatusProto();
detailedErr = rse.getRpcErrorCodeProto();
} else {
returnStatus = RpcStatusProto.ERROR;
detailedErr = RpcErrorCodeProto.ERROR_APPLICATION;
}
errorClass = e.getClass().getName(); errorClass = e.getClass().getName();
error = StringUtils.stringifyException(e); error = StringUtils.stringifyException(e);
// Remove redundant error class name from the beginning of the stack trace // Remove redundant error class name from the beginning of the stack trace
@ -1854,8 +1885,8 @@ public abstract class Server {
// responder.doResponse() since setupResponse may use // responder.doResponse() since setupResponse may use
// SASL to encrypt response data and SASL enforces // SASL to encrypt response data and SASL enforces
// its own message ordering. // its own message ordering.
setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS setupResponse(buf, call, returnStatus, detailedErr,
: RpcStatusProto.ERROR, value, errorClass, error); value, errorClass, error);
// Discard the large buf and reset it back to smaller size // Discard the large buf and reset it back to smaller size
// to free up heap // to free up heap
@ -2026,7 +2057,7 @@ public abstract class Server {
* @throws IOException * @throws IOException
*/ */
private void setupResponse(ByteArrayOutputStream responseBuf, private void setupResponse(ByteArrayOutputStream responseBuf,
Call call, RpcStatusProto status, Call call, RpcStatusProto status, RpcErrorCodeProto erCode,
Writable rv, String errorClass, String error) Writable rv, String errorClass, String error)
throws IOException { throws IOException {
responseBuf.reset(); responseBuf.reset();
@ -2065,6 +2096,7 @@ public abstract class Server {
// buffer is reset at the top, and since status is changed // buffer is reset at the top, and since status is changed
// to ERROR it won't infinite loop. // to ERROR it won't infinite loop.
setupResponse(responseBuf, call, RpcStatusProto.ERROR, setupResponse(responseBuf, call, RpcStatusProto.ERROR,
RpcErrorCodeProto.ERROR_SERIALIZING_RESPONSE,
null, t.getClass().getName(), null, t.getClass().getName(),
StringUtils.stringifyException(t)); StringUtils.stringifyException(t));
return; return;
@ -2072,6 +2104,7 @@ public abstract class Server {
} else { // Rpc Failure } else { // Rpc Failure
headerBuilder.setExceptionClassName(errorClass); headerBuilder.setExceptionClassName(errorClass);
headerBuilder.setErrorMsg(error); headerBuilder.setErrorMsg(error);
headerBuilder.setErrorDetail(erCode);
RpcResponseHeaderProto header = headerBuilder.build(); RpcResponseHeaderProto header = headerBuilder.build();
int headerLen = header.getSerializedSize(); int headerLen = header.getSerializedSize();
final int fullLength = final int fullLength =
@ -2116,6 +2149,19 @@ public abstract class Server {
call.setResponse(ByteBuffer.wrap(response.toByteArray())); call.setResponse(ByteBuffer.wrap(response.toByteArray()));
} }
private void respondBadRpcHeader(Call call, String errorClass, String error)
throws IOException
{
ByteArrayOutputStream responseBuf = new ByteArrayOutputStream();
setupResponse(responseBuf, call,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
null, errorClass, error);
responder.doRespond(call);
return;
}
private void wrapWithSasl(ByteArrayOutputStream response, Call call) private void wrapWithSasl(ByteArrayOutputStream response, Call call)
throws IOException { throws IOException {
if (call.connection.saslServer != null) { if (call.connection.saslServer != null) {

View File

@ -416,62 +416,62 @@ public class WritableRpcEngine implements RpcEngine {
@Override @Override
public Writable call(org.apache.hadoop.ipc.RPC.Server server, public Writable call(org.apache.hadoop.ipc.RPC.Server server,
String protocolName, Writable rpcRequest, long receivedTime) String protocolName, Writable rpcRequest, long receivedTime)
throws IOException { throws IOException, RPC.VersionMismatch {
try {
Invocation call = (Invocation)rpcRequest;
if (server.verbose) log("Call: " + call);
// Verify rpc version Invocation call = (Invocation)rpcRequest;
if (call.getRpcVersion() != writableRpcVersion) { if (server.verbose) log("Call: " + call);
// Client is using a different version of WritableRpc
throw new IOException( // Verify writable rpc version
"WritableRpc version mismatch, client side version=" if (call.getRpcVersion() != writableRpcVersion) {
+ call.getRpcVersion() + ", server side version=" // Client is using a different version of WritableRpc
+ writableRpcVersion); throw new RpcServerException(
"WritableRpc version mismatch, client side version="
+ call.getRpcVersion() + ", server side version="
+ writableRpcVersion);
}
long clientVersion = call.getProtocolVersion();
final String protoName;
ProtoClassProtoImpl protocolImpl;
if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
// VersionProtocol methods are often used by client to figure out
// which version of protocol to use.
//
// Versioned protocol methods should go the protocolName protocol
// rather than the declaring class of the method since the
// the declaring class is VersionedProtocol which is not
// registered directly.
// Send the call to the highest protocol version
VerProtocolImpl highest = server.getHighestSupportedProtocol(
RPC.RpcKind.RPC_WRITABLE, protocolName);
if (highest == null) {
throw new RpcServerException("Unknown protocol: " + protocolName);
} }
protocolImpl = highest.protocolTarget;
} else {
protoName = call.declaringClassProtocolName;
long clientVersion = call.getProtocolVersion(); // Find the right impl for the protocol based on client version.
final String protoName; ProtoNameVer pv =
ProtoClassProtoImpl protocolImpl; new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) { protocolImpl =
// VersionProtocol methods are often used by client to figure out server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv);
// which version of protocol to use. if (protocolImpl == null) { // no match for Protocol AND Version
// VerProtocolImpl highest =
// Versioned protocol methods should go the protocolName protocol server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE,
// rather than the declaring class of the method since the protoName);
// the declaring class is VersionedProtocol which is not
// registered directly.
// Send the call to the highest protocol version
VerProtocolImpl highest = server.getHighestSupportedProtocol(
RPC.RpcKind.RPC_WRITABLE, protocolName);
if (highest == null) { if (highest == null) {
throw new IOException("Unknown protocol: " + protocolName); throw new RpcServerException("Unknown protocol: " + protoName);
} } else { // protocol supported but not the version that client wants
protocolImpl = highest.protocolTarget; throw new RPC.VersionMismatch(protoName, clientVersion,
} else { highest.version);
protoName = call.declaringClassProtocolName;
// Find the right impl for the protocol based on client version.
ProtoNameVer pv =
new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
protocolImpl =
server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv);
if (protocolImpl == null) { // no match for Protocol AND Version
VerProtocolImpl highest =
server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE,
protoName);
if (highest == null) {
throw new IOException("Unknown protocol: " + protoName);
} else { // protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, clientVersion,
highest.version);
}
} }
} }
}
// Invoke the protocol method // Invoke the protocol method
try {
long startTime = Time.now(); long startTime = Time.now();
Method method = Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(), protocolImpl.protocolClass.getMethod(call.getMethodName(),

View File

@ -1,4 +1,4 @@
/**DER /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information

View File

@ -62,27 +62,55 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
/** /**
* Rpc Response Header * Rpc Response Header
* ** If request is successfull response is returned as below ********
* +------------------------------------------------------------------+ * +------------------------------------------------------------------+
* | Rpc reponse length in bytes (4 bytes int) | * | Rpc total response length in bytes (4 bytes int) |
* | (sum of next two parts) | * | (sum of next two parts) |
* +------------------------------------------------------------------+ * +------------------------------------------------------------------+
* | RpcResponseHeaderProto - serialized delimited ie has len | * | RpcResponseHeaderProto - serialized delimited ie has len |
* +------------------------------------------------------------------+ * +------------------------------------------------------------------+
* | if request is successful: | * | if request is successful: |
* | - RpcResponse - The actual rpc response bytes follow | * | - RpcResponse - The actual rpc response bytes follow |
* the response header | * | the response header |
* | This response is serialized based on RpcKindProto | * | This response is serialized based on RpcKindProto |
* | if request fails : | * | if request fails : |
* | The rpc response header contains the necessary info | * | The rpc response header contains the necessary info |
* +------------------------------------------------------------------+ * +------------------------------------------------------------------+
* *
* Note that rpc response header is also used when connection setup fails.
* Ie the response looks like a rpc response with a fake callId.
*/ */
message RpcResponseHeaderProto { message RpcResponseHeaderProto {
/**
*
* RpcStastus - success or failure
* The reponseHeader's errDetail, exceptionClassName and errMsg contains
* further details on the error
**/
enum RpcStatusProto { enum RpcStatusProto {
SUCCESS = 0; // RPC succeeded SUCCESS = 0; // RPC succeeded
ERROR = 1; // RPC Failed ERROR = 1; // RPC or error - connection left open for future calls
FATAL = 2; // Fatal error - connection is closed FATAL = 2; // Fatal error - connection closed
}
enum RpcErrorCodeProto {
// Non-fatal Rpc error - connection left open for future rpc calls
ERROR_APPLICATION = 1; // RPC Failed - rpc app threw exception
ERROR_NO_SUCH_METHOD = 2; // Rpc error - no such method
ERROR_NO_SUCH_PROTOCOL = 3; // Rpc error - no such protocol
ERROR_RPC_SERVER = 4; // Rpc error on server side
ERROR_SERIALIZING_RESPONSE = 5; // error serializign response
ERROR_RPC_VERSION_MISMATCH = 6; // Rpc protocol version mismatch
// Fatal Server side Rpc error - connection closed
FATAL_UNKNOWN = 10; // unknown Fatal error
FATAL_UNSUPPORTED_SERIALIZATION = 11; // IPC layer serilization type invalid
FATAL_INVALID_RPC_HEADER = 12; // fields of RpcHeader are invalid
FATAL_DESERIALIZING_REQUEST = 13; // could not deserilize rpc request
FATAL_VERSION_MISMATCH = 14; // Ipc Layer version mismatch
FATAL_UNAUTHORIZED = 15; // Auth failed
} }
required uint32 callId = 1; // callId used in Request required uint32 callId = 1; // callId used in Request
@ -90,4 +118,5 @@ message RpcResponseHeaderProto {
optional uint32 serverIpcVersionNum = 3; // Sent if success or fail optional uint32 serverIpcVersionNum = 3; // Sent if success or fail
optional string exceptionClassName = 4; // if request fails optional string exceptionClassName = 4; // if request fails
optional string errorMsg = 5; // if request fails, often contains strack trace optional string errorMsg = 5; // if request fails, often contains strack trace
optional RpcErrorCodeProto errorDetail = 6; // in case of error
} }

View File

@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
@ -183,6 +184,8 @@ public class TestProtoBufRpc {
RemoteException re = (RemoteException)e.getCause(); RemoteException re = (RemoteException)e.getCause();
RpcServerException rse = (RpcServerException) re RpcServerException rse = (RpcServerException) re
.unwrapRemoteException(RpcServerException.class); .unwrapRemoteException(RpcServerException.class);
Assert.assertTrue(re.getErrorCode().equals(
RpcErrorCodeProto.ERROR_RPC_SERVER));
} }
} }
@ -223,6 +226,8 @@ public class TestProtoBufRpc {
Assert.assertTrue(re.getClassName().equals( Assert.assertTrue(re.getClassName().equals(
URISyntaxException.class.getName())); URISyntaxException.class.getName()));
Assert.assertTrue(re.getMessage().contains("testException")); Assert.assertTrue(re.getMessage().contains("testException"));
Assert.assertTrue(
re.getErrorCode().equals(RpcErrorCodeProto.ERROR_APPLICATION));
} }
} }
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -310,9 +311,13 @@ System.out.println("echo int is NOT supported");
try { try {
proxy.echo(21); proxy.echo(21);
fail("The call must throw VersionMismatch exception"); fail("The call must throw VersionMismatch exception");
} catch (IOException ex) { } catch (RemoteException ex) {
Assert.assertTrue("Expected version mismatch but got " + ex.getMessage(), Assert.assertEquals(RPC.VersionMismatch.class.getName(),
ex.getMessage().contains("VersionMismatch")); ex.getClassName());
Assert.assertTrue(ex.getErrorCode().equals(
RpcErrorCodeProto.ERROR_RPC_VERSION_MISMATCH));
} catch (IOException ex) {
fail("Expected version mismatch but got " + ex);
} }
} }