HBASE-16561 Add metrics about read/write/scan queue length and active read/write/scan handler count

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2016-11-29 09:50:12 +08:00 committed by zhangduo
parent be042652aa
commit 7b2673db12
13 changed files with 355 additions and 30 deletions

View File

@ -60,10 +60,25 @@ public interface MetricsHBaseServerSource extends BaseSource {
String REPLICATION_QUEUE_DESC =
"Number of calls in the replication call queue.";
String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue.";
String WRITE_QUEUE_NAME = "numCallsInWriteQueue";
String WRITE_QUEUE_DESC = "Number of calls in the write call queue; " +
"parsed requests waiting in scheduler to be executed";
String READ_QUEUE_NAME = "numCallsInReadQueue";
String READ_QUEUE_DESC = "Number of calls in the read call queue; " +
"parsed requests waiting in scheduler to be executed";
String SCAN_QUEUE_NAME = "numCallsInScanQueue";
String SCAN_QUEUE_DESC = "Number of calls in the scan call queue; " +
"parsed requests waiting in scheduler to be executed";
String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections";
String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";
String NUM_ACTIVE_HANDLER_DESC = "Number of active rpc handlers.";
String NUM_ACTIVE_WRITE_HANDLER_NAME = "numActiveWriteHandler";
String NUM_ACTIVE_WRITE_HANDLER_DESC = "Number of active write rpc handlers.";
String NUM_ACTIVE_READ_HANDLER_NAME = "numActiveReadHandler";
String NUM_ACTIVE_READ_HANDLER_DESC = "Number of active read rpc handlers.";
String NUM_ACTIVE_SCAN_HANDLER_NAME = "numActiveScanHandler";
String NUM_ACTIVE_SCAN_HANDLER_DESC = "Number of active scan rpc handlers.";
String NUM_GENERAL_CALLS_DROPPED_NAME = "numGeneralCallsDropped";
String NUM_GENERAL_CALLS_DROPPED_DESC = "Total number of calls in general queue which " +
"were dropped by CoDel RPC executor";

View File

@ -21,11 +21,30 @@ package org.apache.hadoop.hbase.ipc;
public interface MetricsHBaseServerWrapper {
long getTotalQueueSize();
int getGeneralQueueLength();
int getReplicationQueueLength();
int getPriorityQueueLength();
int getNumOpenConnections();
int getActiveRpcHandlerCount();
long getNumGeneralCallsDropped();
long getNumLifoModeSwitches();
int getWriteQueueLength();
int getReadQueueLength();
int getScanQueueLength();
int getActiveWriteRpcHandlerCount();
int getActiveReadRpcHandlerCount();
int getActiveScanRpcHandlerCount();
}

View File

@ -239,7 +239,19 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
.addCounter(Interns.info(NUM_GENERAL_CALLS_DROPPED_NAME,
NUM_GENERAL_CALLS_DROPPED_DESC), wrapper.getNumGeneralCallsDropped())
.addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME,
NUM_LIFO_MODE_SWITCHES_DESC), wrapper.getNumLifoModeSwitches());
NUM_LIFO_MODE_SWITCHES_DESC), wrapper.getNumLifoModeSwitches())
.addGauge(Interns.info(WRITE_QUEUE_NAME, WRITE_QUEUE_DESC),
wrapper.getWriteQueueLength())
.addGauge(Interns.info(READ_QUEUE_NAME, READ_QUEUE_DESC),
wrapper.getReadQueueLength())
.addGauge(Interns.info(SCAN_QUEUE_NAME, SCAN_QUEUE_DESC),
wrapper.getScanQueueLength())
.addGauge(Interns.info(NUM_ACTIVE_WRITE_HANDLER_NAME, NUM_ACTIVE_WRITE_HANDLER_DESC),
wrapper.getActiveWriteRpcHandlerCount())
.addGauge(Interns.info(NUM_ACTIVE_READ_HANDLER_NAME, NUM_ACTIVE_READ_HANDLER_DESC),
wrapper.getActiveReadRpcHandlerCount())
.addGauge(Interns.info(NUM_ACTIVE_SCAN_HANDLER_NAME, NUM_ACTIVE_SCAN_HANDLER_DESC),
wrapper.getActiveScanRpcHandlerCount());
}
metricsRegistry.snapshot(mrb, all);

