HBASE-8884 Pluggable RpcScheduler (Chao Shi)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1504584 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-07-18 18:26:04 +00:00
parent 5ac40d99a8
commit 25e14c8b48
12 changed files with 452 additions and 250 deletions

View File

@ -739,6 +739,20 @@ public final class HConstants {
"hbase.regionserver.disallow.writes.when.recovering";
public static final boolean DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG = false;
public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count";
public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 10;
public static final String REGION_SERVER_META_HANDLER_COUNT =
"hbase.regionserver.metahandler.count";
public static final int DEFAULT_REGION_SERVER_META_HANDLER_COUNT = 10;
public static final String REGION_SERVER_REPLICATION_HANDLER_COUNT =
"hbase.regionserver.replication.handler.count";
public static final int DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT = 3;
public static final String MASTER_HANDLER_COUNT = "hbase.master.handler.count";
public static final int DEFAULT_MASTER_HANLDER_COUNT = 25;
/** Conf key that specifies timeout value to wait for a region ready */
public static final String LOG_REPLAY_WAIT_REGION_TIMEOUT =
"hbase.master.log.replay.wait.region.timeout";

View File

@ -37,26 +37,29 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
@Override
public int getGeneralQueueLength() {
if (this.server == null || this.server.callQueue == null) {
if (this.server == null
|| this.server.getScheduler() == null) {
return 0;
}
return server.callQueue.size();
return server.getScheduler().getGeneralQueueLength();
}
@Override
public int getReplicationQueueLength() {
if (this.server == null || this.server.replicationQueue == null) {
if (this.server == null
|| this.server.getScheduler() == null) {
return 0;
}
return server.replicationQueue.size();
return server.getScheduler().getReplicationQueueLength();
}
@Override
public int getPriorityQueueLength() {
if (this.server == null || this.server.priorityCallQueue == null) {
if (this.server == null
|| this.server.getScheduler() == null) {
return 0;
}
return server.priorityCallQueue.size();
return server.getScheduler().getPriorityQueueLength();
}
@Override

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* An interface for RPC request scheduling algorithm.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
interface RpcScheduler {
/** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */
interface Context {
InetSocketAddress getListenerAddress();
}
/**
* Does some quick initialization. Heavy tasks (e.g. starting threads) should be
* done in {@link #start()}. This method is called before {@code start}.
*
* @param context provides methods to retrieve runtime information from
*/
void init(Context context);
/**
* Prepares for request serving. An implementation may start some handler threads here.
*/
void start();
/** Stops serving new requests. */
void stop();
/**
* Dispatches an RPC request asynchronously. An implementation is free to choose to process the
* request immediately or delay it for later processing.
*
* @param task the request to be dispatched
*/
void dispatch(RpcServer.CallRunner task) throws IOException, InterruptedException;
/** Retrieves length of the general queue for metrics. */
int getGeneralQueueLength();
/** Retrieves length of the priority queue for metrics. */
int getPriorityQueueLength();
/** Retrieves length of the replication queue for metrics. */
int getReplicationQueueLength();
}

View File

@ -51,10 +51,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.security.sasl.Sasl;
@ -62,6 +60,7 @@ import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -69,6 +68,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.CallerDisconnectedException;
@ -102,7 +102,6 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -122,8 +121,6 @@ import org.cloudera.htrace.TraceInfo;
import org.cloudera.htrace.impl.NullSpan;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.BlockingService;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.Descriptors.MethodDescriptor;
@ -153,7 +150,7 @@ public class RpcServer implements RpcServerInterface {
/**
* How many calls/handler are allowed in the queue.
*/
private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
/**
* The maximum size that we can hold in the RPC queue
@ -186,10 +183,12 @@ public class RpcServer implements RpcServerInterface {
*/
protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
/** Keeps MonitoredRPCHandler per handler thread. */
private static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
= new ThreadLocal<MonitoredRPCHandler>();
protected final InetSocketAddress isa;
protected int port; // port we listen on
private int handlerCount; // number of handler threads
private int priorityHandlerCount;
private int readThreads; // number of read threads
protected int maxIdleTime; // the maximum idle time after
// which a client may be
@ -214,11 +213,7 @@ public class RpcServer implements RpcServerInterface {
protected final long purgeTimeout; // in milliseconds
protected volatile boolean running = true; // true while server runs
protected BlockingQueue<Call> callQueue; // queued calls
protected final Counter callQueueSize = new Counter();
protected BlockingQueue<Call> priorityCallQueue;
protected int highPriorityLevel; // what level a high priority call is at
protected final List<Connection> connectionList =
Collections.synchronizedList(new LinkedList<Connection>());
@ -227,12 +222,6 @@ public class RpcServer implements RpcServerInterface {
private Listener listener = null;
protected Responder responder = null;
protected int numConnections = 0;
private Handler[] handlers = null;
private Handler[] priorityHandlers = null;
/** replication related queue; */
protected BlockingQueue<Call> replicationQueue;
private int numOfReplicationHandlers = 0;
private Handler[] replicationHandlers = null;
protected HBaseRPCErrorHandler errorHandler = null;
@ -250,6 +239,8 @@ public class RpcServer implements RpcServerInterface {
private final Object serverInstance;
private final List<BlockingServiceAndInterface> services;
private final RpcScheduler scheduler;
/**
* Datastructure that holds all necessary to a method invocation and then afterward, carries
* the result.
@ -258,6 +249,7 @@ public class RpcServer implements RpcServerInterface {
protected int id; // the client's call id
protected BlockingService service;
protected MethodDescriptor md;
protected RequestHeader header;
protected Message param; // the parameter passed
// Optional cell data passed outside of protobufs.
protected CellScanner cellScanner;
@ -274,12 +266,13 @@ public class RpcServer implements RpcServerInterface {
protected TraceInfo tinfo;
protected String effectiveUser;
Call(int id, final BlockingService service, final MethodDescriptor md, Message param,
CellScanner cellScanner, Connection connection, Responder responder, long size,
TraceInfo tinfo, String effectiveUser) {
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder,
long size, TraceInfo tinfo, String effectiveUser) {
this.id = id;
this.service = service;
this.md = md;
this.header = header;
this.param = param;
this.cellScanner = cellScanner;
this.connection = connection;
@ -741,8 +734,7 @@ public class RpcServer implements RpcServerInterface {
}
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": connection from " + c.toString() +
"; # active connections: " + numConnections +
"; # queued calls: " + callQueue.size());
"; # active connections: " + numConnections);
} finally {
reader.finishAdd();
}
@ -1080,25 +1072,6 @@ public class RpcServer implements RpcServerInterface {
}
}
private Function<Pair<RequestHeader, Message>, Integer> qosFunction = null;
/**
* Gets the QOS level for this call. If it is higher than the highPriorityLevel and there
* are priorityHandlers available it will be processed in it's own thread set.
*
* @param newFunc
*/
@Override
public void setQosFunction(Function<Pair<RequestHeader, Message>, Integer> newFunc) {
qosFunction = newFunc;
}
protected int getQosLevel(Pair<RequestHeader, Message> headerAndParam) {
if (qosFunction == null) return 0;
Integer res = qosFunction.apply(headerAndParam);
return res == null? 0: res;
}
/** Reads calls from a connection and queues them for handling. */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="VO_VOLATILE_INCREMENT",
@ -1144,13 +1117,13 @@ public class RpcServer implements RpcServerInterface {
private static final int AUTHROIZATION_FAILED_CALLID = -1;
private final Call authFailedCall =
new Call(AUTHROIZATION_FAILED_CALLID, this.service, null,
null, null, this, null, 0, null, null);
null, null, null, this, null, 0, null, null);
private ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
private static final int SASL_CALLID = -33;
private final Call saslCall =
new Call(SASL_CALLID, this.service, null, null, null, this, null, 0, null, null);
new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null);
public UserGroupInformation attemptingUser = null; // user name before auth
@ -1505,7 +1478,7 @@ public class RpcServer implements RpcServerInterface {
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg);
Call fakeCall = new Call(-1, null, null, null, null, this, responder, -1, null, null);
Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
setupResponse(null, fakeCall, e, msg);
responder.doRespond(fakeCall);
// Returning -1 closes out the connection.
@ -1655,7 +1628,7 @@ public class RpcServer implements RpcServerInterface {
// This is a bit late to be doing this check - we have already read in the total request.
if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
final Call callTooBig =
new Call(id, this.service, null, null, null, this,
new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
@ -1691,7 +1664,7 @@ public class RpcServer implements RpcServerInterface {
String msg = "Unable to read call parameter from client " + getHostAddress();
LOG.warn(msg, t);
final Call readParamsFailedCall =
new Call(id, this.service, null, null, null, this,
new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, t,
@ -1700,26 +1673,15 @@ public class RpcServer implements RpcServerInterface {
return;
}
Call call = null;
if (header.hasTraceInfo()) {
call = new Call(id, this.service, md, param, cellScanner, this,
responder, totalRequestSize, new TraceInfo(header.getTraceInfo().getTraceId(),
header.getTraceInfo().getParentId()), effectiveUser);
} else {
call = new Call(id, this.service, md, param, cellScanner, this, responder,
totalRequestSize, null, effectiveUser);
}
TraceInfo traceInfo = header.hasTraceInfo()
? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
: null;
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
totalRequestSize,
traceInfo,
effectiveUser);
callQueueSize.add(totalRequestSize);
Pair<RequestHeader, Message> headerAndParam =
new Pair<RequestHeader, Message>(header, param);
if (priorityCallQueue != null && getQosLevel(headerAndParam) > highPriorityLevel) {
priorityCallQueue.put(call);
} else if (replicationQueue != null &&
getQosLevel(headerAndParam) == HConstants.REPLICATION_QOS) {
replicationQueue.put(call);
} else {
callQueue.put(call); // queue the call; maybe blocked here
}
scheduler.dispatch(new CallRunner(call));
}
private boolean authorizeConnection() throws IOException {
@ -1788,118 +1750,116 @@ public class RpcServer implements RpcServerInterface {
}
}
/** Handles queued calls . */
private class Handler extends Thread {
private final BlockingQueue<Call> myCallQueue;
private MonitoredRPCHandler status;
/**
* The real request processing logic, which is usually executed in
* thread pools provided by an {@link RpcScheduler}.
*/
class CallRunner implements Runnable {
private final Call call;
public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
this.myCallQueue = cq;
this.setDaemon(true);
public CallRunner(Call call) {
this.call = call;
}
String threadName = "RpcServer.handler=" + instanceNumber + ",port=" + port;
if (cq == priorityCallQueue) {
// this is just an amazing hack, but it works.
threadName = "Priority." + threadName;
} else if (cq == replicationQueue) {
threadName = "Replication." + threadName;
}
this.setName(threadName);
this.status = TaskMonitor.get().createRPCStatus(threadName);
public Call getCall() {
return call;
}
@Override
public void run() {
LOG.info(getName() + ": starting");
status.setStatus("starting");
MonitoredRPCHandler status = getStatus();
SERVER.set(RpcServer.this);
while (running) {
try {
status.setStatus("Setting up call");
status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
if (LOG.isDebugEnabled()) {
UserGroupInformation remoteUser = call.connection.user;
LOG.debug(call.toShortString() + " executing as " +
((remoteUser == null) ? "NULL principal" : remoteUser.getUserName()));
}
Throwable errorThrowable = null;
String error = null;
Pair<Message, CellScanner> resultPair = null;
CurCall.set(call);
Span currentRequestSpan = NullSpan.getInstance();
try {
status.pause("Waiting for a call");
Call call = myCallQueue.take(); // pop the queue; maybe blocked here
status.setStatus("Setting up call");
status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
if (LOG.isDebugEnabled()) {
UserGroupInformation remoteUser = call.connection.user;
LOG.debug(call.toShortString() + " executing as " +
((remoteUser == null)? "NULL principal": remoteUser.getUserName()));
if (!started) {
throw new ServerNotRunningYetException("Server is not running yet");
}
Throwable errorThrowable = null;
String error = null;
Pair<Message, CellScanner> resultPair = null;
CurCall.set(call);
Span currentRequestSpan = NullSpan.getInstance();
try {
if (!started) {
throw new ServerNotRunningYetException("Server is not running yet");
}
if (call.tinfo != null) {
currentRequestSpan = Trace.startSpan(
"handling " + call.toShortString(), call.tinfo, Sampler.ALWAYS);
}
User user;
if (call.effectiveUser == null) {
user = User.create(call.connection.user);
} else {
UserGroupInformation ugi = UserGroupInformation.createProxyUser(
call.effectiveUser, call.connection.user);
ProxyUsers.authorize(ugi, call.connection.getHostAddress(), conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Authorized " + call.connection.user
+ " to impersonate " + call.effectiveUser);
}
user = User.create(ugi);
}
RequestContext.set(user, getRemoteIp(), call.connection.service);
// make the call
resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp,
status);
} catch (Throwable e) {
LOG.debug(getName() + ": " + call.toShortString(), e);
errorThrowable = e;
error = StringUtils.stringifyException(e);
} finally {
currentRequestSpan.stop();
// Must always clear the request context to avoid leaking
// credentials between requests.
RequestContext.clear();
if (call.tinfo != null) {
currentRequestSpan = Trace.startSpan(
"handling " + call.toShortString(), call.tinfo, Sampler.ALWAYS);
}
CurCall.set(null);
callQueueSize.add(call.getSize() * -1);
// Set the response for undelayed calls and delayed calls with
// undelayed responses.
if (!call.isDelayed() || !call.isReturnValueDelayed()) {
Message param = resultPair != null? resultPair.getFirst(): null;
CellScanner cells = resultPair != null? resultPair.getSecond(): null;
call.setResponse(param, cells, errorThrowable, error);
}
call.sendResponseIfReady();
status.markComplete("Sent response");
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(getName() + ": caught: " + StringUtils.stringifyException(e));
}
} catch (OutOfMemoryError e) {
if (errorHandler != null) {
if (errorHandler.checkOOME(e)) {
LOG.info(getName() + ": exiting on OutOfMemoryError");
return;
}
User user;
if (call.effectiveUser == null) {
user = User.create(call.connection.user);
} else {
// rethrow if no handler
throw e;
UserGroupInformation ugi = UserGroupInformation.createProxyUser(
call.effectiveUser, call.connection.user);
ProxyUsers.authorize(ugi, call.connection.getHostAddress(), conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Authorized " + call.connection.user
+ " to impersonate " + call.effectiveUser);
}
user = User.create(ugi);
}
} catch (ClosedChannelException cce) {
LOG.warn(getName() + ": caught a ClosedChannelException, " +
RequestContext.set(user, getRemoteIp(), call.connection.service);
// make the call
resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp,
status);
} catch (Throwable e) {
LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;
error = StringUtils.stringifyException(e);
} finally {
currentRequestSpan.stop();
// Must always clear the request context to avoid leaking
// credentials between requests.
RequestContext.clear();
}
CurCall.set(null);
callQueueSize.add(call.getSize() * -1);
// Set the response for undelayed calls and delayed calls with
// undelayed responses.
if (!call.isDelayed() || !call.isReturnValueDelayed()) {
Message param = resultPair != null ? resultPair.getFirst() : null;
CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
call.setResponse(param, cells, errorThrowable, error);
}
call.sendResponseIfReady();
status.markComplete("Sent response");
status.pause("Waiting for a call");
} catch (OutOfMemoryError e) {
if (errorHandler != null) {
if (errorHandler.checkOOME(e)) {
LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError");
return;
}
} else {
// rethrow if no handler
throw e;
}
} catch (ClosedChannelException cce) {
LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
"this means that the server was processing a " +
"request but the client went away. The error message was: " +
cce.getMessage());
} catch (Exception e) {
LOG.warn(getName() + ": caught: " + StringUtils.stringifyException(e));
}
} catch (Exception e) {
LOG.warn(Thread.currentThread().getName()
+ ": caught: " + StringUtils.stringifyException(e));
}
LOG.info(getName() + ": exiting");
}
public MonitoredRPCHandler getStatus() {
MonitoredRPCHandler status = MONITORED_RPC.get();
if (status != null) {
return status;
}
status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
status.pause("Waiting for a call");
MONITORED_RPC.set(status);
return status;
}
}
@ -1925,20 +1885,12 @@ public class RpcServer implements RpcServerInterface {
}
}
private class RpcSchedulerContextImpl implements RpcScheduler.Context {
/**
* Minimal setup. Used by tests mostly.
* @param service
* @param isa
* @param conf
* @throws IOException
*/
public RpcServer(final BlockingService service, final InetSocketAddress isa,
final Configuration conf)
throws IOException {
this(null, "generic", Lists.newArrayList(new BlockingServiceAndInterface(service, null)),
isa, 3, 3, conf,
HConstants.QOS_THRESHOLD);
@Override
public InetSocketAddress getListenerAddress() {
return RpcServer.this.getListenerAddress();
}
}
/**
@ -1948,46 +1900,27 @@ public class RpcServer implements RpcServerInterface {
* @param name Used keying this rpc servers' metrics and for naming the Listener thread.
* @param services A list of services.
* @param isa Where to listen
* @param handlerCount the number of handler threads that will be used to process calls
* @param priorityHandlerCount How many threads for priority handling.
* @param conf
* @param highPriorityLevel
* @throws IOException
*/
public RpcServer(final Server serverInstance, final String name,
final List<BlockingServiceAndInterface> services,
final InetSocketAddress isa, int handlerCount, int priorityHandlerCount, Configuration conf,
int highPriorityLevel)
final InetSocketAddress isa, Configuration conf,
RpcScheduler scheduler)
throws IOException {
this.serverInstance = serverInstance;
this.services = services;
this.isa = isa;
this.conf = conf;
this.handlerCount = handlerCount;
this.priorityHandlerCount = priorityHandlerCount;
this.socketSendBufferSize = 0;
this.maxQueueLength = this.conf.getInt("ipc.server.max.callqueue.length",
handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
this.maxQueueSize =
this.conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
this.readThreads = conf.getInt("ipc.server.read.threadpool.size", 10);
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueLength);
if (priorityHandlerCount > 0) {
this.priorityCallQueue = new LinkedBlockingQueue<Call>(maxQueueLength); // TODO hack on size
} else {
this.priorityCallQueue = null;
}
this.highPriorityLevel = highPriorityLevel;
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout",
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.numOfReplicationHandlers = conf.getInt("hbase.regionserver.replication.handler.count", 3);
if (numOfReplicationHandlers > 0) {
this.replicationQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
}
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
@ -2011,6 +1944,8 @@ public class RpcServer implements RpcServerInterface {
if (isSecurityEnabled) {
HBaseSaslRpcServer.init(conf);
}
this.scheduler = scheduler;
this.scheduler.init(new RpcSchedulerContextImpl());
}
/**
@ -2085,9 +2020,7 @@ public class RpcServer implements RpcServerInterface {
HBasePolicyProvider.init(conf, authManager);
responder.start();
listener.start();
handlers = startHandlers(callQueue, handlerCount);
priorityHandlers = startHandlers(priorityCallQueue, priorityHandlerCount);
replicationHandlers = startHandlers(replicationQueue, numOfReplicationHandlers);
scheduler.start();
}
@Override
@ -2095,18 +2028,6 @@ public class RpcServer implements RpcServerInterface {
this.authManager.refresh(this.conf, pp);
}
private Handler[] startHandlers(BlockingQueue<Call> queue, int numOfHandlers) {
if (numOfHandlers <= 0) {
return null;
}
Handler[] handlers = new Handler[numOfHandlers];
for (int i = 0; i < numOfHandlers; i++) {
handlers[i] = new Handler(queue, i);
handlers[i].start();
}
return handlers;
}
private AuthenticationTokenSecretManager createSecretManager() {
if (!isSecurityEnabled) return null;
if (serverInstance == null) return null;
@ -2248,25 +2169,12 @@ public class RpcServer implements RpcServerInterface {
public synchronized void stop() {
LOG.info("Stopping server on " + port);
running = false;
stopHandlers(handlers);
stopHandlers(priorityHandlers);
stopHandlers(replicationHandlers);
listener.interrupt();
listener.doStop();
responder.interrupt();
notifyAll();
}
private void stopHandlers(Handler[] handlers) {
if (handlers != null) {
for (Handler handler : handlers) {
if (handler != null) {
handler.interrupt();
}
}
}
}
/** Wait for the server to be stopped.
* Does not wait for all subthreads to finish.
* See {@link #stop()}.
@ -2532,4 +2440,8 @@ public class RpcServer implements RpcServerInterface {
throw e;
}
}
public RpcScheduler getScheduler() {
return scheduler;
}
}

View File

@ -65,8 +65,6 @@ public interface RpcServerInterface {
*/
MetricsHBaseServer getMetrics();
void setQosFunction(Function<Pair<RequestHeader, Message>, Integer> newFunc);
/**
* Refresh autentication manager policy.
* @param pp

View File

@ -0,0 +1,178 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.protobuf.Message;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.util.Pair;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* A scheduler that maintains isolated handler pools for general, high-priority and replication
* requests.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SimpleRpcScheduler implements RpcScheduler {
private int port;
private final int handlerCount;
private final int priorityHandlerCount;
private final int replicationHandlerCount;
final BlockingQueue<RpcServer.CallRunner> callQueue;
final BlockingQueue<RpcServer.CallRunner> priorityCallQueue;
final BlockingQueue<RpcServer.CallRunner> replicationQueue;
private volatile boolean running = false;
private final List<Thread> handlers = Lists.newArrayList();
private final Function<Pair<RPCProtos.RequestHeader, Message>, Integer> qosFunction;
/** What level a high priority call is at. */
private final int highPriorityLevel;
/**
* @param conf
* @param handlerCount the number of handler threads that will be used to process calls
* @param priorityHandlerCount How many threads for priority handling.
* @param replicationHandlerCount How many threads for replication handling.
* @param qosFunction a function that maps requests to priorities
* @param highPriorityLevel
*/
public SimpleRpcScheduler(
Configuration conf,
int handlerCount,
int priorityHandlerCount,
int replicationHandlerCount,
Function<Pair<RPCProtos.RequestHeader, Message>, Integer> qosFunction,
int highPriorityLevel) {
int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length",
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
this.handlerCount = handlerCount;
this.priorityHandlerCount = priorityHandlerCount;
this.replicationHandlerCount = replicationHandlerCount;
this.qosFunction = qosFunction;
this.highPriorityLevel = highPriorityLevel;
this.callQueue = new LinkedBlockingQueue<RpcServer.CallRunner>(maxQueueLength);
this.priorityCallQueue = priorityHandlerCount > 0
? new LinkedBlockingQueue<RpcServer.CallRunner>(maxQueueLength)
: null;
this.replicationQueue = replicationHandlerCount > 0
? new LinkedBlockingQueue<RpcServer.CallRunner>(maxQueueLength)
: null;
}
@Override
public void init(Context context) {
this.port = context.getListenerAddress().getPort();
}
@Override
public void start() {
running = true;
startHandlers(handlerCount, callQueue, null);
if (priorityCallQueue != null) {
startHandlers(priorityHandlerCount, priorityCallQueue, "Priority.");
}
if (replicationQueue != null) {
startHandlers(replicationHandlerCount, replicationQueue, "Replication.");
}
}
private void startHandlers(
int handlerCount,
final BlockingQueue<RpcServer.CallRunner> callQueue,
String threadNamePrefix) {
for (int i = 0; i < handlerCount; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
consumerLoop(callQueue);
}
});
t.setDaemon(true);
t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port=" + port);
t.start();
handlers.add(t);
}
}
@Override
public void stop() {
running = false;
for (Thread handler : handlers) {
handler.interrupt();
}
}
@Override
public void dispatch(RpcServer.CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall();
Pair<RPCProtos.RequestHeader, Message> headerAndParam =
new Pair<RPCProtos.RequestHeader, Message>(call.header, call.param);
if (priorityCallQueue != null && getQosLevel(headerAndParam) > highPriorityLevel) {
priorityCallQueue.put(callTask);
} else if (replicationQueue != null &&
getQosLevel(headerAndParam) == HConstants.REPLICATION_QOS) {
replicationQueue.put(callTask);
} else {
callQueue.put(callTask); // queue the call; maybe blocked here
}
}
@Override
public int getGeneralQueueLength() {
return callQueue.size();
}
@Override
public int getPriorityQueueLength() {
return priorityCallQueue == null ? 0 : priorityCallQueue.size();
}
@Override
public int getReplicationQueueLength() {
return replicationQueue == null ? 0 : replicationQueue.size();
}
private void consumerLoop(BlockingQueue<RpcServer.CallRunner> myQueue) {
while (running) {
try {
RpcServer.CallRunner task = myQueue.take();
task.run();
} catch (InterruptedException e) {
Thread.interrupted();
}
}
}
private int getQosLevel(Pair<RPCProtos.RequestHeader, Message> headerAndParam) {
if (qosFunction == null) return 0;
Integer res = qosFunction.apply(headerAndParam);
return res == null? 0: res;
}
}

View File

@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
@ -395,14 +396,19 @@ MasterServices, Server {
String name = "master/" + initialIsa.toString();
// Set how many times to retry talking to another server over HConnection.
HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
int numHandlers = conf.getInt("hbase.master.handler.count",
conf.getInt("hbase.regionserver.handler.count", 25));
int numHandlers = conf.getInt(HConstants.MASTER_HANDLER_COUNT,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_MASTER_HANLDER_COUNT));
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(
conf,
numHandlers,
0, // we don't use high priority handlers in master
0, // we don't use replication handlers in master
null, // this is a DNC w/o high priority handlers
0);
this.rpcServer = new RpcServer(this, name, getServices(),
initialIsa, // BindAddress is IP we got for this server.
numHandlers,
0, // we dont use high priority handlers in master
conf,
0); // this is a DNC w/o high priority handlers
scheduler);
// Set our address.
this.isa = this.rpcServer.getListenerAddress();
this.serverName =

View File

@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -547,18 +548,27 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
String name = "regionserver/" + initialIsa.toString();
// Set how many times to retry talking to another server over HConnection.
HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
this.qosFunction = new QosFunction(this);
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(
conf,
handlerCount,
conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
qosFunction,
HConstants.QOS_THRESHOLD);
this.rpcServer = new RpcServer(this, name, getServices(),
/*HBaseRPCErrorHandler.class, OnlineRegions.class},*/
initialIsa, // BindAddress is IP we got for this server.
conf.getInt("hbase.regionserver.handler.count", 10),
conf.getInt("hbase.regionserver.metahandler.count", 10),
conf, HConstants.QOS_THRESHOLD);
conf, scheduler);
// Set our address.
this.isa = this.rpcServer.getListenerAddress();
this.rpcServer.setErrorHandler(this);
this.rpcServer.setQosFunction((qosFunction = new QosFunction(this)));
this.startcode = System.currentTimeMillis();
// login the zookeeper client principal (if using security)

View File

@ -84,7 +84,9 @@ public class TestDelayedRpc {
TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
rpcServer = new RpcServer(null, "testDelayedRpc",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
isa, 1, 0, conf, 0);
isa,
conf,
new SimpleRpcScheduler(conf, 1, 0, 0, null, 0));
rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
@ -163,7 +165,9 @@ public class TestDelayedRpc {
TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
rpcServer = new RpcServer(null, "testTooManyDelayedRpcs",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
isa, 1, 0, conf, 0);
isa,
conf,
new SimpleRpcScheduler(conf, 1, 0, 0, null, 0));
rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
@ -283,7 +287,9 @@ public class TestDelayedRpc {
TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
rpcServer = new RpcServer(null, "testEndDelayThrowing",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
isa, 1, 0, conf, 0);
isa,
conf,
new SimpleRpcScheduler(conf, 1, 0, 0, null, 0));
rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
@ -344,4 +350,4 @@ public class TestDelayedRpc {
return TestResponse.newBuilder().setResponse(DELAYED).build();
}
}
}
}

