HBASE-15967 Metric for active ipc Readers and make default fraction of cpu count

Add new metric hbase.regionserver.ipc.runningReaders
Also make it so Reader count is a factor of processor count
This commit is contained in:
stack 2016-06-05 11:12:05 -07:00
parent e0b70c00e7
commit 1125215aad
3 changed files with 45 additions and 8 deletions

View File

@ -86,6 +86,13 @@ public interface MetricsHBaseServerSource extends BaseSource {
String EXCEPTIONS_MULTI_TOO_LARGE_DESC = "A response to a multi request was too large and the " +
"rest of the requests will have to be retried.";
String RUNNING_READERS = "runningReaders";
String RUNNING_READERS_DESCRIPTION =
"Count of Reader threads currently busy parsing requests to hand off to the scheduler";
void incrRunningReaders();
void decrRunningReaders();
void authorizationSuccess();
void authorizationFailure();
@ -122,6 +129,4 @@ public interface MetricsHBaseServerSource extends BaseSource {
void processedCall(int processingTime);
void queuedAndProcessedCall(int totalTime);
}
}

View File

@ -57,6 +57,12 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
private MetricHistogram requestSize;
private MetricHistogram responseSize;
/**
* The count of readers currently working parsing a request as opposed to being blocked on the
* selector waiting on requests to come in.
*/
private final MutableFastCounter runningReaders;
public MetricsHBaseServerSourceImpl(String metricsName,
String metricsDescription,
String metricsContext,
@ -86,6 +92,9 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
this.exceptionsMultiTooLarge = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_MULTI_TOO_LARGE_NAME, EXCEPTIONS_MULTI_TOO_LARGE_DESC, 0L);
this.runningReaders = this.getMetricsRegistry()
.newCounter(RUNNING_READERS, RUNNING_READERS_DESCRIPTION, 0L);
this.authenticationSuccesses = this.getMetricsRegistry().newCounter(
AUTHENTICATION_SUCCESSES_NAME, AUTHENTICATION_SUCCESSES_DESC, 0L);
this.authenticationFailures = this.getMetricsRegistry().newCounter(AUTHENTICATION_FAILURES_NAME,
@ -108,6 +117,16 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
RESPONSE_SIZE_DESC);
}
@Override
public void incrRunningReaders() {
this.runningReaders.incr(+1);
}
@Override
public void decrRunningReaders() {
this.runningReaders.incr(-1);
}
@Override
public void authorizationSuccess() {
authorizationSuccesses.incr();

View File

@ -625,7 +625,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
public Listener(final String name) throws IOException {
super(name);
// The backlog of requests that we will have the serversocket carry.
// The backlog of requests that we will have the serversocket carry. It is not enough
// just setting this config. You need to set the backlog in the kernel too.
int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
@ -690,7 +691,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
doRead(key);
metrics.getMetricsSource().incrRunningReaders();
try {
doRead(key);
} finally {
metrics.getMetricsSource().decrRunningReaders();
}
}
}
key = null;
@ -734,8 +740,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
if (key.isAcceptable()) {
doAccept(key);
}
}
} catch (IOException ignored) {
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
@ -830,7 +837,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
try {
count = c.readAndProcess();
} catch (InterruptedException ieo) {
LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
LOG.info(Thread.currentThread().getName() +
": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
@ -1159,6 +1167,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private ByteBuffer dataLengthBuffer;
protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
private final Lock responseWriteLock = new ReentrantLock();
// EXPENSIVE: Counters cost lots of CPU. Remove. Used just to see if idle or not. Use boolean.
private Counter rpcCount = new Counter(); // number of outstanding rpcs
private long lastContact;
private InetAddress addr;
@ -2000,7 +2009,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// See declaration above for documentation on what this size is.
this.maxQueueSizeInBytes =
this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
// Have the Reader thread count default to 1/4 of the processors. This seems to do pretty
// well. See the metric hbase.regionserver.ipc.runningReaders to see if you need to change it.
int processors = Runtime.getRuntime().availableProcessors();
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size",
Math.max(8, processors/ 4));
this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);