View File

@ -21,6 +21,7 @@ import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
@ -57,8 +58,9 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
@Override
protected Handler getHandler(String name, double handlerFailureThreshhold,
BlockingQueue<CallRunner> q) {
return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack);
BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount) {
return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount,
fastPathHandlerStack);
}
@Override
@ -84,8 +86,9 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
private CallRunner loadedCallRunner;
FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
final AtomicInteger activeHandlerCount,
final Deque<FastPathHandler> fastPathHandlerStack) {
super(name, handlerFailureThreshhold, q);
super(name, handlerFailureThreshhold, q, activeHandlerCount);
this.fastPathHandlerStack = fastPathHandlerStack;
}

View File

@ -118,4 +118,34 @@ public class FifoRpcScheduler extends RpcScheduler {
public long getNumLifoModeSwitches() {
return 0;
}
@Override
public int getWriteQueueLength() {
return 0;
}
@Override
public int getReadQueueLength() {
return 0;
}
@Override
public int getScanQueueLength() {
return 0;
}
@Override
public int getActiveWriteRpcHandlerCount() {
return 0;
}
@Override
public int getActiveReadRpcHandlerCount() {
return 0;
}
@Override
public int getActiveScanRpcHandlerCount() {
return 0;
}
}

View File

@ -94,4 +94,52 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
}
return server.getScheduler().getNumLifoModeSwitches();
}
@Override
public int getWriteQueueLength() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getWriteQueueLength();
}
@Override
public int getReadQueueLength() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getReadQueueLength();
}
@Override
public int getScanQueueLength() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getScanQueueLength();
}
@Override
public int getActiveWriteRpcHandlerCount() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getActiveWriteRpcHandlerCount();
}
@Override
public int getActiveReadRpcHandlerCount() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getActiveReadRpcHandlerCount();
}
@Override
public int getActiveScanRpcHandlerCount() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getActiveScanRpcHandlerCount();
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.ipc;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
@ -66,6 +67,10 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private final int numReadQueues;
private final int numScanQueues;
private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0);
private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0);
private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0);
public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
@ -233,11 +238,13 @@ public class RWQueueRpcExecutor extends RpcExecutor {
@Override
protected void startHandlers(final int port) {
startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port,
activeWriteHandlerCount);
startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port,
activeReadHandlerCount);
if (numScanQueues > 0) {
startHandlers(".scan", scanHandlersCount, queues, numWriteQueues + numReadQueues,
numScanQueues, port);
numScanQueues, port, activeScanHandlerCount);
}
}
@ -260,6 +267,55 @@ public class RWQueueRpcExecutor extends RpcExecutor {
return queue.offer(callTask);
}
@Override
public int getWriteQueueLength() {
int length = 0;
for (int i = 0; i < numWriteQueues; i++) {
length += queues.get(i).size();
}
return length;
}
@Override
public int getReadQueueLength() {
int length = 0;
for (int i = numWriteQueues; i < (numWriteQueues + numReadQueues); i++) {
length += queues.get(i).size();
}
return length;
}
@Override
public int getScanQueueLength() {
int length = 0;
for (int i = numWriteQueues + numReadQueues;
i < (numWriteQueues + numReadQueues + numScanQueues); i++) {
length += queues.get(i).size();
}
return length;
}
@Override
public int getActiveHandlerCount() {
return activeWriteHandlerCount.get() + activeReadHandlerCount.get()
+ activeScanHandlerCount.get();
}
@Override
public int getActiveWriteHandlerCount() {
return activeWriteHandlerCount.get();
}
@Override
public int getActiveReadHandlerCount() {
return activeReadHandlerCount.get();
}
@Override
public int getActiveScanHandlerCount() {
return activeScanHandlerCount.get();
}
private boolean isWriteRequest(final RequestHeader header, final Message param) {
// TODO: Is there a better way to do this?
if (param instanceof MultiRequest) {

View File

@ -194,19 +194,6 @@ public abstract class RpcExecutor {
}
}
public int getActiveHandlerCount() {
return activeHandlerCount.get();
}
/** Returns the length of the pending queue */
public int getQueueLength() {
int length = 0;
for (final BlockingQueue<CallRunner> queue: queues) {
length += queue.size();
}
return length;
}
/** Add the request to the executor queue */
public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
@ -217,15 +204,15 @@ public abstract class RpcExecutor {
protected void startHandlers(final int port) {
List<BlockingQueue<CallRunner>> callQueues = getQueues();
startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port, activeHandlerCount);
}
/**
* Override if providing alternate Handler implementation.
*/
protected Handler getHandler(final String name, final double handlerFailureThreshhold,
final BlockingQueue<CallRunner> q) {
return new Handler(name, handlerFailureThreshhold, q);
final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount);
}
/**
@ -233,7 +220,7 @@ public abstract class RpcExecutor {
*/
protected void startHandlers(final String nameSuffix, final int numHandlers,
final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize,
final int port) {
final int port, final AtomicInteger activeHandlerCount) {
final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble(
HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
@ -242,7 +229,8 @@ public abstract class RpcExecutor {
final int index = qindex + (i % qsize);
String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
+ ",port=" + port;
Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index));
Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index),
activeHandlerCount);
handler.start();
LOG.debug("Started " + name);
handlers.add(handler);
@ -260,12 +248,16 @@ public abstract class RpcExecutor {
final double handlerFailureThreshhold;
// metrics (shared with other handlers)
final AtomicInteger activeHandlerCount;
Handler(final String name, final double handlerFailureThreshhold,
final BlockingQueue<CallRunner> q) {
final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
super(name);
setDaemon(true);
this.q = q;
this.handlerFailureThreshhold = handlerFailureThreshhold;
this.activeHandlerCount = activeHandlerCount;
}
/**
@ -301,7 +293,7 @@ public abstract class RpcExecutor {
MonitoredRPCHandler status = RpcServer.getStatus();
cr.setStatus(status);
try {
activeHandlerCount.incrementAndGet();
this.activeHandlerCount.incrementAndGet();
cr.run();
} catch (Throwable e) {
if (e instanceof Error) {
@ -324,7 +316,7 @@ public abstract class RpcExecutor {
LOG.warn("Handler exception " + StringUtils.stringifyException(e));
}
} finally {
activeHandlerCount.decrementAndGet();
this.activeHandlerCount.decrementAndGet();
}
}
}
@ -419,6 +411,43 @@ public abstract class RpcExecutor {
return numLifoModeSwitches.get();
}
public int getActiveHandlerCount() {
return activeHandlerCount.get();
}
public int getActiveWriteHandlerCount() {
return 0;
}
public int getActiveReadHandlerCount() {
return 0;
}
public int getActiveScanHandlerCount() {
return 0;
}
/** Returns the length of the pending queue */
public int getQueueLength() {
int length = 0;
for (final BlockingQueue<CallRunner> queue: queues) {
length += queue.size();
}
return length;
}
public int getReadQueueLength() {
return 0;
}
public int getScanQueueLength() {
return 0;
}
public int getWriteQueueLength() {
return 0;
}
public String getName() {
return this.name;
}

View File

@ -89,4 +89,22 @@ public abstract class RpcScheduler {
* in the period of overloade we serve last requests first); returns 0 otherwise.
*/
public abstract long getNumLifoModeSwitches();
/** Retrieves length of the write queue for metrics when use RWQueueRpcExecutor. */
public abstract int getWriteQueueLength();
/** Retrieves length of the read queue for metrics when use RWQueueRpcExecutor. */
public abstract int getReadQueueLength();
/** Retrieves length of the scan queue for metrics when use RWQueueRpcExecutor. */
public abstract int getScanQueueLength();
/** Retrieves the number of active write rpc handler when use RWQueueRpcExecutor. */
public abstract int getActiveWriteRpcHandlerCount();
/** Retrieves the number of active write rpc handler when use RWQueueRpcExecutor. */
public abstract int getActiveReadRpcHandlerCount();
/** Retrieves the number of active write rpc handler when use RWQueueRpcExecutor. */
public abstract int getActiveScanRpcHandlerCount();
}

