HDFS-13665. [SBN read] Move RPC response serialization into Server.doResponse(). Contributed by Plamen Jeliazkov.
This commit is contained in:
parent
a97f00e1c6
commit
7e0a71dd7f
|
@ -835,15 +835,15 @@ public abstract class Server {
|
||||||
final Writable rpcRequest; // Serialized Rpc request from client
|
final Writable rpcRequest; // Serialized Rpc request from client
|
||||||
ByteBuffer rpcResponse; // the response for this call
|
ByteBuffer rpcResponse; // the response for this call
|
||||||
|
|
||||||
private RpcResponseHeaderProto bufferedHeader; // the response header
|
private ResponseParams responseParams; // the response params
|
||||||
private Writable bufferedRv; // the byte response
|
private Writable rv; // the byte response
|
||||||
|
|
||||||
RpcCall(RpcCall call) {
|
RpcCall(RpcCall call) {
|
||||||
super(call);
|
super(call);
|
||||||
this.connection = call.connection;
|
this.connection = call.connection;
|
||||||
this.rpcRequest = call.rpcRequest;
|
this.rpcRequest = call.rpcRequest;
|
||||||
this.bufferedRv = call.bufferedRv;
|
this.rv = call.rv;
|
||||||
this.bufferedHeader = call.bufferedHeader;
|
this.responseParams = call.responseParams;
|
||||||
}
|
}
|
||||||
|
|
||||||
RpcCall(Connection connection, int id) {
|
RpcCall(Connection connection, int id) {
|
||||||
|
@ -864,12 +864,10 @@ public abstract class Server {
|
||||||
this.rpcRequest = param;
|
this.rpcRequest = param;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setBufferedHeader(RpcResponseHeaderProto header) {
|
void setResponseFields(Writable returnValue,
|
||||||
this.bufferedHeader = header;
|
ResponseParams responseParams) {
|
||||||
}
|
this.rv = returnValue;
|
||||||
|
this.responseParams = responseParams;
|
||||||
public void setBufferedRv(Writable rv) {
|
|
||||||
this.bufferedRv = rv;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -903,9 +901,7 @@ public abstract class Server {
|
||||||
populateResponseParamsOnError(e, responseParams);
|
populateResponseParamsOnError(e, responseParams);
|
||||||
}
|
}
|
||||||
if (!isResponseDeferred()) {
|
if (!isResponseDeferred()) {
|
||||||
setupResponse(this, responseParams.returnStatus,
|
setResponseFields(value, responseParams);
|
||||||
responseParams.detailedErr,
|
|
||||||
value, responseParams.errorClass, responseParams.error);
|
|
||||||
sendResponse();
|
sendResponse();
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -960,13 +956,11 @@ public abstract class Server {
|
||||||
setupResponse(call,
|
setupResponse(call,
|
||||||
RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
|
RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
|
||||||
null, t.getClass().getName(), StringUtils.stringifyException(t));
|
null, t.getClass().getName(), StringUtils.stringifyException(t));
|
||||||
} else if (alignmentContext != null) {
|
} else {
|
||||||
// rebuild response with state context in header
|
setupResponse(call, call.responseParams.returnStatus,
|
||||||
RpcResponseHeaderProto.Builder responseHeader =
|
call.responseParams.detailedErr, call.rv,
|
||||||
call.bufferedHeader.toBuilder();
|
call.responseParams.errorClass,
|
||||||
alignmentContext.updateResponseState(responseHeader);
|
call.responseParams.error);
|
||||||
RpcResponseHeaderProto builtHeader = responseHeader.build();
|
|
||||||
setupResponse(call, builtHeader, call.bufferedRv);
|
|
||||||
}
|
}
|
||||||
connection.sendResponse(call);
|
connection.sendResponse(call);
|
||||||
}
|
}
|
||||||
|
@ -2956,6 +2950,9 @@ public abstract class Server {
|
||||||
headerBuilder.setRetryCount(call.retryCount);
|
headerBuilder.setRetryCount(call.retryCount);
|
||||||
headerBuilder.setStatus(status);
|
headerBuilder.setStatus(status);
|
||||||
headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
|
headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
|
||||||
|
if (alignmentContext != null) {
|
||||||
|
alignmentContext.updateResponseState(headerBuilder);
|
||||||
|
}
|
||||||
|
|
||||||
if (status == RpcStatusProto.SUCCESS) {
|
if (status == RpcStatusProto.SUCCESS) {
|
||||||
RpcResponseHeaderProto header = headerBuilder.build();
|
RpcResponseHeaderProto header = headerBuilder.build();
|
||||||
|
@ -2982,12 +2979,6 @@ public abstract class Server {
|
||||||
|
|
||||||
private void setupResponse(RpcCall call,
|
private void setupResponse(RpcCall call,
|
||||||
RpcResponseHeaderProto header, Writable rv) throws IOException {
|
RpcResponseHeaderProto header, Writable rv) throws IOException {
|
||||||
if (alignmentContext != null && call.bufferedHeader == null
|
|
||||||
&& call.bufferedRv == null) {
|
|
||||||
call.setBufferedHeader(header);
|
|
||||||
call.setBufferedRv(rv);
|
|
||||||
}
|
|
||||||
|
|
||||||
final byte[] response;
|
final byte[] response;
|
||||||
if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) {
|
if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) {
|
||||||
response = setupResponseForProtobuf(header, rv);
|
response = setupResponseForProtobuf(header, rv);
|
||||||
|
|
Loading…
Reference in New Issue