svn merge -c 1337283 from trunk for HADOOP-8366 Use ProtoBuf for RpcResponseHeader.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1337430 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-05-12 01:49:52 +00:00
parent b2edecfdde
commit e1013c3d29
5 changed files with 95 additions and 65 deletions

View File

@ -184,6 +184,8 @@ Release 2.0.0 - UNRELEASED
HADOOP-8285 Use ProtoBuf for RpcPayLoadHeader (sanjay radia) HADOOP-8285 Use ProtoBuf for RpcPayLoadHeader (sanjay radia)
HADOOP-8366 Use ProtoBuf for RpcResponseHeader (sanjay radia)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -53,6 +53,8 @@
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
@ -845,24 +847,24 @@ private void receiveResponse() {
touch(); touch();
try { try {
int id = in.readInt(); // try to read an id RpcResponseHeaderProto response =
RpcResponseHeaderProto.parseDelimitedFrom(in);
int callId = response.getCallId();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id); LOG.debug(getName() + " got value #" + callId);
Call call = calls.get(id); Call call = calls.get(callId);
RpcStatusProto status = response.getStatus();
int state = in.readInt(); // read call status if (status == RpcStatusProto.SUCCESS) {
if (state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf); Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value value.readFields(in); // read value
call.setRpcResponse(value); call.setRpcResponse(value);
calls.remove(id); calls.remove(callId);
} else if (state == Status.ERROR.state) { } else if (status == RpcStatusProto.ERROR) {
call.setException(new RemoteException(WritableUtils.readString(in), call.setException(new RemoteException(WritableUtils.readString(in),
WritableUtils.readString(in))); WritableUtils.readString(in)));
calls.remove(id); calls.remove(callId);
} else if (state == Status.FATAL.state) { } else if (status == RpcStatusProto.FATAL) {
// Close the connection // Close the connection
markClosed(new RemoteException(WritableUtils.readString(in), markClosed(new RemoteException(WritableUtils.readString(in),
WritableUtils.readString(in))); WritableUtils.readString(in)));

View File

@ -1339,7 +1339,7 @@ public int readAndProcess() throws IOException, InterruptedException {
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
+ ") is configured as simple. Please configure another method " + ") is configured as simple. Please configure another method "
+ "like kerberos or digest."); + "like kerberos or digest.");
setupResponse(authFailedResponse, authFailedCall, Status.FATAL, setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL,
null, ae.getClass().getName(), ae.getMessage()); null, ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall); responder.doRespond(authFailedCall);
throw ae; throw ae;
@ -1420,7 +1420,7 @@ private void setupBadVersionResponse(int clientVersion) throws IOException {
Call fakeCall = new Call(-1, null, this); Call fakeCall = new Call(-1, null, this);
// Versions 3 and greater can interpret this exception // Versions 3 and greater can interpret this exception
// response in the same manner // response in the same manner
setupResponse(buffer, fakeCall, Status.FATAL, setupResponseOldVersionFatal(buffer, fakeCall,
null, VersionMismatch.class.getName(), errMsg); null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall); responder.doRespond(fakeCall);
@ -1443,7 +1443,7 @@ private void respondUnsupportedSerialization(IpcSerializationType st) throws IOE
ByteArrayOutputStream buffer = new ByteArrayOutputStream(); ByteArrayOutputStream buffer = new ByteArrayOutputStream();
Call fakeCall = new Call(-1, null, this); Call fakeCall = new Call(-1, null, this);
setupResponse(buffer, fakeCall, Status.FATAL, null, setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, null,
IpcException.class.getName(), errMsg); IpcException.class.getName(), errMsg);
responder.doRespond(fakeCall); responder.doRespond(fakeCall);
} }
@ -1579,7 +1579,7 @@ private void processData(byte[] buf) throws IOException, InterruptedException {
new Call(header.getCallId(), null, this); new Call(header.getCallId(), null, this);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null,
IOException.class.getName(), IOException.class.getName(),
"Unknown rpc kind " + header.getRpcKind()); "Unknown rpc kind " + header.getRpcKind());
responder.doRespond(readParamsFailedCall); responder.doRespond(readParamsFailedCall);
@ -1597,7 +1597,7 @@ private void processData(byte[] buf) throws IOException, InterruptedException {
new Call(header.getCallId(), null, this); new Call(header.getCallId(), null, this);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null,
t.getClass().getName(), t.getClass().getName(),
"IPC server unable to read call parameters: " + t.getMessage()); "IPC server unable to read call parameters: " + t.getMessage());
responder.doRespond(readParamsFailedCall); responder.doRespond(readParamsFailedCall);
@ -1627,7 +1627,7 @@ private boolean authorizeConnection() throws IOException {
rpcMetrics.incrAuthorizationSuccesses(); rpcMetrics.incrAuthorizationSuccesses();
} catch (AuthorizationException ae) { } catch (AuthorizationException ae) {
rpcMetrics.incrAuthorizationFailures(); rpcMetrics.incrAuthorizationFailures();
setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null, setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL, null,
ae.getClass().getName(), ae.getMessage()); ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall); responder.doRespond(authFailedCall);
return false; return false;
@ -1725,8 +1725,8 @@ public Writable run() throws Exception {
// responder.doResponse() since setupResponse may use // responder.doResponse() since setupResponse may use
// SASL to encrypt response data and SASL enforces // SASL to encrypt response data and SASL enforces
// its own message ordering. // its own message ordering.
setupResponse(buf, call, (error == null) ? Status.SUCCESS setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS
: Status.ERROR, value, errorClass, error); : RpcStatusProto.ERROR, value, errorClass, error);
// Discard the large buf and reset it back to smaller size // Discard the large buf and reset it back to smaller size
// to free up heap // to free up heap
@ -1859,40 +1859,79 @@ private void closeConnection(Connection connection) {
/** /**
* Setup response for the IPC Call. * Setup response for the IPC Call.
* *
* @param response buffer to serialize the response into * @param responseBuf buffer to serialize the response into
* @param call {@link Call} to which we are setting up the response * @param call {@link Call} to which we are setting up the response
* @param status {@link Status} of the IPC call * @param status of the IPC call
* @param rv return value for the IPC Call, if the call was successful * @param rv return value for the IPC Call, if the call was successful
* @param errorClass error class, if the the call failed * @param errorClass error class, if the the call failed
* @param error error message, if the call failed * @param error error message, if the call failed
* @throws IOException * @throws IOException
*/ */
private void setupResponse(ByteArrayOutputStream response, private void setupResponse(ByteArrayOutputStream responseBuf,
Call call, Status status, Call call, RpcStatusProto status,
Writable rv, String errorClass, String error) Writable rv, String errorClass, String error)
throws IOException { throws IOException {
response.reset(); responseBuf.reset();
DataOutputStream out = new DataOutputStream(response); DataOutputStream out = new DataOutputStream(responseBuf);
out.writeInt(call.callId); // write call id RpcResponseHeaderProto.Builder response =
out.writeInt(status.state); // write status RpcResponseHeaderProto.newBuilder();
response.setCallId(call.callId);
response.setStatus(status);
if (status == Status.SUCCESS) {
if (status == RpcStatusProto.SUCCESS) {
try { try {
response.build().writeDelimitedTo(out);
rv.write(out); rv.write(out);
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Error serializing call response for call " + call, t); LOG.warn("Error serializing call response for call " + call, t);
// Call back to same function - this is OK since the // Call back to same function - this is OK since the
// buffer is reset at the top, and since status is changed // buffer is reset at the top, and since status is changed
// to ERROR it won't infinite loop. // to ERROR it won't infinite loop.
setupResponse(response, call, Status.ERROR, setupResponse(responseBuf, call, RpcStatusProto.ERROR,
null, t.getClass().getName(), null, t.getClass().getName(),
StringUtils.stringifyException(t)); StringUtils.stringifyException(t));
return; return;
} }
} else { } else {
if (status == RpcStatusProto.FATAL) {
response.setServerIpcVersionNum(Server.CURRENT_VERSION);
}
response.build().writeDelimitedTo(out);
WritableUtils.writeString(out, errorClass); WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error); WritableUtils.writeString(out, error);
} }
if (call.connection.useWrap) {
wrapWithSasl(responseBuf, call);
}
call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
}
/**
* Setup response for the IPC Call on Fatal Error from a
* client that is using old version of Hadoop.
* The response is serialized using the previous protocol's response
* layout.
*
* @param response buffer to serialize the response into
* @param call {@link Call} to which we are setting up the response
* @param rv return value for the IPC Call, if the call was successful
* @param errorClass error class, if the the call failed
* @param error error message, if the call failed
* @throws IOException
*/
private void setupResponseOldVersionFatal(ByteArrayOutputStream response,
Call call,
Writable rv, String errorClass, String error)
throws IOException {
final int OLD_VERSION_FATAL_STATUS = -1;
response.reset();
DataOutputStream out = new DataOutputStream(response);
out.writeInt(call.callId); // write call id
out.writeInt(OLD_VERSION_FATAL_STATUS); // write FATAL_STATUS
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
if (call.connection.useWrap) { if (call.connection.useWrap) {
wrapWithSasl(response, call); wrapWithSasl(response, call);
} }

View File

@ -1,32 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
/**
* Status of a Hadoop IPC call.
*/
enum Status {
SUCCESS (0),
ERROR (1),
FATAL (-1);
int state;
private Status(int state) {
this.state = state;
}
}

View File

@ -19,7 +19,6 @@ option java_package = "org.apache.hadoop.ipc.protobuf";
option java_outer_classname = "RpcPayloadHeaderProtos"; option java_outer_classname = "RpcPayloadHeaderProtos";
option java_generate_equals_and_hash = true; option java_generate_equals_and_hash = true;
/** /**
* This is the rpc payload header. It is sent with every rpc call. * This is the rpc payload header. It is sent with every rpc call.
* *
@ -34,8 +33,6 @@ option java_generate_equals_and_hash = true;
* *
*/ */
/** /**
* RpcKind determine the rpcEngine and the serialization of the rpc payload * RpcKind determine the rpcEngine and the serialization of the rpc payload
*/ */
@ -54,5 +51,27 @@ enum RpcPayloadOperationProto {
message RpcPayloadHeaderProto { // the header for the RpcRequest message RpcPayloadHeaderProto { // the header for the RpcRequest
optional RpcKindProto rpcKind = 1; optional RpcKindProto rpcKind = 1;
optional RpcPayloadOperationProto rpcOp = 2; optional RpcPayloadOperationProto rpcOp = 2;
optional uint32 callId = 3; // each rpc has a callId that is also used in response required uint32 callId = 3; // each rpc has a callId that is also used in response
}
enum RpcStatusProto {
SUCCESS = 0; // RPC succeeded
ERROR = 1; // RPC Failed
FATAL = 2; // Fatal error - connection is closed
}
/**
* Rpc Response Header
* - If successfull then the Respose follows after this header
* - length (4 byte int), followed by the response
* - If error or fatal - the exception info follow
* - length (4 byte int) Class name of exception - UTF-8 string
* - length (4 byte int) Stacktrace - UTF-8 string
* - if the strings are null then the length is -1
* In case of Fatal error then the respose contains the Serverside's IPC version
*/
message RpcResponseHeaderProto {
required uint32 callId = 1; // callId used in Request
required RpcStatusProto status = 2;
optional uint32 serverIpcVersionNum = 3; // in case of an fatal IPC error
} }