HADOOP-9140 Cleanup rpc PB protos (sanjay Radia)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1423189 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanjay Radia 2012-12-17 22:16:57 +00:00
parent 721095474d
commit aa4fe26a01
10 changed files with 149 additions and 128 deletions

View File

@ -144,6 +144,8 @@ Trunk (Unreleased)
HADOOP-9093. Move all the Exception in PathExceptions to o.a.h.fs package.
(suresh)
HADOOP-9140 Cleanup rpc PB protos (sanjay Radia)
BUG FIXES
HADOOP-9041. FsUrlStreamHandlerFactory could cause an infinite loop in

View File

@ -260,7 +260,7 @@
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.HadoopRpcProtos.*"/>
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtobufRpcEngineProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
@ -272,7 +272,7 @@
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.RpcPayloadHeaderProtos.*"/>
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.RpcHeaderProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->

View File

@ -378,9 +378,9 @@
<argument>src/main/proto/HAServiceProtocol.proto</argument>
<argument>src/main/proto/IpcConnectionContext.proto</argument>
<argument>src/main/proto/ProtocolInfo.proto</argument>
<argument>src/main/proto/RpcPayloadHeader.proto</argument>
<argument>src/main/proto/RpcHeader.proto</argument>
<argument>src/main/proto/ZKFCProtocol.proto</argument>
<argument>src/main/proto/hadoop_rpc.proto</argument>
<argument>src/main/proto/ProtobufRpcEngine.proto</argument>
</arguments>
</configuration>
</execution>

View File

