svn merge -c 1423189 from trunk for HADOOP-9140 Cleanup rpc PB protos.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1483137 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
00cad64611
commit
4276dddd0e
|
@ -63,6 +63,8 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
HADOOP-9560. metrics2#JvmMetrics should have max memory size of JVM.
|
HADOOP-9560. metrics2#JvmMetrics should have max memory size of JVM.
|
||||||
(Tsuyoshi Ozawa via suresh)
|
(Tsuyoshi Ozawa via suresh)
|
||||||
|
|
||||||
|
HADOOP-9140 Cleanup rpc PB protos (sanjay Radia)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
|
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
|
||||||
|
|
|
@ -272,7 +272,7 @@
|
||||||
</Match>
|
</Match>
|
||||||
<Match>
|
<Match>
|
||||||
<!-- protobuf generated code -->
|
<!-- protobuf generated code -->
|
||||||
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.HadoopRpcProtos.*"/>
|
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtobufRpcEngineProtos.*"/>
|
||||||
</Match>
|
</Match>
|
||||||
<Match>
|
<Match>
|
||||||
<!-- protobuf generated code -->
|
<!-- protobuf generated code -->
|
||||||
|
@ -284,7 +284,7 @@
|
||||||
</Match>
|
</Match>
|
||||||
<Match>
|
<Match>
|
||||||
<!-- protobuf generated code -->
|
<!-- protobuf generated code -->
|
||||||
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.RpcPayloadHeaderProtos.*"/>
|
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.RpcHeaderProtos.*"/>
|
||||||
</Match>
|
</Match>
|
||||||
<Match>
|
<Match>
|
||||||
<!-- protobuf generated code -->
|
<!-- protobuf generated code -->
|
||||||
|
|
|
@ -317,9 +317,9 @@
|
||||||
<include>HAServiceProtocol.proto</include>
|
<include>HAServiceProtocol.proto</include>
|
||||||
<include>IpcConnectionContext.proto</include>
|
<include>IpcConnectionContext.proto</include>
|
||||||
<include>ProtocolInfo.proto</include>
|
<include>ProtocolInfo.proto</include>
|
||||||
<include>RpcPayloadHeader.proto</include>
|
<include>RpcHeader.proto</include>
|
||||||
<include>ZKFCProtocol.proto</include>
|
<include>ZKFCProtocol.proto</include>
|
||||||
<include>hadoop_rpc.proto</include>
|
<include>ProtobufRpcEngine.proto</include>
|
||||||
<include>Security.proto</include>
|
<include>Security.proto</include>
|
||||||
</includes>
|
</includes>
|
||||||
</source>
|
</source>
|
||||||
|
|
|
@ -63,11 +63,10 @@ import org.apache.hadoop.io.WritableUtils;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
||||||
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto;
|
|
||||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.KerberosInfo;
|
import org.apache.hadoop.security.KerberosInfo;
|
||||||
|
@ -201,7 +200,7 @@ public class Client {
|
||||||
*/
|
*/
|
||||||
private class Call {
|
private class Call {
|
||||||
final int id; // call id
|
final int id; // call id
|
||||||
final Writable rpcRequest; // the serialized rpc request - RpcPayload
|
final Writable rpcRequest; // the serialized rpc request
|
||||||
Writable rpcResponse; // null if rpc has error
|
Writable rpcResponse; // null if rpc has error
|
||||||
IOException error; // exception, null if success
|
IOException error; // exception, null if success
|
||||||
final RPC.RpcKind rpcKind; // Rpc EngineKind
|
final RPC.RpcKind rpcKind; // Rpc EngineKind
|
||||||
|
@ -277,7 +276,7 @@ public class Client {
|
||||||
private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
|
private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
|
||||||
private IOException closeException; // close reason
|
private IOException closeException; // close reason
|
||||||
|
|
||||||
private final Object sendParamsLock = new Object();
|
private final Object sendRpcRequestLock = new Object();
|
||||||
|
|
||||||
public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
|
public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
|
||||||
this.remoteId = remoteId;
|
this.remoteId = remoteId;
|
||||||
|
@ -776,7 +775,7 @@ public class Client {
|
||||||
remoteId.getTicket(),
|
remoteId.getTicket(),
|
||||||
authMethod).writeTo(buf);
|
authMethod).writeTo(buf);
|
||||||
|
|
||||||
// Write out the payload length
|
// Write out the packet length
|
||||||
int bufLen = buf.getLength();
|
int bufLen = buf.getLength();
|
||||||
|
|
||||||
out.writeInt(bufLen);
|
out.writeInt(bufLen);
|
||||||
|
@ -840,7 +839,7 @@ public class Client {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (waitForWork()) {//wait here for work - read or close connection
|
while (waitForWork()) {//wait here for work - read or close connection
|
||||||
receiveResponse();
|
receiveRpcResponse();
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
// This truly is unexpected, since we catch IOException in receiveResponse
|
// This truly is unexpected, since we catch IOException in receiveResponse
|
||||||
|
@ -857,11 +856,12 @@ public class Client {
|
||||||
+ connections.size());
|
+ connections.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Initiates a call by sending the parameter to the remote server.
|
/** Initiates a rpc call by sending the rpc request to the remote server.
|
||||||
* Note: this is not called from the Connection thread, but by other
|
* Note: this is not called from the Connection thread, but by other
|
||||||
* threads.
|
* threads.
|
||||||
|
* @param call - the rpc request
|
||||||
*/
|
*/
|
||||||
public void sendParam(final Call call)
|
public void sendRpcRequest(final Call call)
|
||||||
throws InterruptedException, IOException {
|
throws InterruptedException, IOException {
|
||||||
if (shouldCloseConnection.get()) {
|
if (shouldCloseConnection.get()) {
|
||||||
return;
|
return;
|
||||||
|
@ -874,17 +874,17 @@ public class Client {
|
||||||
//
|
//
|
||||||
// Format of a call on the wire:
|
// Format of a call on the wire:
|
||||||
// 0) Length of rest below (1 + 2)
|
// 0) Length of rest below (1 + 2)
|
||||||
// 1) PayloadHeader - is serialized Delimited hence contains length
|
// 1) RpcRequestHeader - is serialized Delimited hence contains length
|
||||||
// 2) the Payload - the RpcRequest
|
// 2) RpcRequest
|
||||||
//
|
//
|
||||||
// Items '1' and '2' are prepared here.
|
// Items '1' and '2' are prepared here.
|
||||||
final DataOutputBuffer d = new DataOutputBuffer();
|
final DataOutputBuffer d = new DataOutputBuffer();
|
||||||
RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader(
|
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
|
||||||
call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id);
|
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id);
|
||||||
header.writeDelimitedTo(d);
|
header.writeDelimitedTo(d);
|
||||||
call.rpcRequest.write(d);
|
call.rpcRequest.write(d);
|
||||||
|
|
||||||
synchronized (sendParamsLock) {
|
synchronized (sendRpcRequestLock) {
|
||||||
Future<?> senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() {
|
Future<?> senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -900,7 +900,7 @@ public class Client {
|
||||||
byte[] data = d.getData();
|
byte[] data = d.getData();
|
||||||
int totalLength = d.getLength();
|
int totalLength = d.getLength();
|
||||||
out.writeInt(totalLength); // Total Length
|
out.writeInt(totalLength); // Total Length
|
||||||
out.write(data, 0, totalLength);//PayloadHeader + RpcRequest
|
out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
|
||||||
out.flush();
|
out.flush();
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -935,7 +935,7 @@ public class Client {
|
||||||
/* Receive a response.
|
/* Receive a response.
|
||||||
* Because only one receiver, so no synchronization on in.
|
* Because only one receiver, so no synchronization on in.
|
||||||
*/
|
*/
|
||||||
private void receiveResponse() {
|
private void receiveRpcResponse() {
|
||||||
if (shouldCloseConnection.get()) {
|
if (shouldCloseConnection.get()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1236,12 +1236,12 @@ public class Client {
|
||||||
Call call = new Call(rpcKind, rpcRequest);
|
Call call = new Call(rpcKind, rpcRequest);
|
||||||
Connection connection = getConnection(remoteId, call, serviceClass);
|
Connection connection = getConnection(remoteId, call, serviceClass);
|
||||||
try {
|
try {
|
||||||
connection.sendParam(call); // send the parameter
|
connection.sendRpcRequest(call); // send the rpc request
|
||||||
} catch (RejectedExecutionException e) {
|
} catch (RejectedExecutionException e) {
|
||||||
throw new IOException("connection has been closed", e);
|
throw new IOException("connection has been closed", e);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
LOG.warn("interrupted waiting to send params to server", e);
|
LOG.warn("interrupted waiting to send rpc request to server", e);
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||||
import org.apache.hadoop.ipc.RPC.RpcInvoker;
|
import org.apache.hadoop.ipc.RPC.RpcInvoker;
|
||||||
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
|
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestProto;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
@ -128,10 +128,10 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
.getProtocolVersion(protocol);
|
.getProtocolVersion(protocol);
|
||||||
}
|
}
|
||||||
|
|
||||||
private HadoopRpcRequestProto constructRpcRequest(Method method,
|
private RequestProto constructRpcRequest(Method method,
|
||||||
Object[] params) throws ServiceException {
|
Object[] params) throws ServiceException {
|
||||||
HadoopRpcRequestProto rpcRequest;
|
RequestProto rpcRequest;
|
||||||
HadoopRpcRequestProto.Builder builder = HadoopRpcRequestProto
|
RequestProto.Builder builder = RequestProto
|
||||||
.newBuilder();
|
.newBuilder();
|
||||||
builder.setMethodName(method.getName());
|
builder.setMethodName(method.getName());
|
||||||
|
|
||||||
|
@ -190,7 +190,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
startTime = Time.now();
|
startTime = Time.now();
|
||||||
}
|
}
|
||||||
|
|
||||||
HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
|
RequestProto rpcRequest = constructRpcRequest(method, args);
|
||||||
RpcResponseWritable val = null;
|
RpcResponseWritable val = null;
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
|
@ -271,13 +271,13 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
* Writable Wrapper for Protocol Buffer Requests
|
* Writable Wrapper for Protocol Buffer Requests
|
||||||
*/
|
*/
|
||||||
private static class RpcRequestWritable implements Writable {
|
private static class RpcRequestWritable implements Writable {
|
||||||
HadoopRpcRequestProto message;
|
RequestProto message;
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
public RpcRequestWritable() {
|
public RpcRequestWritable() {
|
||||||
}
|
}
|
||||||
|
|
||||||
RpcRequestWritable(HadoopRpcRequestProto message) {
|
RpcRequestWritable(RequestProto message) {
|
||||||
this.message = message;
|
this.message = message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,7 +292,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
int length = ProtoUtil.readRawVarint32(in);
|
int length = ProtoUtil.readRawVarint32(in);
|
||||||
byte[] bytes = new byte[length];
|
byte[] bytes = new byte[length];
|
||||||
in.readFully(bytes);
|
in.readFully(bytes);
|
||||||
message = HadoopRpcRequestProto.parseFrom(bytes);
|
message = RequestProto.parseFrom(bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -426,7 +426,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
public Writable call(RPC.Server server, String protocol,
|
public Writable call(RPC.Server server, String protocol,
|
||||||
Writable writableRequest, long receiveTime) throws Exception {
|
Writable writableRequest, long receiveTime) throws Exception {
|
||||||
RpcRequestWritable request = (RpcRequestWritable) writableRequest;
|
RpcRequestWritable request = (RpcRequestWritable) writableRequest;
|
||||||
HadoopRpcRequestProto rpcRequest = request.message;
|
RequestProto rpcRequest = request.message;
|
||||||
String methodName = rpcRequest.getMethodName();
|
String methodName = rpcRequest.getMethodName();
|
||||||
String protoName = rpcRequest.getDeclaringClassProtocolName();
|
String protoName = rpcRequest.getDeclaringClassProtocolName();
|
||||||
long clientVersion = rpcRequest.getClientProtocolVersion();
|
long clientVersion = rpcRequest.getClientProtocolVersion();
|
||||||
|
|
|
@ -80,7 +80,8 @@ import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
||||||
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
|
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
|
||||||
import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
||||||
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
|
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.*;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.*;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.SaslRpcServer;
|
import org.apache.hadoop.security.SaslRpcServer;
|
||||||
|
@ -158,7 +159,7 @@ public abstract class Server {
|
||||||
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
|
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Serialization type for ConnectionContext and RpcPayloadHeader
|
* Serialization type for ConnectionContext and RpcRequestHeader
|
||||||
*/
|
*/
|
||||||
public enum IpcSerializationType {
|
public enum IpcSerializationType {
|
||||||
// Add new serialization type to the end without affecting the enum order
|
// Add new serialization type to the end without affecting the enum order
|
||||||
|
@ -195,7 +196,7 @@ public abstract class Server {
|
||||||
// 4 : Introduced SASL security layer
|
// 4 : Introduced SASL security layer
|
||||||
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
|
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
|
||||||
// in ObjectWritable to efficiently transmit arrays of primitives
|
// in ObjectWritable to efficiently transmit arrays of primitives
|
||||||
// 6 : Made RPC payload header explicit
|
// 6 : Made RPC Request header explicit
|
||||||
// 7 : Changed Ipc Connection Header to use Protocol buffers
|
// 7 : Changed Ipc Connection Header to use Protocol buffers
|
||||||
// 8 : SASL server always sends a final response
|
// 8 : SASL server always sends a final response
|
||||||
public static final byte CURRENT_VERSION = 8;
|
public static final byte CURRENT_VERSION = 8;
|
||||||
|
@ -1650,14 +1651,15 @@ public abstract class Server {
|
||||||
private void processData(byte[] buf) throws IOException, InterruptedException {
|
private void processData(byte[] buf) throws IOException, InterruptedException {
|
||||||
DataInputStream dis =
|
DataInputStream dis =
|
||||||
new DataInputStream(new ByteArrayInputStream(buf));
|
new DataInputStream(new ByteArrayInputStream(buf));
|
||||||
RpcPayloadHeaderProto header = RpcPayloadHeaderProto.parseDelimitedFrom(dis);
|
RpcRequestHeaderProto header = RpcRequestHeaderProto.parseDelimitedFrom(dis);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug(" got #" + header.getCallId());
|
LOG.debug(" got #" + header.getCallId());
|
||||||
if (!header.hasRpcOp()) {
|
if (!header.hasRpcOp()) {
|
||||||
throw new IOException(" IPC Server: No rpc op in rpcPayloadHeader");
|
throw new IOException(" IPC Server: No rpc op in rpcRequestHeader");
|
||||||
}
|
}
|
||||||
if (header.getRpcOp() != RpcPayloadOperationProto.RPC_FINAL_PAYLOAD) {
|
if (header.getRpcOp() !=
|
||||||
|
RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
|
||||||
throw new IOException("IPC Server does not implement operation" +
|
throw new IOException("IPC Server does not implement operation" +
|
||||||
header.getRpcOp());
|
header.getRpcOp());
|
||||||
}
|
}
|
||||||
|
@ -1665,7 +1667,7 @@ public abstract class Server {
|
||||||
// (Note it would make more sense to have the handler deserialize but
|
// (Note it would make more sense to have the handler deserialize but
|
||||||
// we continue with this original design.
|
// we continue with this original design.
|
||||||
if (!header.hasRpcKind()) {
|
if (!header.hasRpcKind()) {
|
||||||
throw new IOException(" IPC Server: No rpc kind in rpcPayloadHeader");
|
throw new IOException(" IPC Server: No rpc kind in rpcRequestHeader");
|
||||||
}
|
}
|
||||||
Class<? extends Writable> rpcRequestClass =
|
Class<? extends Writable> rpcRequestClass =
|
||||||
getRpcRequestWrapper(header.getRpcKind());
|
getRpcRequestWrapper(header.getRpcKind());
|
||||||
|
|
|
@ -24,7 +24,7 @@ import java.io.IOException;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
|
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto;
|
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.*;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.*;
|
||||||
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
|
@ -157,9 +157,9 @@ public abstract class ProtoUtil {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static RpcPayloadHeaderProto makeRpcPayloadHeader(RPC.RpcKind rpcKind,
|
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
|
||||||
RpcPayloadOperationProto operation, int callId) {
|
RpcRequestHeaderProto.OperationProto operation, int callId) {
|
||||||
RpcPayloadHeaderProto.Builder result = RpcPayloadHeaderProto.newBuilder();
|
RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
|
||||||
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId);
|
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId);
|
||||||
return result.build();
|
return result.build();
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,11 +17,13 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* These are the messages used by Hadoop RPC to marshal the
|
* These are the messages used by Hadoop RPC for the Rpc Engine Protocol Buffer
|
||||||
* request and response in the RPC layer.
|
* to marshal the request and response in the RPC layer.
|
||||||
|
* The messages are sent in addition to the normal RPC header as
|
||||||
|
* defined in RpcHeader.proto
|
||||||
*/
|
*/
|
||||||
option java_package = "org.apache.hadoop.ipc.protobuf";
|
option java_package = "org.apache.hadoop.ipc.protobuf";
|
||||||
option java_outer_classname = "HadoopRpcProtos";
|
option java_outer_classname = "ProtobufRpcEngineProtos";
|
||||||
option java_generate_equals_and_hash = true;
|
option java_generate_equals_and_hash = true;
|
||||||
package hadoop.common;
|
package hadoop.common;
|
||||||
|
|
||||||
|
@ -29,17 +31,32 @@ package hadoop.common;
|
||||||
* This message is used for Protobuf Rpc Engine.
|
* This message is used for Protobuf Rpc Engine.
|
||||||
* The message is used to marshal a Rpc-request
|
* The message is used to marshal a Rpc-request
|
||||||
* from RPC client to the RPC server.
|
* from RPC client to the RPC server.
|
||||||
* The Response to the Rpc call (including errors) are handled
|
*
|
||||||
* as part of the standard Rpc response.
|
* No special header is needed for the Rpc Response for Protobuf Rpc Engine.
|
||||||
|
* The normal RPC response header (see RpcHeader.proto) are sufficient.
|
||||||
*/
|
*/
|
||||||
message HadoopRpcRequestProto {
|
message RequestProto {
|
||||||
/** Name of the RPC method */
|
/** Name of the RPC method */
|
||||||
required string methodName = 1;
|
required string methodName = 1;
|
||||||
|
|
||||||
/** Bytes corresponding to the client protobuf request */
|
/** Bytes corresponding to the client protobuf request */
|
||||||
optional bytes request = 2;
|
optional bytes request = 2;
|
||||||
|
|
||||||
/** protocol name of class declaring the called method */
|
/**
|
||||||
|
* RPCs for a particular interface (ie protocol) are done using a
|
||||||
|
* IPC connection that is setup using rpcProxy.
|
||||||
|
* The rpcProxy's has a declared protocol name that is
|
||||||
|
* sent form client to server at connection time.
|
||||||
|
*
|
||||||
|
* Each Rpc call also sends a protocol name
|
||||||
|
* (called declaringClassprotocolName). This name is usually the same
|
||||||
|
* as the connection protocol name except in some cases.
|
||||||
|
* For example metaProtocols such ProtocolInfoProto which get metainfo
|
||||||
|
* about the protocol reuse the connection but need to indicate that
|
||||||
|
* the actual protocol is different (i.e. the protocol is
|
||||||
|
* ProtocolInfoProto) since they reuse the connection; in this case
|
||||||
|
* the declaringClassProtocolName field is set to the ProtocolInfoProto
|
||||||
|
*/
|
||||||
required string declaringClassProtocolName = 3;
|
required string declaringClassProtocolName = 3;
|
||||||
|
|
||||||
/** protocol version of class declaring the called method */
|
/** protocol version of class declaring the called method */
|
|
@ -0,0 +1,92 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
option java_package = "org.apache.hadoop.ipc.protobuf";
|
||||||
|
option java_outer_classname = "RpcHeaderProtos";
|
||||||
|
option java_generate_equals_and_hash = true;
|
||||||
|
package hadoop.common;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is the rpc request header. It is sent with every rpc call.
|
||||||
|
*
|
||||||
|
* The format of RPC call is as follows:
|
||||||
|
* +--------------------------------------------------------------+
|
||||||
|
* | Rpc length in bytes (4 bytes int) sum of next two parts |
|
||||||
|
* +--------------------------------------------------------------+
|
||||||
|
* | RpcRequestHeaderProto - serialized delimited ie has len |
|
||||||
|
* +--------------------------------------------------------------+
|
||||||
|
* | RpcRequest The actual rpc request |
|
||||||
|
* | This request is serialized based on RpcKindProto |
|
||||||
|
* +--------------------------------------------------------------+
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RpcKind determine the rpcEngine and the serialization of the rpc request
|
||||||
|
*/
|
||||||
|
enum RpcKindProto {
|
||||||
|
RPC_BUILTIN = 0; // Used for built in calls by tests
|
||||||
|
RPC_WRITABLE = 1; // Use WritableRpcEngine
|
||||||
|
RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
message RpcRequestHeaderProto { // the header for the RpcRequest
|
||||||
|
enum OperationProto {
|
||||||
|
RPC_FINAL_PACKET = 0; // The final RPC Packet
|
||||||
|
RPC_CONTINUATION_PACKET = 1; // not implemented yet
|
||||||
|
RPC_CLOSE_CONNECTION = 2; // close the rpc connection
|
||||||
|
}
|
||||||
|
|
||||||
|
optional RpcKindProto rpcKind = 1;
|
||||||
|
optional OperationProto rpcOp = 2;
|
||||||
|
required uint32 callId = 3; // each rpc has a callId that is also used in response
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Rpc Response Header
|
||||||
|
* ** If request is successfull response is returned as below ********
|
||||||
|
* +------------------------------------------------------------------+
|
||||||
|
* | Rpc reponse length in bytes (4 bytes int) |
|
||||||
|
* | (sum of next two parts) |
|
||||||
|
* +------------------------------------------------------------------+
|
||||||
|
* | RpcResponseHeaderProto - serialized delimited ie has len |
|
||||||
|
* +------------------------------------------------------------------+
|
||||||
|
* | if request is successful: |
|
||||||
|
* | - RpcResponse - The actual rpc response bytes |
|
||||||
|
* | This response is serialized based on RpcKindProto |
|
||||||
|
* | if request fails : |
|
||||||
|
* | - length (4 byte int) + Class name of exception - UTF-8 string |
|
||||||
|
* | - length (4 byte int) + Stacktrace - UTF-8 string |
|
||||||
|
* | if the strings are null then the length is -1 |
|
||||||
|
* +------------------------------------------------------------------+
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
message RpcResponseHeaderProto {
|
||||||
|
enum RpcStatusProto {
|
||||||
|
SUCCESS = 0; // RPC succeeded
|
||||||
|
ERROR = 1; // RPC Failed
|
||||||
|
FATAL = 2; // Fatal error - connection is closed
|
||||||
|
}
|
||||||
|
|
||||||
|
required uint32 callId = 1; // callId used in Request
|
||||||
|
required RpcStatusProto status = 2;
|
||||||
|
optional uint32 serverIpcVersionNum = 3; // in case of an fatal IPC error
|
||||||
|
}
|
|
@ -1,78 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
option java_package = "org.apache.hadoop.ipc.protobuf";
|
|
||||||
option java_outer_classname = "RpcPayloadHeaderProtos";
|
|
||||||
option java_generate_equals_and_hash = true;
|
|
||||||
package hadoop.common;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the rpc payload header. It is sent with every rpc call.
|
|
||||||
*
|
|
||||||
* The format of RPC call is as follows:
|
|
||||||
* +-----------------------------------------------------+
|
|
||||||
* | Rpc length in bytes |
|
|
||||||
* +-----------------------------------------------------+
|
|
||||||
* | RpcPayloadHeader - serialized delimited ie has len |
|
|
||||||
* +-----------------------------------------------------+
|
|
||||||
* | RpcRequest Payload |
|
|
||||||
* +-----------------------------------------------------+
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* RpcKind determine the rpcEngine and the serialization of the rpc payload
|
|
||||||
*/
|
|
||||||
enum RpcKindProto {
|
|
||||||
RPC_BUILTIN = 0; // Used for built in calls by tests
|
|
||||||
RPC_WRITABLE = 1; // Use WritableRpcEngine
|
|
||||||
RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine
|
|
||||||
}
|
|
||||||
|
|
||||||
enum RpcPayloadOperationProto {
|
|
||||||
RPC_FINAL_PAYLOAD = 0; // The final payload
|
|
||||||
RPC_CONTINUATION_PAYLOAD = 1; // not implemented yet
|
|
||||||
RPC_CLOSE_CONNECTION = 2; // close the rpc connection
|
|
||||||
}
|
|
||||||
|
|
||||||
message RpcPayloadHeaderProto { // the header for the RpcRequest
|
|
||||||
optional RpcKindProto rpcKind = 1;
|
|
||||||
optional RpcPayloadOperationProto rpcOp = 2;
|
|
||||||
required uint32 callId = 3; // each rpc has a callId that is also used in response
|
|
||||||
}
|
|
||||||
|
|
||||||
enum RpcStatusProto {
|
|
||||||
SUCCESS = 0; // RPC succeeded
|
|
||||||
ERROR = 1; // RPC Failed
|
|
||||||
FATAL = 2; // Fatal error - connection is closed
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Rpc Response Header
|
|
||||||
* - If successfull then the Respose follows after this header
|
|
||||||
* - length (4 byte int), followed by the response
|
|
||||||
* - If error or fatal - the exception info follow
|
|
||||||
* - length (4 byte int) Class name of exception - UTF-8 string
|
|
||||||
* - length (4 byte int) Stacktrace - UTF-8 string
|
|
||||||
* - if the strings are null then the length is -1
|
|
||||||
* In case of Fatal error then the respose contains the Serverside's IPC version
|
|
||||||
*/
|
|
||||||
message RpcResponseHeaderProto {
|
|
||||||
required uint32 callId = 1; // callId used in Request
|
|
||||||
required RpcStatusProto status = 2;
|
|
||||||
optional uint32 serverIpcVersionNum = 3; // in case of an fatal IPC error
|
|
||||||
}
|
|
Loading…
Reference in New Issue