HBASE-9461 Some doc and cleanup in RPCServer

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1523386 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-09-15 02:23:46 +00:00
parent b7d127e5e7
commit ee0c193d53
11 changed files with 326 additions and 170 deletions

View File

@ -0,0 +1,148 @@
package org.apache.hadoop.hbase.ipc;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceScope;
import com.google.protobuf.Message;
/**
* The request processing logic, which is usually executed in thread pools provided by an
* {@link RpcScheduler}. Call {@link #run()} to actually execute the contained
* {@link RpcServer.Call}
*/
@InterfaceAudience.Private
public class CallRunner {
private final Call call;
private final RpcServerInterface rpcServer;
private final MonitoredRPCHandler status;
/**
* On construction, adds the size of this call to the running count of outstanding call sizes.
* Presumption is that we are put on a queue while we wait on an executor to run us. During this
* time we occupy heap.
* @param call The call to run.
* @param rpcServer
*/
// The constructor is shutdown so only RpcServer in this class can make one of these.
CallRunner(final RpcServerInterface rpcServer, final Call call) {
this.call = call;
this.rpcServer = rpcServer;
// Add size of the call to queue size.
this.rpcServer.addCallSize(call.getSize());
this.status = getStatus();
}
public Call getCall() {
return call;
}
public void run() {
try {
this.status.setStatus("Setting up call");
this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
if (RpcServer.LOG.isDebugEnabled()) {
UserGroupInformation remoteUser = call.connection.user;
RpcServer.LOG.debug(call.toShortString() + " executing as " +
((remoteUser == null) ? "NULL principal" : remoteUser.getUserName()));
}
Throwable errorThrowable = null;
String error = null;
Pair<Message, CellScanner> resultPair = null;
RpcServer.CurCall.set(call);
TraceScope traceScope = null;
try {
if (!this.rpcServer.isStarted()) {
throw new ServerNotRunningYetException("Server is not running yet");
}
if (call.tinfo != null) {
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
}
RequestContext.set(User.create(call.connection.user), RpcServer.getRemoteIp(),
call.connection.service);
// make the call
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
call.timestamp, this.status);
} catch (Throwable e) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;
error = StringUtils.stringifyException(e);
} finally {
if (traceScope != null) {
traceScope.close();
}
// Must always clear the request context to avoid leaking
// credentials between requests.
RequestContext.clear();
}
RpcServer.CurCall.set(null);
this.rpcServer.addCallSize(call.getSize() * -1);
// Set the response for undelayed calls and delayed calls with
// undelayed responses.
if (!call.isDelayed() || !call.isReturnValueDelayed()) {
Message param = resultPair != null ? resultPair.getFirst() : null;
CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
call.setResponse(param, cells, errorThrowable, error);
}
call.sendResponseIfReady();
this.status.markComplete("Sent response");
this.status.pause("Waiting for a call");
} catch (OutOfMemoryError e) {
if (this.rpcServer.getErrorHandler() != null) {
if (this.rpcServer.getErrorHandler().checkOOME(e)) {
RpcServer.LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError");
return;
}
} else {
// rethrow if no handler
throw e;
}
} catch (ClosedChannelException cce) {
RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
"this means that the server was processing a " +
"request but the client went away. The error message was: " +
cce.getMessage());
} catch (Exception e) {
RpcServer.LOG.warn(Thread.currentThread().getName()
+ ": caught: " + StringUtils.stringifyException(e));
}
}
MonitoredRPCHandler getStatus() {
// It is ugly the way we park status up in RpcServer. Let it be for now. TODO.
MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
if (status != null) {
return status;
}
status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
status.pause("Waiting for a call");
RpcServer.MONITORED_RPC.set(status);
return status;
}
}

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.ipc.CallRunner;
/** /**
* A very simple {@code }RpcScheduler} that serves incoming requests in order. * A very simple {@code }RpcScheduler} that serves incoming requests in order.
@ -65,8 +66,13 @@ public class FifoRpcScheduler implements RpcScheduler {
} }
@Override @Override
public void dispatch(RpcServer.CallRunner task) throws IOException, InterruptedException { public void dispatch(final CallRunner task) throws IOException, InterruptedException {
executor.submit(task); executor.submit(new Runnable() {
@Override
public void run() {
task.run();
}
});
} }
@Override @Override

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.cloudera.htrace.Trace; import org.cloudera.htrace.Trace;
@ -31,9 +32,9 @@ import java.net.InetAddress;
/** /**
* Represents client information (authenticated username, remote address, protocol) * Represents client information (authenticated username, remote address, protocol)
* for the currently executing request within a RPC server handler thread. If * for the currently executing request. If called outside the context of a RPC request, all values
* called outside the context of a RPC request, all values will be * will be <code>null</code>. The {@link CallRunner} class before it a call and then on
* <code>null</code>. * its way out, it will clear the thread local.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class RequestContext { public class RequestContext {
@ -146,7 +147,7 @@ public class RequestContext {
return this.service; return this.service;
} }
public boolean isInRequest() { boolean isInRequest() {
return inRequest; return inRequest;
} }
} }

View File

@ -57,7 +57,7 @@ public interface RpcScheduler {
* *
* @param task the request to be dispatched * @param task the request to be dispatched
*/ */
void dispatch(RpcServer.CallRunner task) throws IOException, InterruptedException; void dispatch(CallRunner task) throws IOException, InterruptedException;
/** Retrieves length of the general queue for metrics. */ /** Retrieves length of the general queue for metrics. */
int getGeneralQueueLength(); int getGeneralQueueLength();

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
class RpcSchedulerContext implements RpcScheduler.Context {
private final RpcServer rpcServer;
/**
* @param rpcServer
*/
RpcSchedulerContext(final RpcServer rpcServer) {
this.rpcServer = rpcServer;
}
@Override
public InetSocketAddress getListenerAddress() {
return this.rpcServer.getListenerAddress();
}
}

