HBASE-9101 Addendum to pluggable RpcScheduler

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1523108 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-09-13 21:18:42 +00:00
parent 3583d29fc3
commit aaf04f7ea0
21 changed files with 479 additions and 104 deletions

View File

@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.util.Bytes;
@InterfaceAudience.Public
@InterfaceStability.Stable
public final class HConstants {
// NOTICE!!!! Please do not add a constants here, unless they are referenced by a lot of classes.
//Bytes.UTF8_ENCODING should be updated if this changed
/** When we encode strings, we always specify UTF8 encoding */
public static final String UTF8_ENCODING = "UTF-8";
@ -893,8 +895,6 @@ public final class HConstants {
public static final String STATUS_MULTICAST_PORT = "hbase.status.multicast.port";
public static final int DEFAULT_STATUS_MULTICAST_PORT = 60100;
private HConstants() {
// Can't be instantiated with this ctor.
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* A very simple {@code }RpcScheduler} that serves incoming requests in order.
*
* This can be used for HMaster, where no prioritization is needed.
*/
public class FifoRpcScheduler implements RpcScheduler {
private final int handlerCount;
private final int maxQueueLength;
private ThreadPoolExecutor executor;
public FifoRpcScheduler(Configuration conf, int handlerCount) {
this.handlerCount = handlerCount;
this.maxQueueLength = conf.getInt("ipc.server.max.callqueue.length",
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
}
@Override
public void init(Context context) {
// no-op
}
@Override
public void start() {
this.executor = new ThreadPoolExecutor(
handlerCount,
handlerCount,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(maxQueueLength),
new DaemonThreadFactory("FifoRpcScheduler.handler"),
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void stop() {
this.executor.shutdown();
}
@Override
public void dispatch(RpcServer.CallRunner task) throws IOException, InterruptedException {
executor.submit(task);
}
@Override
public int getGeneralQueueLength() {
return executor.getQueue().size();
}
@Override
public int getPriorityQueueLength() {
return 0;
}
@Override
public int getReplicationQueueLength() {
return 0;
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Message;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
/**
* Function to figure priority of incoming request.
*/
public interface PriorityFunction {
/**
* @param header
* @param param
* @return Priority of this request.
*/
int getPriority(RequestHeader header, Message param);
}

View File

@ -28,7 +28,7 @@ import java.net.InetSocketAddress;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
interface RpcScheduler {
public interface RpcScheduler {
/** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */
interface Context {

View File

@ -2165,6 +2165,7 @@ public class RpcServer implements RpcServerInterface {
listener.interrupt();
listener.doStop();
responder.interrupt();
scheduler.stop();
notifyAll();
}

View File

@ -17,20 +17,17 @@
*/
package org.apache.hadoop.hbase.ipc;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.protobuf.Message;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.util.Pair;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
/**
* A scheduler that maintains isolated handler pools for general, high-priority and replication
@ -44,12 +41,12 @@ public class SimpleRpcScheduler implements RpcScheduler {
private final int handlerCount;
private final int priorityHandlerCount;
private final int replicationHandlerCount;
private final PriorityFunction priority;
final BlockingQueue<RpcServer.CallRunner> callQueue;
final BlockingQueue<RpcServer.CallRunner> priorityCallQueue;
final BlockingQueue<RpcServer.CallRunner> replicationQueue;
private volatile boolean running = false;
private final List<Thread> handlers = Lists.newArrayList();
private final Function<Pair<RPCProtos.RequestHeader, Message>, Integer> qosFunction;
/** What level a high priority call is at. */
private final int highPriorityLevel;
@ -59,22 +56,22 @@ public class SimpleRpcScheduler implements RpcScheduler {
* @param handlerCount the number of handler threads that will be used to process calls
* @param priorityHandlerCount How many threads for priority handling.
* @param replicationHandlerCount How many threads for replication handling.
* @param qosFunction a function that maps requests to priorities
* @param highPriorityLevel
* @param priority Function to extract request priority.
*/
public SimpleRpcScheduler(
Configuration conf,
int handlerCount,
int priorityHandlerCount,
int replicationHandlerCount,
Function<Pair<RPCProtos.RequestHeader, Message>, Integer> qosFunction,
PriorityFunction priority,
int highPriorityLevel) {
int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length",
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
this.handlerCount = handlerCount;
this.priorityHandlerCount = priorityHandlerCount;
this.replicationHandlerCount = replicationHandlerCount;
this.qosFunction = qosFunction;
this.priority = priority;
this.highPriorityLevel = highPriorityLevel;
this.callQueue = new LinkedBlockingQueue<RpcServer.CallRunner>(maxQueueLength);
this.priorityCallQueue = priorityHandlerCount > 0
@ -131,9 +128,7 @@ public class SimpleRpcScheduler implements RpcScheduler {
@Override
public void dispatch(RpcServer.CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall();
Pair<RPCProtos.RequestHeader, Message> headerAndParam =
new Pair<RPCProtos.RequestHeader, Message>(call.header, call.param);
int level = getQosLevel(headerAndParam);
int level = priority.getPriority(call.header, call.param);
if (priorityCallQueue != null && level > highPriorityLevel) {
priorityCallQueue.put(callTask);
} else if (replicationQueue != null && level == HConstants.REPLICATION_QOS) {
@ -168,11 +163,5 @@ public class SimpleRpcScheduler implements RpcScheduler {
}
}
}
private int getQosLevel(Pair<RPCProtos.RequestHeader, Message> headerAndParam) {
if (qosFunction == null) return 0;
Integer res = qosFunction.apply(headerAndParam);
return res == null? 0: res;
}
}

View File

@ -84,6 +84,8 @@ import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RequestContext;
@ -422,17 +424,10 @@ MasterServices, Server {
HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
int numHandlers = conf.getInt(HConstants.MASTER_HANDLER_COUNT,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_MASTER_HANLDER_COUNT));
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(
conf,
numHandlers,
0, // we don't use high priority handlers in master
0, // we don't use replication handlers in master
null, // this is a DNC w/o high priority handlers
0);
this.rpcServer = new RpcServer(this, name, getServices(),
initialIsa, // BindAddress is IP we got for this server.
conf,
scheduler);
new FifoRpcScheduler(conf, numHandlers));
// Set our address.
this.isa = this.rpcServer.getListenerAddress();
// We don't want to pass isa's hostname here since it could be 0.0.0.0

View File

@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
@ -38,17 +39,15 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.regionserver.HRegionServer.QosPriority;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.protobuf.Message;
import com.google.protobuf.TextFormat;
/**
* A guava function that will return a priority for use by QoS facility in regionserver; e.g.
* rpcs to hbase:meta and -ROOT-, etc., get priority.
* Reads special method annotations and table names to figure a priority for use by QoS facility in
* ipc; e.g: rpcs to hbase:meta get priority.
*/
// TODO: Remove. This is doing way too much work just to figure a priority. Do as Elliott
// suggests and just have the client specify a priority.
@ -67,8 +66,9 @@ import com.google.protobuf.TextFormat;
//All the argument classes declare a 'getRegion' method that returns a
//RegionSpecifier object. Methods can be invoked on the returned object
//to figure out whether it is a meta region or not.
class QosFunction implements Function<Pair<RequestHeader, Message>, Integer> {
public static final Log LOG = LogFactory.getLog(QosFunction.class.getName());
class AnnotationReadingPriorityFunction implements PriorityFunction {
public static final Log LOG =
LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName());
private final Map<String, Integer> annotatedQos;
//We need to mock the regionserver instance for some unit tests (set via
//setRegionServer method.
@ -93,7 +93,7 @@ class QosFunction implements Function<Pair<RequestHeader, Message>, Integer> {
private final Map<String, Map<Class<? extends Message>, Method>> methodMap =
new HashMap<String, Map<Class<? extends Message>, Method>>();
QosFunction(final HRegionServer hrs) {
AnnotationReadingPriorityFunction(final HRegionServer hrs) {
this.hRegionServer = hrs;
Map<String, Integer> qosMap = new HashMap<String, Integer>();
for (Method m : HRegionServer.class.getMethods()) {
@ -142,15 +142,13 @@ class QosFunction implements Function<Pair<RequestHeader, Message>, Integer> {
}
@Override
public Integer apply(Pair<RequestHeader, Message> headerAndParam) {
RequestHeader header = headerAndParam.getFirst();
public int getPriority(RequestHeader header, Message param) {
String methodName = header.getMethodName();
Integer priorityByAnnotation = annotatedQos.get(methodName);
if (priorityByAnnotation != null) {
return priorityByAnnotation;
}
Message param = headerAndParam.getSecond();
if (param == null) {
return HConstants.NORMAL_QOS;
}

View File

@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
import javax.management.ObjectName;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.annotation.Retention;
@ -47,8 +49,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer;
@ -111,7 +112,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -174,6 +174,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
@ -256,6 +257,10 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
/** RPC scheduler to use for the region server. */
public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
"hbase.region.server.rpc.scheduler.factory.class";
protected long maxScannerResultSize;
// Cache flushing
@ -466,9 +471,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
private final int scannerLeaseTimeoutPeriod;
/**
* The reference to the QosFunction
* The reference to the priority extraction function
*/
private final QosFunction qosFunction;
private final PriorityFunction priority;
private RegionServerCoprocessorHost rsHost;
@ -552,22 +557,23 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
String name = "regionserver/" + initialIsa.toString();
// Set how many times to retry talking to another server over HConnection.
HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
this.qosFunction = new QosFunction(this);
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(
conf,
handlerCount,
conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
qosFunction,
HConstants.QOS_THRESHOLD);
this.priority = new AnnotationReadingPriorityFunction(this);
RpcSchedulerFactory rpcSchedulerFactory;
try {
Class<?> rpcSchedulerFactoryClass = conf.getClass(
REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
SimpleRpcSchedulerFactory.class);
rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
} catch (InstantiationException e) {
throw new IllegalArgumentException(e);
} catch (IllegalAccessException e) {
throw new IllegalArgumentException(e);
}
this.rpcServer = new RpcServer(this, name, getServices(),
/*HBaseRPCErrorHandler.class, OnlineRegions.class},*/
initialIsa, // BindAddress is IP we got for this server.
conf, scheduler);
conf,
rpcSchedulerFactory.create(conf, this));
// Set our address.
this.isa = this.rpcServer.getListenerAddress();
@ -631,13 +637,18 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
return this.clusterId;
}
@Override
public int getPriority(RequestHeader header, Message param) {
return priority.getPriority(header, param);
}
@Retention(RetentionPolicy.RUNTIME)
protected @interface QosPriority {
int priority() default 0;
}
QosFunction getQosFunction() {
return qosFunction;
PriorityFunction getPriority() {
return priority;
}
RegionScanner getScanner(long scannerId) {

View File

@ -18,25 +18,27 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
/**
* Services provided by {@link HRegionServer}
*/
@InterfaceAudience.Private
public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegion {
public interface RegionServerServices
extends OnlineRegions, FavoredNodesForRegion, PriorityFunction {
/**
* @return True if this regionserver is stopping.
*/

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
/**
* A factory class that constructs an {@link org.apache.hadoop.hbase.ipc.RpcScheduler} for
* a region server.
*/
public interface RpcSchedulerFactory {
/**
* Constructs a {@link org.apache.hadoop.hbase.ipc.RpcScheduler}.
*
* Please note that this method is called in constructor of {@link HRegionServer}, so some
* fields may not be ready for access. The reason that {@code HRegionServer} is passed as
* parameter here is that an RPC scheduler may need to access data structure inside
* {@code HRegionServer} (see example in {@link SimpleRpcSchedulerFactory}).
*/
RpcScheduler create(Configuration conf, RegionServerServices server);
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
/** Constructs a {@link SimpleRpcScheduler}. for the region server. */
class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
@Override
public RpcScheduler create(Configuration conf, RegionServerServices server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new SimpleRpcScheduler(
conf,
handlerCount,
conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
server,
HConstants.QOS_THRESHOLD);
}
}

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import com.google.protobuf.Message;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -214,4 +216,9 @@ class MockRegionServerServices implements RegionServerServices {
// TODO Auto-generated method stub
return null;
}
@Override
public int getPriority(RPCProtos.RequestHeader header, Message param) {
return 0;
}
}

View File

@ -86,7 +86,7 @@ public class TestDelayedRpc {
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
isa,
conf,
new SimpleRpcScheduler(conf, 1, 0, 0, null, 0));
new FifoRpcScheduler(conf, 1));
rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
@ -167,7 +167,7 @@ public class TestDelayedRpc {
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
isa,
conf,
new SimpleRpcScheduler(conf, 1, 0, 0, null, 0));
new FifoRpcScheduler(conf, 1));
rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
@ -289,7 +289,7 @@ public class TestDelayedRpc {
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
isa,
conf,
new SimpleRpcScheduler(conf, 1, 0, 0, null, 0));
new FifoRpcScheduler(conf, 1));
rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {

View File

@ -21,18 +21,26 @@ package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -64,6 +72,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -139,9 +148,13 @@ public class TestIPC {
private static class TestRpcServer extends RpcServer {
TestRpcServer() throws IOException {
this(new FifoRpcScheduler(CONF, 1));
}
TestRpcServer(RpcScheduler scheduler) throws IOException {
super(null, "testRpcServer",
Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("0.0.0.0", 0), CONF, new SimpleRpcScheduler(CONF, 1, 1, 0, null, 0));
new InetSocketAddress("0.0.0.0", 0), CONF, scheduler);
}
@Override
@ -257,6 +270,29 @@ public class TestIPC {
}
}
/** Tests that the rpc scheduler is called when requests arrive. */
@Test
public void testRpcScheduler() throws IOException, InterruptedException {
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
RpcServer rpcServer = new TestRpcServer(scheduler);
verify(scheduler).init((RpcScheduler.Context) anyObject());
RpcClient client = new RpcClient(CONF, HConstants.CLUSTER_ID_DEFAULT);
try {
rpcServer.start();
verify(scheduler).start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
for (int i = 0; i < 10; i++) {
client.call(md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)),
md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0);
}
verify(scheduler, times(10)).dispatch((RpcServer.CallRunner) anyObject());
} finally {
rpcServer.stop();
verify(scheduler).stop();
}
}
public static void main(String[] args)
throws IOException, SecurityException, NoSuchMethodException, InterruptedException {
if (args.length != 2) {
@ -285,7 +321,8 @@ public class TestIPC {
for (int i = 0; i < cycles; i++) {
List<CellScannable> cells = new ArrayList<CellScannable>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
Message param = RequestConverter.buildNoDataMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm, cells);
Message param = RequestConverter.buildNoDataMultiRequest(
HConstants.EMPTY_BYTE_ARRAY, rm, cells);
CellScanner cellScanner = CellUtil.createCellScanner(cells);
if (i % 1000 == 0) {
LOG.info("" + i);

View File

@ -99,7 +99,7 @@ public class TestProtoBufRpc {
this.server = new RpcServer(null, "testrpc",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
new InetSocketAddress(ADDRESS, PORT), conf,
new SimpleRpcScheduler(conf, 10, 10, 0, null, 0));
new FifoRpcScheduler(conf, 10));
this.isa = server.getListenerAddress();
this.server.start();
}

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.protobuf.Message;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.ipc.RpcServer.CallRunner;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@Category(SmallTests.class)
public class TestSimpleRpcScheduler {
private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
@Override
public InetSocketAddress getListenerAddress() {
return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
}
};
private Configuration conf;
@Before
public void setUp() {
conf = HBaseConfiguration.create();
}
@Test
public void testBasic() throws IOException, InterruptedException {
PriorityFunction qosFunction = mock(PriorityFunction.class);
RpcScheduler scheduler = new SimpleRpcScheduler(
conf, 10, 0, 0, qosFunction, 0);
scheduler.init(CONTEXT);
scheduler.start();
CallRunner task = createMockTask();
scheduler.dispatch(task);
verify(task, timeout(1000)).run();
scheduler.stop();
}
@Test
public void testHandlerIsolation() throws IOException, InterruptedException {
CallRunner generalTask = createMockTask();
CallRunner priorityTask = createMockTask();
CallRunner replicationTask = createMockTask();
List<CallRunner> tasks = ImmutableList.of(
generalTask,
priorityTask,
replicationTask);
Map<CallRunner, Integer> qos = ImmutableMap.of(
generalTask, 0,
priorityTask, HConstants.HIGH_QOS + 1,
replicationTask, HConstants.REPLICATION_QOS);
PriorityFunction qosFunction = mock(PriorityFunction.class);
final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
Answer<Void> answerToRun = new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
handlerThreads.put(
(CallRunner) invocationOnMock.getMock(),
Thread.currentThread());
return null;
}
};
for (CallRunner task : tasks) {
doAnswer(answerToRun).when(task).run();
}
RpcScheduler scheduler = new SimpleRpcScheduler(
conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS);
scheduler.init(CONTEXT);
scheduler.start();
for (CallRunner task : tasks) {
when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(), (Message) anyObject()))
.thenReturn(qos.get(task));
scheduler.dispatch(task);
}
for (CallRunner task : tasks) {
verify(task, timeout(1000)).run();
}
scheduler.stop();
// Tests that these requests are handled by three distinct threads.
assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
}
private CallRunner createMockTask() {
Call call = mock(Call.class);
CallRunner task = mock(CallRunner.class);
when(task.getCall()).thenReturn(call);
return task;
}
}

View File

@ -27,6 +27,7 @@ import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import com.google.protobuf.Message;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.CellScannable;
@ -86,6 +87,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -557,6 +559,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return null;
}
@Override
public int getPriority(RPCProtos.RequestHeader header, Message param) {
return 0;
}
@Override
public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
UpdateFavoredNodesRequest request) throws ServiceException {

View File

@ -19,7 +19,8 @@
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@ -28,42 +29,41 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Get;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
/**
* Tests that verify certain RPCs get a higher QoS.
*/
@Category(MediumTests.class)
public class TestPriorityRpc {
private HRegionServer regionServer = null;
private QosFunction qosFunction = null;
private PriorityFunction priority = null;
@Before
public void setup() {
Configuration conf = HBaseConfiguration.create();
regionServer = HRegionServer.constructRegionServer(HRegionServer.class, conf);
qosFunction = regionServer.getQosFunction();
priority = regionServer.getPriority();
}
@Test
public void testQosFunctionForMeta() throws IOException {
qosFunction = regionServer.getQosFunction();
priority = regionServer.getPriority();
RequestHeader.Builder headerBuilder = RequestHeader.newBuilder();
//create a rpc request that has references to hbase:meta region and also
//uses one of the known argument classes (known argument classes are
//listed in HRegionServer.QosFunction.knownArgumentClasses)
//listed in HRegionServer.QosFunctionImpl.knownArgumentClasses)
headerBuilder.setMethodName("foo");
GetRequest.Builder getRequestBuilder = GetRequest.newBuilder();
@ -84,9 +84,9 @@ public class TestPriorityRpc {
Mockito.when(mockRS.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion);
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
Mockito.when(mockRegionInfo.isMetaTable()).thenReturn(true);
qosFunction.setRegionServer(mockRS);
assertTrue (qosFunction.apply(new Pair<RequestHeader, Message>(header, getRequest)) ==
HConstants.HIGH_QOS);
// Presume type.
((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
assertEquals(HConstants.HIGH_QOS, priority.getPriority(header, getRequest));
}
@Test
@ -94,13 +94,12 @@ public class TestPriorityRpc {
//The request is not using any of the
//known argument classes (it uses one random request class)
//(known argument classes are listed in
//HRegionServer.QosFunction.knownArgumentClasses)
//HRegionServer.QosFunctionImpl.knownArgumentClasses)
RequestHeader.Builder headerBuilder = RequestHeader.newBuilder();
headerBuilder.setMethodName("foo");
RequestHeader header = headerBuilder.build();
QosFunction qosFunc = regionServer.getQosFunction();
assertTrue (qosFunc.apply(new Pair<RequestHeader, Message>(header, null)) ==
HConstants.NORMAL_QOS);
PriorityFunction qosFunc = regionServer.getPriority();
assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null));
}
@Test
@ -118,8 +117,9 @@ public class TestPriorityRpc {
Mockito.when(mockRS.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion);
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(false);
qosFunction.setRegionServer(mockRS);
int qos = qosFunction.apply(new Pair<RequestHeader, Message>(header, scanRequest));
// Presume type.
((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
int qos = priority.getPriority(header, scanRequest);
assertTrue ("" + qos, qos == HConstants.NORMAL_QOS);
//build a scan request with scannerID
@ -134,14 +134,13 @@ public class TestPriorityRpc {
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(true);
qosFunction.setRegionServer(mockRS);
// Presume type.
((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
assertTrue (qosFunction.apply(new Pair<RequestHeader, Message>(header, scanRequest)) ==
HConstants.HIGH_QOS);
assertEquals(HConstants.HIGH_QOS, priority.getPriority(header, scanRequest));
//the same as above but with non-meta region
Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(false);
assertTrue (qosFunction.apply(new Pair<RequestHeader, Message>(header, scanRequest)) ==
HConstants.NORMAL_QOS);
assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest));
}
}
}

View File

@ -37,7 +37,7 @@ public class TestQosFunction {
@Test
public void testPriority() {
HRegionServer hrs = Mockito.mock(HRegionServer.class);
QosFunction qosFunction = new QosFunction(hrs);
AnnotationReadingPriorityFunction qosFunction = new AnnotationReadingPriorityFunction(hrs);
// Set method name in pb style with the method name capitalized.
checkMethod("ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction);
@ -45,11 +45,9 @@ public class TestQosFunction {
checkMethod("OpenRegion", HConstants.HIGH_QOS, qosFunction);
}
private void checkMethod(final String methodName, final int expected, final QosFunction qosf) {
private void checkMethod(final String methodName, final int expected, final AnnotationReadingPriorityFunction qosf) {
RequestHeader.Builder builder = RequestHeader.newBuilder();
builder.setMethodName(methodName);
Pair<RequestHeader, Message> headerAndParam =
new Pair<RequestHeader, Message>(builder.build(), null);
assertEquals(methodName, expected, qosf.apply(headerAndParam).intValue());
assertEquals(methodName, expected, qosf.getPriority(builder.build(), null));
}
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer;
@ -130,10 +131,8 @@ public class TestTokenAuthentication {
AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
sai.add(new BlockingServiceAndInterface(service,
AuthenticationProtos.AuthenticationService.BlockingInterface.class));
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(
conf, 3, 1, 0, null, HConstants.QOS_THRESHOLD);
this.rpcServer =
new RpcServer(this, "tokenServer", sai, initialIsa, conf, scheduler);
new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
this.isa = this.rpcServer.getListenerAddress();
this.sleeper = new Sleeper(1000, this);
}