HBASE-2782 QOS for META table access

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@997541 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan Rawson 2010-09-15 23:44:08 +00:00
parent 191a334487
commit 5b600805b5
7 changed files with 206 additions and 48 deletions

View File

@ -910,6 +910,7 @@ Release 0.21.0 - Unreleased
HBASE-2980 Refactor region server command line to a new class HBASE-2980 Refactor region server command line to a new class
HBASE-2988 Support alternate compression for major compactions HBASE-2988 Support alternate compression for major compactions
HBASE-2941 port HADOOP-6713 - threading scalability for RPC reads - to HBase HBASE-2941 port HADOOP-6713 - threading scalability for RPC reads - to HBase
HBASE-2782 QOS for META table access
NEW FEATURES NEW FEATURES
HBASE-1961 HBase EC2 scripts HBASE-1961 HBase EC2 scripts

View File

@ -30,6 +30,7 @@ import java.io.DataInput;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
/** /**
@ -47,7 +48,7 @@ public final class MultiAction implements Writable {
/** /**
* Get the total number of Actions * Get the total number of Actions
* *
* @return total number of Actions for all groups in this container. * @return total number of Actions for all groups in this container.
*/ */
public int size() { public int size() {
@ -62,7 +63,7 @@ public final class MultiAction implements Writable {
* Add an Action to this container based on it's regionName. If the regionName * Add an Action to this container based on it's regionName. If the regionName
* is wrong, the initial execution will fail, but will be automatically * is wrong, the initial execution will fail, but will be automatically
* retried after looking up the correct region. * retried after looking up the correct region.
* *
* @param regionName * @param regionName
* @param a * @param a
*/ */
@ -75,6 +76,10 @@ public final class MultiAction implements Writable {
rsActions.add(a); rsActions.add(a);
} }
public Set<byte[]> getRegions() {
return actions.keySet();
}
/** /**
* @return All actions from all regions in this container * @return All actions from all regions in this container
*/ */

View File

@ -20,12 +20,14 @@
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import com.google.common.base.Function;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -82,8 +84,9 @@ public class HBaseRPC {
super(); super();
} // no public ctor } // no public ctor
/** A method invocation, including the method name and its parameters.*/ /** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable { public static class Invocation implements Writable, Configurable {
private String methodName; private String methodName;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private Class[] parameterClasses; private Class[] parameterClasses;
@ -497,9 +500,9 @@ public class HBaseRPC {
final Class<?>[] ifaces, final Class<?>[] ifaces,
final String bindAddress, final int port, final String bindAddress, final int port,
final int numHandlers, final int numHandlers,
final boolean verbose, Configuration conf) int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
throws IOException { throws IOException {
return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, verbose); return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, metaHandlerCount, verbose, highPriorityLevel);
} }
/** An RPC Server. */ /** An RPC Server. */
@ -527,9 +530,9 @@ public class HBaseRPC {
* @throws IOException e * @throws IOException e
*/ */
public Server(Object instance, final Class<?>[] ifaces, public Server(Object instance, final Class<?>[] ifaces,
Configuration conf, String bindAddress, int port, Configuration conf, String bindAddress, int port,
int numHandlers, boolean verbose) throws IOException { int numHandlers, int metaHandlerCount, boolean verbose, int highPriorityLevel) throws IOException {
super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName())); super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount, conf, classNameBase(instance.getClass().getName()), highPriorityLevel);
this.instance = instance; this.instance = instance;
this.implementation = instance.getClass(); this.implementation = instance.getClass();

View File

@ -20,9 +20,11 @@
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import com.google.common.base.Function;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
@ -131,6 +133,7 @@ public abstract class HBaseServer {
protected String bindAddress; protected String bindAddress;
protected int port; // port we listen on protected int port; // port we listen on
private int handlerCount; // number of handler threads private int handlerCount; // number of handler threads
private int priorityHandlerCount;
private int readThreads; // number of read threads private int readThreads; // number of read threads
protected Class<? extends Writable> paramClass; // class of call parameters protected Class<? extends Writable> paramClass; // class of call parameters
protected int maxIdleTime; // the maximum idle time after protected int maxIdleTime; // the maximum idle time after
@ -156,6 +159,9 @@ public abstract class HBaseServer {
volatile protected boolean running = true; // true while server runs volatile protected boolean running = true; // true while server runs
protected BlockingQueue<Call> callQueue; // queued calls protected BlockingQueue<Call> callQueue; // queued calls
protected BlockingQueue<Call> priorityCallQueue;
private int highPriorityLevel; // what level a high priority call is at
protected final List<Connection> connectionList = protected final List<Connection> connectionList =
Collections.synchronizedList(new LinkedList<Connection>()); Collections.synchronizedList(new LinkedList<Connection>());
@ -165,6 +171,7 @@ public abstract class HBaseServer {
protected Responder responder = null; protected Responder responder = null;
protected int numConnections = 0; protected int numConnections = 0;
private Handler[] handlers = null; private Handler[] handlers = null;
private Handler[] priorityHandlers = null;
protected HBaseRPCErrorHandler errorHandler = null; protected HBaseRPCErrorHandler errorHandler = null;
/** /**
@ -959,7 +966,12 @@ public abstract class HBaseServer {
param.readFields(dis); param.readFields(dis);
Call call = new Call(id, param, this); Call call = new Call(id, param, this);
callQueue.put(call); // queue the call; maybe blocked here
if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
priorityCallQueue.put(call);
} else {
callQueue.put(call); // queue the call; maybe blocked here
}
} }
protected synchronized void close() { protected synchronized void close() {
@ -977,9 +989,17 @@ public abstract class HBaseServer {
/** Handles queued calls . */ /** Handles queued calls . */
private class Handler extends Thread { private class Handler extends Thread {
public Handler(int instanceNumber) { private final BlockingQueue<Call> myCallQueue;
public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
this.myCallQueue = cq;
this.setDaemon(true); this.setDaemon(true);
this.setName("IPC Server handler "+ instanceNumber + " on " + port);
String threadName = "IPC Server handler " + instanceNumber + " on " + port;
if (cq == priorityCallQueue) {
// this is just an amazing hack, but it works.
threadName = "PRI " + threadName;
}
this.setName(threadName);
} }
@Override @Override
@ -990,7 +1010,7 @@ public abstract class HBaseServer {
ByteArrayOutputStream buf = new ByteArrayOutputStream(buffersize); ByteArrayOutputStream buf = new ByteArrayOutputStream(buffersize);
while (running) { while (running) {
try { try {
Call call = callQueue.take(); // pop the queue; maybe blocked here Call call = myCallQueue.take(); // pop the queue; maybe blocked here
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(getName() + ": has #" + call.id + " from " + LOG.debug(getName() + ": has #" + call.id + " from " +
@ -1058,33 +1078,58 @@ public abstract class HBaseServer {
} }
protected HBaseServer(String bindAddress, int port, /**
Class<? extends Writable> paramClass, int handlerCount, * Gets the QOS level for this call. If it is higher than the highPriorityLevel and there
Configuration conf) * are priorityHandlers available it will be processed in it's own thread set.
throws IOException *
{ * @param param
this(bindAddress, port, paramClass, handlerCount, conf, Integer.toString(port)); * @return priority, higher is better
*/
private Function<Writable,Integer> qosFunction = null;
public void setQosFunction(Function<Writable, Integer> newFunc) {
qosFunction = newFunc;
} }
protected int getQosLevel(Writable param) {
if (qosFunction == null) {
return 0;
}
Integer res = qosFunction.apply(param);
if (res == null) {
return 0;
}
return res;
}
/* Constructs a server listening on the named port and address. Parameters passed must /* Constructs a server listening on the named port and address. Parameters passed must
* be of the named class. The <code>handlerCount</handlerCount> determines * be of the named class. The <code>handlerCount</handlerCount> determines
* the number of handler threads that will be used to process calls. * the number of handler threads that will be used to process calls.
* *
*/ */
protected HBaseServer(String bindAddress, int port, protected HBaseServer(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount, Class<? extends Writable> paramClass, int handlerCount,
Configuration conf, String serverName) int priorityHandlerCount, Configuration conf, String serverName,
int highPriorityLevel)
throws IOException { throws IOException {
this.bindAddress = bindAddress; this.bindAddress = bindAddress;
this.conf = conf; this.conf = conf;
this.port = port; this.port = port;
this.paramClass = paramClass; this.paramClass = paramClass;
this.handlerCount = handlerCount; this.handlerCount = handlerCount;
this.priorityHandlerCount = priorityHandlerCount;
this.socketSendBufferSize = 0; this.socketSendBufferSize = 0;
this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER; this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
this.readThreads = conf.getInt( this.readThreads = conf.getInt(
"ipc.server.read.threadpool.size", "ipc.server.read.threadpool.size",
10); 10);
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize); this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
if (priorityHandlerCount > 0) {
this.priorityCallQueue = new LinkedBlockingQueue<Call>(maxQueueSize); // TODO hack on size
} else {
this.priorityCallQueue = null;
}
this.highPriorityLevel = highPriorityLevel;
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
@ -1121,9 +1166,17 @@ public abstract class HBaseServer {
handlers = new Handler[handlerCount]; handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) { for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i); handlers[i] = new Handler(callQueue, i);
handlers[i].start(); handlers[i].start();
} }
if (priorityHandlerCount > 0) {
priorityHandlers = new Handler[priorityHandlerCount];
for (int i = 0 ; i < priorityHandlerCount; i++) {
priorityHandlers[i] = new Handler(priorityCallQueue, i);
priorityHandlers[i].start();
}
}
} }
/** Stops the service. No new calls will be handled after this is called. */ /** Stops the service. No new calls will be handled after this is called. */
@ -1131,9 +1184,16 @@ public abstract class HBaseServer {
LOG.info("Stopping server on " + port); LOG.info("Stopping server on " + port);
running = false; running = false;
if (handlers != null) { if (handlers != null) {
for (int i = 0; i < handlerCount; i++) { for (Handler handler : handlers) {
if (handlers[i] != null) { if (handler != null) {
handlers[i].interrupt(); handler.interrupt();
}
}
}
if (priorityHandlers != null) {
for (Handler handler : priorityHandlers) {
if (handler != null) {
handler.interrupt();
} }
} }
} }

View File

@ -95,13 +95,13 @@ import org.apache.zookeeper.Watcher;
* run the cluster. All others park themselves in their constructor until * run the cluster. All others park themselves in their constructor until
* master or cluster shutdown or until the active master loses its lease in * master or cluster shutdown or until the active master loses its lease in
* zookeeper. Thereafter, all running master jostle to take over master role. * zookeeper. Thereafter, all running master jostle to take over master role.
* *
* <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}. In * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}. In
* this case it will tell all regionservers to go down and then wait on them * this case it will tell all regionservers to go down and then wait on them
* all reporting in that they are down. This master will then shut itself down. * all reporting in that they are down. This master will then shut itself down.
* *
* <p>You can also shutdown just this master. Call {@link #stopMaster()}. * <p>You can also shutdown just this master. Call {@link #stopMaster()}.
* *
* @see HMasterInterface * @see HMasterInterface
* @see HMasterRegionInterface * @see HMasterRegionInterface
* @see Watcher * @see Watcher
@ -171,7 +171,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
* <li>Initialize master components - server manager, region manager, * <li>Initialize master components - server manager, region manager,
* region server queue, file system manager, etc * region server queue, file system manager, etc
* </ol> * </ol>
* @throws InterruptedException * @throws InterruptedException
*/ */
public HMaster(final Configuration conf) public HMaster(final Configuration conf)
throws IOException, KeeperException, InterruptedException { throws IOException, KeeperException, InterruptedException {
@ -185,13 +185,16 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.rpcServer = HBaseRPC.getServer(this, this.rpcServer = HBaseRPC.getServer(this,
new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class}, new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class},
a.getBindAddress(), a.getPort(), a.getBindAddress(), a.getPort(),
numHandlers, false, conf); numHandlers,
0, // we dont use high priority handlers in master
false, conf,
0); // this is a DNC w/o high priority handlers
this.address = new HServerAddress(rpcServer.getListenerAddress()); this.address = new HServerAddress(rpcServer.getListenerAddress());
// set the thread name now we have an address // set the thread name now we have an address
setName(MASTER + "-" + this.address); setName(MASTER + "-" + this.address);
// Hack! Maps DFSClient => Master for logs. HDFS made this // Hack! Maps DFSClient => Master for logs. HDFS made this
// config param for task trackers, but we can piggyback off of it. // config param for task trackers, but we can piggyback off of it.
if (this.conf.get("mapred.task.id") == null) { if (this.conf.get("mapred.task.id") == null) {
this.conf.set("mapred.task.id", "hb_m_" + this.address.toString() + this.conf.set("mapred.task.id", "hb_m_" + this.address.toString() +
@ -340,7 +343,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
* Initializations we need to do if we are cluster starter. * Initializations we need to do if we are cluster starter.
* @param starter * @param starter
* @param mfs * @param mfs
* @throws IOException * @throws IOException
*/ */
private static void clusterStarterInitializations(final MasterFileSystem mfs, private static void clusterStarterInitializations(final MasterFileSystem mfs,
final ServerManager sm, final CatalogTracker ct, final AssignmentManager am) final ServerManager sm, final CatalogTracker ct, final AssignmentManager am)

View File

@ -158,7 +158,7 @@ public class HRegion implements HeapSize { // , Writable{
* This directory contains the directory for this region. * This directory contains the directory for this region.
*/ */
final Path tableDir; final Path tableDir;
final HLog log; final HLog log;
final FileSystem fs; final FileSystem fs;
final Configuration conf; final Configuration conf;
@ -631,7 +631,7 @@ public class HRegion implements HeapSize { // , Writable{
private void cleanupTmpDir() throws IOException { private void cleanupTmpDir() throws IOException {
FSUtils.deleteDirectory(this.fs, getTmpDir()); FSUtils.deleteDirectory(this.fs, getTmpDir());
} }
/** /**
* Get the temporary diretory for this region. This directory * Get the temporary diretory for this region. This directory
* will have its contents removed when the region is reopened. * will have its contents removed when the region is reopened.
@ -798,7 +798,7 @@ public class HRegion implements HeapSize { // , Writable{
/** /**
* Flush the memstore. * Flush the memstore.
* *
* Flushing the memstore is a little tricky. We have a lot of updates in the * Flushing the memstore is a little tricky. We have a lot of updates in the
* memstore, all of which have also been written to the log. We need to * memstore, all of which have also been written to the log. We need to
* write those updates in the memstore out to disk, while being able to * write those updates in the memstore out to disk, while being able to
@ -1279,12 +1279,12 @@ public class HRegion implements HeapSize { // , Writable{
retCodes = new OperationStatusCode[operations.length]; retCodes = new OperationStatusCode[operations.length];
Arrays.fill(retCodes, OperationStatusCode.NOT_RUN); Arrays.fill(retCodes, OperationStatusCode.NOT_RUN);
} }
public boolean isDone() { public boolean isDone() {
return nextIndexToProcess == operations.length; return nextIndexToProcess == operations.length;
} }
} }
/** /**
* Perform a batch put with no pre-specified locks * Perform a batch put with no pre-specified locks
* @see HRegion#put(Pair[]) * @see HRegion#put(Pair[])
@ -1298,7 +1298,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
return put(putsAndLocks); return put(putsAndLocks);
} }
/** /**
* Perform a batch of puts. * Perform a batch of puts.
* @param putsAndLocks the list of puts paired with their requested lock IDs. * @param putsAndLocks the list of puts paired with their requested lock IDs.
@ -1307,7 +1307,7 @@ public class HRegion implements HeapSize { // , Writable{
public OperationStatusCode[] put(Pair<Put, Integer>[] putsAndLocks) throws IOException { public OperationStatusCode[] put(Pair<Put, Integer>[] putsAndLocks) throws IOException {
BatchOperationInProgress<Pair<Put, Integer>> batchOp = BatchOperationInProgress<Pair<Put, Integer>> batchOp =
new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks); new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
while (!batchOp.isDone()) { while (!batchOp.isDone()) {
checkReadOnly(); checkReadOnly();
checkResources(); checkResources();
@ -1384,7 +1384,7 @@ public class HRegion implements HeapSize { // , Writable{
batchOp.operations[i].getFirst().getFamilyMap().values(), batchOp.operations[i].getFirst().getFamilyMap().values(),
byteNow); byteNow);
} }
// ------------------------------------ // ------------------------------------
// STEP 3. Write to WAL // STEP 3. Write to WAL
// ---------------------------------- // ----------------------------------
@ -1392,12 +1392,12 @@ public class HRegion implements HeapSize { // , Writable{
for (int i = firstIndex; i < lastIndexExclusive; i++) { for (int i = firstIndex; i < lastIndexExclusive; i++) {
// Skip puts that were determined to be invalid during preprocessing // Skip puts that were determined to be invalid during preprocessing
if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue; if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
Put p = batchOp.operations[i].getFirst(); Put p = batchOp.operations[i].getFirst();
if (!p.getWriteToWAL()) continue; if (!p.getWriteToWAL()) continue;
addFamilyMapToWALEdit(p.getFamilyMap(), walEdit); addFamilyMapToWALEdit(p.getFamilyMap(), walEdit);
} }
// Append the edit to WAL // Append the edit to WAL
this.log.append(regionInfo, regionInfo.getTableDesc().getName(), this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
walEdit, now); walEdit, now);
@ -1635,7 +1635,7 @@ public class HRegion implements HeapSize { // , Writable{
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) { for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey(); byte[] family = e.getKey();
List<KeyValue> edits = e.getValue(); List<KeyValue> edits = e.getValue();
Store store = getStore(family); Store store = getStore(family);
for (KeyValue kv: edits) { for (KeyValue kv: edits) {
kv.setMemstoreTS(w.getWriteNumber()); kv.setMemstoreTS(w.getWriteNumber());
@ -1706,7 +1706,7 @@ public class HRegion implements HeapSize { // , Writable{
* <p>We can ignore any log message that has a sequence ID that's equal to or * <p>We can ignore any log message that has a sequence ID that's equal to or
* lower than minSeqId. (Because we know such log messages are already * lower than minSeqId. (Because we know such log messages are already
* reflected in the HFiles.) * reflected in the HFiles.)
* *
* <p>While this is running we are putting pressure on memory yet we are * <p>While this is running we are putting pressure on memory yet we are
* outside of our usual accounting because we are not yet an onlined region * outside of our usual accounting because we are not yet an onlined region
* (this stuff is being run as part of Region initialization). This means * (this stuff is being run as part of Region initialization). This means
@ -1715,7 +1715,7 @@ public class HRegion implements HeapSize { // , Writable{
* we're not yet online so our relative sequenceids are not yet aligned with * we're not yet online so our relative sequenceids are not yet aligned with
* HLog sequenceids -- not till we come up online, post processing of split * HLog sequenceids -- not till we come up online, post processing of split
* edits. * edits.
* *
* <p>But to help relieve memory pressure, at least manage our own heap size * <p>But to help relieve memory pressure, at least manage our own heap size
* flushing if are in excess of per-region limits. Flushing, though, we have * flushing if are in excess of per-region limits. Flushing, though, we have
* to be careful and avoid using the regionserver/hlog sequenceid. Its running * to be careful and avoid using the regionserver/hlog sequenceid. Its running
@ -1725,7 +1725,7 @@ public class HRegion implements HeapSize { // , Writable{
* in this region and with its split editlogs, then we could miss edits the * in this region and with its split editlogs, then we could miss edits the
* next time we go to recover. So, we have to flush inline, using seqids that * next time we go to recover. So, we have to flush inline, using seqids that
* make sense in a this single region context only -- until we online. * make sense in a this single region context only -- until we online.
* *
* @param regiondir * @param regiondir
* @param minSeqId Any edit found in split editlogs needs to be in excess of * @param minSeqId Any edit found in split editlogs needs to be in excess of
* this minSeqId to be applied, else its skipped. * this minSeqId to be applied, else its skipped.
@ -1970,7 +1970,7 @@ public class HRegion implements HeapSize { // , Writable{
closeRegionOperation(); closeRegionOperation();
} }
} }
/** /**
* Obtains or tries to obtain the given row lock. * Obtains or tries to obtain the given row lock.
* @param waitForLock if true, will block until the lock is available. * @param waitForLock if true, will block until the lock is available.
@ -2018,7 +2018,7 @@ public class HRegion implements HeapSize { // , Writable{
closeRegionOperation(); closeRegionOperation();
} }
} }
/** /**
* Used by unit tests. * Used by unit tests.
* @param lockid * @param lockid
@ -2135,6 +2135,9 @@ public class HRegion implements HeapSize { // , Writable{
private boolean filterClosed = false; private boolean filterClosed = false;
private long readPt; private long readPt;
public HRegionInfo getRegionName() {
return regionInfo;
}
RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException { RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
//DebugPrint.println("HRegionScanner.<init>"); //DebugPrint.println("HRegionScanner.<init>");
this.filter = scan.getFilter(); this.filter = scan.getFilter();

View File

@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Function;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -304,6 +305,85 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
initialize(); initialize();
} }
private static final int NORMAL_QOS = 0;
private static final int QOS_THRESHOLD = 10; // the line between low and high qos
private static final int HIGH_QOS = 100;
class QosFunction implements Function<Writable,Integer> {
public boolean isMetaRegion(byte[] regionName) {
HRegion region;
try {
region = getRegion(regionName);
} catch (NotServingRegionException ignored) {
return false;
}
return region.getRegionInfo().isMetaRegion();
}
@Override
public Integer apply(Writable from) {
if (from instanceof HBaseRPC.Invocation) {
HBaseRPC.Invocation inv = (HBaseRPC.Invocation) from;
String methodName = inv.getMethodName();
// scanner methods...
if (methodName.equals("next") || methodName.equals("close")) {
// translate!
Long scannerId;
try {
scannerId = (Long) inv.getParameters()[0];
} catch (ClassCastException ignored) {
//LOG.debug("Low priority: " + from);
return NORMAL_QOS; // doh.
}
String scannerIdString = Long.toString(scannerId);
InternalScanner scanner = scanners.get(scannerIdString);
if (scanner instanceof HRegion.RegionScanner) {
HRegion.RegionScanner rs = (HRegion.RegionScanner) scanner;
HRegionInfo regionName = rs.getRegionName();
if (regionName.isMetaRegion()) {
//LOG.debug("High priority scanner request: " + scannerId);
return HIGH_QOS;
}
}
}
else if (methodName.equals("getHServerInfo") ||
methodName.equals("getRegionsAssignment") ||
methodName.equals("unlockRow") ||
methodName.equals("getProtocolVersion") ||
methodName.equals("getClosestRowBefore")) {
//LOG.debug("High priority method: " + methodName);
return HIGH_QOS;
}
else if (inv.getParameterClasses()[0] == byte[].class) {
// first arg is byte array, so assume this is a regionname:
if (isMetaRegion((byte[]) inv.getParameters()[0])) {
//LOG.debug("High priority with method: " + methodName + " and region: "
// + Bytes.toString((byte[]) inv.getParameters()[0]));
return HIGH_QOS;
}
}
else if (inv.getParameterClasses()[0] == MultiAction.class) {
MultiAction ma = (MultiAction) inv.getParameters()[0];
Set<byte[]> regions = ma.getRegions();
// ok this sucks, but if any single of the actions touches a meta, the whole
// thing gets pingged high priority. This is a dangerous hack because people
// can get their multi action tagged high QOS by tossing a Get(.META.) AND this
// regionserver hosts META/-ROOT-
for (byte[] region: regions) {
if (isMetaRegion(region)) {
//LOG.debug("High priority multi with region: " + Bytes.toString(region));
return HIGH_QOS; // short circuit for the win.
}
}
}
}
//LOG.debug("Low priority: " + from.toString());
return NORMAL_QOS;
}
}
/** /**
* Creates all of the state that needs to be reconstructed in case we are * Creates all of the state that needs to be reconstructed in case we are
* doing a restart. This is shared between the constructor and restart(). Both * doing a restart. This is shared between the constructor and restart(). Both
@ -326,8 +406,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
OnlineRegions.class}, OnlineRegions.class},
address.getBindAddress(), address.getBindAddress(),
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
false, conf); conf.getInt("hbase.regionserver.metahandler.count", 10),
false, conf, QOS_THRESHOLD);
this.server.setErrorHandler(this); this.server.setErrorHandler(this);
this.server.setQosFunction(new QosFunction());
// Address is giving a default IP for the moment. Will be changed after // Address is giving a default IP for the moment. Will be changed after
// calling the master. // calling the master.
this.serverInfo = new HServerInfo(new HServerAddress(new InetSocketAddress( this.serverInfo = new HServerInfo(new HServerAddress(new InetSocketAddress(