View File

@ -1,5 +1,4 @@
/** /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -74,7 +73,6 @@ import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
@ -111,9 +109,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.cliffc.high_scale_lib.Counter; import org.cliffc.high_scale_lib.Counter;
import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceInfo; import org.cloudera.htrace.TraceInfo;
import org.cloudera.htrace.TraceScope;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -128,7 +124,21 @@ import com.google.protobuf.TextFormat;
/** /**
* An RPC server that hosts protobuf described Services. * An RPC server that hosts protobuf described Services.
* <p>Once was copied from Hadoop to local to fix HBASE-900 but deviated long ago. *
* An RpcServer instance has a {@link Listener} that hosts the socket. Listener has fixed number
* of {@link Reader}s in an ExecutorPool, 10 by default. The Listener does an accept and then
* round robin a Reader is chosen to do the read. The reader is registered on Selector. Read does
* total read off the channel and the parse from which it makes a Call. The call is wrapped in a
* CallRunner and passed to the scheduler to be run. Reader goes back to see if more to be done
* and loops till done.
*
* <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
* has given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run
* taking from the queue. They run the CallRunner#run method on each item gotten from queue
* and keep taking while the server is up.
*
* CallRunner#run executes the call. When done, asks the included Call to put itself on new
* queue for {@link Responder} to pull from and return result to client.
* *
* @see RpcClient * @see RpcClient
*/ */
@ -171,16 +181,13 @@ public class RpcServer implements RpcServerInterface {
protected SecretManager<TokenIdentifier> secretManager; protected SecretManager<TokenIdentifier> secretManager;
protected ServiceAuthorizationManager authManager; protected ServiceAuthorizationManager authManager;
protected static final ThreadLocal<RpcServerInterface> SERVER = new ThreadLocal<RpcServerInterface>(); /** This is set to Call object before Handler invokes an RPC and ybdie
private volatile boolean started = false;
/** This is set to Call object before Handler invokes an RPC and reset
* after the call returns. * after the call returns.
*/ */
protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>(); protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
/** Keeps MonitoredRPCHandler per handler thread. */ /** Keeps MonitoredRPCHandler per handler thread. */
private static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
= new ThreadLocal<MonitoredRPCHandler>(); = new ThreadLocal<MonitoredRPCHandler>();
protected final InetSocketAddress isa; protected final InetSocketAddress isa;
@ -207,7 +214,22 @@ public class RpcServer implements RpcServerInterface {
protected final boolean tcpKeepAlive; // if T then use keepalives protected final boolean tcpKeepAlive; // if T then use keepalives
protected final long purgeTimeout; // in milliseconds protected final long purgeTimeout; // in milliseconds
protected volatile boolean running = true; // true while server runs /**
* This flag is used to indicate to sub threads when they should go down. When we call
* {@link #startThreads()}, all threads started will consult this flag on whether they should
* keep going. It is set to false when {@link #stop()} is called.
*/
volatile boolean running = true;
/**
* This flag is set to true after all threads are up and 'running' and the server is then opened
* for business by the calle to {@link #openServer()}.
*/
volatile boolean started = false;
/**
* This is a running count of the size of all outstanding calls by size.
*/
protected final Counter callQueueSize = new Counter(); protected final Counter callQueueSize = new Counter();
protected final List<Connection> connectionList = protected final List<Connection> connectionList =
@ -462,7 +484,7 @@ public class RpcServer implements RpcServerInterface {
/** /**
* If we have a response, and delay is not set, then respond * If we have a response, and delay is not set, then respond
* immediately. Otherwise, do not respond to client. This is * immediately. Otherwise, do not respond to client. This is
* called the by the RPC code in the context of the Handler thread. * called by the RPC code in the context of the Handler thread.
*/ */
public synchronized void sendResponseIfReady() throws IOException { public synchronized void sendResponseIfReady() throws IOException {
if (!this.delayResponse) { if (!this.delayResponse) {
@ -643,8 +665,6 @@ public class RpcServer implements RpcServerInterface {
@Override @Override
public void run() { public void run() {
LOG.info(getName() + ": starting"); LOG.info(getName() + ": starting");
SERVER.set(RpcServer.this);
while (running) { while (running) {
SelectionKey key = null; SelectionKey key = null;
try { try {
@ -815,7 +835,6 @@ public class RpcServer implements RpcServerInterface {
@Override @Override
public void run() { public void run() {
LOG.info(getName() + ": starting"); LOG.info(getName() + ": starting");
SERVER.set(RpcServer.this);
try { try {
doRunLoop(); doRunLoop();
} finally { } finally {
@ -1685,8 +1704,7 @@ public class RpcServer implements RpcServerInterface {
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
totalRequestSize, totalRequestSize,
traceInfo); traceInfo);
callQueueSize.add(totalRequestSize); scheduler.dispatch(new CallRunner(RpcServer.this, call));
scheduler.dispatch(new CallRunner(call));
} }
private boolean authorizeConnection() throws IOException { private boolean authorizeConnection() throws IOException {
@ -1755,107 +1773,6 @@ public class RpcServer implements RpcServerInterface {
} }
} }
/**
* The real request processing logic, which is usually executed in
* thread pools provided by an {@link RpcScheduler}.
*/
class CallRunner implements Runnable {
private final Call call;
public CallRunner(Call call) {
this.call = call;
}
public Call getCall() {
return call;
}
@Override
public void run() {
MonitoredRPCHandler status = getStatus();
SERVER.set(RpcServer.this);
try {
status.setStatus("Setting up call");
status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
if (LOG.isDebugEnabled()) {
UserGroupInformation remoteUser = call.connection.user;
LOG.debug(call.toShortString() + " executing as " +
((remoteUser == null) ? "NULL principal" : remoteUser.getUserName()));
}
Throwable errorThrowable = null;
String error = null;
Pair<Message, CellScanner> resultPair = null;
CurCall.set(call);
TraceScope traceScope = null;
try {
if (!started) {
throw new ServerNotRunningYetException("Server is not running yet");
}
if (call.tinfo != null) {
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
}
RequestContext.set(User.create(call.connection.user), getRemoteIp(),
call.connection.service);
// make the call
resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp,
status);
} catch (Throwable e) {
LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;
error = StringUtils.stringifyException(e);
} finally {
if (traceScope != null) {
traceScope.close();
}
// Must always clear the request context to avoid leaking
// credentials between requests.
RequestContext.clear();
}
CurCall.set(null);
callQueueSize.add(call.getSize() * -1);
// Set the response for undelayed calls and delayed calls with
// undelayed responses.
if (!call.isDelayed() || !call.isReturnValueDelayed()) {
Message param = resultPair != null ? resultPair.getFirst() : null;
CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
call.setResponse(param, cells, errorThrowable, error);
}
call.sendResponseIfReady();
status.markComplete("Sent response");
status.pause("Waiting for a call");
} catch (OutOfMemoryError e) {
if (errorHandler != null) {
if (errorHandler.checkOOME(e)) {
LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError");
return;
}
} else {
// rethrow if no handler
throw e;
}
} catch (ClosedChannelException cce) {
LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
"this means that the server was processing a " +
"request but the client went away. The error message was: " +
cce.getMessage());
} catch (Exception e) {
LOG.warn(Thread.currentThread().getName()
+ ": caught: " + StringUtils.stringifyException(e));
}
}
public MonitoredRPCHandler getStatus() {
MonitoredRPCHandler status = MONITORED_RPC.get();
if (status != null) {
return status;
}
status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
status.pause("Waiting for a call");
MONITORED_RPC.set(status);
return status;
}
}
/** /**
* Datastructure for passing a {@link BlockingService} and its associated class of * Datastructure for passing a {@link BlockingService} and its associated class of
* protobuf service interface. For example, a server that fielded what is defined * protobuf service interface. For example, a server that fielded what is defined
@ -1878,14 +1795,6 @@ public class RpcServer implements RpcServerInterface {
} }
} }
private class RpcSchedulerContextImpl implements RpcScheduler.Context {
@Override
public InetSocketAddress getListenerAddress() {
return RpcServer.this.getListenerAddress();
}
}
/** /**
* Constructs a server listening on the named port and address. * Constructs a server listening on the named port and address.
* @param serverInstance hosting instance of {@link Server}. We will do authentications if an * @param serverInstance hosting instance of {@link Server}. We will do authentications if an
@ -1938,7 +1847,7 @@ public class RpcServer implements RpcServerInterface {
HBaseSaslRpcServer.init(conf); HBaseSaslRpcServer.init(conf);
} }
this.scheduler = scheduler; this.scheduler = scheduler;
this.scheduler.init(new RpcSchedulerContextImpl()); this.scheduler.init(new RpcSchedulerContext(this));
} }
/** /**
@ -1995,7 +1904,12 @@ public class RpcServer implements RpcServerInterface {
*/ */
@Override @Override
public void openServer() { public void openServer() {
started = true; this.started = true;
}
@Override
public boolean isStarted() {
return this.started;
} }
/** /**
@ -2018,6 +1932,8 @@ public class RpcServer implements RpcServerInterface {
@Override @Override
public void refreshAuthManager(PolicyProvider pp) { public void refreshAuthManager(PolicyProvider pp) {
// Ignore warnings that this should be accessed in a static way instead of via an instance;
// it'll break if you go via static route.
this.authManager.refresh(this.conf, pp); this.authManager.refresh(this.conf, pp);
} }
@ -2199,6 +2115,11 @@ public class RpcServer implements RpcServerInterface {
this.errorHandler = handler; this.errorHandler = handler;
} }
@Override
public HBaseRPCErrorHandler getErrorHandler() {
return this.errorHandler;
}
/** /**
* Returns the metrics instance for reporting RPC call statistics * Returns the metrics instance for reporting RPC call statistics
*/ */
@ -2206,6 +2127,11 @@ public class RpcServer implements RpcServerInterface {
return metrics; return metrics;
} }
@Override
public void addCallSize(final long diff) {
this.callQueueSize.add(diff);
}
/** /**
* Authorize the incoming client connection. * Authorize the incoming client connection.
* *
@ -2376,7 +2302,7 @@ public class RpcServer implements RpcServerInterface {
*/ */
public static InetAddress getRemoteIp() { public static InetAddress getRemoteIp() {
Call call = CurCall.get(); Call call = CurCall.get();
if (call != null) { if (call != null && call.connection.socket != null) {
return call.connection.socket.getInetAddress(); return call.connection.socket.getInetAddress();
} }
return null; return null;
@ -2394,17 +2320,6 @@ public class RpcServer implements RpcServerInterface {
return null; return null;
} }
/**
* May be called under
* {@code #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations,
* and under protobuf methods of parameters and return values.
* Permits applications to access the server context.
* @return the server instance called under or null
*/
public static RpcServerInterface get() {
return SERVER.get();
}
/** /**
* A convenience method to bind to a given address and report * A convenience method to bind to a given address and report
* better exceptions if the address is not a valid host. * better exceptions if the address is not a valid host.

View File

@ -34,18 +34,24 @@ import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
/**
* RpcServer Interface.
* Start calls {@link #openServer()} and then {@link #startThreads()}. Prefer {@link #start()}
* and {@link #stop()}. Only use {@link #openServer()} and {@link #startThreads()} if in a
* situation where you could start getting requests though the server not up and fully
* initiaalized.
*/
@InterfaceAudience.Private @InterfaceAudience.Private
public interface RpcServerInterface { public interface RpcServerInterface {
// TODO: Needs cleanup. Why a 'start', and then a 'startThreads' and an 'openServer'?
void setSocketSendBufSize(int size);
void start(); void start();
void openServer();
void startThreads();
boolean isStarted();
void stop(); void stop();
void join() throws InterruptedException; void join() throws InterruptedException;
void setSocketSendBufSize(int size);
InetSocketAddress getListenerAddress(); InetSocketAddress getListenerAddress();
Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
@ -53,10 +59,7 @@ public interface RpcServerInterface {
throws IOException, ServiceException; throws IOException, ServiceException;
void setErrorHandler(HBaseRPCErrorHandler handler); void setErrorHandler(HBaseRPCErrorHandler handler);
HBaseRPCErrorHandler getErrorHandler();
void openServer();
void startThreads();
/** /**
* Returns the metrics instance for reporting RPC call statistics * Returns the metrics instance for reporting RPC call statistics
@ -64,7 +67,14 @@ public interface RpcServerInterface {
MetricsHBaseServer getMetrics(); MetricsHBaseServer getMetrics();
/** /**
* Refresh autentication manager policy. * Add/subtract from the current size of all outstanding calls. Called on setup of a call to add
* call total size and then again at end of a call to remove the call size.
* @param diff Change (plus or minus)
*/
void addCallSize(long diff);
/**
* Refresh authentication manager policy.
* @param pp * @param pp
*/ */
@VisibleForTesting @VisibleForTesting

View File

@ -42,9 +42,9 @@ public class SimpleRpcScheduler implements RpcScheduler {
private final int priorityHandlerCount; private final int priorityHandlerCount;
private final int replicationHandlerCount; private final int replicationHandlerCount;
private final PriorityFunction priority; private final PriorityFunction priority;
final BlockingQueue<RpcServer.CallRunner> callQueue; final BlockingQueue<CallRunner> callQueue;
final BlockingQueue<RpcServer.CallRunner> priorityCallQueue; final BlockingQueue<CallRunner> priorityCallQueue;
final BlockingQueue<RpcServer.CallRunner> replicationQueue; final BlockingQueue<CallRunner> replicationQueue;
private volatile boolean running = false; private volatile boolean running = false;
private final List<Thread> handlers = Lists.newArrayList(); private final List<Thread> handlers = Lists.newArrayList();
@ -73,12 +73,12 @@ public class SimpleRpcScheduler implements RpcScheduler {
this.replicationHandlerCount = replicationHandlerCount; this.replicationHandlerCount = replicationHandlerCount;
this.priority = priority; this.priority = priority;
this.highPriorityLevel = highPriorityLevel; this.highPriorityLevel = highPriorityLevel;
this.callQueue = new LinkedBlockingQueue<RpcServer.CallRunner>(maxQueueLength); this.callQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength);
this.priorityCallQueue = priorityHandlerCount > 0 this.priorityCallQueue = priorityHandlerCount > 0
? new LinkedBlockingQueue<RpcServer.CallRunner>(maxQueueLength) ? new LinkedBlockingQueue<CallRunner>(maxQueueLength)
: null; : null;
this.replicationQueue = replicationHandlerCount > 0 this.replicationQueue = replicationHandlerCount > 0
? new LinkedBlockingQueue<RpcServer.CallRunner>(maxQueueLength) ? new LinkedBlockingQueue<CallRunner>(maxQueueLength)
: null; : null;
} }
@ -101,7 +101,7 @@ public class SimpleRpcScheduler implements RpcScheduler {
private void startHandlers( private void startHandlers(
int handlerCount, int handlerCount,
final BlockingQueue<RpcServer.CallRunner> callQueue, final BlockingQueue<CallRunner> callQueue,
String threadNamePrefix) { String threadNamePrefix) {
for (int i = 0; i < handlerCount; i++) { for (int i = 0; i < handlerCount; i++) {
Thread t = new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
@ -126,7 +126,7 @@ public class SimpleRpcScheduler implements RpcScheduler {
} }
@Override @Override
public void dispatch(RpcServer.CallRunner callTask) throws InterruptedException { public void dispatch(CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall(); RpcServer.Call call = callTask.getCall();
int level = priority.getPriority(call.header, call.param); int level = priority.getPriority(call.header, call.param);
if (priorityCallQueue != null && level > highPriorityLevel) { if (priorityCallQueue != null && level > highPriorityLevel) {
@ -153,10 +153,10 @@ public class SimpleRpcScheduler implements RpcScheduler {
return replicationQueue == null ? 0 : replicationQueue.size(); return replicationQueue == null ? 0 : replicationQueue.size();
} }
private void consumerLoop(BlockingQueue<RpcServer.CallRunner> myQueue) { private void consumerLoop(BlockingQueue<CallRunner> myQueue) {
while (running) { while (running) {
try { try {
RpcServer.CallRunner task = myQueue.take(); CallRunner task = myQueue.take();
task.run(); task.run();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.interrupted(); Thread.interrupted();

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@Category(SmallTests.class)
public class TestCallRunner {
/**
* Does nothing but exercise a {@link CallRunner} outside of {@link RpcServer} context.
*/
@Test
public void testSimpleCall() {
RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class);
mockCall.connection = Mockito.mock(RpcServer.Connection.class);
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.run();
}
}

View File

@ -286,7 +286,7 @@ public class TestIPC {
client.call(md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), client.call(md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)),
md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0); md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0);
} }
verify(scheduler, times(10)).dispatch((RpcServer.CallRunner) anyObject()); verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
} finally { } finally {
rpcServer.stop(); rpcServer.stop();
verify(scheduler).stop(); verify(scheduler).stop();

View File

@ -27,9 +27,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.ipc.RpcServer.CallRunner;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;