From aaf04f7ea07d5cbfc83676e9f7d34ae457c776c9 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 13 Sep 2013 21:18:42 +0000 Subject: [PATCH] HBASE-9101 Addendum to pluggable RpcScheduler git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1523108 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/hbase/HConstants.java | 4 +- .../hadoop/hbase/ipc/FifoRpcScheduler.java | 86 +++++++++++ .../hadoop/hbase/ipc/PriorityFunction.java | 33 +++++ .../apache/hadoop/hbase/ipc/RpcScheduler.java | 2 +- .../apache/hadoop/hbase/ipc/RpcServer.java | 1 + .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 33 ++--- .../apache/hadoop/hbase/master/HMaster.java | 11 +- ...=> AnnotationReadingPriorityFunction.java} | 18 ++- .../hbase/regionserver/HRegionServer.java | 51 ++++--- .../regionserver/RegionServerServices.java | 12 +- .../regionserver/RpcSchedulerFactory.java | 38 +++++ .../SimpleRpcSchedulerFactory.java | 42 ++++++ .../hbase/MockRegionServerServices.java | 7 + .../hadoop/hbase/ipc/TestDelayedRpc.java | 6 +- .../org/apache/hadoop/hbase/ipc/TestIPC.java | 41 +++++- .../hadoop/hbase/ipc/TestProtoBufRpc.java | 2 +- .../hbase/ipc/TestSimpleRpcScheduler.java | 133 ++++++++++++++++++ .../hadoop/hbase/master/MockRegionServer.java | 7 + .../hbase/regionserver/TestPriorityRpc.java | 43 +++--- .../hbase/regionserver/TestQosFunction.java | 8 +- .../token/TestTokenAuthentication.java | 5 +- 21 files changed, 479 insertions(+), 104 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/{QosFunction.java => AnnotationReadingPriorityFunction.java} (93%) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 7be75009294..23269059fa1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.util.Bytes; @InterfaceAudience.Public @InterfaceStability.Stable public final class HConstants { + // NOTICE!!!! Please do not add a constants here, unless they are referenced by a lot of classes. + //Bytes.UTF8_ENCODING should be updated if this changed /** When we encode strings, we always specify UTF8 encoding */ public static final String UTF8_ENCODING = "UTF-8"; @@ -893,8 +895,6 @@ public final class HConstants { public static final String STATUS_MULTICAST_PORT = "hbase.status.multicast.port"; public static final int DEFAULT_STATUS_MULTICAST_PORT = 60100; - - private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java new file mode 100644 index 00000000000..629bb012d2b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; + +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * A very simple {@code }RpcScheduler} that serves incoming requests in order. + * + * This can be used for HMaster, where no prioritization is needed. + */ +public class FifoRpcScheduler implements RpcScheduler { + + private final int handlerCount; + private final int maxQueueLength; + private ThreadPoolExecutor executor; + + public FifoRpcScheduler(Configuration conf, int handlerCount) { + this.handlerCount = handlerCount; + this.maxQueueLength = conf.getInt("ipc.server.max.callqueue.length", + handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + } + + @Override + public void init(Context context) { + // no-op + } + + @Override + public void start() { + this.executor = new ThreadPoolExecutor( + handlerCount, + handlerCount, + 60, + TimeUnit.SECONDS, + new ArrayBlockingQueue(maxQueueLength), + new DaemonThreadFactory("FifoRpcScheduler.handler"), + new ThreadPoolExecutor.CallerRunsPolicy()); + } + + @Override + public void stop() { + this.executor.shutdown(); + } + + @Override + public void dispatch(RpcServer.CallRunner task) throws IOException, InterruptedException { + executor.submit(task); + } + + @Override + public int getGeneralQueueLength() { + return executor.getQueue().size(); + } + + @Override + public int getPriorityQueueLength() { + return 0; + } + + @Override + public int getReplicationQueueLength() { + return 0; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java new file mode 100644 index 00000000000..0df041eab44 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.Message; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; + +/** + * Function to figure priority of incoming request. + */ +public interface PriorityFunction { + /** + * @param header + * @param param + * @return Priority of this request. + */ + int getPriority(RequestHeader header, Message param); +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index 1994ad5e810..84590816d17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -28,7 +28,7 @@ import java.net.InetSocketAddress; */ @InterfaceAudience.Private @InterfaceStability.Evolving -interface RpcScheduler { +public interface RpcScheduler { /** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */ interface Context { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index f1e45263fc4..980ffd4caa1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -2165,6 +2165,7 @@ public class RpcServer implements RpcServerInterface { listener.interrupt(); listener.doStop(); responder.interrupt(); + scheduler.stop(); notifyAll(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index c3021cc1b8c..23b98c71205 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -17,20 +17,17 @@ */ package org.apache.hadoop.hbase.ipc; -import com.google.common.base.Function; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.protobuf.Message; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; -import org.apache.hadoop.hbase.util.Pair; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; /** * A scheduler that maintains isolated handler pools for general, high-priority and replication @@ -44,12 +41,12 @@ public class SimpleRpcScheduler implements RpcScheduler { private final int handlerCount; private final int priorityHandlerCount; private final int replicationHandlerCount; + private final PriorityFunction priority; final BlockingQueue callQueue; final BlockingQueue priorityCallQueue; final BlockingQueue replicationQueue; private volatile boolean running = false; private final List handlers = Lists.newArrayList(); - private final Function, Integer> qosFunction; /** What level a high priority call is at. */ private final int highPriorityLevel; @@ -59,22 +56,22 @@ public class SimpleRpcScheduler implements RpcScheduler { * @param handlerCount the number of handler threads that will be used to process calls * @param priorityHandlerCount How many threads for priority handling. * @param replicationHandlerCount How many threads for replication handling. - * @param qosFunction a function that maps requests to priorities * @param highPriorityLevel + * @param priority Function to extract request priority. */ public SimpleRpcScheduler( Configuration conf, int handlerCount, int priorityHandlerCount, int replicationHandlerCount, - Function, Integer> qosFunction, + PriorityFunction priority, int highPriorityLevel) { int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length", handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); this.handlerCount = handlerCount; this.priorityHandlerCount = priorityHandlerCount; this.replicationHandlerCount = replicationHandlerCount; - this.qosFunction = qosFunction; + this.priority = priority; this.highPriorityLevel = highPriorityLevel; this.callQueue = new LinkedBlockingQueue(maxQueueLength); this.priorityCallQueue = priorityHandlerCount > 0 @@ -131,9 +128,7 @@ public class SimpleRpcScheduler implements RpcScheduler { @Override public void dispatch(RpcServer.CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); - Pair headerAndParam = - new Pair(call.header, call.param); - int level = getQosLevel(headerAndParam); + int level = priority.getPriority(call.header, call.param); if (priorityCallQueue != null && level > highPriorityLevel) { priorityCallQueue.put(callTask); } else if (replicationQueue != null && level == HConstants.REPLICATION_QOS) { @@ -168,11 +163,5 @@ public class SimpleRpcScheduler implements RpcScheduler { } } } - - private int getQosLevel(Pair headerAndParam) { - if (qosFunction == null) return 0; - Integer res = qosFunction.apply(headerAndParam); - return res == null? 0: res; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 33ea3e9ced3..64fe281607c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -84,6 +84,8 @@ import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorType; +import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; +import org.apache.hadoop.hbase.ipc.RpcScheduler; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RequestContext; @@ -422,17 +424,10 @@ MasterServices, Server { HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG); int numHandlers = conf.getInt(HConstants.MASTER_HANDLER_COUNT, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_MASTER_HANLDER_COUNT)); - SimpleRpcScheduler scheduler = new SimpleRpcScheduler( - conf, - numHandlers, - 0, // we don't use high priority handlers in master - 0, // we don't use replication handlers in master - null, // this is a DNC w/o high priority handlers - 0); this.rpcServer = new RpcServer(this, name, getServices(), initialIsa, // BindAddress is IP we got for this server. conf, - scheduler); + new FifoRpcScheduler(conf, numHandlers)); // Set our address. this.isa = this.rpcServer.getListenerAddress(); // We don't want to pass isa's hostname here since it could be 0.0.0.0 diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java similarity index 93% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index 28eb8d2eeb8..aa7018b4f05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; @@ -38,17 +39,15 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.regionserver.HRegionServer.QosPriority; -import org.apache.hadoop.hbase.util.Pair; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.protobuf.Message; import com.google.protobuf.TextFormat; /** - * A guava function that will return a priority for use by QoS facility in regionserver; e.g. - * rpcs to hbase:meta and -ROOT-, etc., get priority. + * Reads special method annotations and table names to figure a priority for use by QoS facility in + * ipc; e.g: rpcs to hbase:meta get priority. */ // TODO: Remove. This is doing way too much work just to figure a priority. Do as Elliott // suggests and just have the client specify a priority. @@ -67,8 +66,9 @@ import com.google.protobuf.TextFormat; //All the argument classes declare a 'getRegion' method that returns a //RegionSpecifier object. Methods can be invoked on the returned object //to figure out whether it is a meta region or not. -class QosFunction implements Function, Integer> { - public static final Log LOG = LogFactory.getLog(QosFunction.class.getName()); +class AnnotationReadingPriorityFunction implements PriorityFunction { + public static final Log LOG = + LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName()); private final Map annotatedQos; //We need to mock the regionserver instance for some unit tests (set via //setRegionServer method. @@ -93,7 +93,7 @@ class QosFunction implements Function, Integer> { private final Map, Method>> methodMap = new HashMap, Method>>(); - QosFunction(final HRegionServer hrs) { + AnnotationReadingPriorityFunction(final HRegionServer hrs) { this.hRegionServer = hrs; Map qosMap = new HashMap(); for (Method m : HRegionServer.class.getMethods()) { @@ -142,15 +142,13 @@ class QosFunction implements Function, Integer> { } @Override - public Integer apply(Pair headerAndParam) { - RequestHeader header = headerAndParam.getFirst(); + public int getPriority(RequestHeader header, Message param) { String methodName = header.getMethodName(); Integer priorityByAnnotation = annotatedQos.get(methodName); if (priorityByAnnotation != null) { return priorityByAnnotation; } - Message param = headerAndParam.getSecond(); if (param == null) { return HConstants.NORMAL_QOS; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 4d24821a78f..f9c195b7524 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import javax.management.ObjectName; + import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.annotation.Retention; @@ -47,8 +49,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.locks.ReentrantReadWriteLock; -import javax.management.ObjectName; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcServer; @@ -111,7 +112,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -174,6 +174,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; @@ -256,6 +257,10 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa protected final ConcurrentMap regionsInTransitionInRS = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + /** RPC scheduler to use for the region server. */ + public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = + "hbase.region.server.rpc.scheduler.factory.class"; + protected long maxScannerResultSize; // Cache flushing @@ -466,9 +471,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa private final int scannerLeaseTimeoutPeriod; /** - * The reference to the QosFunction + * The reference to the priority extraction function */ - private final QosFunction qosFunction; + private final PriorityFunction priority; private RegionServerCoprocessorHost rsHost; @@ -552,22 +557,23 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa String name = "regionserver/" + initialIsa.toString(); // Set how many times to retry talking to another server over HConnection. HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG); - this.qosFunction = new QosFunction(this); - int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, - HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); - SimpleRpcScheduler scheduler = new SimpleRpcScheduler( - conf, - handlerCount, - conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT, - HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT), - conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, - HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT), - qosFunction, - HConstants.QOS_THRESHOLD); + this.priority = new AnnotationReadingPriorityFunction(this); + RpcSchedulerFactory rpcSchedulerFactory; + try { + Class rpcSchedulerFactoryClass = conf.getClass( + REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + SimpleRpcSchedulerFactory.class); + rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance()); + } catch (InstantiationException e) { + throw new IllegalArgumentException(e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException(e); + } this.rpcServer = new RpcServer(this, name, getServices(), /*HBaseRPCErrorHandler.class, OnlineRegions.class},*/ initialIsa, // BindAddress is IP we got for this server. - conf, scheduler); + conf, + rpcSchedulerFactory.create(conf, this)); // Set our address. this.isa = this.rpcServer.getListenerAddress(); @@ -631,13 +637,18 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa return this.clusterId; } + @Override + public int getPriority(RequestHeader header, Message param) { + return priority.getPriority(header, param); + } + @Retention(RetentionPolicy.RUNTIME) protected @interface QosPriority { int priority() default 0; } - QosFunction getQosFunction() { - return qosFunction; + PriorityFunction getPriority() { + return priority; } RegionScanner getScanner(long scannerId) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index a9fa5cc231d..f95c1acaa5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -18,25 +18,27 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.zookeeper.KeeperException; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + /** * Services provided by {@link HRegionServer} */ @InterfaceAudience.Private -public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegion { +public interface RegionServerServices + extends OnlineRegions, FavoredNodesForRegion, PriorityFunction { /** * @return True if this regionserver is stopping. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java new file mode 100644 index 00000000000..58cbe12ec04 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ipc.RpcScheduler; + +/** + * A factory class that constructs an {@link org.apache.hadoop.hbase.ipc.RpcScheduler} for + * a region server. + */ +public interface RpcSchedulerFactory { + + /** + * Constructs a {@link org.apache.hadoop.hbase.ipc.RpcScheduler}. + * + * Please note that this method is called in constructor of {@link HRegionServer}, so some + * fields may not be ready for access. The reason that {@code HRegionServer} is passed as + * parameter here is that an RPC scheduler may need to access data structure inside + * {@code HRegionServer} (see example in {@link SimpleRpcSchedulerFactory}). + */ + RpcScheduler create(Configuration conf, RegionServerServices server); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java new file mode 100644 index 00000000000..03a75baecff --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; + +/** Constructs a {@link SimpleRpcScheduler}. for the region server. */ +class SimpleRpcSchedulerFactory implements RpcSchedulerFactory { + + @Override + public RpcScheduler create(Configuration conf, RegionServerServices server) { + int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + return new SimpleRpcScheduler( + conf, + handlerCount, + conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT), + conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT), + server, + HConstants.QOS_THRESHOLD); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index b2c434cf352..866a56bdfaa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; +import com.google.protobuf.Message; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.catalog.CatalogTracker; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.regionserver.CompactionRequestor; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -214,4 +216,9 @@ class MockRegionServerServices implements RegionServerServices { // TODO Auto-generated method stub return null; } + + @Override + public int getPriority(RPCProtos.RequestHeader header, Message param) { + return 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java index f7f2198c9c2..f68552a004b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java @@ -86,7 +86,7 @@ public class TestDelayedRpc { Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa, conf, - new SimpleRpcScheduler(conf, 1, 0, 0, null, 0)); + new FifoRpcScheduler(conf, 1)); rpcServer.start(); RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { @@ -167,7 +167,7 @@ public class TestDelayedRpc { Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa, conf, - new SimpleRpcScheduler(conf, 1, 0, 0, null, 0)); + new FifoRpcScheduler(conf, 1)); rpcServer.start(); RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { @@ -289,7 +289,7 @@ public class TestDelayedRpc { Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa, conf, - new SimpleRpcScheduler(conf, 1, 0, 0, null, 0)); + new FifoRpcScheduler(conf, 1)); rpcServer.start(); RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 431a7132175..0120f73d910 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -21,18 +21,26 @@ package org.apache.hadoop.hbase.ipc; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.SocketFactory; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -64,6 +72,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -139,9 +148,13 @@ public class TestIPC { private static class TestRpcServer extends RpcServer { TestRpcServer() throws IOException { + this(new FifoRpcScheduler(CONF, 1)); + } + + TestRpcServer(RpcScheduler scheduler) throws IOException { super(null, "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("0.0.0.0", 0), CONF, new SimpleRpcScheduler(CONF, 1, 1, 0, null, 0)); + new InetSocketAddress("0.0.0.0", 0), CONF, scheduler); } @Override @@ -257,6 +270,29 @@ public class TestIPC { } } + /** Tests that the rpc scheduler is called when requests arrive. */ + @Test + public void testRpcScheduler() throws IOException, InterruptedException { + RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); + RpcServer rpcServer = new TestRpcServer(scheduler); + verify(scheduler).init((RpcScheduler.Context) anyObject()); + RpcClient client = new RpcClient(CONF, HConstants.CLUSTER_ID_DEFAULT); + try { + rpcServer.start(); + verify(scheduler).start(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + for (int i = 0; i < 10; i++) { + client.call(md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), + md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0); + } + verify(scheduler, times(10)).dispatch((RpcServer.CallRunner) anyObject()); + } finally { + rpcServer.stop(); + verify(scheduler).stop(); + } + } + public static void main(String[] args) throws IOException, SecurityException, NoSuchMethodException, InterruptedException { if (args.length != 2) { @@ -285,7 +321,8 @@ public class TestIPC { for (int i = 0; i < cycles; i++) { List cells = new ArrayList(); // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); - Message param = RequestConverter.buildNoDataMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm, cells); + Message param = RequestConverter.buildNoDataMultiRequest( + HConstants.EMPTY_BYTE_ARRAY, rm, cells); CellScanner cellScanner = CellUtil.createCellScanner(cells); if (i % 1000 == 0) { LOG.info("" + i); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index f9679ece6ff..65d6c840ad1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -99,7 +99,7 @@ public class TestProtoBufRpc { this.server = new RpcServer(null, "testrpc", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), new InetSocketAddress(ADDRESS, PORT), conf, - new SimpleRpcScheduler(conf, 10, 10, 0, null, 0)); + new FifoRpcScheduler(conf, 10)); this.isa = server.getListenerAddress(); this.server.start(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java new file mode 100644 index 00000000000..862de1d09d5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.protobuf.Message; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.ipc.RpcServer.Call; +import org.apache.hadoop.hbase.ipc.RpcServer.CallRunner; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@Category(SmallTests.class) +public class TestSimpleRpcScheduler { + + private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { + @Override + public InetSocketAddress getListenerAddress() { + return InetSocketAddress.createUnresolved("127.0.0.1", 1000); + } + }; + private Configuration conf; + + @Before + public void setUp() { + conf = HBaseConfiguration.create(); + } + + @Test + public void testBasic() throws IOException, InterruptedException { + PriorityFunction qosFunction = mock(PriorityFunction.class); + RpcScheduler scheduler = new SimpleRpcScheduler( + conf, 10, 0, 0, qosFunction, 0); + scheduler.init(CONTEXT); + scheduler.start(); + CallRunner task = createMockTask(); + scheduler.dispatch(task); + verify(task, timeout(1000)).run(); + scheduler.stop(); + } + + @Test + public void testHandlerIsolation() throws IOException, InterruptedException { + CallRunner generalTask = createMockTask(); + CallRunner priorityTask = createMockTask(); + CallRunner replicationTask = createMockTask(); + List tasks = ImmutableList.of( + generalTask, + priorityTask, + replicationTask); + Map qos = ImmutableMap.of( + generalTask, 0, + priorityTask, HConstants.HIGH_QOS + 1, + replicationTask, HConstants.REPLICATION_QOS); + PriorityFunction qosFunction = mock(PriorityFunction.class); + final Map handlerThreads = Maps.newHashMap(); + Answer answerToRun = new Answer() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + handlerThreads.put( + (CallRunner) invocationOnMock.getMock(), + Thread.currentThread()); + return null; + } + }; + for (CallRunner task : tasks) { + doAnswer(answerToRun).when(task).run(); + } + + RpcScheduler scheduler = new SimpleRpcScheduler( + conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS); + scheduler.init(CONTEXT); + scheduler.start(); + for (CallRunner task : tasks) { + when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(), (Message) anyObject())) + .thenReturn(qos.get(task)); + scheduler.dispatch(task); + } + for (CallRunner task : tasks) { + verify(task, timeout(1000)).run(); + } + scheduler.stop(); + + // Tests that these requests are handled by three distinct threads. + assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size()); + } + + private CallRunner createMockTask() { + Call call = mock(Call.class); + CallRunner task = mock(CallRunner.class); + when(task.getCall()).thenReturn(call); + return task; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 8a2eabc1b0f..5f7e541fd49 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -27,6 +27,7 @@ import java.util.Random; import java.util.TreeMap; import java.util.concurrent.ConcurrentSkipListMap; +import com.google.protobuf.Message; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.CellScannable; @@ -86,6 +87,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.regionserver.CompactionRequestor; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -557,6 +559,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { return null; } + @Override + public int getPriority(RPCProtos.RequestHeader header, Message param) { + return 0; + } + @Override public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, UpdateFavoredNodesRequest request) throws ServiceException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java index 231a5e45b7e..0bb9cb27a37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java @@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.regionserver; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -28,42 +29,41 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Get; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.util.Pair; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; import com.google.protobuf.ByteString; -import com.google.protobuf.Message; /** * Tests that verify certain RPCs get a higher QoS. */ @Category(MediumTests.class) public class TestPriorityRpc { private HRegionServer regionServer = null; - private QosFunction qosFunction = null; + private PriorityFunction priority = null; @Before public void setup() { Configuration conf = HBaseConfiguration.create(); regionServer = HRegionServer.constructRegionServer(HRegionServer.class, conf); - qosFunction = regionServer.getQosFunction(); + priority = regionServer.getPriority(); } @Test public void testQosFunctionForMeta() throws IOException { - qosFunction = regionServer.getQosFunction(); + priority = regionServer.getPriority(); RequestHeader.Builder headerBuilder = RequestHeader.newBuilder(); //create a rpc request that has references to hbase:meta region and also //uses one of the known argument classes (known argument classes are - //listed in HRegionServer.QosFunction.knownArgumentClasses) + //listed in HRegionServer.QosFunctionImpl.knownArgumentClasses) headerBuilder.setMethodName("foo"); GetRequest.Builder getRequestBuilder = GetRequest.newBuilder(); @@ -84,9 +84,9 @@ public class TestPriorityRpc { Mockito.when(mockRS.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion); Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); Mockito.when(mockRegionInfo.isMetaTable()).thenReturn(true); - qosFunction.setRegionServer(mockRS); - assertTrue (qosFunction.apply(new Pair(header, getRequest)) == - HConstants.HIGH_QOS); + // Presume type. + ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); + assertEquals(HConstants.HIGH_QOS, priority.getPriority(header, getRequest)); } @Test @@ -94,13 +94,12 @@ public class TestPriorityRpc { //The request is not using any of the //known argument classes (it uses one random request class) //(known argument classes are listed in - //HRegionServer.QosFunction.knownArgumentClasses) + //HRegionServer.QosFunctionImpl.knownArgumentClasses) RequestHeader.Builder headerBuilder = RequestHeader.newBuilder(); headerBuilder.setMethodName("foo"); RequestHeader header = headerBuilder.build(); - QosFunction qosFunc = regionServer.getQosFunction(); - assertTrue (qosFunc.apply(new Pair(header, null)) == - HConstants.NORMAL_QOS); + PriorityFunction qosFunc = regionServer.getPriority(); + assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null)); } @Test @@ -118,8 +117,9 @@ public class TestPriorityRpc { Mockito.when(mockRS.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion); Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(false); - qosFunction.setRegionServer(mockRS); - int qos = qosFunction.apply(new Pair(header, scanRequest)); + // Presume type. + ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); + int qos = priority.getPriority(header, scanRequest); assertTrue ("" + qos, qos == HConstants.NORMAL_QOS); //build a scan request with scannerID @@ -134,14 +134,13 @@ public class TestPriorityRpc { Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(true); - qosFunction.setRegionServer(mockRS); + // Presume type. + ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - assertTrue (qosFunction.apply(new Pair(header, scanRequest)) == - HConstants.HIGH_QOS); + assertEquals(HConstants.HIGH_QOS, priority.getPriority(header, scanRequest)); //the same as above but with non-meta region Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(false); - assertTrue (qosFunction.apply(new Pair(header, scanRequest)) == - HConstants.NORMAL_QOS); + assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest)); } -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java index c4983ea775e..bc8ede7c730 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java @@ -37,7 +37,7 @@ public class TestQosFunction { @Test public void testPriority() { HRegionServer hrs = Mockito.mock(HRegionServer.class); - QosFunction qosFunction = new QosFunction(hrs); + AnnotationReadingPriorityFunction qosFunction = new AnnotationReadingPriorityFunction(hrs); // Set method name in pb style with the method name capitalized. checkMethod("ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction); @@ -45,11 +45,9 @@ public class TestQosFunction { checkMethod("OpenRegion", HConstants.HIGH_QOS, qosFunction); } - private void checkMethod(final String methodName, final int expected, final QosFunction qosf) { + private void checkMethod(final String methodName, final int expected, final AnnotationReadingPriorityFunction qosf) { RequestHeader.Builder builder = RequestHeader.newBuilder(); builder.setMethodName(methodName); - Pair headerAndParam = - new Pair(builder.build(), null); - assertEquals(methodName, expected, qosf.apply(headerAndParam).intValue()); + assertEquals(methodName, expected, qosf.getPriority(builder.build(), null)); } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java index a69a91650ce..79bf3664940 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; import org.apache.hadoop.hbase.ipc.RequestContext; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcServer; @@ -130,10 +131,8 @@ public class TestTokenAuthentication { AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this); sai.add(new BlockingServiceAndInterface(service, AuthenticationProtos.AuthenticationService.BlockingInterface.class)); - SimpleRpcScheduler scheduler = new SimpleRpcScheduler( - conf, 3, 1, 0, null, HConstants.QOS_THRESHOLD); this.rpcServer = - new RpcServer(this, "tokenServer", sai, initialIsa, conf, scheduler); + new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1)); this.isa = this.rpcServer.getListenerAddress(); this.sleeper = new Sleeper(1000, this); }