From e1013c3d29ebc101fae2656632598e0cadc1707a Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Sat, 12 May 2012 01:49:52 +0000 Subject: [PATCH] svn merge -c 1337283 from trunk for HADOOP-8366 Use ProtoBuf for RpcResponseHeader. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1337430 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../java/org/apache/hadoop/ipc/Client.java | 24 +++--- .../java/org/apache/hadoop/ipc/Server.java | 75 ++++++++++++++----- .../java/org/apache/hadoop/ipc/Status.java | 32 -------- .../src/main/proto/RpcPayloadHeader.proto | 27 ++++++- 5 files changed, 95 insertions(+), 65 deletions(-) delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Status.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 652d9a28e04..953f4a76338 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -184,6 +184,8 @@ Release 2.0.0 - UNRELEASED HADOOP-8285 Use ProtoBuf for RpcPayLoadHeader (sanjay radia) + HADOOP-8366 Use ProtoBuf for RpcResponseHeader (sanjay radia) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 083141311b5..ef32cfde3a9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -53,6 +53,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto; +import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto; +import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @@ -845,24 +847,24 @@ public class Client { touch(); try { - int id = in.readInt(); // try to read an id - + RpcResponseHeaderProto response = + RpcResponseHeaderProto.parseDelimitedFrom(in); + int callId = response.getCallId(); if (LOG.isDebugEnabled()) - LOG.debug(getName() + " got value #" + id); + LOG.debug(getName() + " got value #" + callId); - Call call = calls.get(id); - - int state = in.readInt(); // read call status - if (state == Status.SUCCESS.state) { + Call call = calls.get(callId); + RpcStatusProto status = response.getStatus(); + if (status == RpcStatusProto.SUCCESS) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value call.setRpcResponse(value); - calls.remove(id); - } else if (state == Status.ERROR.state) { + calls.remove(callId); + } else if (status == RpcStatusProto.ERROR) { call.setException(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); - calls.remove(id); - } else if (state == Status.FATAL.state) { + calls.remove(callId); + } else if (status == RpcStatusProto.FATAL) { // Close the connection markClosed(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 24c2f7beaa7..d227a416afe 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -1339,7 +1339,7 @@ public abstract class Server { + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + ") is configured as simple. Please configure another method " + "like kerberos or digest."); - setupResponse(authFailedResponse, authFailedCall, Status.FATAL, + setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL, null, ae.getClass().getName(), ae.getMessage()); responder.doRespond(authFailedCall); throw ae; @@ -1420,7 +1420,7 @@ public abstract class Server { Call fakeCall = new Call(-1, null, this); // Versions 3 and greater can interpret this exception // response in the same manner - setupResponse(buffer, fakeCall, Status.FATAL, + setupResponseOldVersionFatal(buffer, fakeCall, null, VersionMismatch.class.getName(), errMsg); responder.doRespond(fakeCall); @@ -1443,7 +1443,7 @@ public abstract class Server { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); Call fakeCall = new Call(-1, null, this); - setupResponse(buffer, fakeCall, Status.FATAL, null, + setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, null, IpcException.class.getName(), errMsg); responder.doRespond(fakeCall); } @@ -1579,7 +1579,7 @@ public abstract class Server { new Call(header.getCallId(), null, this); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); - setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, + setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null, IOException.class.getName(), "Unknown rpc kind " + header.getRpcKind()); responder.doRespond(readParamsFailedCall); @@ -1597,7 +1597,7 @@ public abstract class Server { new Call(header.getCallId(), null, this); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); - setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, + setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null, t.getClass().getName(), "IPC server unable to read call parameters: " + t.getMessage()); responder.doRespond(readParamsFailedCall); @@ -1627,7 +1627,7 @@ public abstract class Server { rpcMetrics.incrAuthorizationSuccesses(); } catch (AuthorizationException ae) { rpcMetrics.incrAuthorizationFailures(); - setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null, + setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL, null, ae.getClass().getName(), ae.getMessage()); responder.doRespond(authFailedCall); return false; @@ -1725,8 +1725,8 @@ public abstract class Server { // responder.doResponse() since setupResponse may use // SASL to encrypt response data and SASL enforces // its own message ordering. - setupResponse(buf, call, (error == null) ? Status.SUCCESS - : Status.ERROR, value, errorClass, error); + setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS + : RpcStatusProto.ERROR, value, errorClass, error); // Discard the large buf and reset it back to smaller size // to free up heap @@ -1859,40 +1859,79 @@ public abstract class Server { /** * Setup response for the IPC Call. * - * @param response buffer to serialize the response into + * @param responseBuf buffer to serialize the response into * @param call {@link Call} to which we are setting up the response - * @param status {@link Status} of the IPC call + * @param status of the IPC call * @param rv return value for the IPC Call, if the call was successful * @param errorClass error class, if the the call failed * @param error error message, if the call failed * @throws IOException */ - private void setupResponse(ByteArrayOutputStream response, - Call call, Status status, + private void setupResponse(ByteArrayOutputStream responseBuf, + Call call, RpcStatusProto status, Writable rv, String errorClass, String error) throws IOException { - response.reset(); - DataOutputStream out = new DataOutputStream(response); - out.writeInt(call.callId); // write call id - out.writeInt(status.state); // write status + responseBuf.reset(); + DataOutputStream out = new DataOutputStream(responseBuf); + RpcResponseHeaderProto.Builder response = + RpcResponseHeaderProto.newBuilder(); + response.setCallId(call.callId); + response.setStatus(status); - if (status == Status.SUCCESS) { + + if (status == RpcStatusProto.SUCCESS) { try { + response.build().writeDelimitedTo(out); rv.write(out); } catch (Throwable t) { LOG.warn("Error serializing call response for call " + call, t); // Call back to same function - this is OK since the // buffer is reset at the top, and since status is changed // to ERROR it won't infinite loop. - setupResponse(response, call, Status.ERROR, + setupResponse(responseBuf, call, RpcStatusProto.ERROR, null, t.getClass().getName(), StringUtils.stringifyException(t)); return; } } else { + if (status == RpcStatusProto.FATAL) { + response.setServerIpcVersionNum(Server.CURRENT_VERSION); + } + response.build().writeDelimitedTo(out); WritableUtils.writeString(out, errorClass); WritableUtils.writeString(out, error); } + if (call.connection.useWrap) { + wrapWithSasl(responseBuf, call); + } + call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray())); + } + + /** + * Setup response for the IPC Call on Fatal Error from a + * client that is using old version of Hadoop. + * The response is serialized using the previous protocol's response + * layout. + * + * @param response buffer to serialize the response into + * @param call {@link Call} to which we are setting up the response + * @param rv return value for the IPC Call, if the call was successful + * @param errorClass error class, if the the call failed + * @param error error message, if the call failed + * @throws IOException + */ + private void setupResponseOldVersionFatal(ByteArrayOutputStream response, + Call call, + Writable rv, String errorClass, String error) + throws IOException { + final int OLD_VERSION_FATAL_STATUS = -1; + response.reset(); + DataOutputStream out = new DataOutputStream(response); + out.writeInt(call.callId); // write call id + out.writeInt(OLD_VERSION_FATAL_STATUS); // write FATAL_STATUS + WritableUtils.writeString(out, errorClass); + WritableUtils.writeString(out, error); + if (call.connection.useWrap) { wrapWithSasl(response, call); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Status.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Status.java deleted file mode 100644 index 16fd871ffa6..00000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Status.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ipc; - -/** - * Status of a Hadoop IPC call. - */ -enum Status { - SUCCESS (0), - ERROR (1), - FATAL (-1); - - int state; - private Status(int state) { - this.state = state; - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto index 42dea3bde3e..50657413012 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto @@ -19,7 +19,6 @@ option java_package = "org.apache.hadoop.ipc.protobuf"; option java_outer_classname = "RpcPayloadHeaderProtos"; option java_generate_equals_and_hash = true; - /** * This is the rpc payload header. It is sent with every rpc call. * @@ -34,8 +33,6 @@ option java_generate_equals_and_hash = true; * */ - - /** * RpcKind determine the rpcEngine and the serialization of the rpc payload */ @@ -54,5 +51,27 @@ enum RpcPayloadOperationProto { message RpcPayloadHeaderProto { // the header for the RpcRequest optional RpcKindProto rpcKind = 1; optional RpcPayloadOperationProto rpcOp = 2; - optional uint32 callId = 3; // each rpc has a callId that is also used in response + required uint32 callId = 3; // each rpc has a callId that is also used in response +} + +enum RpcStatusProto { + SUCCESS = 0; // RPC succeeded + ERROR = 1; // RPC Failed + FATAL = 2; // Fatal error - connection is closed +} + +/** + * Rpc Response Header + * - If successfull then the Respose follows after this header + * - length (4 byte int), followed by the response + * - If error or fatal - the exception info follow + * - length (4 byte int) Class name of exception - UTF-8 string + * - length (4 byte int) Stacktrace - UTF-8 string + * - if the strings are null then the length is -1 + * In case of Fatal error then the respose contains the Serverside's IPC version + */ +message RpcResponseHeaderProto { + required uint32 callId = 1; // callId used in Request + required RpcStatusProto status = 2; + optional uint32 serverIpcVersionNum = 3; // in case of an fatal IPC error }