HBASE-18009 Move RpcServer.Call to a separated file

This commit is contained in:
zhangduo 2017-05-08 20:36:33 +08:00
parent 959deb0e5c
commit 51d4c68b7c
11 changed files with 775 additions and 691 deletions

View File

@ -75,8 +75,8 @@ public class CallRunner {
* @deprecated As of release 2.0, this will be removed in HBase 3.0
*/
@Deprecated
public RpcServer.Call getCall() {
return (RpcServer.Call) call;
public ServerCall getCall() {
return (ServerCall) call;
}
public void setStatus(MonitoredRPCHandler status) {

View File

@ -59,7 +59,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
@ -203,13 +202,14 @@ public class NettyRpcServer extends RpcServer {
this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
}
this.remotePort = inetSocketAddress.getPort();
this.saslCall = new Call(SASL_CALLID, null, null, null, null, null, this,
0, null, null, 0, null);
this.setConnectionHeaderResponseCall = new Call(
CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null,
this, 0, null, null, 0, null);
this.authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null,
null, null, null, this, 0, null, null, 0, null);
this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
this.setConnectionHeaderResponseCall =
new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null, this,
0, null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
this.authFailedCall =
new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, 0,
null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
}
void readPreamble(ByteBuf buffer) throws IOException {
@ -243,7 +243,7 @@ public class NettyRpcServer extends RpcServer {
AccessDeniedException ae = new AccessDeniedException(
"Authentication is required");
setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
((Call) authFailedCall)
((NettyServerCall) authFailedCall)
.sendResponseIfReady(ChannelFutureListener.CLOSE);
return;
}
@ -269,8 +269,8 @@ public class NettyRpcServer extends RpcServer {
private void doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg);
Call fakeCall = new Call(-1, null, null, null, null, null, this, -1,
null, null, 0, null);
NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1,
null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
setupResponse(null, fakeCall, e, msg);
// closes out the connection.
fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
@ -336,59 +336,17 @@ public class NettyRpcServer extends RpcServer {
}
@Override
public RpcServer.Call createCall(int id, final BlockingService service,
public ServerCall createCall(int id, final BlockingService service,
final MethodDescriptor md, RequestHeader header, Message param,
CellScanner cellScanner, RpcServer.Connection connection, long size,
TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
CallCleanup reqCleanup) {
return new Call(id, service, md, header, param, cellScanner, connection,
size, tinfo, remoteAddress, timeout, reqCleanup);
return new NettyServerCall(id, service, md, header, param, cellScanner, connection, size,
tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder,
reqCleanup);
}
}
/**
* Datastructure that holds all necessary to a method invocation and then afterward, carries the
* result.
*/
@InterfaceStability.Evolving
public class Call extends RpcServer.Call {
Call(int id, final BlockingService service, final MethodDescriptor md,
RequestHeader header, Message param, CellScanner cellScanner,
RpcServer.Connection connection, long size, TraceInfo tinfo,
final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
super(id, service, md, header, param, cellScanner,
connection, size, tinfo, remoteAddress, timeout, reqCleanup);
}
@Override
public long disconnectSince() {
if (!getConnection().isConnectionOpen()) {
return System.currentTimeMillis() - timestamp;
} else {
return -1L;
}
}
NettyConnection getConnection() {
return (NettyConnection) this.connection;
}
/**
* If we have a response, and delay is not set, then respond immediately. Otherwise, do not
* respond to client. This is called by the RPC code in the context of the Handler thread.
*/
@Override
public synchronized void sendResponseIfReady() throws IOException {
getConnection().channel.writeAndFlush(this);
}
public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException {
getConnection().channel.writeAndFlush(this).addListener(listener);
}
}
private class Initializer extends ChannelInitializer<SocketChannel> {
final int maxRequestSize;
@ -483,7 +441,7 @@ public class NettyRpcServer extends RpcServer {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
final Call call = (Call) msg;
final NettyServerCall call = (NettyServerCall) msg;
ByteBuf response = Unpooled.wrappedBuffer(call.response.getBuffers());
ctx.write(response, promise).addListener(new CallWriteListener(call));
}
@ -492,9 +450,9 @@ public class NettyRpcServer extends RpcServer {
private class CallWriteListener implements ChannelFutureListener {
private Call call;
private NettyServerCall call;
CallWriteListener(Call call) {
CallWriteListener(NettyServerCall call) {
this.call = call;
}
@ -527,14 +485,11 @@ public class NettyRpcServer extends RpcServer {
}
@Override
public Pair<Message, CellScanner> call(BlockingService service,
MethodDescriptor md, Message param, CellScanner cellScanner,
long receiveTime, MonitoredRPCHandler status, long startTime, int timeout)
throws IOException {
Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null,
-1, null, null, timeout, null);
fakeCall.setReceiveTime(receiveTime);
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
long startTime, int timeout) throws IOException {
NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null,
-1, null, null, receiveTime, timeout, reservoir, cellBlockBuilder, null);
return call(fakeCall, status);
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import io.netty.channel.ChannelFutureListener;
import java.io.IOException;
import java.net.InetAddress;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.NettyRpcServer.NettyConnection;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.htrace.TraceInfo;
/**
* Datastructure that holds all necessary to a method invocation and then afterward, carries the
* result.
*/
@InterfaceAudience.Private
class NettyServerCall extends ServerCall {
NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, RpcServer.Connection connection, long size,
TraceInfo tinfo, InetAddress remoteAddress, long receiveTime, int timeout,
ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress,
receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
}
NettyConnection getConnection() {
return (NettyConnection) this.connection;
}
/**
* If we have a response, and delay is not set, then respond immediately. Otherwise, do not
* respond to client. This is called by the RPC code in the context of the Handler thread.
*/
@Override
public synchronized void sendResponseIfReady() throws IOException {
getConnection().channel.writeAndFlush(this);
}
public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException {
getConnection().channel.writeAndFlush(this).addListener(listener);
}
}

View File

@ -64,11 +64,6 @@ public interface RpcCall extends RpcCallContext {
*/
long getReceiveTime();
/**
* Set the timestamp when the call is constructed.
*/
void setReceiveTime(long receiveTime);
/**
* @return The time when the call starts to be executed.
*/

View File

@ -62,9 +62,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
@ -88,7 +86,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
@ -98,13 +95,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.BytesWritable;
@ -120,7 +113,6 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.TraceInfo;
import org.codehaus.jackson.map.ObjectMapper;
@ -265,475 +257,6 @@ public abstract class RpcServer implements RpcServerInterface,
*/
private RSRpcServices rsRpcServices;
/**
* Datastructure that holds all necessary to a method invocation and then afterward, carries
* the result.
*/
@InterfaceStability.Evolving
@InterfaceAudience.Private
public abstract class Call implements RpcCall {
protected int id; // the client's call id
protected BlockingService service;
protected MethodDescriptor md;
protected RequestHeader header;
protected Message param; // the parameter passed
// Optional cell data passed outside of protobufs.
protected CellScanner cellScanner;
protected Connection connection; // connection to client
protected long timestamp; // the time received when response is null
// the time served when response is not null
protected int timeout;
protected long startTime;
protected long deadline;// the deadline to handle this call, if exceed we can drop it.
/**
* Chain of buffers to send as response.
*/
protected BufferChain response;
protected long size; // size of current call
protected boolean isError;
protected TraceInfo tinfo;
protected ByteBufferListOutputStream cellBlockStream = null;
protected CallCleanup reqCleanup = null;
protected User user;
protected InetAddress remoteAddress;
protected RpcCallback rpcCallback;
private long responseCellSize = 0;
private long responseBlockSize = 0;
// cumulative size of serialized exceptions
private long exceptionSize = 0;
private boolean retryImmediatelySupported;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
justification="Can't figure why this complaint is happening... see below")
Call(int id, final BlockingService service, final MethodDescriptor md,
RequestHeader header, Message param, CellScanner cellScanner,
Connection connection, long size, TraceInfo tinfo,
final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
this.id = id;
this.service = service;
this.md = md;
this.header = header;
this.param = param;
this.cellScanner = cellScanner;
this.connection = connection;
this.timestamp = System.currentTimeMillis();
this.response = null;
this.isError = false;
this.size = size;
this.tinfo = tinfo;
this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
this.remoteAddress = remoteAddress;
this.retryImmediatelySupported =
connection == null? null: connection.retryImmediatelySupported;
this.timeout = timeout;
this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE;
this.reqCleanup = reqCleanup;
}
/**
* Call is done. Execution happened and we returned results to client. It is
* now safe to cleanup.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
justification = "Presume the lock on processing request held by caller is protection enough")
void done() {
if (this.cellBlockStream != null) {
// This will return back the BBs which we got from pool.
this.cellBlockStream.releaseResources();
this.cellBlockStream = null;
}
// If the call was run successfuly, we might have already returned the BB
// back to pool. No worries..Then inputCellBlock will be null
cleanup();
}
@Override
public void cleanup() {
if (this.reqCleanup != null) {
this.reqCleanup.run();
this.reqCleanup = null;
}
}
@Override
public String toString() {
return toShortString() + " param: " +
(this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
" connection: " + connection.toString();
}
@Override
public RequestHeader getHeader() {
return this.header;
}
@Override
public int getPriority() {
return this.header.getPriority();
}
/*
* Short string representation without param info because param itself could be huge depends on
* the payload of a command
*/
@Override
public String toShortString() {
String serviceName = this.connection.service != null ?
this.connection.service.getDescriptorForType().getName() : "null";
return "callId: " + this.id + " service: " + serviceName +
" methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
" size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
" connection: " + connection.toString() +
" deadline: " + deadline;
}
protected synchronized void setSaslTokenResponse(ByteBuffer response) {
ByteBuffer[] responseBufs = new ByteBuffer[1];
responseBufs[0] = response;
this.response = new BufferChain(responseBufs);
}
protected synchronized void setConnectionHeaderResponse(ByteBuffer response) {
ByteBuffer[] responseBufs = new ByteBuffer[1];
responseBufs[0] = response;
this.response = new BufferChain(responseBufs);
}
@Override
public synchronized void setResponse(Message m, final CellScanner cells,
Throwable t, String errorMsg) {
if (this.isError) return;
if (t != null) this.isError = true;
BufferChain bc = null;
try {
ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
// Call id.
headerBuilder.setCallId(this.id);
if (t != null) {
setExceptionResponse(t, errorMsg, headerBuilder);
}
// Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
// reservoir when finished. This is hacky and the hack is not contained but benefits are
// high when we can avoid a big buffer allocation on each rpc.
List<ByteBuffer> cellBlock = null;
int cellBlockSize = 0;
if (reservoir != null) {
this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec,
this.connection.compressionCodec, cells, reservoir);
if (this.cellBlockStream != null) {
cellBlock = this.cellBlockStream.getByteBuffers();
cellBlockSize = this.cellBlockStream.size();
}
} else {
ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec,
this.connection.compressionCodec, cells);
if (b != null) {
cellBlockSize = b.remaining();
cellBlock = new ArrayList<>(1);
cellBlock.add(b);
}
}
if (cellBlockSize > 0) {
CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
// Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
cellBlockBuilder.setLength(cellBlockSize);
headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
Message header = headerBuilder.build();
ByteBuffer headerBuf =
createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
ByteBuffer[] responseBufs = null;
int cellBlockBufferSize = 0;
if (cellBlock != null) {
cellBlockBufferSize = cellBlock.size();
responseBufs = new ByteBuffer[1 + cellBlockBufferSize];
} else {
responseBufs = new ByteBuffer[1];
}
responseBufs[0] = headerBuf;
if (cellBlock != null) {
for (int i = 0; i < cellBlockBufferSize; i++) {
responseBufs[i + 1] = cellBlock.get(i);
}
}
bc = new BufferChain(responseBufs);
if (connection.useWrap) {
bc = wrapWithSasl(bc);
}
} catch (IOException e) {
LOG.warn("Exception while creating response " + e);
}
this.response = bc;
// Once a response message is created and set to this.response, this Call can be treated as
// done. The Responder thread will do the n/w write of this message back to client.
if (this.rpcCallback != null) {
try {
this.rpcCallback.run();
} catch (Exception e) {
// Don't allow any exception here to kill this handler thread.
LOG.warn("Exception while running the Rpc Callback.", e);
}
}
}
protected void setExceptionResponse(Throwable t, String errorMsg,
ResponseHeader.Builder headerBuilder) {
ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
exceptionBuilder.setExceptionClassName(t.getClass().getName());
exceptionBuilder.setStackTrace(errorMsg);
exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
if (t instanceof RegionMovedException) {
// Special casing for this exception. This is only one carrying a payload.
// Do this instead of build a generic system for allowing exceptions carry
// any kind of payload.
RegionMovedException rme = (RegionMovedException)t;
exceptionBuilder.setHostname(rme.getHostname());
exceptionBuilder.setPort(rme.getPort());
}
// Set the exception as the result of the method invocation.
headerBuilder.setException(exceptionBuilder.build());
}
protected ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException {
// Organize the response as a set of bytebuffers rather than collect it all together inside
// one big byte array; save on allocations.
// for writing the header, we check if there is available space in the buffers
// created for the cellblock itself. If there is space for the header, we reuse
// the last buffer in the cellblock. This applies to the cellblock created from the
// pool or even the onheap cellblock buffer in case there is no pool enabled.
// Possible reuse would avoid creating a temporary array for storing the header every time.
ByteBuffer possiblePBBuf =
(cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
resultVintSize = 0;
if (header != null) {
headerSerializedSize = header.getSerializedSize();
headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize);
}
if (result != null) {
resultSerializedSize = result.getSerializedSize();
resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize);
}
// calculate the total size
int totalSize = headerSerializedSize + headerVintSize
+ (resultSerializedSize + resultVintSize)
+ cellBlockSize;
int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize
+ resultVintSize + Bytes.SIZEOF_INT;
// Only if the last buffer has enough space for header use it. Else allocate
// a new buffer. Assume they are all flipped
if (possiblePBBuf != null
&& possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
// duplicate the buffer. This is where the header is going to be written
ByteBuffer pbBuf = possiblePBBuf.duplicate();
// get the current limit
int limit = pbBuf.limit();
// Position such that we write the header to the end of the buffer
pbBuf.position(limit);
// limit to the header size
pbBuf.limit(totalPBSize + limit);
// mark the current position
pbBuf.mark();
writeToCOS(result, header, totalSize, pbBuf);
// reset the buffer back to old position
pbBuf.reset();
return pbBuf;
} else {
return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
}
}
private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
throws IOException {
ByteBufferUtils.putInt(pbBuf, totalSize);
// create COS that works on BB
CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf);
if (header != null) {
cos.writeMessageNoTag(header);
}
if (result != null) {
cos.writeMessageNoTag(result);
}
cos.flush();
cos.checkNoSpaceLeft();
}
private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
int totalSize, int totalPBSize) throws IOException {
ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
writeToCOS(result, header, totalSize, pbBuf);
pbBuf.flip();
return pbBuf;
}
protected BufferChain wrapWithSasl(BufferChain bc)
throws IOException {
if (!this.connection.useSasl) return bc;
// Looks like no way around this; saslserver wants a byte array. I have to make it one.
// THIS IS A BIG UGLY COPY.
byte [] responseBytes = bc.getBytes();
byte [] token;
// synchronization may be needed since there can be multiple Handler
// threads using saslServer or Crypto AES to wrap responses.
if (connection.useCryptoAesWrap) {
// wrap with Crypto AES
synchronized (connection.cryptoAES) {
token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length);
}
} else {
synchronized (connection.saslServer) {
token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("Adding saslServer wrapped token of size " + token.length
+ " as call response.");
}
ByteBuffer[] responseBufs = new ByteBuffer[2];
responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length));
responseBufs[1] = ByteBuffer.wrap(token);
return new BufferChain(responseBufs);
}
@Override
public boolean isClientCellBlockSupported() {
return this.connection != null && this.connection.codec != null;
}
@Override
public long getResponseCellSize() {
return responseCellSize;
}
@Override
public void incrementResponseCellSize(long cellSize) {
responseCellSize += cellSize;
}
@Override
public long getResponseBlockSize() {
return responseBlockSize;
}
@Override
public void incrementResponseBlockSize(long blockSize) {
responseBlockSize += blockSize;
}
@Override
public long getResponseExceptionSize() {
return exceptionSize;
}
@Override
public void incrementResponseExceptionSize(long exSize) {
exceptionSize += exSize;
}
@Override
public long getSize() {
return this.size;
}
@Override
public long getDeadline() {
return deadline;
}
@Override
public User getRequestUser() {
return user;
}
@Override
public String getRequestUserName() {
User user = getRequestUser();
return user == null? null: user.getShortName();
}
@Override
public InetAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public VersionInfo getClientVersionInfo() {
return connection.getVersionInfo();
}
@Override
public synchronized void setCallBack(RpcCallback callback) {
this.rpcCallback = callback;
}
@Override
public boolean isRetryImmediatelySupported() {
return retryImmediatelySupported;
}
@Override
public BlockingService getService() {
return service;
}
@Override
public MethodDescriptor getMethod() {
return md;
}
@Override
public Message getParam() {
return param;
}
@Override
public CellScanner getCellScanner() {
return cellScanner;
}
@Override
public long getReceiveTime() {
return timestamp;
}
@Override
public void setReceiveTime(long t) {
this.timestamp = t;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public void setStartTime(long t) {
this.startTime = t;
}
@Override
public int getTimeout() {
return timeout;
}
@Override
public int getRemotePort() {
return connection.getRemotePort();
}
@Override
public TraceInfo getTraceInfo() {
return tinfo;
}
}
@FunctionalInterface
protected static interface CallCleanup {
void run();
@ -781,15 +304,15 @@ public abstract class RpcServer implements RpcServerInterface,
protected boolean useCryptoAesWrap = false;
// Fake 'call' for failed authorization response
protected static final int AUTHORIZATION_FAILED_CALLID = -1;
protected Call authFailedCall;
protected ServerCall authFailedCall;
protected ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
protected static final int SASL_CALLID = -33;
protected Call saslCall;
protected ServerCall saslCall;
// Fake 'call' for connection header response
protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
protected Call setConnectionHeaderResponseCall;
protected ServerCall setConnectionHeaderResponseCall;
// was authentication allowed with a fallback to simple auth
protected boolean authenticatedWithFallback;
@ -1366,7 +889,7 @@ public abstract class RpcServer implements RpcServerInterface,
// This is a bit late to be doing this check - we have already read in the
// total request.
if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
final RpcServer.Call callTooBig = createCall(id, this.service, null,
final ServerCall callTooBig = createCall(id, this.service, null,
null, null, null, this, totalRequestSize, null, null, 0,
this.callCleanup);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
@ -1430,7 +953,7 @@ public abstract class RpcServer implements RpcServerInterface,
t = new DoNotRetryIOException(t);
}
final RpcServer.Call readParamsFailedCall = createCall(id,
final ServerCall readParamsFailedCall = createCall(id,
this.service, null, null, null, null, this, totalRequestSize, null,
null, 0, this.callCleanup);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
@ -1447,7 +970,7 @@ public abstract class RpcServer implements RpcServerInterface,
if (header.hasTimeout() && header.getTimeout() > 0) {
timeout = Math.max(minClientRequestTimeout, header.getTimeout());
}
RpcServer.Call call = createCall(id, this.service, md, header, param,
ServerCall call = createCall(id, this.service, md, header, param,
cellScanner, this, totalRequestSize, traceInfo, this.addr, timeout,
this.callCleanup);
@ -1465,7 +988,7 @@ public abstract class RpcServer implements RpcServerInterface,
public abstract boolean isConnectionOpen();
public abstract Call createCall(int id, final BlockingService service,
public abstract ServerCall createCall(int id, final BlockingService service,
final MethodDescriptor md, RequestHeader header, Message param,
CellScanner cellScanner, Connection connection, long size,
TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
@ -1594,14 +1117,13 @@ public abstract class RpcServer implements RpcServerInterface,
/**
* Setup response for the RPC Call.
*
* @param response buffer to serialize the response into
* @param call {@link Call} to which we are setting up the response
* @param call {@link ServerCall} to which we are setting up the response
* @param error error message, if the call failed
* @throws IOException
*/
protected void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
throws IOException {
protected void setupResponse(ByteArrayOutputStream response, ServerCall call, Throwable t,
String error) throws IOException {
if (response != null) response.reset();
call.setResponse(null, null, t, error);
}

View File

@ -0,0 +1,527 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.ipc.RpcServer.Connection;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.TraceInfo;
/**
* Datastructure that holds all necessary to a method invocation and then afterward, carries
* the result.
*/
@InterfaceAudience.Private
abstract class ServerCall implements RpcCall {
protected final int id; // the client's call id
protected final BlockingService service;
protected final MethodDescriptor md;
protected final RequestHeader header;
protected Message param; // the parameter passed
// Optional cell data passed outside of protobufs.
protected final CellScanner cellScanner;
protected final Connection connection; // connection to client
protected final long receiveTime; // the time received when response is null
// the time served when response is not null
protected final int timeout;
protected long startTime;
protected final long deadline;// the deadline to handle this call, if exceed we can drop it.
protected final ByteBufferPool reservoir;
protected final CellBlockBuilder cellBlockBuilder;
/**
* Chain of buffers to send as response.
*/
protected BufferChain response;
protected final long size; // size of current call
protected boolean isError;
protected final TraceInfo tinfo;
protected ByteBufferListOutputStream cellBlockStream = null;
protected CallCleanup reqCleanup = null;
protected User user;
protected final InetAddress remoteAddress;
protected RpcCallback rpcCallback;
private long responseCellSize = 0;
private long responseBlockSize = 0;
// cumulative size of serialized exceptions
private long exceptionSize = 0;
private final boolean retryImmediatelySupported;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
justification="Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, long size, TraceInfo tinfo,
InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
this.id = id;
this.service = service;
this.md = md;
this.header = header;
this.param = param;
this.cellScanner = cellScanner;
this.connection = connection;
this.receiveTime = receiveTime;
this.response = null;
this.isError = false;
this.size = size;
this.tinfo = tinfo;
this.user = connection == null ? null : connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
this.remoteAddress = remoteAddress;
this.retryImmediatelySupported =
connection == null ? false : connection.retryImmediatelySupported;
this.timeout = timeout;
this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE;
this.reservoir = reservoir;
this.cellBlockBuilder = cellBlockBuilder;
this.reqCleanup = reqCleanup;
}
/**
* Call is done. Execution happened and we returned results to client. It is
* now safe to cleanup.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
justification = "Presume the lock on processing request held by caller is protection enough")
void done() {
if (this.cellBlockStream != null) {
// This will return back the BBs which we got from pool.
this.cellBlockStream.releaseResources();
this.cellBlockStream = null;
}
// If the call was run successfuly, we might have already returned the BB
// back to pool. No worries..Then inputCellBlock will be null
cleanup();
}
@Override
public void cleanup() {
if (this.reqCleanup != null) {
this.reqCleanup.run();
this.reqCleanup = null;
}
}
@Override
public String toString() {
return toShortString() + " param: " +
(this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
" connection: " + connection.toString();
}
@Override
public RequestHeader getHeader() {
return this.header;
}
@Override
public int getPriority() {
return this.header.getPriority();
}
/*
* Short string representation without param info because param itself could be huge depends on
* the payload of a command
*/
@Override
public String toShortString() {
String serviceName = this.connection.service != null ?
this.connection.service.getDescriptorForType().getName() : "null";
return "callId: " + this.id + " service: " + serviceName +
" methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
" size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
" connection: " + connection.toString() +
" deadline: " + deadline;
}
protected synchronized void setSaslTokenResponse(ByteBuffer response) {
ByteBuffer[] responseBufs = new ByteBuffer[1];
responseBufs[0] = response;
this.response = new BufferChain(responseBufs);
}
protected synchronized void setConnectionHeaderResponse(ByteBuffer response) {
ByteBuffer[] responseBufs = new ByteBuffer[1];
responseBufs[0] = response;
this.response = new BufferChain(responseBufs);
}
@Override
public synchronized void setResponse(Message m, final CellScanner cells,
Throwable t, String errorMsg) {
if (this.isError) return;
if (t != null) this.isError = true;
BufferChain bc = null;
try {
ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
// Call id.
headerBuilder.setCallId(this.id);
if (t != null) {
setExceptionResponse(t, errorMsg, headerBuilder);
}
// Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
// reservoir when finished. This is hacky and the hack is not contained but benefits are
// high when we can avoid a big buffer allocation on each rpc.
List<ByteBuffer> cellBlock = null;
int cellBlockSize = 0;
if (this.reservoir != null) {
this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec,
this.connection.compressionCodec, cells, this.reservoir);
if (this.cellBlockStream != null) {
cellBlock = this.cellBlockStream.getByteBuffers();
cellBlockSize = this.cellBlockStream.size();
}
} else {
ByteBuffer b = this.cellBlockBuilder.buildCellBlock(this.connection.codec,
this.connection.compressionCodec, cells);
if (b != null) {
cellBlockSize = b.remaining();
cellBlock = new ArrayList<>(1);
cellBlock.add(b);
}
}
if (cellBlockSize > 0) {
CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
// Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
cellBlockBuilder.setLength(cellBlockSize);
headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
Message header = headerBuilder.build();
ByteBuffer headerBuf =
createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
ByteBuffer[] responseBufs = null;
int cellBlockBufferSize = 0;
if (cellBlock != null) {
cellBlockBufferSize = cellBlock.size();
responseBufs = new ByteBuffer[1 + cellBlockBufferSize];
} else {
responseBufs = new ByteBuffer[1];
}
responseBufs[0] = headerBuf;
if (cellBlock != null) {
for (int i = 0; i < cellBlockBufferSize; i++) {
responseBufs[i + 1] = cellBlock.get(i);
}
}
bc = new BufferChain(responseBufs);
if (connection.useWrap) {
bc = wrapWithSasl(bc);
}
} catch (IOException e) {
RpcServer.LOG.warn("Exception while creating response " + e);
}
this.response = bc;
// Once a response message is created and set to this.response, this Call can be treated as
// done. The Responder thread will do the n/w write of this message back to client.
if (this.rpcCallback != null) {
try {
this.rpcCallback.run();
} catch (Exception e) {
// Don't allow any exception here to kill this handler thread.
RpcServer.LOG.warn("Exception while running the Rpc Callback.", e);
}
}
}
protected void setExceptionResponse(Throwable t, String errorMsg,
ResponseHeader.Builder headerBuilder) {
ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
exceptionBuilder.setExceptionClassName(t.getClass().getName());
exceptionBuilder.setStackTrace(errorMsg);
exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
if (t instanceof RegionMovedException) {
// Special casing for this exception. This is only one carrying a payload.
// Do this instead of build a generic system for allowing exceptions carry
// any kind of payload.
RegionMovedException rme = (RegionMovedException)t;
exceptionBuilder.setHostname(rme.getHostname());
exceptionBuilder.setPort(rme.getPort());
}
// Set the exception as the result of the method invocation.
headerBuilder.setException(exceptionBuilder.build());
}
protected ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException {
// Organize the response as a set of bytebuffers rather than collect it all together inside
// one big byte array; save on allocations.
// for writing the header, we check if there is available space in the buffers
// created for the cellblock itself. If there is space for the header, we reuse
// the last buffer in the cellblock. This applies to the cellblock created from the
// pool or even the onheap cellblock buffer in case there is no pool enabled.
// Possible reuse would avoid creating a temporary array for storing the header every time.
ByteBuffer possiblePBBuf =
(cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
resultVintSize = 0;
if (header != null) {
headerSerializedSize = header.getSerializedSize();
headerVintSize = CodedOutputStream.computeUInt32SizeNoTag(headerSerializedSize);
}
if (result != null) {
resultSerializedSize = result.getSerializedSize();
resultVintSize = CodedOutputStream.computeUInt32SizeNoTag(resultSerializedSize);
}
// calculate the total size
int totalSize = headerSerializedSize + headerVintSize
+ (resultSerializedSize + resultVintSize)
+ cellBlockSize;
int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize
+ resultVintSize + Bytes.SIZEOF_INT;
// Only if the last buffer has enough space for header use it. Else allocate
// a new buffer. Assume they are all flipped
if (possiblePBBuf != null
&& possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
// duplicate the buffer. This is where the header is going to be written
ByteBuffer pbBuf = possiblePBBuf.duplicate();
// get the current limit
int limit = pbBuf.limit();
// Position such that we write the header to the end of the buffer
pbBuf.position(limit);
// limit to the header size
pbBuf.limit(totalPBSize + limit);
// mark the current position
pbBuf.mark();
writeToCOS(result, header, totalSize, pbBuf);
// reset the buffer back to old position
pbBuf.reset();
return pbBuf;
} else {
return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
}
}
private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
throws IOException {
ByteBufferUtils.putInt(pbBuf, totalSize);
// create COS that works on BB
CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf);
if (header != null) {
cos.writeMessageNoTag(header);
}
if (result != null) {
cos.writeMessageNoTag(result);
}
cos.flush();
cos.checkNoSpaceLeft();
}
private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
int totalSize, int totalPBSize) throws IOException {
ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
writeToCOS(result, header, totalSize, pbBuf);
pbBuf.flip();
return pbBuf;
}
protected BufferChain wrapWithSasl(BufferChain bc)
throws IOException {
if (!this.connection.useSasl) return bc;
// Looks like no way around this; saslserver wants a byte array. I have to make it one.
// THIS IS A BIG UGLY COPY.
byte [] responseBytes = bc.getBytes();
byte [] token;
// synchronization may be needed since there can be multiple Handler
// threads using saslServer or Crypto AES to wrap responses.
if (connection.useCryptoAesWrap) {
// wrap with Crypto AES
synchronized (connection.cryptoAES) {
token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length);
}
} else {
synchronized (connection.saslServer) {
token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
}
}
if (RpcServer.LOG.isTraceEnabled()) {
RpcServer.LOG.trace("Adding saslServer wrapped token of size " + token.length
+ " as call response.");
}
ByteBuffer[] responseBufs = new ByteBuffer[2];
responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length));
responseBufs[1] = ByteBuffer.wrap(token);
return new BufferChain(responseBufs);
}
@Override
public long disconnectSince() {
if (!this.connection.isConnectionOpen()) {
return System.currentTimeMillis() - receiveTime;
} else {
return -1L;
}
}
@Override
public boolean isClientCellBlockSupported() {
return this.connection != null && this.connection.codec != null;
}
@Override
public long getResponseCellSize() {
return responseCellSize;
}
@Override
public void incrementResponseCellSize(long cellSize) {
responseCellSize += cellSize;
}
@Override
public long getResponseBlockSize() {
return responseBlockSize;
}
@Override
public void incrementResponseBlockSize(long blockSize) {
responseBlockSize += blockSize;
}
@Override
public long getResponseExceptionSize() {
return exceptionSize;
}
@Override
public void incrementResponseExceptionSize(long exSize) {
exceptionSize += exSize;
}
@Override
public long getSize() {
return this.size;
}
@Override
public long getDeadline() {
return deadline;
}
@Override
public User getRequestUser() {
return user;
}
@Override
public String getRequestUserName() {
User user = getRequestUser();
return user == null? null: user.getShortName();
}
@Override
public InetAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public VersionInfo getClientVersionInfo() {
return connection.getVersionInfo();
}
@Override
public synchronized void setCallBack(RpcCallback callback) {
this.rpcCallback = callback;
}
@Override
public boolean isRetryImmediatelySupported() {
return retryImmediatelySupported;
}
@Override
public BlockingService getService() {
return service;
}
@Override
public MethodDescriptor getMethod() {
return md;
}
@Override
public Message getParam() {
return param;
}
@Override
public CellScanner getCellScanner() {
return cellScanner;
}
@Override
public long getReceiveTime() {
return receiveTime;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public void setStartTime(long t) {
this.startTime = t;
}
@Override
public int getTimeout() {
return timeout;
}
@Override
public int getRemotePort() {
return connection.getRemotePort();
}
@Override
public TraceInfo getTraceInfo() {
return tinfo;
}
}

View File

@ -127,61 +127,6 @@ public class SimpleRpcServer extends RpcServer {
private Listener listener = null;
protected Responder responder = null;
/**
* Datastructure that holds all necessary to a method invocation and then afterward, carries
* the result.
*/
@InterfaceStability.Evolving
public class Call extends RpcServer.Call {
protected Responder responder;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
justification="Can't figure why this complaint is happening... see below")
Call(int id, final BlockingService service, final MethodDescriptor md,
RequestHeader header, Message param, CellScanner cellScanner,
RpcServer.Connection connection, long size, TraceInfo tinfo,
final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup,
Responder responder) {
super(id, service, md, header, param, cellScanner, connection, size,
tinfo, remoteAddress, timeout, reqCleanup);
this.responder = responder;
}
/**
* Call is done. Execution happened and we returned results to client. It is now safe to
* cleanup.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
justification="Presume the lock on processing request held by caller is protection enough")
@Override
void done() {
super.done();
this.getConnection().decRpcCount(); // Say that we're done with this call.
}
@Override
public long disconnectSince() {
if (!getConnection().isConnectionOpen()) {
return System.currentTimeMillis() - timestamp;
} else {
return -1L;
}
}
@Override
public synchronized void sendResponseIfReady() throws IOException {
// set param null to reduce memory pressure
this.param = null;
this.responder.doRespond(this);
}
Connection getConnection() {
return (Connection) this.connection;
}
}
/** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {
@ -589,8 +534,8 @@ public class SimpleRpcServer extends RpcServer {
if (connection == null) {
throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
}
Call call = connection.responseQueue.peekFirst();
if (call != null && now > call.timestamp + purgeTimeout) {
SimpleServerCall call = connection.responseQueue.peekFirst();
if (call != null && now > call.lastSentTime + purgeTimeout) {
conWithOldCalls.add(call.getConnection());
}
}
@ -637,7 +582,7 @@ public class SimpleRpcServer extends RpcServer {
* @return true if we proceed the call fully, false otherwise.
* @throws IOException
*/
private boolean processResponse(final Call call) throws IOException {
private boolean processResponse(final SimpleServerCall call) throws IOException {
boolean error = true;
try {
// Send as much data as we can in the non-blocking fashion
@ -680,7 +625,7 @@ public class SimpleRpcServer extends RpcServer {
try {
for (int i = 0; i < 20; i++) {
// protection if some handlers manage to need all the responder
Call call = connection.responseQueue.pollFirst();
SimpleServerCall call = connection.responseQueue.pollFirst();
if (call == null) {
return true;
}
@ -699,7 +644,7 @@ public class SimpleRpcServer extends RpcServer {
//
// Enqueue a response from the application.
//
void doRespond(Call call) throws IOException {
void doRespond(SimpleServerCall call) throws IOException {
boolean added = false;
// If there is already a write in progress, we don't wait. This allows to free the handlers
@ -728,7 +673,7 @@ public class SimpleRpcServer extends RpcServer {
call.responder.registerForWrite(call.getConnection());
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
call.lastSentTime = System.currentTimeMillis();
}
}
@ -741,7 +686,7 @@ public class SimpleRpcServer extends RpcServer {
protected SocketChannel channel;
private ByteBuff data;
private ByteBuffer dataLengthBuffer;
protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<>();
protected final ConcurrentLinkedDeque<SimpleServerCall> responseQueue = new ConcurrentLinkedDeque<>();
private final Lock responseWriteLock = new ReentrantLock();
private LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
private long lastContact;
@ -769,13 +714,14 @@ public class SimpleRpcServer extends RpcServer {
socketSendBufferSize);
}
}
this.saslCall = new Call(SASL_CALLID, null, null, null, null, null, this,
0, null, null, 0, null, responder);
this.setConnectionHeaderResponseCall = new Call(
CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null,
this, 0, null, null, 0, null, responder);
this.authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null,
null, null, null, this, 0, null, null, 0, null, responder);
this.saslCall = new SimpleServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder);
this.setConnectionHeaderResponseCall = new SimpleServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0,
reservoir, cellBlockBuilder, null, responder);
this.authFailedCall = new SimpleServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null,
null, null, this, 0, null, null, System.currentTimeMillis(), 0, reservoir,
cellBlockBuilder, null, responder);
}
public void setLastContact(long lastContact) {
@ -941,8 +887,9 @@ public class SimpleRpcServer extends RpcServer {
RequestHeader header = (RequestHeader) builder.build();
// Notify the client about the offending request
Call reqTooBig = new Call(header.getCallId(), this.service, null,
null, null, null, this, 0, null, this.addr, 0, null, responder);
SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service,
null, null, null, null, this, 0, null, this.addr, System.currentTimeMillis(), 0,
reservoir, cellBlockBuilder, null, responder);
metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
// Make sure the client recognizes the underlying exception
// Otherwise, throw a DoNotRetryIOException.
@ -1043,8 +990,8 @@ public class SimpleRpcServer extends RpcServer {
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg);
Call fakeCall = new Call(-1, null, null, null, null, null, this, -1,
null, null, 0, null, responder);
SimpleServerCall fakeCall = new SimpleServerCall(-1, null, null, null, null, null, this, -1,
null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder);
setupResponse(null, fakeCall, e, msg);
responder.doRespond(fakeCall);
// Returning -1 closes out the connection.
@ -1081,13 +1028,13 @@ public class SimpleRpcServer extends RpcServer {
}
@Override
public RpcServer.Call createCall(int id, final BlockingService service,
final MethodDescriptor md, RequestHeader header, Message param,
CellScanner cellScanner, RpcServer.Connection connection, long size,
TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
CallCleanup reqCleanup) {
return new Call(id, service, md, header, param, cellScanner, connection,
size, tinfo, remoteAddress, timeout, reqCleanup, responder);
public ServerCall createCall(int id, final BlockingService service, final MethodDescriptor md,
RequestHeader header, Message param, CellScanner cellScanner,
RpcServer.Connection connection, long size, TraceInfo tinfo,
final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
return new SimpleServerCall(id, service, md, header, param, cellScanner, connection, size,
tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder,
reqCleanup, responder);
}
}
@ -1206,17 +1153,16 @@ public class SimpleRpcServer extends RpcServer {
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {
return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0);
return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),
0);
}
@Override
public Pair<Message, CellScanner> call(BlockingService service,
MethodDescriptor md, Message param, CellScanner cellScanner,
long receiveTime, MonitoredRPCHandler status, long startTime, int timeout)
throws IOException {
Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null,
-1, null, null, timeout, null, null);
fakeCall.setReceiveTime(receiveTime);
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
long startTime, int timeout) throws IOException {
SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner,
null, -1, null, null, receiveTime, timeout, reservoir, cellBlockBuilder, null, null);
return call(fakeCall, status);
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetAddress;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Connection;
import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Responder;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.htrace.TraceInfo;
/**
* Datastructure that holds all necessary to a method invocation and then afterward, carries the
* result.
*/
@InterfaceAudience.Private
class SimpleServerCall extends ServerCall {
long lastSentTime;
final Responder responder;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
justification = "Can't figure why this complaint is happening... see below")
SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md,
RequestHeader header, Message param, CellScanner cellScanner, RpcServer.Connection connection,
long size, TraceInfo tinfo, final InetAddress remoteAddress, long receiveTime, int timeout,
ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup,
Responder responder) {
super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress,
receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
this.responder = responder;
}
/**
* Call is done. Execution happened and we returned results to client. It is now safe to cleanup.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
justification = "Presume the lock on processing request held by caller is protection enough")
@Override
void done() {
super.done();
this.getConnection().decRpcCount(); // Say that we're done with this call.
}
@Override
public synchronized void sendResponseIfReady() throws IOException {
// set param null to reduce memory pressure
this.param = null;
this.responder.doRespond(this);
}
Connection getConnection() {
return (Connection) this.connection;
}
}

View File

@ -33,8 +33,7 @@ public class TestCallRunner {
public void testSimpleCall() {
RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class);
mockCall.connection = Mockito.mock(RpcServer.Connection.class);
ServerCall mockCall = Mockito.mock(ServerCall.class);
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.run();

View File

@ -30,6 +30,11 @@ import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@ -44,18 +49,16 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -63,18 +66,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
@Category({RPCTests.class, SmallTests.class})
public class TestSimpleRpcScheduler {/*
@Rule
@ -167,7 +163,7 @@ public class TestSimpleRpcScheduler {/*
}
private CallRunner createMockTask() {
Call call = mock(Call.class);
ServerCall call = mock(ServerCall.class);
CallRunner task = mock(CallRunner.class);
when(task.getRpcCall()).thenReturn(call);
return task;
@ -195,19 +191,19 @@ public class TestSimpleRpcScheduler {/*
scheduler.start();
CallRunner smallCallTask = mock(CallRunner.class);
RpcServer.Call smallCall = mock(RpcServer.Call.class);
ServerCall smallCall = mock(ServerCall.class);
RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
when(smallCallTask.getRpcCall()).thenReturn(smallCall);
when(smallCall.getHeader()).thenReturn(smallHead);
CallRunner largeCallTask = mock(CallRunner.class);
RpcServer.Call largeCall = mock(RpcServer.Call.class);
ServerCall largeCall = mock(ServerCall.class);
RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
when(largeCallTask.getRpcCall()).thenReturn(largeCall);
when(largeCall.getHeader()).thenReturn(largeHead);
CallRunner hugeCallTask = mock(CallRunner.class);
RpcServer.Call hugeCall = mock(RpcServer.Call.class);
ServerCall hugeCall = mock(ServerCall.class);
RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
when(hugeCall.getHeader()).thenReturn(hugeHead);
@ -290,7 +286,7 @@ public class TestSimpleRpcScheduler {/*
scheduler.start();
CallRunner putCallTask = mock(CallRunner.class);
RpcServer.Call putCall = mock(RpcServer.Call.class);
ServerCall putCall = mock(ServerCall.class);
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
@ -299,13 +295,13 @@ public class TestSimpleRpcScheduler {/*
when(putCall.getParam()).thenReturn(putCall.param);
CallRunner getCallTask = mock(CallRunner.class);
RpcServer.Call getCall = mock(RpcServer.Call.class);
ServerCall getCall = mock(ServerCall.class);
RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
when(getCallTask.getRpcCall()).thenReturn(getCall);
when(getCall.getHeader()).thenReturn(getHead);
CallRunner scanCallTask = mock(CallRunner.class);
RpcServer.Call scanCall = mock(RpcServer.Call.class);
ServerCall scanCall = mock(ServerCall.class);
scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
when(scanCallTask.getRpcCall()).thenReturn(scanCall);
@ -382,7 +378,7 @@ public class TestSimpleRpcScheduler {/*
scheduler.start();
CallRunner putCallTask = mock(CallRunner.class);
RpcServer.Call putCall = mock(RpcServer.Call.class);
ServerCall putCall = mock(ServerCall.class);
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
@ -506,19 +502,15 @@ public class TestSimpleRpcScheduler {/*
// Get mocked call that has the CallRunner sleep for a while so that the fast
// path isn't hit.
private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
final RpcServer.Call putCall = mock(RpcServer.Call.class);
ServerCall putCall = new ServerCall(1, null, null,
RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(),
RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))),
null, null, 9, null, null, timestamp, 0, null, null, null) {
putCall.timestamp = timestamp;
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder()
.setMethodName("mutate")
.build();
when(putCall.getSize()).thenReturn(9L);
when(putCall.getHeader()).thenReturn(putHead);
when(putCall.getReceiveTime()).thenReturn(putCall.timestamp);
when(putCall.getParam()).thenReturn(putCall.param);
@Override
public void sendResponseIfReady() throws IOException {
}
};
CallRunner cr = new CallRunner(null, putCall) {
public void run() {
@ -530,11 +522,13 @@ public class TestSimpleRpcScheduler {/*
} catch (InterruptedException e) {
}
}
public RpcCall getRpcCall() {
return putCall;
}
public void drop() {}
public void drop() {
}
};
return cr;

View File

@ -28,6 +28,8 @@ import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -57,8 +59,8 @@ import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -75,10 +77,8 @@ import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
@RunWith(Parameterized.class)
@Category({ SecurityTests.class, SmallTests.class })
@Category({ SecurityTests.class, MediumTests.class })
public class TestSecureIPC {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();