View File

@ -189,5 +189,35 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
public long getNumLifoModeSwitches() {
return callExecutor.getNumLifoModeSwitches();
}
@Override
public int getWriteQueueLength() {
return callExecutor.getWriteQueueLength();
}
@Override
public int getReadQueueLength() {
return callExecutor.getReadQueueLength();
}
@Override
public int getScanQueueLength() {
return callExecutor.getScanQueueLength();
}
@Override
public int getActiveWriteRpcHandlerCount() {
return callExecutor.getActiveWriteHandlerCount();
}
@Override
public int getActiveReadRpcHandlerCount() {
return callExecutor.getActiveReadHandlerCount();
}
@Override
public int getActiveScanRpcHandlerCount() {
return callExecutor.getActiveScanHandlerCount();
}
}

View File

@ -73,4 +73,34 @@ public class DelegatingRpcScheduler extends RpcScheduler {
public long getNumLifoModeSwitches() {
return delegate.getNumLifoModeSwitches();
}
}
@Override
public int getWriteQueueLength() {
return 0;
}
@Override
public int getReadQueueLength() {
return 0;
}
@Override
public int getScanQueueLength() {
return 0;
}
@Override
public int getActiveWriteRpcHandlerCount() {
return 0;
}
@Override
public int getActiveReadRpcHandlerCount() {
return 0;
}
@Override
public int getActiveScanRpcHandlerCount() {
return 0;
}
}

