Merged in HADOOP-9218 Document the Rpc-wrappers used internally (sanjay Radia)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1483142 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4276dddd0e
commit
f41b189498
|
@ -65,6 +65,8 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
|
|
||||||
HADOOP-9140 Cleanup rpc PB protos (sanjay Radia)
|
HADOOP-9140 Cleanup rpc PB protos (sanjay Radia)
|
||||||
|
|
||||||
|
HADOOP-9218 Document the Rpc-wrappers used internally (sanjay Radia)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
|
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
|
||||||
|
|
|
@ -62,7 +62,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
|
|
||||||
static { // Register the rpcRequest deserializer for WritableRpcEngine
|
static { // Register the rpcRequest deserializer for WritableRpcEngine
|
||||||
org.apache.hadoop.ipc.Server.registerProtocolEngine(
|
org.apache.hadoop.ipc.Server.registerProtocolEngine(
|
||||||
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class,
|
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
|
||||||
new Server.ProtoBufRpcInvoker());
|
new Server.ProtoBufRpcInvoker());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,7 +122,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
public Invoker(Class<?> protocol, Client.ConnectionId connId,
|
public Invoker(Class<?> protocol, Client.ConnectionId connId,
|
||||||
Configuration conf, SocketFactory factory) {
|
Configuration conf, SocketFactory factory) {
|
||||||
this.remoteId = connId;
|
this.remoteId = connId;
|
||||||
this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class);
|
this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);
|
||||||
this.protocolName = RPC.getProtocolName(protocol);
|
this.protocolName = RPC.getProtocolName(protocol);
|
||||||
this.clientProtocolVersion = RPC
|
this.clientProtocolVersion = RPC
|
||||||
.getProtocolVersion(protocol);
|
.getProtocolVersion(protocol);
|
||||||
|
@ -191,7 +191,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
}
|
}
|
||||||
|
|
||||||
RequestProto rpcRequest = constructRpcRequest(method, args);
|
RequestProto rpcRequest = constructRpcRequest(method, args);
|
||||||
RpcResponseWritable val = null;
|
RpcResponseWrapper val = null;
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(Thread.currentThread().getId() + ": Call -> " +
|
LOG.trace(Thread.currentThread().getId() + ": Call -> " +
|
||||||
|
@ -199,8 +199,8 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
" {" + TextFormat.shortDebugString((Message) args[1]) + "}");
|
" {" + TextFormat.shortDebugString((Message) args[1]) + "}");
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
val = (RpcResponseWritable) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
||||||
new RpcRequestWritable(rpcRequest), remoteId);
|
new RpcRequestWrapper(rpcRequest), remoteId);
|
||||||
|
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
|
@ -268,16 +268,20 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writable Wrapper for Protocol Buffer Requests
|
* Wrapper for Protocol Buffer Requests
|
||||||
|
*
|
||||||
|
* Note while this wrapper is writable, the request on the wire is in
|
||||||
|
* Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC}
|
||||||
|
* use type Writable as a wrapper to work across multiple RpcEngine kinds.
|
||||||
*/
|
*/
|
||||||
private static class RpcRequestWritable implements Writable {
|
private static class RpcRequestWrapper implements Writable {
|
||||||
RequestProto message;
|
RequestProto message;
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
public RpcRequestWritable() {
|
public RpcRequestWrapper() {
|
||||||
}
|
}
|
||||||
|
|
||||||
RpcRequestWritable(RequestProto message) {
|
RpcRequestWrapper(RequestProto message) {
|
||||||
this.message = message;
|
this.message = message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,16 +307,20 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writable Wrapper for Protocol Buffer Responses
|
* Wrapper for Protocol Buffer Responses
|
||||||
|
*
|
||||||
|
* Note while this wrapper is writable, the request on the wire is in
|
||||||
|
* Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC}
|
||||||
|
* use type Writable as a wrapper to work across multiple RpcEngine kinds.
|
||||||
*/
|
*/
|
||||||
private static class RpcResponseWritable implements Writable {
|
private static class RpcResponseWrapper implements Writable {
|
||||||
byte[] responseMessage;
|
byte[] responseMessage;
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
public RpcResponseWritable() {
|
public RpcResponseWrapper() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public RpcResponseWritable(Message message) {
|
public RpcResponseWrapper(Message message) {
|
||||||
this.responseMessage = message.toByteArray();
|
this.responseMessage = message.toByteArray();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -336,7 +344,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
static Client getClient(Configuration conf) {
|
static Client getClient(Configuration conf) {
|
||||||
return CLIENTS.getClient(conf, SocketFactory.getDefault(),
|
return CLIENTS.getClient(conf, SocketFactory.getDefault(),
|
||||||
RpcResponseWritable.class);
|
RpcResponseWrapper.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -425,7 +433,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
*/
|
*/
|
||||||
public Writable call(RPC.Server server, String protocol,
|
public Writable call(RPC.Server server, String protocol,
|
||||||
Writable writableRequest, long receiveTime) throws Exception {
|
Writable writableRequest, long receiveTime) throws Exception {
|
||||||
RpcRequestWritable request = (RpcRequestWritable) writableRequest;
|
RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
|
||||||
RequestProto rpcRequest = request.message;
|
RequestProto rpcRequest = request.message;
|
||||||
String methodName = rpcRequest.getMethodName();
|
String methodName = rpcRequest.getMethodName();
|
||||||
String protoName = rpcRequest.getDeclaringClassProtocolName();
|
String protoName = rpcRequest.getDeclaringClassProtocolName();
|
||||||
|
@ -467,7 +475,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
return new RpcResponseWritable(result);
|
return new RpcResponseWrapper(result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue