HADOOP-13465. Design Server.Call to be extensible for unified call queue. Contributed by Daryn Sharp.

This commit is contained in:
Kihwal Lee 2016-08-25 11:43:39 -05:00
parent 4da5000dd3
commit d288a0ba83
1 changed files with 191 additions and 145 deletions

View File

@ -354,10 +354,9 @@ public abstract class Server {
*/ */
public static InetAddress getRemoteIp() { public static InetAddress getRemoteIp() {
Call call = CurCall.get(); Call call = CurCall.get();
return (call != null && call.connection != null) ? call.connection return (call != null ) ? call.getHostInetAddress() : null;
.getHostInetAddress() : null;
} }
/** /**
* Returns the clientId from the current RPC request * Returns the clientId from the current RPC request
*/ */
@ -380,10 +379,9 @@ public abstract class Server {
*/ */
public static UserGroupInformation getRemoteUser() { public static UserGroupInformation getRemoteUser() {
Call call = CurCall.get(); Call call = CurCall.get();
return (call != null && call.connection != null) ? call.connection.user return (call != null) ? call.getRemoteUser() : null;
: null;
} }
/** Return true if the invocation was through an RPC. /** Return true if the invocation was through an RPC.
*/ */
public static boolean isRpcInvocation() { public static boolean isRpcInvocation() {
@ -483,7 +481,7 @@ public abstract class Server {
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) && if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
(processingTime > threeSigma)) { (processingTime > threeSigma)) {
if(LOG.isWarnEnabled()) { if(LOG.isWarnEnabled()) {
String client = CurCall.get().connection.toString(); String client = CurCall.get().toString();
LOG.warn( LOG.warn(
"Slow RPC : " + methodName + " took " + processingTime + "Slow RPC : " + methodName + " took " + processingTime +
" milliseconds to process from client " + client); " milliseconds to process from client " + client);
@ -657,62 +655,65 @@ public abstract class Server {
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT); CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
} }
/** A call queued for handling. */ /** A generic call queued for handling. */
public static class Call implements Schedulable { public static class Call implements Schedulable,
private final int callId; // the client's call id PrivilegedExceptionAction<Void> {
private final int retryCount; // the retry count of the call final int callId; // the client's call id
private final Writable rpcRequest; // Serialized Rpc request from client final int retryCount; // the retry count of the call
private final Connection connection; // connection to client long timestamp; // time received when response is null
private long timestamp; // time received when response is null // time served when response is not null
// time served when response is not null
private ByteBuffer rpcResponse; // the response for this call
private AtomicInteger responseWaitCount = new AtomicInteger(1); private AtomicInteger responseWaitCount = new AtomicInteger(1);
private final RPC.RpcKind rpcKind; final RPC.RpcKind rpcKind;
private final byte[] clientId; final byte[] clientId;
private final TraceScope traceScope; // the HTrace scope on the server side private final TraceScope traceScope; // the HTrace scope on the server side
private final CallerContext callerContext; // the call context private final CallerContext callerContext; // the call context
private int priorityLevel; private int priorityLevel;
// the priority level assigned by scheduler, 0 by default // the priority level assigned by scheduler, 0 by default
private Call(Call call) { Call(Call call) {
this(call.callId, call.retryCount, call.rpcRequest, call.connection, this(call.callId, call.retryCount, call.rpcKind, call.clientId,
call.rpcKind, call.clientId, call.traceScope, call.callerContext); call.traceScope, call.callerContext);
} }
public Call(int id, int retryCount, Writable param, Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId) {
Connection connection) { this(id, retryCount, kind, clientId, null, null);
this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN,
RpcConstants.DUMMY_CLIENT_ID);
} }
public Call(int id, int retryCount, Writable param, Connection connection, @VisibleForTesting // primarily TestNamenodeRetryCache
public Call(int id, int retryCount, Void ignore1, Void ignore2,
RPC.RpcKind kind, byte[] clientId) { RPC.RpcKind kind, byte[] clientId) {
this(id, retryCount, param, connection, kind, clientId, null, null); this(id, retryCount, kind, clientId, null, null);
} }
public Call(int id, int retryCount, Writable param, Connection connection, Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId,
RPC.RpcKind kind, byte[] clientId, TraceScope traceScope, TraceScope traceScope, CallerContext callerContext) {
CallerContext callerContext) {
this.callId = id; this.callId = id;
this.retryCount = retryCount; this.retryCount = retryCount;
this.rpcRequest = param;
this.connection = connection;
this.timestamp = Time.now(); this.timestamp = Time.now();
this.rpcResponse = null;
this.rpcKind = kind; this.rpcKind = kind;
this.clientId = clientId; this.clientId = clientId;
this.traceScope = traceScope; this.traceScope = traceScope;
this.callerContext = callerContext; this.callerContext = callerContext;
} }
@Override @Override
public String toString() { public String toString() {
return rpcRequest + " from " + connection + " Call#" + callId + " Retry#" return "Call#" + callId + " Retry#" + retryCount;
+ retryCount;
} }
public void setResponse(ByteBuffer response) { public Void run() throws Exception {
this.rpcResponse = response; return null;
}
// should eventually be abstract but need to avoid breaking tests
public UserGroupInformation getRemoteUser() {
return null;
}
public InetAddress getHostInetAddress() {
return null;
}
public String getHostAddress() {
InetAddress addr = getHostInetAddress();
return (addr != null) ? addr.getHostAddress() : null;
} }
/** /**
@ -724,34 +725,36 @@ public abstract class Server {
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"HDFS"}) @InterfaceAudience.LimitedPrivate({"HDFS"})
public void postponeResponse() { public final void postponeResponse() {
int count = responseWaitCount.incrementAndGet(); int count = responseWaitCount.incrementAndGet();
assert count > 0 : "response has already been sent"; assert count > 0 : "response has already been sent";
} }
@InterfaceStability.Unstable @InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"HDFS"}) @InterfaceAudience.LimitedPrivate({"HDFS"})
public void sendResponse() throws IOException { public final void sendResponse() throws IOException {
int count = responseWaitCount.decrementAndGet(); int count = responseWaitCount.decrementAndGet();
assert count >= 0 : "response has already been sent"; assert count >= 0 : "response has already been sent";
if (count == 0) { if (count == 0) {
connection.sendResponse(this); doResponse(null);
} }
} }
@InterfaceStability.Unstable @InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"HDFS"}) @InterfaceAudience.LimitedPrivate({"HDFS"})
public void abortResponse(Throwable t) throws IOException { public final void abortResponse(Throwable t) throws IOException {
// don't send response if the call was already sent or aborted. // don't send response if the call was already sent or aborted.
if (responseWaitCount.getAndSet(-1) > 0) { if (responseWaitCount.getAndSet(-1) > 0) {
connection.abortResponse(this, t); doResponse(t);
} }
} }
void doResponse(Throwable t) throws IOException {}
// For Schedulable // For Schedulable
@Override @Override
public UserGroupInformation getUserGroupInformation() { public UserGroupInformation getUserGroupInformation() {
return connection.user; return getRemoteUser();
} }
@Override @Override
@ -764,6 +767,114 @@ public abstract class Server {
} }
} }
/** A RPC extended call queued for handling. */
private class RpcCall extends Call {
final Connection connection; // connection to client
final Writable rpcRequest; // Serialized Rpc request from client
ByteBuffer rpcResponse; // the response for this call
RpcCall(RpcCall call) {
super(call);
this.connection = call.connection;
this.rpcRequest = call.rpcRequest;
}
RpcCall(Connection connection, int id) {
this(connection, id, RpcConstants.INVALID_RETRY_COUNT);
}
RpcCall(Connection connection, int id, int retryCount) {
this(connection, id, retryCount, null,
RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID,
null, null);
}
RpcCall(Connection connection, int id, int retryCount,
Writable param, RPC.RpcKind kind, byte[] clientId,
TraceScope traceScope, CallerContext context) {
super(id, retryCount, kind, clientId, traceScope, context);
this.connection = connection;
this.rpcRequest = param;
}
@Override
public UserGroupInformation getRemoteUser() {
return connection.user;
}
@Override
public InetAddress getHostInetAddress() {
return connection.getHostInetAddress();
}
@Override
public Void run() throws Exception {
if (!connection.channel.isOpen()) {
Server.LOG.info(Thread.currentThread().getName() + ": skipped " + this);
return null;
}
String errorClass = null;
String error = null;
RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
RpcErrorCodeProto detailedErr = null;
Writable value = null;
try {
value = call(
rpcKind, connection.protocolName, rpcRequest, timestamp);
} catch (Throwable e) {
if (e instanceof UndeclaredThrowableException) {
e = e.getCause();
}
logException(Server.LOG, e, this);
if (e instanceof RpcServerException) {
RpcServerException rse = ((RpcServerException)e);
returnStatus = rse.getRpcStatusProto();
detailedErr = rse.getRpcErrorCodeProto();
} else {
returnStatus = RpcStatusProto.ERROR;
detailedErr = RpcErrorCodeProto.ERROR_APPLICATION;
}
errorClass = e.getClass().getName();
error = StringUtils.stringifyException(e);
// Remove redundant error class name from the beginning of the
// stack trace
String exceptionHdr = errorClass + ": ";
if (error.startsWith(exceptionHdr)) {
error = error.substring(exceptionHdr.length());
}
}
setupResponse(this, returnStatus, detailedErr,
value, errorClass, error);
sendResponse();
return null;
}
void setResponse(ByteBuffer response) throws IOException {
this.rpcResponse = response;
}
@Override
void doResponse(Throwable t) throws IOException {
RpcCall call = this;
if (t != null) {
// clone the call to prevent a race with another thread stomping
// on the response while being sent. the original call is
// effectively discarded since the wait count won't hit zero
call = new RpcCall(this);
setupResponse(call,
RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
null, t.getClass().getName(), StringUtils.stringifyException(t));
}
connection.sendResponse(this);
}
@Override
public String toString() {
return super.toString() + " " + rpcRequest + " from " + connection;
}
}
/** Listens on the socket. Creates jobs for the handler threads*/ /** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread { private class Listener extends Thread {
@ -1094,22 +1205,22 @@ public abstract class Server {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Checking for old call responses."); LOG.debug("Checking for old call responses.");
} }
ArrayList<Call> calls; ArrayList<RpcCall> calls;
// get the list of channels from list of keys. // get the list of channels from list of keys.
synchronized (writeSelector.keys()) { synchronized (writeSelector.keys()) {
calls = new ArrayList<Call>(writeSelector.keys().size()); calls = new ArrayList<RpcCall>(writeSelector.keys().size());
iter = writeSelector.keys().iterator(); iter = writeSelector.keys().iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
SelectionKey key = iter.next(); SelectionKey key = iter.next();
Call call = (Call)key.attachment(); RpcCall call = (RpcCall)key.attachment();
if (call != null && key.channel() == call.connection.channel) { if (call != null && key.channel() == call.connection.channel) {
calls.add(call); calls.add(call);
} }
} }
} }
for(Call call : calls) { for (RpcCall call : calls) {
doPurge(call, now); doPurge(call, now);
} }
} catch (OutOfMemoryError e) { } catch (OutOfMemoryError e) {
@ -1127,7 +1238,7 @@ public abstract class Server {
} }
private void doAsyncWrite(SelectionKey key) throws IOException { private void doAsyncWrite(SelectionKey key) throws IOException {
Call call = (Call)key.attachment(); RpcCall call = (RpcCall)key.attachment();
if (call == null) { if (call == null) {
return; return;
} }
@ -1155,10 +1266,10 @@ public abstract class Server {
// Remove calls that have been pending in the responseQueue // Remove calls that have been pending in the responseQueue
// for a long time. // for a long time.
// //
private void doPurge(Call call, long now) { private void doPurge(RpcCall call, long now) {
LinkedList<Call> responseQueue = call.connection.responseQueue; LinkedList<RpcCall> responseQueue = call.connection.responseQueue;
synchronized (responseQueue) { synchronized (responseQueue) {
Iterator<Call> iter = responseQueue.listIterator(0); Iterator<RpcCall> iter = responseQueue.listIterator(0);
while (iter.hasNext()) { while (iter.hasNext()) {
call = iter.next(); call = iter.next();
if (now > call.timestamp + PURGE_INTERVAL) { if (now > call.timestamp + PURGE_INTERVAL) {
@ -1172,12 +1283,12 @@ public abstract class Server {
// Processes one response. Returns true if there are no more pending // Processes one response. Returns true if there are no more pending
// data for this channel. // data for this channel.
// //
private boolean processResponse(LinkedList<Call> responseQueue, private boolean processResponse(LinkedList<RpcCall> responseQueue,
boolean inHandler) throws IOException { boolean inHandler) throws IOException {
boolean error = true; boolean error = true;
boolean done = false; // there is more data for this channel. boolean done = false; // there is more data for this channel.
int numElements = 0; int numElements = 0;
Call call = null; RpcCall call = null;
try { try {
synchronized (responseQueue) { synchronized (responseQueue) {
// //
@ -1260,7 +1371,7 @@ public abstract class Server {
// //
// Enqueue a response from the application. // Enqueue a response from the application.
// //
void doRespond(Call call) throws IOException { void doRespond(RpcCall call) throws IOException {
synchronized (call.connection.responseQueue) { synchronized (call.connection.responseQueue) {
// must only wrap before adding to the responseQueue to prevent // must only wrap before adding to the responseQueue to prevent
// postponed responses from being encrypted and sent out of order. // postponed responses from being encrypted and sent out of order.
@ -1358,7 +1469,7 @@ public abstract class Server {
private SocketChannel channel; private SocketChannel channel;
private ByteBuffer data; private ByteBuffer data;
private ByteBuffer dataLengthBuffer; private ByteBuffer dataLengthBuffer;
private LinkedList<Call> responseQueue; private LinkedList<RpcCall> responseQueue;
// number of outstanding rpcs // number of outstanding rpcs
private AtomicInteger rpcCount = new AtomicInteger(); private AtomicInteger rpcCount = new AtomicInteger();
private long lastContact; private long lastContact;
@ -1385,8 +1496,8 @@ public abstract class Server {
public UserGroupInformation attemptingUser = null; // user name before auth public UserGroupInformation attemptingUser = null; // user name before auth
// Fake 'call' for failed authorization response // Fake 'call' for failed authorization response
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID, private final RpcCall authFailedCall =
RpcConstants.INVALID_RETRY_COUNT, null, this); new RpcCall(this, AUTHORIZATION_FAILED_CALL_ID);
private boolean sentNegotiate = false; private boolean sentNegotiate = false;
private boolean useWrap = false; private boolean useWrap = false;
@ -1409,7 +1520,7 @@ public abstract class Server {
this.hostAddress = addr.getHostAddress(); this.hostAddress = addr.getHostAddress();
} }
this.remotePort = socket.getPort(); this.remotePort = socket.getPort();
this.responseQueue = new LinkedList<Call>(); this.responseQueue = new LinkedList<RpcCall>();
if (socketSendBufferSize != 0) { if (socketSendBufferSize != 0) {
try { try {
socket.setSendBufferSize(socketSendBufferSize); socket.setSendBufferSize(socketSendBufferSize);
@ -1704,8 +1815,7 @@ public abstract class Server {
} }
private void doSaslReply(Message message) throws IOException { private void doSaslReply(Message message) throws IOException {
final Call saslCall = new Call(AuthProtocol.SASL.callId, final RpcCall saslCall = new RpcCall(this, AuthProtocol.SASL.callId);
RpcConstants.INVALID_RETRY_COUNT, null, this);
setupResponse(saslCall, setupResponse(saslCall,
RpcStatusProto.SUCCESS, null, RpcStatusProto.SUCCESS, null,
RpcWritable.wrap(message), null, null); RpcWritable.wrap(message), null, null);
@ -1922,23 +2032,20 @@ public abstract class Server {
if (clientVersion >= 9) { if (clientVersion >= 9) {
// Versions >>9 understand the normal response // Versions >>9 understand the normal response
Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null, RpcCall fakeCall = new RpcCall(this, -1);
this);
setupResponse(fakeCall, setupResponse(fakeCall,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH, RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
null, VersionMismatch.class.getName(), errMsg); null, VersionMismatch.class.getName(), errMsg);
fakeCall.sendResponse(); fakeCall.sendResponse();
} else if (clientVersion >= 3) { } else if (clientVersion >= 3) {
Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null, RpcCall fakeCall = new RpcCall(this, -1);
this);
// Versions 3 to 8 use older response // Versions 3 to 8 use older response
setupResponseOldVersionFatal(buffer, fakeCall, setupResponseOldVersionFatal(buffer, fakeCall,
null, VersionMismatch.class.getName(), errMsg); null, VersionMismatch.class.getName(), errMsg);
fakeCall.sendResponse(); fakeCall.sendResponse();
} else if (clientVersion == 2) { // Hadoop 0.18.3 } else if (clientVersion == 2) { // Hadoop 0.18.3
Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, RpcCall fakeCall = new RpcCall(this, 0);
this);
DataOutputStream out = new DataOutputStream(buffer); DataOutputStream out = new DataOutputStream(buffer);
out.writeInt(0); // call ID out.writeInt(0); // call ID
out.writeBoolean(true); // error out.writeBoolean(true); // error
@ -1950,7 +2057,7 @@ public abstract class Server {
} }
private void setupHttpRequestOnIpcPortResponse() throws IOException { private void setupHttpRequestOnIpcPortResponse() throws IOException {
Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this); RpcCall fakeCall = new RpcCall(this, 0);
fakeCall.setResponse(ByteBuffer.wrap( fakeCall.setResponse(ByteBuffer.wrap(
RECEIVED_HTTP_REQ_RESPONSE.getBytes(StandardCharsets.UTF_8))); RECEIVED_HTTP_REQ_RESPONSE.getBytes(StandardCharsets.UTF_8)));
fakeCall.sendResponse(); fakeCall.sendResponse();
@ -2098,7 +2205,7 @@ public abstract class Server {
} }
} catch (WrappedRpcServerException wrse) { // inform client of error } catch (WrappedRpcServerException wrse) { // inform client of error
Throwable ioe = wrse.getCause(); Throwable ioe = wrse.getCause();
final Call call = new Call(callId, retry, null, this); final RpcCall call = new RpcCall(this, callId, retry);
setupResponse(call, setupResponse(call,
RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null, RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
ioe.getClass().getName(), ioe.getMessage()); ioe.getClass().getName(), ioe.getMessage());
@ -2198,8 +2305,9 @@ public abstract class Server {
.build(); .build();
} }
Call call = new Call(header.getCallId(), header.getRetryCount(), RpcCall call = new RpcCall(this, header.getCallId(),
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header.getRetryCount(), rpcRequest,
ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceScope, callerContext); header.getClientId().toByteArray(), traceScope, callerContext);
// Save the priority level assignment by the scheduler // Save the priority level assignment by the scheduler
@ -2323,21 +2431,10 @@ public abstract class Server {
} }
} }
private void sendResponse(Call call) throws IOException { private void sendResponse(RpcCall call) throws IOException {
responder.doRespond(call); responder.doRespond(call);
} }
private void abortResponse(Call call, Throwable t) throws IOException {
// clone the call to prevent a race with the other thread stomping
// on the response while being sent. the original call is
// effectively discarded since the wait count won't hit zero
call = new Call(call);
setupResponse(call,
RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
null, t.getClass().getName(), StringUtils.stringifyException(t));
call.sendResponse();
}
/** /**
* Get service class for connection * Get service class for connection
* @return the serviceClass * @return the serviceClass
@ -2388,16 +2485,6 @@ public abstract class Server {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind); LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
} }
if (!call.connection.channel.isOpen()) {
LOG.info(Thread.currentThread().getName() + ": skipped " + call);
continue;
}
String errorClass = null;
String error = null;
RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
RpcErrorCodeProto detailedErr = null;
Writable value = null;
CurCall.set(call); CurCall.set(call);
if (call.traceScope != null) { if (call.traceScope != null) {
call.traceScope.reattach(); call.traceScope.reattach();
@ -2406,53 +2493,11 @@ public abstract class Server {
} }
// always update the current call context // always update the current call context
CallerContext.setCurrent(call.callerContext); CallerContext.setCurrent(call.callerContext);
UserGroupInformation remoteUser = call.getRemoteUser();
try { if (remoteUser != null) {
// Make the call as the user via Subject.doAs, thus associating remoteUser.doAs(call);
// the call with the Subject } else {
if (call.connection.user == null) { call.run();
value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
call.connection.user.doAs
(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
// make the call
return call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);
}
}
);
}
} catch (Throwable e) {
if (e instanceof UndeclaredThrowableException) {
e = e.getCause();
}
logException(LOG, e, call);
if (e instanceof RpcServerException) {
RpcServerException rse = ((RpcServerException)e);
returnStatus = rse.getRpcStatusProto();
detailedErr = rse.getRpcErrorCodeProto();
} else {
returnStatus = RpcStatusProto.ERROR;
detailedErr = RpcErrorCodeProto.ERROR_APPLICATION;
}
errorClass = e.getClass().getName();
error = StringUtils.stringifyException(e);
// Remove redundant error class name from the beginning of the stack trace
String exceptionHdr = errorClass + ": ";
if (error.startsWith(exceptionHdr)) {
error = error.substring(exceptionHdr.length());
}
}
CurCall.set(null);
synchronized (call.connection.responseQueue) {
setupResponse(call, returnStatus, detailedErr,
value, errorClass, error);
call.sendResponse();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (running) { // unexpected -- log it if (running) { // unexpected -- log it
@ -2469,6 +2514,7 @@ public abstract class Server {
StringUtils.stringifyException(e)); StringUtils.stringifyException(e));
} }
} finally { } finally {
CurCall.set(null);
IOUtils.cleanup(LOG, traceScope); IOUtils.cleanup(LOG, traceScope);
} }
} }
@ -2670,7 +2716,7 @@ public abstract class Server {
* @throws IOException * @throws IOException
*/ */
private void setupResponse( private void setupResponse(
Call call, RpcStatusProto status, RpcErrorCodeProto erCode, RpcCall call, RpcStatusProto status, RpcErrorCodeProto erCode,
Writable rv, String errorClass, String error) Writable rv, String errorClass, String error)
throws IOException { throws IOException {
RpcResponseHeaderProto.Builder headerBuilder = RpcResponseHeaderProto.Builder headerBuilder =
@ -2704,7 +2750,7 @@ public abstract class Server {
} }
} }
private void setupResponse(Call call, private void setupResponse(RpcCall call,
RpcResponseHeaderProto header, Writable rv) throws IOException { RpcResponseHeaderProto header, Writable rv) throws IOException {
ResponseBuffer buf = responseBuffer.get().reset(); ResponseBuffer buf = responseBuffer.get().reset();
try { try {
@ -2738,7 +2784,7 @@ public abstract class Server {
* @throws IOException * @throws IOException
*/ */
private void setupResponseOldVersionFatal(ByteArrayOutputStream response, private void setupResponseOldVersionFatal(ByteArrayOutputStream response,
Call call, RpcCall call,
Writable rv, String errorClass, String error) Writable rv, String errorClass, String error)
throws IOException { throws IOException {
final int OLD_VERSION_FATAL_STATUS = -1; final int OLD_VERSION_FATAL_STATUS = -1;
@ -2751,7 +2797,7 @@ public abstract class Server {
call.setResponse(ByteBuffer.wrap(response.toByteArray())); call.setResponse(ByteBuffer.wrap(response.toByteArray()));
} }
private void wrapWithSasl(Call call) throws IOException { private void wrapWithSasl(RpcCall call) throws IOException {
if (call.connection.saslServer != null) { if (call.connection.saslServer != null) {
byte[] token = call.rpcResponse.array(); byte[] token = call.rpcResponse.array();
// synchronization may be needed since there can be multiple Handler // synchronization may be needed since there can be multiple Handler