From ee0c193d536e9587b8b700c03b67fcf0dd1bd7a2 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Sun, 15 Sep 2013 02:23:46 +0000 Subject: [PATCH] HBASE-9461 Some doc and cleanup in RPCServer git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1523386 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/hbase/ipc/CallRunner.java | 148 +++++++++++++ .../hadoop/hbase/ipc/FifoRpcScheduler.java | 10 +- .../hadoop/hbase/ipc/RequestContext.java | 9 +- .../apache/hadoop/hbase/ipc/RpcScheduler.java | 2 +- .../hadoop/hbase/ipc/RpcSchedulerContext.java | 39 ++++ .../apache/hadoop/hbase/ipc/RpcServer.java | 195 +++++------------- .../hadoop/hbase/ipc/RpcServerInterface.java | 30 ++- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 20 +- .../hadoop/hbase/ipc/TestCallRunner.java | 39 ++++ .../org/apache/hadoop/hbase/ipc/TestIPC.java | 2 +- .../hbase/ipc/TestSimpleRpcScheduler.java | 2 - 11 files changed, 326 insertions(+), 170 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcSchedulerContext.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java new file mode 100644 index 00000000000..e7f6fa427f8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -0,0 +1,148 @@ +package org.apache.hadoop.hbase.ipc; +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.nio.channels.ClosedChannelException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.ipc.RpcServer.Call; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; +import org.cloudera.htrace.Trace; +import org.cloudera.htrace.TraceScope; + +import com.google.protobuf.Message; + +/** + * The request processing logic, which is usually executed in thread pools provided by an + * {@link RpcScheduler}. Call {@link #run()} to actually execute the contained + * {@link RpcServer.Call} + */ +@InterfaceAudience.Private +public class CallRunner { + private final Call call; + private final RpcServerInterface rpcServer; + private final MonitoredRPCHandler status; + + /** + * On construction, adds the size of this call to the running count of outstanding call sizes. + * Presumption is that we are put on a queue while we wait on an executor to run us. During this + * time we occupy heap. + * @param call The call to run. + * @param rpcServer + */ + // The constructor is shutdown so only RpcServer in this class can make one of these. + CallRunner(final RpcServerInterface rpcServer, final Call call) { + this.call = call; + this.rpcServer = rpcServer; + // Add size of the call to queue size. + this.rpcServer.addCallSize(call.getSize()); + this.status = getStatus(); + } + + public Call getCall() { + return call; + } + + public void run() { + try { + this.status.setStatus("Setting up call"); + this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); + if (RpcServer.LOG.isDebugEnabled()) { + UserGroupInformation remoteUser = call.connection.user; + RpcServer.LOG.debug(call.toShortString() + " executing as " + + ((remoteUser == null) ? "NULL principal" : remoteUser.getUserName())); + } + Throwable errorThrowable = null; + String error = null; + Pair resultPair = null; + RpcServer.CurCall.set(call); + TraceScope traceScope = null; + try { + if (!this.rpcServer.isStarted()) { + throw new ServerNotRunningYetException("Server is not running yet"); + } + if (call.tinfo != null) { + traceScope = Trace.startSpan(call.toTraceString(), call.tinfo); + } + RequestContext.set(User.create(call.connection.user), RpcServer.getRemoteIp(), + call.connection.service); + // make the call + resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, + call.timestamp, this.status); + } catch (Throwable e) { + RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); + errorThrowable = e; + error = StringUtils.stringifyException(e); + } finally { + if (traceScope != null) { + traceScope.close(); + } + // Must always clear the request context to avoid leaking + // credentials between requests. + RequestContext.clear(); + } + RpcServer.CurCall.set(null); + this.rpcServer.addCallSize(call.getSize() * -1); + // Set the response for undelayed calls and delayed calls with + // undelayed responses. + if (!call.isDelayed() || !call.isReturnValueDelayed()) { + Message param = resultPair != null ? resultPair.getFirst() : null; + CellScanner cells = resultPair != null ? resultPair.getSecond() : null; + call.setResponse(param, cells, errorThrowable, error); + } + call.sendResponseIfReady(); + this.status.markComplete("Sent response"); + this.status.pause("Waiting for a call"); + } catch (OutOfMemoryError e) { + if (this.rpcServer.getErrorHandler() != null) { + if (this.rpcServer.getErrorHandler().checkOOME(e)) { + RpcServer.LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError"); + return; + } + } else { + // rethrow if no handler + throw e; + } + } catch (ClosedChannelException cce) { + RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + + "this means that the server was processing a " + + "request but the client went away. The error message was: " + + cce.getMessage()); + } catch (Exception e) { + RpcServer.LOG.warn(Thread.currentThread().getName() + + ": caught: " + StringUtils.stringifyException(e)); + } + } + + MonitoredRPCHandler getStatus() { + // It is ugly the way we park status up in RpcServer. Let it be for now. TODO. + MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get(); + if (status != null) { + return status; + } + status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); + status.pause("Waiting for a call"); + RpcServer.MONITORED_RPC.set(status); + return status; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index 629bb012d2b..76777db0551 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.ipc.CallRunner; /** * A very simple {@code }RpcScheduler} that serves incoming requests in order. @@ -65,8 +66,13 @@ public class FifoRpcScheduler implements RpcScheduler { } @Override - public void dispatch(RpcServer.CallRunner task) throws IOException, InterruptedException { - executor.submit(task); + public void dispatch(final CallRunner task) throws IOException, InterruptedException { + executor.submit(new Runnable() { + @Override + public void run() { + task.run(); + } + }); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java index 49f20842ed8..c14600d3218 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.security.User; import com.google.protobuf.BlockingService; + import org.apache.hadoop.hbase.util.Bytes; import org.cloudera.htrace.Trace; @@ -31,9 +32,9 @@ import java.net.InetAddress; /** * Represents client information (authenticated username, remote address, protocol) - * for the currently executing request within a RPC server handler thread. If - * called outside the context of a RPC request, all values will be - * null. + * for the currently executing request. If called outside the context of a RPC request, all values + * will be null. The {@link CallRunner} class before it a call and then on + * its way out, it will clear the thread local. */ @InterfaceAudience.Private public class RequestContext { @@ -146,7 +147,7 @@ public class RequestContext { return this.service; } - public boolean isInRequest() { + boolean isInRequest() { return inRequest; } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index 84590816d17..f114e503efe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -57,7 +57,7 @@ public interface RpcScheduler { * * @param task the request to be dispatched */ - void dispatch(RpcServer.CallRunner task) throws IOException, InterruptedException; + void dispatch(CallRunner task) throws IOException, InterruptedException; /** Retrieves length of the general queue for metrics. */ int getGeneralQueueLength(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcSchedulerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcSchedulerContext.java new file mode 100644 index 00000000000..c0799297073 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcSchedulerContext.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience; + +@InterfaceAudience.Private +class RpcSchedulerContext implements RpcScheduler.Context { + private final RpcServer rpcServer; + + /** + * @param rpcServer + */ + RpcSchedulerContext(final RpcServer rpcServer) { + this.rpcServer = rpcServer; + } + + @Override + public InetSocketAddress getListenerAddress() { + return this.rpcServer.getListenerAddress(); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 980ffd4caa1..2f281019f84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -74,7 +73,6 @@ import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; @@ -111,9 +109,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; import org.cliffc.high_scale_lib.Counter; -import org.cloudera.htrace.Trace; import org.cloudera.htrace.TraceInfo; -import org.cloudera.htrace.TraceScope; import org.codehaus.jackson.map.ObjectMapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -128,7 +124,21 @@ import com.google.protobuf.TextFormat; /** * An RPC server that hosts protobuf described Services. - *

Once was copied from Hadoop to local to fix HBASE-900 but deviated long ago. + * + * An RpcServer instance has a {@link Listener} that hosts the socket. Listener has fixed number + * of {@link Reader}s in an ExecutorPool, 10 by default. The Listener does an accept and then + * round robin a Reader is chosen to do the read. The reader is registered on Selector. Read does + * total read off the channel and the parse from which it makes a Call. The call is wrapped in a + * CallRunner and passed to the scheduler to be run. Reader goes back to see if more to be done + * and loops till done. + * + *

Scheduler can be variously implemented but default simple scheduler has handlers to which it + * has given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run + * taking from the queue. They run the CallRunner#run method on each item gotten from queue + * and keep taking while the server is up. + * + * CallRunner#run executes the call. When done, asks the included Call to put itself on new + * queue for {@link Responder} to pull from and return result to client. * * @see RpcClient */ @@ -171,16 +181,13 @@ public class RpcServer implements RpcServerInterface { protected SecretManager secretManager; protected ServiceAuthorizationManager authManager; - protected static final ThreadLocal SERVER = new ThreadLocal(); - private volatile boolean started = false; - - /** This is set to Call object before Handler invokes an RPC and reset + /** This is set to Call object before Handler invokes an RPC and ybdie * after the call returns. */ protected static final ThreadLocal CurCall = new ThreadLocal(); /** Keeps MonitoredRPCHandler per handler thread. */ - private static final ThreadLocal MONITORED_RPC + static final ThreadLocal MONITORED_RPC = new ThreadLocal(); protected final InetSocketAddress isa; @@ -207,7 +214,22 @@ public class RpcServer implements RpcServerInterface { protected final boolean tcpKeepAlive; // if T then use keepalives protected final long purgeTimeout; // in milliseconds - protected volatile boolean running = true; // true while server runs + /** + * This flag is used to indicate to sub threads when they should go down. When we call + * {@link #startThreads()}, all threads started will consult this flag on whether they should + * keep going. It is set to false when {@link #stop()} is called. + */ + volatile boolean running = true; + + /** + * This flag is set to true after all threads are up and 'running' and the server is then opened + * for business by the calle to {@link #openServer()}. + */ + volatile boolean started = false; + + /** + * This is a running count of the size of all outstanding calls by size. + */ protected final Counter callQueueSize = new Counter(); protected final List connectionList = @@ -462,7 +484,7 @@ public class RpcServer implements RpcServerInterface { /** * If we have a response, and delay is not set, then respond * immediately. Otherwise, do not respond to client. This is - * called the by the RPC code in the context of the Handler thread. + * called by the RPC code in the context of the Handler thread. */ public synchronized void sendResponseIfReady() throws IOException { if (!this.delayResponse) { @@ -643,8 +665,6 @@ public class RpcServer implements RpcServerInterface { @Override public void run() { LOG.info(getName() + ": starting"); - SERVER.set(RpcServer.this); - while (running) { SelectionKey key = null; try { @@ -815,7 +835,6 @@ public class RpcServer implements RpcServerInterface { @Override public void run() { LOG.info(getName() + ": starting"); - SERVER.set(RpcServer.this); try { doRunLoop(); } finally { @@ -1685,8 +1704,7 @@ public class RpcServer implements RpcServerInterface { Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, totalRequestSize, traceInfo); - callQueueSize.add(totalRequestSize); - scheduler.dispatch(new CallRunner(call)); + scheduler.dispatch(new CallRunner(RpcServer.this, call)); } private boolean authorizeConnection() throws IOException { @@ -1755,107 +1773,6 @@ public class RpcServer implements RpcServerInterface { } } - /** - * The real request processing logic, which is usually executed in - * thread pools provided by an {@link RpcScheduler}. - */ - class CallRunner implements Runnable { - private final Call call; - - public CallRunner(Call call) { - this.call = call; - } - - public Call getCall() { - return call; - } - - @Override - public void run() { - MonitoredRPCHandler status = getStatus(); - SERVER.set(RpcServer.this); - try { - status.setStatus("Setting up call"); - status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); - if (LOG.isDebugEnabled()) { - UserGroupInformation remoteUser = call.connection.user; - LOG.debug(call.toShortString() + " executing as " + - ((remoteUser == null) ? "NULL principal" : remoteUser.getUserName())); - } - Throwable errorThrowable = null; - String error = null; - Pair resultPair = null; - CurCall.set(call); - TraceScope traceScope = null; - try { - if (!started) { - throw new ServerNotRunningYetException("Server is not running yet"); - } - if (call.tinfo != null) { - traceScope = Trace.startSpan(call.toTraceString(), call.tinfo); - } - RequestContext.set(User.create(call.connection.user), getRemoteIp(), - call.connection.service); - // make the call - resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp, - status); - } catch (Throwable e) { - LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); - errorThrowable = e; - error = StringUtils.stringifyException(e); - } finally { - if (traceScope != null) { - traceScope.close(); - } - // Must always clear the request context to avoid leaking - // credentials between requests. - RequestContext.clear(); - } - CurCall.set(null); - callQueueSize.add(call.getSize() * -1); - // Set the response for undelayed calls and delayed calls with - // undelayed responses. - if (!call.isDelayed() || !call.isReturnValueDelayed()) { - Message param = resultPair != null ? resultPair.getFirst() : null; - CellScanner cells = resultPair != null ? resultPair.getSecond() : null; - call.setResponse(param, cells, errorThrowable, error); - } - call.sendResponseIfReady(); - status.markComplete("Sent response"); - status.pause("Waiting for a call"); - } catch (OutOfMemoryError e) { - if (errorHandler != null) { - if (errorHandler.checkOOME(e)) { - LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError"); - return; - } - } else { - // rethrow if no handler - throw e; - } - } catch (ClosedChannelException cce) { - LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + - "this means that the server was processing a " + - "request but the client went away. The error message was: " + - cce.getMessage()); - } catch (Exception e) { - LOG.warn(Thread.currentThread().getName() - + ": caught: " + StringUtils.stringifyException(e)); - } - } - - public MonitoredRPCHandler getStatus() { - MonitoredRPCHandler status = MONITORED_RPC.get(); - if (status != null) { - return status; - } - status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); - status.pause("Waiting for a call"); - MONITORED_RPC.set(status); - return status; - } - } - /** * Datastructure for passing a {@link BlockingService} and its associated class of * protobuf service interface. For example, a server that fielded what is defined @@ -1878,14 +1795,6 @@ public class RpcServer implements RpcServerInterface { } } - private class RpcSchedulerContextImpl implements RpcScheduler.Context { - - @Override - public InetSocketAddress getListenerAddress() { - return RpcServer.this.getListenerAddress(); - } - } - /** * Constructs a server listening on the named port and address. * @param serverInstance hosting instance of {@link Server}. We will do authentications if an @@ -1938,7 +1847,7 @@ public class RpcServer implements RpcServerInterface { HBaseSaslRpcServer.init(conf); } this.scheduler = scheduler; - this.scheduler.init(new RpcSchedulerContextImpl()); + this.scheduler.init(new RpcSchedulerContext(this)); } /** @@ -1995,7 +1904,12 @@ public class RpcServer implements RpcServerInterface { */ @Override public void openServer() { - started = true; + this.started = true; + } + + @Override + public boolean isStarted() { + return this.started; } /** @@ -2018,6 +1932,8 @@ public class RpcServer implements RpcServerInterface { @Override public void refreshAuthManager(PolicyProvider pp) { + // Ignore warnings that this should be accessed in a static way instead of via an instance; + // it'll break if you go via static route. this.authManager.refresh(this.conf, pp); } @@ -2199,6 +2115,11 @@ public class RpcServer implements RpcServerInterface { this.errorHandler = handler; } + @Override + public HBaseRPCErrorHandler getErrorHandler() { + return this.errorHandler; + } + /** * Returns the metrics instance for reporting RPC call statistics */ @@ -2206,6 +2127,11 @@ public class RpcServer implements RpcServerInterface { return metrics; } + @Override + public void addCallSize(final long diff) { + this.callQueueSize.add(diff); + } + /** * Authorize the incoming client connection. * @@ -2376,7 +2302,7 @@ public class RpcServer implements RpcServerInterface { */ public static InetAddress getRemoteIp() { Call call = CurCall.get(); - if (call != null) { + if (call != null && call.connection.socket != null) { return call.connection.socket.getInetAddress(); } return null; @@ -2394,17 +2320,6 @@ public class RpcServer implements RpcServerInterface { return null; } - /** - * May be called under - * {@code #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations, - * and under protobuf methods of parameters and return values. - * Permits applications to access the server context. - * @return the server instance called under or null - */ - public static RpcServerInterface get() { - return SERVER.get(); - } - /** * A convenience method to bind to a given address and report * better exceptions if the address is not a valid host. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index 42bb313b0fa..58106c6fcf0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -34,18 +34,24 @@ import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; +/** + * RpcServer Interface. + * Start calls {@link #openServer()} and then {@link #startThreads()}. Prefer {@link #start()} + * and {@link #stop()}. Only use {@link #openServer()} and {@link #startThreads()} if in a + * situation where you could start getting requests though the server not up and fully + * initiaalized. + */ @InterfaceAudience.Private public interface RpcServerInterface { - // TODO: Needs cleanup. Why a 'start', and then a 'startThreads' and an 'openServer'? - - void setSocketSendBufSize(int size); - void start(); + void openServer(); + void startThreads(); + boolean isStarted(); void stop(); - void join() throws InterruptedException; + void setSocketSendBufSize(int size); InetSocketAddress getListenerAddress(); Pair call(BlockingService service, MethodDescriptor md, @@ -53,10 +59,7 @@ public interface RpcServerInterface { throws IOException, ServiceException; void setErrorHandler(HBaseRPCErrorHandler handler); - - void openServer(); - - void startThreads(); + HBaseRPCErrorHandler getErrorHandler(); /** * Returns the metrics instance for reporting RPC call statistics @@ -64,7 +67,14 @@ public interface RpcServerInterface { MetricsHBaseServer getMetrics(); /** - * Refresh autentication manager policy. + * Add/subtract from the current size of all outstanding calls. Called on setup of a call to add + * call total size and then again at end of a call to remove the call size. + * @param diff Change (plus or minus) + */ + void addCallSize(long diff); + + /** + * Refresh authentication manager policy. * @param pp */ @VisibleForTesting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 23b98c71205..f367fa74036 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -42,9 +42,9 @@ public class SimpleRpcScheduler implements RpcScheduler { private final int priorityHandlerCount; private final int replicationHandlerCount; private final PriorityFunction priority; - final BlockingQueue callQueue; - final BlockingQueue priorityCallQueue; - final BlockingQueue replicationQueue; + final BlockingQueue callQueue; + final BlockingQueue priorityCallQueue; + final BlockingQueue replicationQueue; private volatile boolean running = false; private final List handlers = Lists.newArrayList(); @@ -73,12 +73,12 @@ public class SimpleRpcScheduler implements RpcScheduler { this.replicationHandlerCount = replicationHandlerCount; this.priority = priority; this.highPriorityLevel = highPriorityLevel; - this.callQueue = new LinkedBlockingQueue(maxQueueLength); + this.callQueue = new LinkedBlockingQueue(maxQueueLength); this.priorityCallQueue = priorityHandlerCount > 0 - ? new LinkedBlockingQueue(maxQueueLength) + ? new LinkedBlockingQueue(maxQueueLength) : null; this.replicationQueue = replicationHandlerCount > 0 - ? new LinkedBlockingQueue(maxQueueLength) + ? new LinkedBlockingQueue(maxQueueLength) : null; } @@ -101,7 +101,7 @@ public class SimpleRpcScheduler implements RpcScheduler { private void startHandlers( int handlerCount, - final BlockingQueue callQueue, + final BlockingQueue callQueue, String threadNamePrefix) { for (int i = 0; i < handlerCount; i++) { Thread t = new Thread(new Runnable() { @@ -126,7 +126,7 @@ public class SimpleRpcScheduler implements RpcScheduler { } @Override - public void dispatch(RpcServer.CallRunner callTask) throws InterruptedException { + public void dispatch(CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); int level = priority.getPriority(call.header, call.param); if (priorityCallQueue != null && level > highPriorityLevel) { @@ -153,10 +153,10 @@ public class SimpleRpcScheduler implements RpcScheduler { return replicationQueue == null ? 0 : replicationQueue.size(); } - private void consumerLoop(BlockingQueue myQueue) { + private void consumerLoop(BlockingQueue myQueue) { while (running) { try { - RpcServer.CallRunner task = myQueue.take(); + CallRunner task = myQueue.take(); task.run(); } catch (InterruptedException e) { Thread.interrupted(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java new file mode 100644 index 00000000000..70db64f38f9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category(SmallTests.class) +public class TestCallRunner { + /** + * Does nothing but exercise a {@link CallRunner} outside of {@link RpcServer} context. + */ + @Test + public void testSimpleCall() { + RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); + Mockito.when(mockRpcServer.isStarted()).thenReturn(true); + RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class); + mockCall.connection = Mockito.mock(RpcServer.Connection.class); + CallRunner cr = new CallRunner(mockRpcServer, mockCall); + cr.run(); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 0120f73d910..7818bd3afd9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -286,7 +286,7 @@ public class TestIPC { client.call(md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0); } - verify(scheduler, times(10)).dispatch((RpcServer.CallRunner) anyObject()); + verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); } finally { rpcServer.stop(); verify(scheduler).stop(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 862de1d09d5..a39835a8867 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -27,9 +27,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.ipc.RpcServer.Call; -import org.apache.hadoop.hbase.ipc.RpcServer.CallRunner; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category;