HBASE-17221 Abstract out an interface for RpcServer.Call

This commit is contained in:
Jerry He 2016-12-05 10:21:55 -08:00
parent 1c8822ddff
commit 39653862a4
11 changed files with 323 additions and 86 deletions

View File

@ -161,7 +161,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
*/
private boolean needToDrop(CallRunner callRunner) {
long now = EnvironmentEdgeManager.currentTime();
long callDelay = now - callRunner.getCall().timestamp;
long callDelay = now - callRunner.getRpcCall().getReceiveTime();
long localMinDelay = this.minDelay;

View File

@ -25,16 +25,14 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
/**
* The request processing logic, which is usually executed in thread pools provided by an
* {@link RpcScheduler}. Call {@link #run()} to actually execute the contained
@ -47,7 +45,7 @@ public class CallRunner {
private static final CallDroppedException CALL_DROPPED_EXCEPTION
= new CallDroppedException();
private Call call;
private RpcCall call;
private RpcServerInterface rpcServer;
private MonitoredRPCHandler status;
private volatile boolean sucessful;
@ -58,7 +56,7 @@ public class CallRunner {
* time we occupy heap.
*/
// The constructor is shutdown so only RpcServer in this class can make one of these.
CallRunner(final RpcServerInterface rpcServer, final Call call) {
CallRunner(final RpcServerInterface rpcServer, final RpcCall call) {
this.call = call;
this.rpcServer = rpcServer;
// Add size of the call to queue size.
@ -67,10 +65,19 @@ public class CallRunner {
}
}
public Call getCall() {
public RpcCall getRpcCall() {
return call;
}
/**
* Keep for backward compatibility.
* @deprecated As of release 2.0, this will be removed in HBase 3.0
*/
@Deprecated
public RpcServer.Call getCall() {
return (RpcServer.Call) call;
}
public void setStatus(MonitoredRPCHandler status) {
this.status = status;
}
@ -85,23 +92,23 @@ public class CallRunner {
public void run() {
try {
if (!call.connection.channel.isOpen()) {
if (call.disconnectSince() >= 0) {
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
}
return;
}
call.startTime = System.currentTimeMillis();
if (call.startTime > call.deadline) {
call.setStartTime(System.currentTimeMillis());
if (call.getStartTime() > call.getDeadline()) {
RpcServer.LOG.warn("Dropping timed out call: " + call);
return;
}
this.status.setStatus("Setting up call");
this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort());
if (RpcServer.LOG.isTraceEnabled()) {
UserGroupInformation remoteUser = call.connection.ugi;
User remoteUser = call.getRequestUser();
RpcServer.LOG.trace(call.toShortString() + " executing as " +
((remoteUser == null) ? "NULL principal" : remoteUser.getUserName()));
((remoteUser == null) ? "NULL principal" : remoteUser.getName()));
}
Throwable errorThrowable = null;
String error = null;
@ -114,12 +121,15 @@ public class CallRunner {
throw new ServerNotRunningYetException("Server " +
(address != null ? address : "(channel closed)") + " is not running yet");
}
if (call.tinfo != null) {
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
if (call.getTraceInfo() != null) {
String serviceName =
call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
String methodName = (call.getMethod() != null) ? call.getMethod().getName() : "";
String traceString = serviceName + "." + methodName;
traceScope = Trace.startSpan(traceString, call.getTraceInfo());
}
// make the call
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
call.timestamp, this.status, call.startTime, call.timeout);
resultPair = this.rpcServer.call(call, this.status);
} catch (TimeoutIOException e){
RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
return;
@ -181,7 +191,7 @@ public class CallRunner {
*/
public void drop() {
try {
if (!call.connection.channel.isOpen()) {
if (call.disconnectSince() >= 0) {
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
@ -129,11 +130,11 @@ public class RWQueueRpcExecutor extends RpcExecutor {
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall();
RpcCall call = callTask.getRpcCall();
int queueIndex;
if (isWriteRequest(call.getHeader(), call.param)) {
if (isWriteRequest(call.getHeader(), call.getParam())) {
queueIndex = writeBalancer.getNextQueue();
} else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
} else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam())) {
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
} else {
queueIndex = numWriteQueues + readBalancer.getNextQueue();

View File

@ -0,0 +1,146 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import java.io.IOException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.htrace.TraceInfo;
/**
* Interface of all necessary to carry out a RPC method invocation on the server.
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving
public interface RpcCall extends RpcCallContext {
/**
* @return The service of this call.
*/
BlockingService getService();
/**
* @return The service method.
*/
MethodDescriptor getMethod();
/**
* @return The call parameter message.
*/
Message getParam();
/**
* @return The CellScanner that can carry input and result payload.
*/
CellScanner getCellScanner();
/**
* @return The timestamp when the call is constructed.
*/
long getReceiveTime();
/**
* Set the timestamp when the call is constructed.
*/
void setReceiveTime(long receiveTime);
/**
* @return The time when the call starts to be executed.
*/
long getStartTime();
/**
* Set the time when the call starts to be executed.
*/
void setStartTime(long startTime);
/**
* @return The timeout of this call.
*/
int getTimeout();
/**
* @return The Priority of this call.
*/
int getPriority();
/**
* Return the deadline of this call. If we can not complete this call in time,
* we can throw a TimeoutIOException and RPCServer will drop it.
* @return The system timestamp of deadline.
*/
long getDeadline();
/**
* Used to calculate the request call queue size.
* If the total request call size exceeds a limit, the call will be rejected.
* @return The raw size of this call.
*/
long getSize();
/**
* @return The request header of this call.
*/
RequestHeader getHeader();
/**
* @return Port of remote address in this call
*/
int getRemotePort();
/**
* Set the response resulting from this RPC call.
* @param param The result message as response.
* @param cells The CellScanner that possibly carries the payload.
* @param errorThrowable The error Throwable resulting from the call.
* @param error Extra error message.
*/
void setResponse(Message param, CellScanner cells, Throwable errorThrowable, String error);
/**
* Send the response of this RPC call.
* Implementation provides the underlying facility (connection, etc) to send.
* @throws IOException
*/
void sendResponseIfReady() throws IOException;
/**
* Do the necessary cleanup after the call if needed.
*/
void cleanup();
/**
* @return A short string format of this call without possibly lengthy params
*/
String toShortString();
/**
* @return TraceInfo attached to this call.
*/
TraceInfo getTraceInfo();
}

View File

@ -23,6 +23,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.security.User;
/**
* Interface of all necessary to carry out a RPC service invocation on the server. This interface
* focus on the information needed or obtained during the actual execution of the service method.
*/
@InterfaceAudience.Private
public interface RpcCallContext {
/**
@ -56,7 +60,7 @@ public interface RpcCallContext {
String getRequestUserName();
/**
* @return Address of remote client if a request is ongoing, else null
* @return Address of remote client in this call
*/
InetAddress getRemoteAddress();
@ -92,12 +96,6 @@ public interface RpcCallContext {
void incrementResponseCellSize(long cellSize);
long getResponseBlockSize();
void incrementResponseBlockSize(long blockSize);
/**
* Return the deadline of this call. If we can not complete this call in time, we can throw a
* TimeoutIOException and RPCServer will drop it.
* @return The system timestamp of deadline.
*/
long getDeadline();
void incrementResponseBlockSize(long blockSize);
}

View File

@ -361,12 +361,12 @@ public abstract class RpcExecutor {
@Override
public int compare(CallRunner a, CallRunner b) {
RpcServer.Call callA = a.getCall();
RpcServer.Call callB = b.getCall();
long deadlineA = priority.getDeadline(callA.getHeader(), callA.param);
long deadlineB = priority.getDeadline(callB.getHeader(), callB.param);
deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay);
deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay);
RpcCall callA = a.getRpcCall();
RpcCall callB = b.getRpcCall();
long deadlineA = priority.getDeadline(callA.getHeader(), callA.getParam());
long deadlineB = priority.getDeadline(callB.getHeader(), callB.getParam());
deadlineA = callA.getReceiveTime() + Math.min(deadlineA, maxDelay);
deadlineB = callB.getReceiveTime() + Math.min(deadlineB, maxDelay);
return Long.compare(deadlineA, deadlineB);
}
}

View File

@ -214,7 +214,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
/** This is set to Call object before Handler invokes an RPC and ybdie
* after the call returns.
*/
protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
protected static final ThreadLocal<RpcCall> CurCall =
new ThreadLocal<RpcCall>();
/** Keeps MonitoredRPCHandler per handler thread. */
static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
@ -326,9 +327,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* Datastructure that holds all necessary to a method invocation and then afterward, carries
* the result.
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving
public class Call implements RpcCallContext {
@InterfaceAudience.Private
public class Call implements RpcCall {
protected int id; // the client's call id
protected BlockingService service;
protected MethodDescriptor md;
@ -408,7 +408,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.connection.decRpcCount(); // Say that we're done with this call.
}
protected void cleanup() {
@Override
public void cleanup() {
if (this.reqCleanup != null) {
this.reqCleanup.run();
this.reqCleanup = null;
@ -422,7 +423,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
" connection: " + connection.toString();
}
protected RequestHeader getHeader() {
@Override
public RequestHeader getHeader() {
return this.header;
}
@ -430,6 +432,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return this.header.hasPriority();
}
@Override
public int getPriority() {
return this.header.getPriority();
}
@ -438,7 +441,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* Short string representation without param info because param itself could be huge depends on
* the payload of a command
*/
String toShortString() {
@Override
public String toShortString() {
String serviceName = this.connection.service != null ?
this.connection.service.getDescriptorForType().getName() : "null";
return "callId: " + this.id + " service: " + serviceName +
@ -448,13 +452,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
" deadline: " + deadline;
}
String toTraceString() {
String serviceName = this.connection.service != null ?
this.connection.service.getDescriptorForType().getName() : "";
String methodName = (this.md != null) ? this.md.getName() : "";
return serviceName + "." + methodName;
}
protected synchronized void setSaslTokenResponse(ByteBuffer response) {
ByteBuffer[] responseBufs = new ByteBuffer[1];
responseBufs[0] = response;
@ -467,15 +464,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.response = new BufferChain(responseBufs);
}
protected synchronized void setResponse(Object m, final CellScanner cells,
@Override
public synchronized void setResponse(Message m, final CellScanner cells,
Throwable t, String errorMsg) {
if (this.isError) return;
if (t != null) this.isError = true;
BufferChain bc = null;
try {
ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
// Presume it a pb Message. Could be null.
Message result = (Message)m;
// Call id.
headerBuilder.setCallId(this.id);
if (t != null) {
@ -511,7 +507,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
Message header = headerBuilder.build();
ByteBuffer headerBuf =
createHeaderAndMessageBytes(result, header, cellBlockSize, cellBlock);
createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
ByteBuffer[] responseBufs = null;
int cellBlockBufferSize = 0;
if (cellBlock != null) {
@ -681,10 +677,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
public long getSize() {
return this.size;
}
@Override
public long getResponseCellSize() {
return responseCellSize;
@ -705,21 +697,23 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
responseBlockSize += blockSize;
}
@Override
public long getSize() {
return this.size;
}
@Override
public long getDeadline() {
return deadline;
}
@Override
public synchronized void sendResponseIfReady() throws IOException {
// set param null to reduce memory pressure
this.param = null;
this.responder.doRespond(this);
}
public UserGroupInformation getRemoteUser() {
return connection.ugi;
}
@Override
public User getRequestUser() {
return user;
@ -750,6 +744,64 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
public boolean isRetryImmediatelySupported() {
return retryImmediatelySupported;
}
@Override
public BlockingService getService() {
return service;
}
@Override
public MethodDescriptor getMethod() {
return md;
}
@Override
public Message getParam() {
return param;
}
@Override
public CellScanner getCellScanner() {
return cellScanner;
}
@Override
public long getReceiveTime() {
return timestamp;
}
@Override
public void setReceiveTime(long t) {
this.timestamp = t;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public void setStartTime(long t) {
this.startTime = t;
}
@Override
public int getTimeout() {
return timeout;
}
@Override
public int getRemotePort() {
return connection.getRemotePort();
}
@Override
public TraceInfo getTraceInfo() {
return tinfo;
}
}
@FunctionalInterface
@ -2565,24 +2617,37 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0);
}
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Message param,
CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime,
int timeout)
throws IOException {
Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null, null, -1, null, null, timeout,
null);
fakeCall.setReceiveTime(receiveTime);
return call(fakeCall, status);
}
/**
* This is a server side method, which is invoked over RPC. On success
* the return response has protobuf response payload. On failure, the
* exception name and the stack trace are returned in the protobuf response.
*/
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
long startTime, int timeout)
public Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status)
throws IOException {
try {
status.setRPC(md.getName(), new Object[]{param}, receiveTime);
MethodDescriptor md = call.getMethod();
Message param = call.getParam();
status.setRPC(md.getName(), new Object[]{param},
call.getReceiveTime());
// TODO: Review after we add in encoded data blocks.
status.setRPCPacket(param);
status.resume("Servicing call");
//get an instance of the method arg type
HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner);
controller.setCallTimeout(timeout);
Message result = service.callBlockingMethod(md, controller, param);
HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
controller.setCallTimeout(call.getTimeout());
Message result = call.getService().callBlockingMethod(md, controller, param);
long receiveTime = call.getReceiveTime();
long startTime = call.getStartTime();
long endTime = System.currentTimeMillis();
int processingTime = (int) (endTime - startTime);
int qTime = (int) (startTime - receiveTime);
@ -2596,6 +2661,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
long requestSize = param.getSerializedSize();
long responseSize = result.getSerializedSize();
metrics.dequeuedCall(qTime);
metrics.processedCall(processingTime);
metrics.totalCall(totalTime);
@ -3014,9 +3080,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* @return InetAddress
*/
public static InetAddress getRemoteIp() {
Call call = CurCall.get();
if (call != null && call.connection != null && call.connection.socket != null) {
return call.connection.socket.getInetAddress();
RpcCall call = CurCall.get();
if (call != null) {
return call.getRemoteAddress();
}
return null;
}

View File

@ -55,11 +55,18 @@ public interface RpcServerInterface {
@Deprecated
Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException, ServiceException;
throws IOException;
/**
* @deprecated As of release 2.0, this will be removed in HBase 3.0
*/
@Deprecated
Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Message param,
CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime,
int timeout) throws IOException, ServiceException;
int timeout) throws IOException;
Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status)
throws IOException;
void setErrorHandler(HBaseRPCErrorHandler handler);
HBaseRPCErrorHandler getErrorHandler();

View File

@ -157,8 +157,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
@Override
public boolean dispatch(CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall();
int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser());
RpcCall call = callTask.getRpcCall();
int level = priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser());
if (priorityExecutor != null && level > highPriorityLevel) {
return priorityExecutor.dispatch(callTask);
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {

View File

@ -643,7 +643,7 @@ public class TestMetaTableAccessor {
@Override
public boolean dispatch(CallRunner task) throws IOException, InterruptedException {
int priority = task.getCall().getPriority();
int priority = task.getRpcCall().getPriority();
if (priority > HConstants.QOS_THRESHOLD) {
numPriorityCalls++;

View File

@ -99,6 +99,7 @@ public class TestSimpleRpcScheduler {/*
@Test
public void testBasic() throws IOException, InterruptedException {
PriorityFunction qosFunction = mock(PriorityFunction.class);
RpcScheduler scheduler = new SimpleRpcScheduler(
conf, 10, 0, 0, qosFunction, 0);
@ -113,6 +114,7 @@ public class TestSimpleRpcScheduler {/*
@Test
public void testHandlerIsolation() throws IOException, InterruptedException {
CallRunner generalTask = createMockTask();
CallRunner priorityTask = createMockTask();
CallRunner replicationTask = createMockTask();
@ -167,12 +169,13 @@ public class TestSimpleRpcScheduler {/*
private CallRunner createMockTask() {
Call call = mock(Call.class);
CallRunner task = mock(CallRunner.class);
when(task.getCall()).thenReturn(call);
when(task.getRpcCall()).thenReturn(call);
return task;
}
@Test
public void testRpcScheduler() throws Exception {
testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
}
@ -194,19 +197,19 @@ public class TestSimpleRpcScheduler {/*
CallRunner smallCallTask = mock(CallRunner.class);
RpcServer.Call smallCall = mock(RpcServer.Call.class);
RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
when(smallCallTask.getCall()).thenReturn(smallCall);
when(smallCallTask.getRpcCall()).thenReturn(smallCall);
when(smallCall.getHeader()).thenReturn(smallHead);
CallRunner largeCallTask = mock(CallRunner.class);
RpcServer.Call largeCall = mock(RpcServer.Call.class);
RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
when(largeCallTask.getCall()).thenReturn(largeCall);
when(largeCallTask.getRpcCall()).thenReturn(largeCall);
when(largeCall.getHeader()).thenReturn(largeHead);
CallRunner hugeCallTask = mock(CallRunner.class);
RpcServer.Call hugeCall = mock(RpcServer.Call.class);
RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
when(hugeCallTask.getCall()).thenReturn(hugeCall);
when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
when(hugeCall.getHeader()).thenReturn(hugeHead);
when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L);
@ -255,6 +258,7 @@ public class TestSimpleRpcScheduler {/*
@Test
public void testScanQueueWithZeroScanRatio() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
@ -290,21 +294,23 @@ public class TestSimpleRpcScheduler {/*
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
when(putCallTask.getCall()).thenReturn(putCall);
when(putCallTask.getRpcCall()).thenReturn(putCall);
when(putCall.getHeader()).thenReturn(putHead);
when(putCall.getParam()).thenReturn(putCall.param);
CallRunner getCallTask = mock(CallRunner.class);
RpcServer.Call getCall = mock(RpcServer.Call.class);
RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
when(getCallTask.getCall()).thenReturn(getCall);
when(getCallTask.getRpcCall()).thenReturn(getCall);
when(getCall.getHeader()).thenReturn(getHead);
CallRunner scanCallTask = mock(CallRunner.class);
RpcServer.Call scanCall = mock(RpcServer.Call.class);
scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
when(scanCallTask.getCall()).thenReturn(scanCall);
when(scanCallTask.getRpcCall()).thenReturn(scanCall);
when(scanCall.getHeader()).thenReturn(scanHead);
when(scanCall.getParam()).thenReturn(scanCall.param);
ArrayList<Integer> work = new ArrayList<Integer>();
doAnswerTaskExecution(putCallTask, work, 1, 1000);
@ -361,6 +367,7 @@ public class TestSimpleRpcScheduler {/*
@Test
public void testSoftAndHardQueueLimits() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
@ -379,7 +386,7 @@ public class TestSimpleRpcScheduler {/*
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
when(putCallTask.getCall()).thenReturn(putCall);
when(putCallTask.getRpcCall()).thenReturn(putCall);
when(putCall.getHeader()).thenReturn(putHead);
assertTrue(scheduler.dispatch(putCallTask));
@ -512,6 +519,8 @@ public class TestSimpleRpcScheduler {/*
.build();
when(putCall.getSize()).thenReturn(9L);
when(putCall.getHeader()).thenReturn(putHead);
when(putCall.getReceiveTime()).thenReturn(putCall.timestamp);
when(putCall.getParam()).thenReturn(putCall.param);
CallRunner cr = new CallRunner(null, putCall) {
public void run() {
@ -523,7 +532,7 @@ public class TestSimpleRpcScheduler {/*
} catch (InterruptedException e) {
}
}
public Call getCall() {
public RpcCall getRpcCall() {
return putCall;
}