From 25e14c8b4825c8b38a1ed36e6f7488c75efe38f9 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Thu, 18 Jul 2013 18:26:04 +0000 Subject: [PATCH] HBASE-8884 Pluggable RpcScheduler (Chao Shi) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1504584 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/hbase/HConstants.java | 14 + .../ipc/MetricsHBaseServerWrapperImpl.java | 15 +- .../apache/hadoop/hbase/ipc/RpcScheduler.java | 70 ++++ .../apache/hadoop/hbase/ipc/RpcServer.java | 360 +++++++----------- .../hadoop/hbase/ipc/RpcServerInterface.java | 2 - .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 178 +++++++++ .../apache/hadoop/hbase/master/HMaster.java | 16 +- .../hbase/regionserver/HRegionServer.java | 18 +- .../hadoop/hbase/ipc/TestDelayedRpc.java | 14 +- .../org/apache/hadoop/hbase/ipc/TestIPC.java | 5 +- .../hadoop/hbase/ipc/TestProtoBufRpc.java | 5 +- .../token/TestTokenAuthentication.java | 5 +- 12 files changed, 452 insertions(+), 250 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 056696b9993..fc3126c6862 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -739,6 +739,20 @@ public final class HConstants { "hbase.regionserver.disallow.writes.when.recovering"; public static final boolean DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG = false; + public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count"; + public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 10; + + public static final String REGION_SERVER_META_HANDLER_COUNT = + "hbase.regionserver.metahandler.count"; + public static final int DEFAULT_REGION_SERVER_META_HANDLER_COUNT = 10; + + public static final String REGION_SERVER_REPLICATION_HANDLER_COUNT = + "hbase.regionserver.replication.handler.count"; + public static final int DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT = 3; + + public static final String MASTER_HANDLER_COUNT = "hbase.master.handler.count"; + public static final int DEFAULT_MASTER_HANLDER_COUNT = 25; + /** Conf key that specifies timeout value to wait for a region ready */ public static final String LOG_REPLAY_WAIT_REGION_TIMEOUT = "hbase.master.log.replay.wait.region.timeout"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java index ab5484c8586..5fa44776beb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java @@ -37,26 +37,29 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper @Override public int getGeneralQueueLength() { - if (this.server == null || this.server.callQueue == null) { + if (this.server == null + || this.server.getScheduler() == null) { return 0; } - return server.callQueue.size(); + return server.getScheduler().getGeneralQueueLength(); } @Override public int getReplicationQueueLength() { - if (this.server == null || this.server.replicationQueue == null) { + if (this.server == null + || this.server.getScheduler() == null) { return 0; } - return server.replicationQueue.size(); + return server.getScheduler().getReplicationQueueLength(); } @Override public int getPriorityQueueLength() { - if (this.server == null || this.server.priorityCallQueue == null) { + if (this.server == null + || this.server.getScheduler() == null) { return 0; } - return server.priorityCallQueue.size(); + return server.getScheduler().getPriorityQueueLength(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java new file mode 100644 index 00000000000..1994ad5e810 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * An interface for RPC request scheduling algorithm. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +interface RpcScheduler { + + /** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */ + interface Context { + InetSocketAddress getListenerAddress(); + } + + /** + * Does some quick initialization. Heavy tasks (e.g. starting threads) should be + * done in {@link #start()}. This method is called before {@code start}. + * + * @param context provides methods to retrieve runtime information from + */ + void init(Context context); + + /** + * Prepares for request serving. An implementation may start some handler threads here. + */ + void start(); + + /** Stops serving new requests. */ + void stop(); + + /** + * Dispatches an RPC request asynchronously. An implementation is free to choose to process the + * request immediately or delay it for later processing. + * + * @param task the request to be dispatched + */ + void dispatch(RpcServer.CallRunner task) throws IOException, InterruptedException; + + /** Retrieves length of the general queue for metrics. */ + int getGeneralQueueLength(); + + /** Retrieves length of the priority queue for metrics. */ + int getPriorityQueueLength(); + + /** Retrieves length of the replication queue for metrics. */ + int getReplicationQueueLength(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 915c43cbc65..232a04c6b7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -51,10 +51,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import javax.security.sasl.Sasl; @@ -62,6 +60,7 @@ import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -69,6 +68,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.CallerDisconnectedException; @@ -102,7 +102,6 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -122,8 +121,6 @@ import org.cloudera.htrace.TraceInfo; import org.cloudera.htrace.impl.NullSpan; import org.codehaus.jackson.map.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.BlockingService; import com.google.protobuf.CodedInputStream; import com.google.protobuf.Descriptors.MethodDescriptor; @@ -153,7 +150,7 @@ public class RpcServer implements RpcServerInterface { /** * How many calls/handler are allowed in the queue. */ - private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; + static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; /** * The maximum size that we can hold in the RPC queue @@ -186,10 +183,12 @@ public class RpcServer implements RpcServerInterface { */ protected static final ThreadLocal CurCall = new ThreadLocal(); + /** Keeps MonitoredRPCHandler per handler thread. */ + private static final ThreadLocal MONITORED_RPC + = new ThreadLocal(); + protected final InetSocketAddress isa; protected int port; // port we listen on - private int handlerCount; // number of handler threads - private int priorityHandlerCount; private int readThreads; // number of read threads protected int maxIdleTime; // the maximum idle time after // which a client may be @@ -214,11 +213,7 @@ public class RpcServer implements RpcServerInterface { protected final long purgeTimeout; // in milliseconds protected volatile boolean running = true; // true while server runs - protected BlockingQueue callQueue; // queued calls protected final Counter callQueueSize = new Counter(); - protected BlockingQueue priorityCallQueue; - - protected int highPriorityLevel; // what level a high priority call is at protected final List connectionList = Collections.synchronizedList(new LinkedList()); @@ -227,12 +222,6 @@ public class RpcServer implements RpcServerInterface { private Listener listener = null; protected Responder responder = null; protected int numConnections = 0; - private Handler[] handlers = null; - private Handler[] priorityHandlers = null; - /** replication related queue; */ - protected BlockingQueue replicationQueue; - private int numOfReplicationHandlers = 0; - private Handler[] replicationHandlers = null; protected HBaseRPCErrorHandler errorHandler = null; @@ -250,6 +239,8 @@ public class RpcServer implements RpcServerInterface { private final Object serverInstance; private final List services; + private final RpcScheduler scheduler; + /** * Datastructure that holds all necessary to a method invocation and then afterward, carries * the result. @@ -258,6 +249,7 @@ public class RpcServer implements RpcServerInterface { protected int id; // the client's call id protected BlockingService service; protected MethodDescriptor md; + protected RequestHeader header; protected Message param; // the parameter passed // Optional cell data passed outside of protobufs. protected CellScanner cellScanner; @@ -274,12 +266,13 @@ public class RpcServer implements RpcServerInterface { protected TraceInfo tinfo; protected String effectiveUser; - Call(int id, final BlockingService service, final MethodDescriptor md, Message param, - CellScanner cellScanner, Connection connection, Responder responder, long size, - TraceInfo tinfo, String effectiveUser) { + Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, + Message param, CellScanner cellScanner, Connection connection, Responder responder, + long size, TraceInfo tinfo, String effectiveUser) { this.id = id; this.service = service; this.md = md; + this.header = header; this.param = param; this.cellScanner = cellScanner; this.connection = connection; @@ -741,8 +734,7 @@ public class RpcServer implements RpcServerInterface { } if (LOG.isDebugEnabled()) LOG.debug(getName() + ": connection from " + c.toString() + - "; # active connections: " + numConnections + - "; # queued calls: " + callQueue.size()); + "; # active connections: " + numConnections); } finally { reader.finishAdd(); } @@ -1080,25 +1072,6 @@ public class RpcServer implements RpcServerInterface { } } - private Function, Integer> qosFunction = null; - - /** - * Gets the QOS level for this call. If it is higher than the highPriorityLevel and there - * are priorityHandlers available it will be processed in it's own thread set. - * - * @param newFunc - */ - @Override - public void setQosFunction(Function, Integer> newFunc) { - qosFunction = newFunc; - } - - protected int getQosLevel(Pair headerAndParam) { - if (qosFunction == null) return 0; - Integer res = qosFunction.apply(headerAndParam); - return res == null? 0: res; - } - /** Reads calls from a connection and queues them for handling. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="VO_VOLATILE_INCREMENT", @@ -1144,13 +1117,13 @@ public class RpcServer implements RpcServerInterface { private static final int AUTHROIZATION_FAILED_CALLID = -1; private final Call authFailedCall = new Call(AUTHROIZATION_FAILED_CALLID, this.service, null, - null, null, this, null, 0, null, null); + null, null, null, this, null, 0, null, null); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); // Fake 'call' for SASL context setup private static final int SASL_CALLID = -33; private final Call saslCall = - new Call(SASL_CALLID, this.service, null, null, null, this, null, 0, null, null); + new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null); public UserGroupInformation attemptingUser = null; // user name before auth @@ -1505,7 +1478,7 @@ public class RpcServer implements RpcServerInterface { private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { LOG.warn(msg); - Call fakeCall = new Call(-1, null, null, null, null, this, responder, -1, null, null); + Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null); setupResponse(null, fakeCall, e, msg); responder.doRespond(fakeCall); // Returning -1 closes out the connection. @@ -1655,7 +1628,7 @@ public class RpcServer implements RpcServerInterface { // This is a bit late to be doing this check - we have already read in the total request. if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) { final Call callTooBig = - new Call(id, this.service, null, null, null, this, + new Call(id, this.service, null, null, null, null, this, responder, totalRequestSize, null, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(), @@ -1691,7 +1664,7 @@ public class RpcServer implements RpcServerInterface { String msg = "Unable to read call parameter from client " + getHostAddress(); LOG.warn(msg, t); final Call readParamsFailedCall = - new Call(id, this.service, null, null, null, this, + new Call(id, this.service, null, null, null, null, this, responder, totalRequestSize, null, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, readParamsFailedCall, t, @@ -1700,26 +1673,15 @@ public class RpcServer implements RpcServerInterface { return; } - Call call = null; - if (header.hasTraceInfo()) { - call = new Call(id, this.service, md, param, cellScanner, this, - responder, totalRequestSize, new TraceInfo(header.getTraceInfo().getTraceId(), - header.getTraceInfo().getParentId()), effectiveUser); - } else { - call = new Call(id, this.service, md, param, cellScanner, this, responder, - totalRequestSize, null, effectiveUser); - } + TraceInfo traceInfo = header.hasTraceInfo() + ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()) + : null; + Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, + totalRequestSize, + traceInfo, + effectiveUser); callQueueSize.add(totalRequestSize); - Pair headerAndParam = - new Pair(header, param); - if (priorityCallQueue != null && getQosLevel(headerAndParam) > highPriorityLevel) { - priorityCallQueue.put(call); - } else if (replicationQueue != null && - getQosLevel(headerAndParam) == HConstants.REPLICATION_QOS) { - replicationQueue.put(call); - } else { - callQueue.put(call); // queue the call; maybe blocked here - } + scheduler.dispatch(new CallRunner(call)); } private boolean authorizeConnection() throws IOException { @@ -1788,118 +1750,116 @@ public class RpcServer implements RpcServerInterface { } } - /** Handles queued calls . */ - private class Handler extends Thread { - private final BlockingQueue myCallQueue; - private MonitoredRPCHandler status; + /** + * The real request processing logic, which is usually executed in + * thread pools provided by an {@link RpcScheduler}. + */ + class CallRunner implements Runnable { + private final Call call; - public Handler(final BlockingQueue cq, int instanceNumber) { - this.myCallQueue = cq; - this.setDaemon(true); + public CallRunner(Call call) { + this.call = call; + } - String threadName = "RpcServer.handler=" + instanceNumber + ",port=" + port; - if (cq == priorityCallQueue) { - // this is just an amazing hack, but it works. - threadName = "Priority." + threadName; - } else if (cq == replicationQueue) { - threadName = "Replication." + threadName; - } - this.setName(threadName); - this.status = TaskMonitor.get().createRPCStatus(threadName); + public Call getCall() { + return call; } @Override public void run() { - LOG.info(getName() + ": starting"); - status.setStatus("starting"); + MonitoredRPCHandler status = getStatus(); SERVER.set(RpcServer.this); - while (running) { + try { + status.setStatus("Setting up call"); + status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); + if (LOG.isDebugEnabled()) { + UserGroupInformation remoteUser = call.connection.user; + LOG.debug(call.toShortString() + " executing as " + + ((remoteUser == null) ? "NULL principal" : remoteUser.getUserName())); + } + Throwable errorThrowable = null; + String error = null; + Pair resultPair = null; + CurCall.set(call); + Span currentRequestSpan = NullSpan.getInstance(); try { - status.pause("Waiting for a call"); - Call call = myCallQueue.take(); // pop the queue; maybe blocked here - status.setStatus("Setting up call"); - status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); - if (LOG.isDebugEnabled()) { - UserGroupInformation remoteUser = call.connection.user; - LOG.debug(call.toShortString() + " executing as " + - ((remoteUser == null)? "NULL principal": remoteUser.getUserName())); + if (!started) { + throw new ServerNotRunningYetException("Server is not running yet"); } - Throwable errorThrowable = null; - String error = null; - Pair resultPair = null; - CurCall.set(call); - Span currentRequestSpan = NullSpan.getInstance(); - try { - if (!started) { - throw new ServerNotRunningYetException("Server is not running yet"); - } - if (call.tinfo != null) { - currentRequestSpan = Trace.startSpan( - "handling " + call.toShortString(), call.tinfo, Sampler.ALWAYS); - } - User user; - if (call.effectiveUser == null) { - user = User.create(call.connection.user); - } else { - UserGroupInformation ugi = UserGroupInformation.createProxyUser( - call.effectiveUser, call.connection.user); - ProxyUsers.authorize(ugi, call.connection.getHostAddress(), conf); - if (LOG.isDebugEnabled()) { - LOG.debug("Authorized " + call.connection.user - + " to impersonate " + call.effectiveUser); - } - user = User.create(ugi); - } - RequestContext.set(user, getRemoteIp(), call.connection.service); - - // make the call - resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp, - status); - } catch (Throwable e) { - LOG.debug(getName() + ": " + call.toShortString(), e); - errorThrowable = e; - error = StringUtils.stringifyException(e); - } finally { - currentRequestSpan.stop(); - // Must always clear the request context to avoid leaking - // credentials between requests. - RequestContext.clear(); + if (call.tinfo != null) { + currentRequestSpan = Trace.startSpan( + "handling " + call.toShortString(), call.tinfo, Sampler.ALWAYS); } - CurCall.set(null); - callQueueSize.add(call.getSize() * -1); - // Set the response for undelayed calls and delayed calls with - // undelayed responses. - if (!call.isDelayed() || !call.isReturnValueDelayed()) { - Message param = resultPair != null? resultPair.getFirst(): null; - CellScanner cells = resultPair != null? resultPair.getSecond(): null; - call.setResponse(param, cells, errorThrowable, error); - } - call.sendResponseIfReady(); - status.markComplete("Sent response"); - } catch (InterruptedException e) { - if (running) { // unexpected -- log it - LOG.info(getName() + ": caught: " + StringUtils.stringifyException(e)); - } - } catch (OutOfMemoryError e) { - if (errorHandler != null) { - if (errorHandler.checkOOME(e)) { - LOG.info(getName() + ": exiting on OutOfMemoryError"); - return; - } + User user; + if (call.effectiveUser == null) { + user = User.create(call.connection.user); } else { - // rethrow if no handler - throw e; + UserGroupInformation ugi = UserGroupInformation.createProxyUser( + call.effectiveUser, call.connection.user); + ProxyUsers.authorize(ugi, call.connection.getHostAddress(), conf); + if (LOG.isDebugEnabled()) { + LOG.debug("Authorized " + call.connection.user + + " to impersonate " + call.effectiveUser); + } + user = User.create(ugi); } - } catch (ClosedChannelException cce) { - LOG.warn(getName() + ": caught a ClosedChannelException, " + + RequestContext.set(user, getRemoteIp(), call.connection.service); + + // make the call + resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp, + status); + } catch (Throwable e) { + LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); + errorThrowable = e; + error = StringUtils.stringifyException(e); + } finally { + currentRequestSpan.stop(); + // Must always clear the request context to avoid leaking + // credentials between requests. + RequestContext.clear(); + } + CurCall.set(null); + callQueueSize.add(call.getSize() * -1); + // Set the response for undelayed calls and delayed calls with + // undelayed responses. + if (!call.isDelayed() || !call.isReturnValueDelayed()) { + Message param = resultPair != null ? resultPair.getFirst() : null; + CellScanner cells = resultPair != null ? resultPair.getSecond() : null; + call.setResponse(param, cells, errorThrowable, error); + } + call.sendResponseIfReady(); + status.markComplete("Sent response"); + status.pause("Waiting for a call"); + } catch (OutOfMemoryError e) { + if (errorHandler != null) { + if (errorHandler.checkOOME(e)) { + LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError"); + return; + } + } else { + // rethrow if no handler + throw e; + } + } catch (ClosedChannelException cce) { + LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + "this means that the server was processing a " + "request but the client went away. The error message was: " + cce.getMessage()); - } catch (Exception e) { - LOG.warn(getName() + ": caught: " + StringUtils.stringifyException(e)); - } + } catch (Exception e) { + LOG.warn(Thread.currentThread().getName() + + ": caught: " + StringUtils.stringifyException(e)); } - LOG.info(getName() + ": exiting"); + } + + public MonitoredRPCHandler getStatus() { + MonitoredRPCHandler status = MONITORED_RPC.get(); + if (status != null) { + return status; + } + status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); + status.pause("Waiting for a call"); + MONITORED_RPC.set(status); + return status; } } @@ -1925,20 +1885,12 @@ public class RpcServer implements RpcServerInterface { } } + private class RpcSchedulerContextImpl implements RpcScheduler.Context { - /** - * Minimal setup. Used by tests mostly. - * @param service - * @param isa - * @param conf - * @throws IOException - */ - public RpcServer(final BlockingService service, final InetSocketAddress isa, - final Configuration conf) - throws IOException { - this(null, "generic", Lists.newArrayList(new BlockingServiceAndInterface(service, null)), - isa, 3, 3, conf, - HConstants.QOS_THRESHOLD); + @Override + public InetSocketAddress getListenerAddress() { + return RpcServer.this.getListenerAddress(); + } } /** @@ -1948,46 +1900,27 @@ public class RpcServer implements RpcServerInterface { * @param name Used keying this rpc servers' metrics and for naming the Listener thread. * @param services A list of services. * @param isa Where to listen - * @param handlerCount the number of handler threads that will be used to process calls - * @param priorityHandlerCount How many threads for priority handling. * @param conf - * @param highPriorityLevel * @throws IOException */ public RpcServer(final Server serverInstance, final String name, final List services, - final InetSocketAddress isa, int handlerCount, int priorityHandlerCount, Configuration conf, - int highPriorityLevel) + final InetSocketAddress isa, Configuration conf, + RpcScheduler scheduler) throws IOException { this.serverInstance = serverInstance; this.services = services; this.isa = isa; this.conf = conf; - this.handlerCount = handlerCount; - this.priorityHandlerCount = priorityHandlerCount; this.socketSendBufferSize = 0; - this.maxQueueLength = this.conf.getInt("ipc.server.max.callqueue.length", - handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); this.maxQueueSize = this.conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); this.readThreads = conf.getInt("ipc.server.read.threadpool.size", 10); - this.callQueue = new LinkedBlockingQueue(maxQueueLength); - if (priorityHandlerCount > 0) { - this.priorityCallQueue = new LinkedBlockingQueue(maxQueueLength); // TODO hack on size - } else { - this.priorityCallQueue = null; - } - this.highPriorityLevel = highPriorityLevel; this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - this.numOfReplicationHandlers = conf.getInt("hbase.regionserver.replication.handler.count", 3); - if (numOfReplicationHandlers > 0) { - this.replicationQueue = new LinkedBlockingQueue(maxQueueSize); - } - this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); @@ -2011,6 +1944,8 @@ public class RpcServer implements RpcServerInterface { if (isSecurityEnabled) { HBaseSaslRpcServer.init(conf); } + this.scheduler = scheduler; + this.scheduler.init(new RpcSchedulerContextImpl()); } /** @@ -2085,9 +2020,7 @@ public class RpcServer implements RpcServerInterface { HBasePolicyProvider.init(conf, authManager); responder.start(); listener.start(); - handlers = startHandlers(callQueue, handlerCount); - priorityHandlers = startHandlers(priorityCallQueue, priorityHandlerCount); - replicationHandlers = startHandlers(replicationQueue, numOfReplicationHandlers); + scheduler.start(); } @Override @@ -2095,18 +2028,6 @@ public class RpcServer implements RpcServerInterface { this.authManager.refresh(this.conf, pp); } - private Handler[] startHandlers(BlockingQueue queue, int numOfHandlers) { - if (numOfHandlers <= 0) { - return null; - } - Handler[] handlers = new Handler[numOfHandlers]; - for (int i = 0; i < numOfHandlers; i++) { - handlers[i] = new Handler(queue, i); - handlers[i].start(); - } - return handlers; - } - private AuthenticationTokenSecretManager createSecretManager() { if (!isSecurityEnabled) return null; if (serverInstance == null) return null; @@ -2248,25 +2169,12 @@ public class RpcServer implements RpcServerInterface { public synchronized void stop() { LOG.info("Stopping server on " + port); running = false; - stopHandlers(handlers); - stopHandlers(priorityHandlers); - stopHandlers(replicationHandlers); listener.interrupt(); listener.doStop(); responder.interrupt(); notifyAll(); } - private void stopHandlers(Handler[] handlers) { - if (handlers != null) { - for (Handler handler : handlers) { - if (handler != null) { - handler.interrupt(); - } - } - } - } - /** Wait for the server to be stopped. * Does not wait for all subthreads to finish. * See {@link #stop()}. @@ -2532,4 +2440,8 @@ public class RpcServer implements RpcServerInterface { throw e; } } + + public RpcScheduler getScheduler() { + return scheduler; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index 9fe93443dd0..4199c06f1d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -65,8 +65,6 @@ public interface RpcServerInterface { */ MetricsHBaseServer getMetrics(); - void setQosFunction(Function, Integer> newFunc); - /** * Refresh autentication manager policy. * @param pp diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java new file mode 100644 index 00000000000..1c246a13413 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.common.base.Function; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.protobuf.Message; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.util.Pair; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A scheduler that maintains isolated handler pools for general, high-priority and replication + * requests. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class SimpleRpcScheduler implements RpcScheduler { + + private int port; + private final int handlerCount; + private final int priorityHandlerCount; + private final int replicationHandlerCount; + final BlockingQueue callQueue; + final BlockingQueue priorityCallQueue; + final BlockingQueue replicationQueue; + private volatile boolean running = false; + private final List handlers = Lists.newArrayList(); + private final Function, Integer> qosFunction; + + /** What level a high priority call is at. */ + private final int highPriorityLevel; + + /** + * @param conf + * @param handlerCount the number of handler threads that will be used to process calls + * @param priorityHandlerCount How many threads for priority handling. + * @param replicationHandlerCount How many threads for replication handling. + * @param qosFunction a function that maps requests to priorities + * @param highPriorityLevel + */ + public SimpleRpcScheduler( + Configuration conf, + int handlerCount, + int priorityHandlerCount, + int replicationHandlerCount, + Function, Integer> qosFunction, + int highPriorityLevel) { + int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length", + handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + this.handlerCount = handlerCount; + this.priorityHandlerCount = priorityHandlerCount; + this.replicationHandlerCount = replicationHandlerCount; + this.qosFunction = qosFunction; + this.highPriorityLevel = highPriorityLevel; + this.callQueue = new LinkedBlockingQueue(maxQueueLength); + this.priorityCallQueue = priorityHandlerCount > 0 + ? new LinkedBlockingQueue(maxQueueLength) + : null; + this.replicationQueue = replicationHandlerCount > 0 + ? new LinkedBlockingQueue(maxQueueLength) + : null; + } + + @Override + public void init(Context context) { + this.port = context.getListenerAddress().getPort(); + } + + @Override + public void start() { + running = true; + startHandlers(handlerCount, callQueue, null); + if (priorityCallQueue != null) { + startHandlers(priorityHandlerCount, priorityCallQueue, "Priority."); + } + if (replicationQueue != null) { + startHandlers(replicationHandlerCount, replicationQueue, "Replication."); + } + } + + private void startHandlers( + int handlerCount, + final BlockingQueue callQueue, + String threadNamePrefix) { + for (int i = 0; i < handlerCount; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + consumerLoop(callQueue); + } + }); + t.setDaemon(true); + t.setName(Strings.nullToEmpty(threadNamePrefix) + "RpcServer.handler=" + i + ",port=" + port); + t.start(); + handlers.add(t); + } + } + + @Override + public void stop() { + running = false; + for (Thread handler : handlers) { + handler.interrupt(); + } + } + + @Override + public void dispatch(RpcServer.CallRunner callTask) throws InterruptedException { + RpcServer.Call call = callTask.getCall(); + Pair headerAndParam = + new Pair(call.header, call.param); + if (priorityCallQueue != null && getQosLevel(headerAndParam) > highPriorityLevel) { + priorityCallQueue.put(callTask); + } else if (replicationQueue != null && + getQosLevel(headerAndParam) == HConstants.REPLICATION_QOS) { + replicationQueue.put(callTask); + } else { + callQueue.put(callTask); // queue the call; maybe blocked here + } + } + + @Override + public int getGeneralQueueLength() { + return callQueue.size(); + } + + @Override + public int getPriorityQueueLength() { + return priorityCallQueue == null ? 0 : priorityCallQueue.size(); + } + + @Override + public int getReplicationQueueLength() { + return replicationQueue == null ? 0 : replicationQueue.size(); + } + + private void consumerLoop(BlockingQueue myQueue) { + while (running) { + try { + RpcServer.CallRunner task = myQueue.take(); + task.run(); + } catch (InterruptedException e) { + Thread.interrupted(); + } + } + } + + private int getQosLevel(Pair headerAndParam) { + if (qosFunction == null) return 0; + Integer res = qosFunction.apply(headerAndParam); + return res == null? 0: res; + } +} + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 3ecd9374374..613c5b4881f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; import org.apache.hadoop.hbase.master.balancer.BalancerChore; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; @@ -395,14 +396,19 @@ MasterServices, Server { String name = "master/" + initialIsa.toString(); // Set how many times to retry talking to another server over HConnection. HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG); - int numHandlers = conf.getInt("hbase.master.handler.count", - conf.getInt("hbase.regionserver.handler.count", 25)); + int numHandlers = conf.getInt(HConstants.MASTER_HANDLER_COUNT, + conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_MASTER_HANLDER_COUNT)); + SimpleRpcScheduler scheduler = new SimpleRpcScheduler( + conf, + numHandlers, + 0, // we don't use high priority handlers in master + 0, // we don't use replication handlers in master + null, // this is a DNC w/o high priority handlers + 0); this.rpcServer = new RpcServer(this, name, getServices(), initialIsa, // BindAddress is IP we got for this server. - numHandlers, - 0, // we dont use high priority handlers in master conf, - 0); // this is a DNC w/o high priority handlers + scheduler); // Set our address. this.isa = this.rpcServer.getListenerAddress(); this.serverName = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 4840981f0f7..5c7a260229d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -547,18 +548,27 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa String name = "regionserver/" + initialIsa.toString(); // Set how many times to retry talking to another server over HConnection. HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG); + this.qosFunction = new QosFunction(this); + int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + SimpleRpcScheduler scheduler = new SimpleRpcScheduler( + conf, + handlerCount, + conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT), + conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT), + qosFunction, + HConstants.QOS_THRESHOLD); this.rpcServer = new RpcServer(this, name, getServices(), /*HBaseRPCErrorHandler.class, OnlineRegions.class},*/ initialIsa, // BindAddress is IP we got for this server. - conf.getInt("hbase.regionserver.handler.count", 10), - conf.getInt("hbase.regionserver.metahandler.count", 10), - conf, HConstants.QOS_THRESHOLD); + conf, scheduler); // Set our address. this.isa = this.rpcServer.getListenerAddress(); this.rpcServer.setErrorHandler(this); - this.rpcServer.setQosFunction((qosFunction = new QosFunction(this))); this.startcode = System.currentTimeMillis(); // login the zookeeper client principal (if using security) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java index b3520d21544..f7f2198c9c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java @@ -84,7 +84,9 @@ public class TestDelayedRpc { TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance); rpcServer = new RpcServer(null, "testDelayedRpc", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), - isa, 1, 0, conf, 0); + isa, + conf, + new SimpleRpcScheduler(conf, 1, 0, 0, null, 0)); rpcServer.start(); RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { @@ -163,7 +165,9 @@ public class TestDelayedRpc { TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance); rpcServer = new RpcServer(null, "testTooManyDelayedRpcs", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), - isa, 1, 0, conf, 0); + isa, + conf, + new SimpleRpcScheduler(conf, 1, 0, 0, null, 0)); rpcServer.start(); RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { @@ -283,7 +287,9 @@ public class TestDelayedRpc { TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance); rpcServer = new RpcServer(null, "testEndDelayThrowing", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), - isa, 1, 0, conf, 0); + isa, + conf, + new SimpleRpcScheduler(conf, 1, 0, 0, null, 0)); rpcServer.start(); RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { @@ -344,4 +350,4 @@ public class TestDelayedRpc { return TestResponse.newBuilder().setResponse(DELAYED).build(); } } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index da33ba58f17..6c229c4b5e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -81,6 +81,7 @@ public class TestIPC { public static final Log LOG = LogFactory.getLog(TestIPC.class); static byte [] CELL_BYTES = Bytes.toBytes("xyz"); static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); + private final static Configuration CONF = HBaseConfiguration.create(); // We are using the test TestRpcServiceProtos generated classes and Service because they are // available and basic with methods like 'echo', and ping. Below we make a blocking service // by passing in implementation of blocking interface. We use this service in all tests that @@ -132,11 +133,11 @@ public class TestIPC { * HBaseRpcServer directly. */ private static class TestRpcServer extends RpcServer { + TestRpcServer() throws IOException { super(null, "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("0.0.0.0", 0), 1, 1, - HBaseConfiguration.create(), 0); + new InetSocketAddress("0.0.0.0", 0), CONF, new SimpleRpcScheduler(CONF, 1, 1, 0, null, 0)); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index 02bbd9715da..f9679ece6ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -98,7 +98,8 @@ public class TestProtoBufRpc { // Get RPC server for server side implementation this.server = new RpcServer(null, "testrpc", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), - new InetSocketAddress(ADDRESS, PORT), 10, 10, conf, 0); + new InetSocketAddress(ADDRESS, PORT), conf, + new SimpleRpcScheduler(conf, 10, 10, 0, null, 0)); this.isa = server.getListenerAddress(); this.server.start(); } @@ -137,4 +138,4 @@ public class TestProtoBufRpc { rpcClient.stop(); } } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java index 827fcf8002f..4f816f72046 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ipc.RequestContext; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -128,8 +129,10 @@ public class TestTokenAuthentication { AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this); sai.add(new BlockingServiceAndInterface(service, AuthenticationProtos.AuthenticationService.BlockingInterface.class)); + SimpleRpcScheduler scheduler = new SimpleRpcScheduler( + conf, 3, 1, 0, null, HConstants.QOS_THRESHOLD); this.rpcServer = - new RpcServer(this, "tokenServer", sai, initialIsa, 3, 1, conf, HConstants.QOS_THRESHOLD); + new RpcServer(this, "tokenServer", sai, initialIsa, conf, scheduler); this.isa = this.rpcServer.getListenerAddress(); this.sleeper = new Sleeper(1000, this); }