From 39653862a4d24b5309e972ca38554a7f81bc94fd Mon Sep 17 00:00:00 2001 From: Jerry He Date: Mon, 5 Dec 2016 10:21:55 -0800 Subject: [PATCH] HBASE-17221 Abstract out an interface for RpcServer.Call --- .../hbase/ipc/AdaptiveLifoCoDelCallQueue.java | 2 +- .../apache/hadoop/hbase/ipc/CallRunner.java | 46 +++--- .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 7 +- .../org/apache/hadoop/hbase/ipc/RpcCall.java | 146 ++++++++++++++++++ .../hadoop/hbase/ipc/RpcCallContext.java | 14 +- .../apache/hadoop/hbase/ipc/RpcExecutor.java | 12 +- .../apache/hadoop/hbase/ipc/RpcServer.java | 138 ++++++++++++----- .../hadoop/hbase/ipc/RpcServerInterface.java | 11 +- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 4 +- .../hadoop/hbase/TestMetaTableAccessor.java | 2 +- .../hbase/ipc/TestSimpleRpcScheduler.java | 27 ++-- 11 files changed, 323 insertions(+), 86 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java index 42b500f570b..82b8f1be2b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java @@ -161,7 +161,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { */ private boolean needToDrop(CallRunner callRunner) { long now = EnvironmentEdgeManager.currentTime(); - long callDelay = now - callRunner.getCall().timestamp; + long callDelay = now - callRunner.getRpcCall().getReceiveTime(); long localMinDelay = this.minDelay; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index d570b17b5ce..5301a6706c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -25,16 +25,14 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; -import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; - /** * The request processing logic, which is usually executed in thread pools provided by an * {@link RpcScheduler}. Call {@link #run()} to actually execute the contained @@ -47,7 +45,7 @@ public class CallRunner { private static final CallDroppedException CALL_DROPPED_EXCEPTION = new CallDroppedException(); - private Call call; + private RpcCall call; private RpcServerInterface rpcServer; private MonitoredRPCHandler status; private volatile boolean sucessful; @@ -58,7 +56,7 @@ public class CallRunner { * time we occupy heap. */ // The constructor is shutdown so only RpcServer in this class can make one of these. - CallRunner(final RpcServerInterface rpcServer, final Call call) { + CallRunner(final RpcServerInterface rpcServer, final RpcCall call) { this.call = call; this.rpcServer = rpcServer; // Add size of the call to queue size. @@ -67,10 +65,19 @@ public class CallRunner { } } - public Call getCall() { + public RpcCall getRpcCall() { return call; } + /** + * Keep for backward compatibility. + * @deprecated As of release 2.0, this will be removed in HBase 3.0 + */ + @Deprecated + public RpcServer.Call getCall() { + return (RpcServer.Call) call; + } + public void setStatus(MonitoredRPCHandler status) { this.status = status; } @@ -85,23 +92,23 @@ public class CallRunner { public void run() { try { - if (!call.connection.channel.isOpen()) { + if (call.disconnectSince() >= 0) { if (RpcServer.LOG.isDebugEnabled()) { RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call); } return; } - call.startTime = System.currentTimeMillis(); - if (call.startTime > call.deadline) { + call.setStartTime(System.currentTimeMillis()); + if (call.getStartTime() > call.getDeadline()) { RpcServer.LOG.warn("Dropping timed out call: " + call); return; } this.status.setStatus("Setting up call"); - this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); + this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort()); if (RpcServer.LOG.isTraceEnabled()) { - UserGroupInformation remoteUser = call.connection.ugi; + User remoteUser = call.getRequestUser(); RpcServer.LOG.trace(call.toShortString() + " executing as " + - ((remoteUser == null) ? "NULL principal" : remoteUser.getUserName())); + ((remoteUser == null) ? "NULL principal" : remoteUser.getName())); } Throwable errorThrowable = null; String error = null; @@ -114,12 +121,15 @@ public class CallRunner { throw new ServerNotRunningYetException("Server " + (address != null ? address : "(channel closed)") + " is not running yet"); } - if (call.tinfo != null) { - traceScope = Trace.startSpan(call.toTraceString(), call.tinfo); + if (call.getTraceInfo() != null) { + String serviceName = + call.getService() != null ? call.getService().getDescriptorForType().getName() : ""; + String methodName = (call.getMethod() != null) ? call.getMethod().getName() : ""; + String traceString = serviceName + "." + methodName; + traceScope = Trace.startSpan(traceString, call.getTraceInfo()); } // make the call - resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, - call.timestamp, this.status, call.startTime, call.timeout); + resultPair = this.rpcServer.call(call, this.status); } catch (TimeoutIOException e){ RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call); return; @@ -181,7 +191,7 @@ public class CallRunner { */ public void drop() { try { - if (!call.connection.channel.isOpen()) { + if (call.disconnectSince() >= 0) { if (RpcServer.LOG.isDebugEnabled()) { RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 600590002ba..8637f7923cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; @@ -129,11 +130,11 @@ public class RWQueueRpcExecutor extends RpcExecutor { @Override public boolean dispatch(final CallRunner callTask) throws InterruptedException { - RpcServer.Call call = callTask.getCall(); + RpcCall call = callTask.getRpcCall(); int queueIndex; - if (isWriteRequest(call.getHeader(), call.param)) { + if (isWriteRequest(call.getHeader(), call.getParam())) { queueIndex = writeBalancer.getNextQueue(); - } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) { + } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam())) { queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(); } else { queueIndex = numWriteQueues + readBalancer.getNextQueue(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java new file mode 100644 index 00000000000..239ea9ed26b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import java.io.IOException; + +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.htrace.TraceInfo; + +/** + * Interface of all necessary to carry out a RPC method invocation on the server. + */ +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public interface RpcCall extends RpcCallContext { + + /** + * @return The service of this call. + */ + BlockingService getService(); + + /** + * @return The service method. + */ + MethodDescriptor getMethod(); + + /** + * @return The call parameter message. + */ + Message getParam(); + + /** + * @return The CellScanner that can carry input and result payload. + */ + CellScanner getCellScanner(); + + /** + * @return The timestamp when the call is constructed. + */ + long getReceiveTime(); + + /** + * Set the timestamp when the call is constructed. + */ + void setReceiveTime(long receiveTime); + + /** + * @return The time when the call starts to be executed. + */ + long getStartTime(); + + /** + * Set the time when the call starts to be executed. + */ + void setStartTime(long startTime); + + /** + * @return The timeout of this call. + */ + int getTimeout(); + + /** + * @return The Priority of this call. + */ + int getPriority(); + + /** + * Return the deadline of this call. If we can not complete this call in time, + * we can throw a TimeoutIOException and RPCServer will drop it. + * @return The system timestamp of deadline. + */ + long getDeadline(); + + /** + * Used to calculate the request call queue size. + * If the total request call size exceeds a limit, the call will be rejected. + * @return The raw size of this call. + */ + long getSize(); + + /** + * @return The request header of this call. + */ + RequestHeader getHeader(); + + /** + * @return Port of remote address in this call + */ + int getRemotePort(); + + /** + * Set the response resulting from this RPC call. + * @param param The result message as response. + * @param cells The CellScanner that possibly carries the payload. + * @param errorThrowable The error Throwable resulting from the call. + * @param error Extra error message. + */ + void setResponse(Message param, CellScanner cells, Throwable errorThrowable, String error); + + /** + * Send the response of this RPC call. + * Implementation provides the underlying facility (connection, etc) to send. + * @throws IOException + */ + void sendResponseIfReady() throws IOException; + + /** + * Do the necessary cleanup after the call if needed. + */ + void cleanup(); + + /** + * @return A short string format of this call without possibly lengthy params + */ + String toShortString(); + + /** + * @return TraceInfo attached to this call. + */ + TraceInfo getTraceInfo(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java index 9bc8ee79151..d2fd55774aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -23,6 +23,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; import org.apache.hadoop.hbase.security.User; +/** + * Interface of all necessary to carry out a RPC service invocation on the server. This interface + * focus on the information needed or obtained during the actual execution of the service method. + */ @InterfaceAudience.Private public interface RpcCallContext { /** @@ -56,7 +60,7 @@ public interface RpcCallContext { String getRequestUserName(); /** - * @return Address of remote client if a request is ongoing, else null + * @return Address of remote client in this call */ InetAddress getRemoteAddress(); @@ -92,12 +96,6 @@ public interface RpcCallContext { void incrementResponseCellSize(long cellSize); long getResponseBlockSize(); - void incrementResponseBlockSize(long blockSize); - /** - * Return the deadline of this call. If we can not complete this call in time, we can throw a - * TimeoutIOException and RPCServer will drop it. - * @return The system timestamp of deadline. - */ - long getDeadline(); + void incrementResponseBlockSize(long blockSize); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index e41f4c7c050..3cb6011c9aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -361,12 +361,12 @@ public abstract class RpcExecutor { @Override public int compare(CallRunner a, CallRunner b) { - RpcServer.Call callA = a.getCall(); - RpcServer.Call callB = b.getCall(); - long deadlineA = priority.getDeadline(callA.getHeader(), callA.param); - long deadlineB = priority.getDeadline(callB.getHeader(), callB.param); - deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay); - deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay); + RpcCall callA = a.getRpcCall(); + RpcCall callB = b.getRpcCall(); + long deadlineA = priority.getDeadline(callA.getHeader(), callA.getParam()); + long deadlineB = priority.getDeadline(callB.getHeader(), callB.getParam()); + deadlineA = callA.getReceiveTime() + Math.min(deadlineA, maxDelay); + deadlineB = callB.getReceiveTime() + Math.min(deadlineB, maxDelay); return Long.compare(deadlineA, deadlineB); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index ee44c6801be..8b6379b01d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -214,7 +214,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { /** This is set to Call object before Handler invokes an RPC and ybdie * after the call returns. */ - protected static final ThreadLocal CurCall = new ThreadLocal(); + protected static final ThreadLocal CurCall = + new ThreadLocal(); /** Keeps MonitoredRPCHandler per handler thread. */ static final ThreadLocal MONITORED_RPC @@ -326,9 +327,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * Datastructure that holds all necessary to a method invocation and then afterward, carries * the result. */ - @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) - @InterfaceStability.Evolving - public class Call implements RpcCallContext { + @InterfaceAudience.Private + public class Call implements RpcCall { protected int id; // the client's call id protected BlockingService service; protected MethodDescriptor md; @@ -408,7 +408,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.connection.decRpcCount(); // Say that we're done with this call. } - protected void cleanup() { + @Override + public void cleanup() { if (this.reqCleanup != null) { this.reqCleanup.run(); this.reqCleanup = null; @@ -422,7 +423,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { " connection: " + connection.toString(); } - protected RequestHeader getHeader() { + @Override + public RequestHeader getHeader() { return this.header; } @@ -430,6 +432,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return this.header.hasPriority(); } + @Override public int getPriority() { return this.header.getPriority(); } @@ -438,7 +441,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * Short string representation without param info because param itself could be huge depends on * the payload of a command */ - String toShortString() { + @Override + public String toShortString() { String serviceName = this.connection.service != null ? this.connection.service.getDescriptorForType().getName() : "null"; return "callId: " + this.id + " service: " + serviceName + @@ -448,13 +452,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { " deadline: " + deadline; } - String toTraceString() { - String serviceName = this.connection.service != null ? - this.connection.service.getDescriptorForType().getName() : ""; - String methodName = (this.md != null) ? this.md.getName() : ""; - return serviceName + "." + methodName; - } - protected synchronized void setSaslTokenResponse(ByteBuffer response) { ByteBuffer[] responseBufs = new ByteBuffer[1]; responseBufs[0] = response; @@ -467,15 +464,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.response = new BufferChain(responseBufs); } - protected synchronized void setResponse(Object m, final CellScanner cells, + @Override + public synchronized void setResponse(Message m, final CellScanner cells, Throwable t, String errorMsg) { if (this.isError) return; if (t != null) this.isError = true; BufferChain bc = null; try { ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); - // Presume it a pb Message. Could be null. - Message result = (Message)m; // Call id. headerBuilder.setCallId(this.id); if (t != null) { @@ -511,7 +507,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } Message header = headerBuilder.build(); ByteBuffer headerBuf = - createHeaderAndMessageBytes(result, header, cellBlockSize, cellBlock); + createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock); ByteBuffer[] responseBufs = null; int cellBlockBufferSize = 0; if (cellBlock != null) { @@ -681,10 +677,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } - public long getSize() { - return this.size; - } - @Override public long getResponseCellSize() { return responseCellSize; @@ -705,21 +697,23 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { responseBlockSize += blockSize; } + @Override + public long getSize() { + return this.size; + } + @Override public long getDeadline() { return deadline; } + @Override public synchronized void sendResponseIfReady() throws IOException { // set param null to reduce memory pressure this.param = null; this.responder.doRespond(this); } - public UserGroupInformation getRemoteUser() { - return connection.ugi; - } - @Override public User getRequestUser() { return user; @@ -750,6 +744,64 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { public boolean isRetryImmediatelySupported() { return retryImmediatelySupported; } + + @Override + public BlockingService getService() { + return service; + } + + @Override + public MethodDescriptor getMethod() { + return md; + } + + @Override + public Message getParam() { + return param; + } + + @Override + public CellScanner getCellScanner() { + return cellScanner; + } + + @Override + public long getReceiveTime() { + return timestamp; + } + + @Override + public void setReceiveTime(long t) { + this.timestamp = t; + + } + + @Override + public long getStartTime() { + return startTime; + } + + @Override + public void setStartTime(long t) { + this.startTime = t; + + } + + @Override + public int getTimeout() { + return timeout; + } + + @Override + public int getRemotePort() { + return connection.getRemotePort(); + } + + @Override + public TraceInfo getTraceInfo() { + return tinfo; + } + } @FunctionalInterface @@ -2565,24 +2617,37 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0); } + public Pair call(BlockingService service, MethodDescriptor md, Message param, + CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime, + int timeout) + throws IOException { + Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null, null, -1, null, null, timeout, + null); + fakeCall.setReceiveTime(receiveTime); + return call(fakeCall, status); + } + /** * This is a server side method, which is invoked over RPC. On success * the return response has protobuf response payload. On failure, the * exception name and the stack trace are returned in the protobuf response. */ - public Pair call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, - long startTime, int timeout) + public Pair call(RpcCall call, MonitoredRPCHandler status) throws IOException { try { - status.setRPC(md.getName(), new Object[]{param}, receiveTime); + MethodDescriptor md = call.getMethod(); + Message param = call.getParam(); + status.setRPC(md.getName(), new Object[]{param}, + call.getReceiveTime()); // TODO: Review after we add in encoded data blocks. status.setRPCPacket(param); status.resume("Servicing call"); //get an instance of the method arg type - HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner); - controller.setCallTimeout(timeout); - Message result = service.callBlockingMethod(md, controller, param); + HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner()); + controller.setCallTimeout(call.getTimeout()); + Message result = call.getService().callBlockingMethod(md, controller, param); + long receiveTime = call.getReceiveTime(); + long startTime = call.getStartTime(); long endTime = System.currentTimeMillis(); int processingTime = (int) (endTime - startTime); int qTime = (int) (startTime - receiveTime); @@ -2596,6 +2661,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } long requestSize = param.getSerializedSize(); long responseSize = result.getSerializedSize(); + metrics.dequeuedCall(qTime); metrics.processedCall(processingTime); metrics.totalCall(totalTime); @@ -3014,9 +3080,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * @return InetAddress */ public static InetAddress getRemoteIp() { - Call call = CurCall.get(); - if (call != null && call.connection != null && call.connection.socket != null) { - return call.connection.socket.getInetAddress(); + RpcCall call = CurCall.get(); + if (call != null) { + return call.getRemoteAddress(); } return null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index 5401e3f4d6b..a8479314db3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -55,11 +55,18 @@ public interface RpcServerInterface { @Deprecated Pair call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) - throws IOException, ServiceException; + throws IOException; + /** + * @deprecated As of release 2.0, this will be removed in HBase 3.0 + */ + @Deprecated Pair call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime, - int timeout) throws IOException, ServiceException; + int timeout) throws IOException; + + Pair call(RpcCall call, MonitoredRPCHandler status) + throws IOException; void setErrorHandler(HBaseRPCErrorHandler handler); HBaseRPCErrorHandler getErrorHandler(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 3aa486e313d..1f7e8ba8ccd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -157,8 +157,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs @Override public boolean dispatch(CallRunner callTask) throws InterruptedException { - RpcServer.Call call = callTask.getCall(); - int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser()); + RpcCall call = callTask.getRpcCall(); + int level = priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser()); if (priorityExecutor != null && level > highPriorityLevel) { return priorityExecutor.dispatch(callTask); } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java index d750fafe208..e1c19e2a0e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java @@ -643,7 +643,7 @@ public class TestMetaTableAccessor { @Override public boolean dispatch(CallRunner task) throws IOException, InterruptedException { - int priority = task.getCall().getPriority(); + int priority = task.getRpcCall().getPriority(); if (priority > HConstants.QOS_THRESHOLD) { numPriorityCalls++; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index c9c5684708a..3535d232437 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -99,6 +99,7 @@ public class TestSimpleRpcScheduler {/* @Test public void testBasic() throws IOException, InterruptedException { + PriorityFunction qosFunction = mock(PriorityFunction.class); RpcScheduler scheduler = new SimpleRpcScheduler( conf, 10, 0, 0, qosFunction, 0); @@ -113,6 +114,7 @@ public class TestSimpleRpcScheduler {/* @Test public void testHandlerIsolation() throws IOException, InterruptedException { + CallRunner generalTask = createMockTask(); CallRunner priorityTask = createMockTask(); CallRunner replicationTask = createMockTask(); @@ -167,12 +169,13 @@ public class TestSimpleRpcScheduler {/* private CallRunner createMockTask() { Call call = mock(Call.class); CallRunner task = mock(CallRunner.class); - when(task.getCall()).thenReturn(call); + when(task.getRpcCall()).thenReturn(call); return task; } @Test public void testRpcScheduler() throws Exception { + testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE); } @@ -194,19 +197,19 @@ public class TestSimpleRpcScheduler {/* CallRunner smallCallTask = mock(CallRunner.class); RpcServer.Call smallCall = mock(RpcServer.Call.class); RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); - when(smallCallTask.getCall()).thenReturn(smallCall); + when(smallCallTask.getRpcCall()).thenReturn(smallCall); when(smallCall.getHeader()).thenReturn(smallHead); CallRunner largeCallTask = mock(CallRunner.class); RpcServer.Call largeCall = mock(RpcServer.Call.class); RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); - when(largeCallTask.getCall()).thenReturn(largeCall); + when(largeCallTask.getRpcCall()).thenReturn(largeCall); when(largeCall.getHeader()).thenReturn(largeHead); CallRunner hugeCallTask = mock(CallRunner.class); RpcServer.Call hugeCall = mock(RpcServer.Call.class); RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build(); - when(hugeCallTask.getCall()).thenReturn(hugeCall); + when(hugeCallTask.getRpcCall()).thenReturn(hugeCall); when(hugeCall.getHeader()).thenReturn(hugeHead); when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L); @@ -255,6 +258,7 @@ public class TestSimpleRpcScheduler {/* @Test public void testScanQueueWithZeroScanRatio() throws Exception { + Configuration schedConf = HBaseConfiguration.create(); schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); @@ -290,21 +294,23 @@ public class TestSimpleRpcScheduler {/* putCall.param = RequestConverter.buildMutateRequest( Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); - when(putCallTask.getCall()).thenReturn(putCall); + when(putCallTask.getRpcCall()).thenReturn(putCall); when(putCall.getHeader()).thenReturn(putHead); + when(putCall.getParam()).thenReturn(putCall.param); CallRunner getCallTask = mock(CallRunner.class); RpcServer.Call getCall = mock(RpcServer.Call.class); RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); - when(getCallTask.getCall()).thenReturn(getCall); + when(getCallTask.getRpcCall()).thenReturn(getCall); when(getCall.getHeader()).thenReturn(getHead); CallRunner scanCallTask = mock(CallRunner.class); RpcServer.Call scanCall = mock(RpcServer.Call.class); scanCall.param = ScanRequest.newBuilder().setScannerId(1).build(); RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); - when(scanCallTask.getCall()).thenReturn(scanCall); + when(scanCallTask.getRpcCall()).thenReturn(scanCall); when(scanCall.getHeader()).thenReturn(scanHead); + when(scanCall.getParam()).thenReturn(scanCall.param); ArrayList work = new ArrayList(); doAnswerTaskExecution(putCallTask, work, 1, 1000); @@ -361,6 +367,7 @@ public class TestSimpleRpcScheduler {/* @Test public void testSoftAndHardQueueLimits() throws Exception { + Configuration schedConf = HBaseConfiguration.create(); schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0); @@ -379,7 +386,7 @@ public class TestSimpleRpcScheduler {/* putCall.param = RequestConverter.buildMutateRequest( Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); - when(putCallTask.getCall()).thenReturn(putCall); + when(putCallTask.getRpcCall()).thenReturn(putCall); when(putCall.getHeader()).thenReturn(putHead); assertTrue(scheduler.dispatch(putCallTask)); @@ -512,6 +519,8 @@ public class TestSimpleRpcScheduler {/* .build(); when(putCall.getSize()).thenReturn(9L); when(putCall.getHeader()).thenReturn(putHead); + when(putCall.getReceiveTime()).thenReturn(putCall.timestamp); + when(putCall.getParam()).thenReturn(putCall.param); CallRunner cr = new CallRunner(null, putCall) { public void run() { @@ -523,7 +532,7 @@ public class TestSimpleRpcScheduler {/* } catch (InterruptedException e) { } } - public Call getCall() { + public RpcCall getRpcCall() { return putCall; }