View File

@ -58,4 +58,34 @@ public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper{
public long getNumLifoModeSwitches() {
return 5;
}
@Override
public int getWriteQueueLength() {
return 50;
}
@Override
public int getReadQueueLength() {
return 50;
}
@Override
public int getScanQueueLength() {
return 2;
}
@Override
public int getActiveWriteRpcHandlerCount() {
return 50;
}
@Override
public int getActiveReadRpcHandlerCount() {
return 50;
}
@Override
public int getActiveScanRpcHandlerCount() {
return 6;
}
}

View File

@ -45,7 +45,6 @@ public class TestRpcMetrics {
MetricsHBaseServer rsMetrics = new MetricsHBaseServer("HRegionServer", new MetricsHBaseServerWrapperStub());
MetricsHBaseServerSource rsSource = rsMetrics.getMetricsSource();
assertEquals("master", masterSource.getMetricsContext());
assertEquals("regionserver", rsSource.getMetricsContext());
@ -70,6 +69,12 @@ public class TestRpcMetrics {
HELPER.assertGauge("numCallsInPriorityQueue", 104, serverSource);
HELPER.assertGauge("numOpenConnections", 105, serverSource);
HELPER.assertGauge("numActiveHandler", 106, serverSource);
HELPER.assertGauge("numActiveWriteHandler", 50, serverSource);
HELPER.assertGauge("numActiveReadHandler", 50, serverSource);
HELPER.assertGauge("numActiveScanHandler", 6, serverSource);
HELPER.assertGauge("numCallsInWriteQueue", 50, serverSource);
HELPER.assertGauge("numCallsInReadQueue", 50, serverSource);
HELPER.assertGauge("numCallsInScanQueue", 2, serverSource);
}
/**