View File

@ -81,6 +81,7 @@ public class TestIPC {
public static final Log LOG = LogFactory.getLog(TestIPC.class);
static byte [] CELL_BYTES = Bytes.toBytes("xyz");
static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
private final static Configuration CONF = HBaseConfiguration.create();
// We are using the test TestRpcServiceProtos generated classes and Service because they are
// available and basic with methods like 'echo', and ping. Below we make a blocking service
// by passing in implementation of blocking interface. We use this service in all tests that
@ -132,11 +133,11 @@ public class TestIPC {
* HBaseRpcServer directly.
*/
private static class TestRpcServer extends RpcServer {
TestRpcServer() throws IOException {
super(null, "testRpcServer",
Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("0.0.0.0", 0), 1, 1,
HBaseConfiguration.create(), 0);
new InetSocketAddress("0.0.0.0", 0), CONF, new SimpleRpcScheduler(CONF, 1, 1, 0, null, 0));
}
@Override

View File

@ -98,7 +98,8 @@ public class TestProtoBufRpc {
// Get RPC server for server side implementation
this.server = new RpcServer(null, "testrpc",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
new InetSocketAddress(ADDRESS, PORT), 10, 10, conf, 0);
new InetSocketAddress(ADDRESS, PORT), conf,
new SimpleRpcScheduler(conf, 10, 10, 0, null, 0));
this.isa = server.getListenerAddress();
this.server.start();
}
@ -137,4 +138,4 @@ public class TestProtoBufRpc {
rpcClient.stop();
}
}
}
}

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@ -128,8 +129,10 @@ public class TestTokenAuthentication {
AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
sai.add(new BlockingServiceAndInterface(service,
AuthenticationProtos.AuthenticationService.BlockingInterface.class));
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(
conf, 3, 1, 0, null, HConstants.QOS_THRESHOLD);
this.rpcServer =
new RpcServer(this, "tokenServer", sai, initialIsa, 3, 1, conf, HConstants.QOS_THRESHOLD);
new RpcServer(this, "tokenServer", sai, initialIsa, conf, scheduler);
this.isa = this.rpcServer.getListenerAddress();
this.sleeper = new Sleeper(1000, this);
}