@ -63,11 +63,10 @@ import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SaslRpcClient;
@ -191,7 +190,7 @@ public class Client {
*/
private class Call {
final int id; // call id
final Writable rpcRequest; // the serialized rpc request - RpcPayload
final Writable rpcRequest; // the serialized rpc request
Writable rpcResponse; // null if rpc has error
IOException error; // exception, null if success
final RPC.RpcKind rpcKind; // Rpc EngineKind
@ -266,7 +265,7 @@ public class Client {
private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
private IOException closeException; // close reason
private final Object sendParamsLock = new Object();
private final Object sendRpcRequestLock = new Object();
public Connection(ConnectionId remoteId) throws IOException {
this.remoteId = remoteId;
@ -768,7 +767,7 @@ public class Client {
remoteId.getTicket(),
authMethod).writeTo(buf);
// Write out the payload length
// Write out the packet length
int bufLen = buf.getLength();
out.writeInt(bufLen);
@ -832,7 +831,7 @@ public class Client {
try {
while (waitForWork()) {//wait here for work - read or close connection
receiveResponse();
receiveRpcResponse();
}
} catch (Throwable t) {
// This truly is unexpected, since we catch IOException in receiveResponse
@ -849,11 +848,12 @@ public class Client {
+ connections.size());
}
/** Initiates a call by sending the parameter to the remote server.
/** Initiates a rpc call by sending the rpc request to the remote server.
* Note: this is not called from the Connection thread, but by other
* threads.
* @param call - the rpc request
*/
public void sendParam(final Call call)
public void sendRpcRequest(final Call call)
throws InterruptedException, IOException {
if (shouldCloseConnection.get()) {
return;
@ -866,17 +866,17 @@ public class Client {
//
// Format of a call on the wire:
// 0) Length of rest below (1 + 2)
// 1) PayloadHeader - is serialized Delimited hence contains length
// 2) the Payload - the RpcRequest
// 1) RpcRequestHeader - is serialized Delimited hence contains length
// 2) RpcRequest
//
// Items '1' and '2' are prepared here.
final DataOutputBuffer d = new DataOutputBuffer();
RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader(
call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id);
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id);
header.writeDelimitedTo(d);
call.rpcRequest.write(d);
synchronized (sendParamsLock) {
synchronized (sendRpcRequestLock) {
Future<?> senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() {
@Override
public void run() {
@ -892,7 +892,7 @@ public class Client {
byte[] data = d.getData();
int totalLength = d.getLength();
out.writeInt(totalLength); // Total Length
out.write(data, 0, totalLength);//PayloadHeader + RpcRequest
out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
out.flush();
}
} catch (IOException e) {
@ -927,7 +927,7 @@ public class Client {
/* Receive a response.
* Because only one receiver, so no synchronization on in.
*/
private void receiveResponse() {
private void receiveRpcResponse() {
if (shouldCloseConnection.get()) {
return;
}
@ -1194,12 +1194,12 @@ public class Client {
Call call = new Call(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call);
try {
connection.sendParam(call); // send the parameter
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
throw new IOException("connection has been closed", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("interrupted waiting to send params to server", e);
LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e);
}

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestProto;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
@ -128,10 +128,10 @@ public class ProtobufRpcEngine implements RpcEngine {
.getProtocolVersion(protocol);
}
private HadoopRpcRequestProto constructRpcRequest(Method method,
private RequestProto constructRpcRequest(Method method,
Object[] params) throws ServiceException {
HadoopRpcRequestProto rpcRequest;
HadoopRpcRequestProto.Builder builder = HadoopRpcRequestProto
RequestProto rpcRequest;
RequestProto.Builder builder = RequestProto
.newBuilder();
builder.setMethodName(method.getName());
@ -190,7 +190,7 @@ public class ProtobufRpcEngine implements RpcEngine {
startTime = Time.now();
}
HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
RequestProto rpcRequest = constructRpcRequest(method, args);
RpcResponseWritable val = null;
if (LOG.isTraceEnabled()) {
@ -271,13 +271,13 @@ public class ProtobufRpcEngine implements RpcEngine {
* Writable Wrapper for Protocol Buffer Requests
*/
private static class RpcRequestWritable implements Writable {
HadoopRpcRequestProto message;
RequestProto message;
@SuppressWarnings("unused")
public RpcRequestWritable() {
}
RpcRequestWritable(HadoopRpcRequestProto message) {
RpcRequestWritable(RequestProto message) {
this.message = message;
}
@ -292,7 +292,7 @@ public class ProtobufRpcEngine implements RpcEngine {
int length = ProtoUtil.readRawVarint32(in);
byte[] bytes = new byte[length];
in.readFully(bytes);
message = HadoopRpcRequestProto.parseFrom(bytes);
message = RequestProto.parseFrom(bytes);
}
@Override
@ -426,7 +426,7 @@ public class ProtobufRpcEngine implements RpcEngine {
public Writable call(RPC.Server server, String connectionProtocolName,
Writable writableRequest, long receiveTime) throws Exception {
RpcRequestWritable request = (RpcRequestWritable) writableRequest;
HadoopRpcRequestProto rpcRequest = request.message;
RequestProto rpcRequest = request.message;
String methodName = rpcRequest.getMethodName();

View File

@ -80,7 +80,8 @@ import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.*;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.*;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SaslRpcServer;
@ -160,7 +161,7 @@ public abstract class Server {
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
/**
* Serialization type for ConnectionContext and RpcPayloadHeader
* Serialization type for ConnectionContext and RpcRequestHeader
*/
public enum IpcSerializationType {
// Add new serialization type to the end without affecting the enum order
@ -197,7 +198,7 @@ public abstract class Server {
// 4 : Introduced SASL security layer
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
// in ObjectWritable to efficiently transmit arrays of primitives
// 6 : Made RPC payload header explicit
// 6 : Made RPC Request header explicit
// 7 : Changed Ipc Connection Header to use Protocol buffers
// 8 : SASL server always sends a final response
public static final byte CURRENT_VERSION = 8;
@ -1637,14 +1638,15 @@ public abstract class Server {
private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
RpcPayloadHeaderProto header = RpcPayloadHeaderProto.parseDelimitedFrom(dis);
RpcRequestHeaderProto header = RpcRequestHeaderProto.parseDelimitedFrom(dis);
if (LOG.isDebugEnabled())
LOG.debug(" got #" + header.getCallId());
if (!header.hasRpcOp()) {
throw new IOException(" IPC Server: No rpc op in rpcPayloadHeader");
throw new IOException(" IPC Server: No rpc op in rpcRequestHeader");
}
if (header.getRpcOp() != RpcPayloadOperationProto.RPC_FINAL_PAYLOAD) {
if (header.getRpcOp() !=
RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
throw new IOException("IPC Server does not implement operation" +
header.getRpcOp());
}
@ -1652,7 +1654,7 @@ public abstract class Server {
// (Note it would make more sense to have the handler deserialize but
// we continue with this original design.
if (!header.hasRpcKind()) {
throw new IOException(" IPC Server: No rpc kind in rpcPayloadHeader");
throw new IOException(" IPC Server: No rpc kind in rpcRequestHeader");
}
Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getRpcKind());

View File

@ -24,7 +24,7 @@ import java.io.IOException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.*;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.*;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.UserGroupInformation;
@ -157,9 +157,9 @@ public abstract class ProtoUtil {
return null;
}
public static RpcPayloadHeaderProto makeRpcPayloadHeader(RPC.RpcKind rpcKind,
RpcPayloadOperationProto operation, int callId) {
RpcPayloadHeaderProto.Builder result = RpcPayloadHeaderProto.newBuilder();
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
RpcRequestHeaderProto.OperationProto operation, int callId) {
RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId);
return result.build();
}

View File

@ -17,11 +17,13 @@
*/
/**
* These are the messages used by Hadoop RPC to marshal the
* request and response in the RPC layer.
* These are the messages used by Hadoop RPC for the Rpc Engine Protocol Buffer
* to marshal the request and response in the RPC layer.
* The messages are sent in addition to the normal RPC header as
* defined in RpcHeader.proto
*/
option java_package = "org.apache.hadoop.ipc.protobuf";
option java_outer_classname = "HadoopRpcProtos";
option java_outer_classname = "ProtobufRpcEngineProtos";
option java_generate_equals_and_hash = true;
package hadoop.common;
@ -29,10 +31,11 @@ package hadoop.common;
* This message is used for Protobuf Rpc Engine.
* The message is used to marshal a Rpc-request
* from RPC client to the RPC server.
* The Response to the Rpc call (including errors) are handled
* as part of the standard Rpc response.
*
* No special header is needed for the Rpc Response for Protobuf Rpc Engine.
* The normal RPC response header (see RpcHeader.proto) are sufficient.
*/
message HadoopRpcRequestProto {
message RequestProto {
/** Name of the RPC method */
required string methodName = 1;

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.ipc.protobuf";
option java_outer_classname = "RpcHeaderProtos";
option java_generate_equals_and_hash = true;
package hadoop.common;
/**
* This is the rpc request header. It is sent with every rpc call.
*
* The format of RPC call is as follows:
* +--------------------------------------------------------------+
* | Rpc length in bytes (4 bytes int) sum of next two parts |
* +--------------------------------------------------------------+
* | RpcRequestHeaderProto - serialized delimited ie has len |
* +--------------------------------------------------------------+
* | RpcRequest The actual rpc request |
* | This request is serialized based on RpcKindProto |
* +--------------------------------------------------------------+
*
*/
/**
* RpcKind determine the rpcEngine and the serialization of the rpc request
*/
enum RpcKindProto {
RPC_BUILTIN = 0; // Used for built in calls by tests
RPC_WRITABLE = 1; // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine
}
message RpcRequestHeaderProto { // the header for the RpcRequest
enum OperationProto {
RPC_FINAL_PACKET = 0; // The final RPC Packet
RPC_CONTINUATION_PACKET = 1; // not implemented yet
RPC_CLOSE_CONNECTION = 2; // close the rpc connection
}
optional RpcKindProto rpcKind = 1;
optional OperationProto rpcOp = 2;
required uint32 callId = 3; // each rpc has a callId that is also used in response
}
/**
* Rpc Response Header
* ** If request is successfull response is returned as below ********
* +------------------------------------------------------------------+
* | Rpc reponse length in bytes (4 bytes int) |
* | (sum of next two parts) |
* +------------------------------------------------------------------+
* | RpcResponseHeaderProto - serialized delimited ie has len |
* +------------------------------------------------------------------+
* | if request is successful: |
* | - RpcResponse - The actual rpc response bytes |
* | This response is serialized based on RpcKindProto |
* | if request fails : |
* | - length (4 byte int) + Class name of exception - UTF-8 string |
* | - length (4 byte int) + Stacktrace - UTF-8 string |
* | if the strings are null then the length is -1 |
* +------------------------------------------------------------------+
*
*/
message RpcResponseHeaderProto {
enum RpcStatusProto {
SUCCESS = 0; // RPC succeeded
ERROR = 1; // RPC Failed
FATAL = 2; // Fatal error - connection is closed
}
required uint32 callId = 1; // callId used in Request
required RpcStatusProto status = 2;
optional uint32 serverIpcVersionNum = 3; // in case of an fatal IPC error
}

View File

@ -1,78 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.ipc.protobuf";
option java_outer_classname = "RpcPayloadHeaderProtos";
option java_generate_equals_and_hash = true;
package hadoop.common;
/**
* This is the rpc payload header. It is sent with every rpc call.
*
* The format of RPC call is as follows:
* +-----------------------------------------------------+
* | Rpc length in bytes |
* +-----------------------------------------------------+
* | RpcPayloadHeader - serialized delimited ie has len |
* +-----------------------------------------------------+
* | RpcRequest Payload |
* +-----------------------------------------------------+
*
*/
/**
* RpcKind determine the rpcEngine and the serialization of the rpc payload
*/
enum RpcKindProto {
RPC_BUILTIN = 0; // Used for built in calls by tests
RPC_WRITABLE = 1; // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine
}
enum RpcPayloadOperationProto {
RPC_FINAL_PAYLOAD = 0; // The final payload
RPC_CONTINUATION_PAYLOAD = 1; // not implemented yet
RPC_CLOSE_CONNECTION = 2; // close the rpc connection
}
message RpcPayloadHeaderProto { // the header for the RpcRequest
optional RpcKindProto rpcKind = 1;
optional RpcPayloadOperationProto rpcOp = 2;
required uint32 callId = 3; // each rpc has a callId that is also used in response
}
enum RpcStatusProto {
SUCCESS = 0; // RPC succeeded
ERROR = 1; // RPC Failed
FATAL = 2; // Fatal error - connection is closed
}
/**
* Rpc Response Header
* - If successfull then the Respose follows after this header
* - length (4 byte int), followed by the response
* - If error or fatal - the exception info follow
* - length (4 byte int) Class name of exception - UTF-8 string
* - length (4 byte int) Stacktrace - UTF-8 string
* - if the strings are null then the length is -1
* In case of Fatal error then the respose contains the Serverside's IPC version
*/
message RpcResponseHeaderProto {
required uint32 callId = 1; // callId used in Request
required RpcStatusProto status = 2;
optional uint32 serverIpcVersionNum = 3; // in case of an fatal IPC error
}