HBASE-16561 Add metrics about read/write/scan queue length and active read/write/scan handler count
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
3d5e686070
commit
cc03f7ad53
|
@ -62,10 +62,25 @@ public interface MetricsHBaseServerSource extends BaseSource {
|
||||||
String REPLICATION_QUEUE_DESC =
|
String REPLICATION_QUEUE_DESC =
|
||||||
"Number of calls in the replication call queue waiting to be run";
|
"Number of calls in the replication call queue waiting to be run";
|
||||||
String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
|
String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
|
||||||
|
String WRITE_QUEUE_NAME = "numCallsInWriteQueue";
|
||||||
|
String WRITE_QUEUE_DESC = "Number of calls in the write call queue; " +
|
||||||
|
"parsed requests waiting in scheduler to be executed";
|
||||||
|
String READ_QUEUE_NAME = "numCallsInReadQueue";
|
||||||
|
String READ_QUEUE_DESC = "Number of calls in the read call queue; " +
|
||||||
|
"parsed requests waiting in scheduler to be executed";
|
||||||
|
String SCAN_QUEUE_NAME = "numCallsInScanQueue";
|
||||||
|
String SCAN_QUEUE_DESC = "Number of calls in the scan call queue; " +
|
||||||
|
"parsed requests waiting in scheduler to be executed";
|
||||||
String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections";
|
String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections";
|
||||||
String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
|
String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
|
||||||
String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";
|
String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";
|
||||||
String NUM_ACTIVE_HANDLER_DESC = "Number of active rpc handlers.";
|
String NUM_ACTIVE_HANDLER_DESC = "Number of active rpc handlers.";
|
||||||
|
String NUM_ACTIVE_WRITE_HANDLER_NAME = "numActiveWriteHandler";
|
||||||
|
String NUM_ACTIVE_WRITE_HANDLER_DESC = "Number of active write rpc handlers.";
|
||||||
|
String NUM_ACTIVE_READ_HANDLER_NAME = "numActiveReadHandler";
|
||||||
|
String NUM_ACTIVE_READ_HANDLER_DESC = "Number of active read rpc handlers.";
|
||||||
|
String NUM_ACTIVE_SCAN_HANDLER_NAME = "numActiveScanHandler";
|
||||||
|
String NUM_ACTIVE_SCAN_HANDLER_DESC = "Number of active scan rpc handlers.";
|
||||||
String NUM_GENERAL_CALLS_DROPPED_NAME = "numGeneralCallsDropped";
|
String NUM_GENERAL_CALLS_DROPPED_NAME = "numGeneralCallsDropped";
|
||||||
String NUM_GENERAL_CALLS_DROPPED_DESC = "Total number of calls in general queue which " +
|
String NUM_GENERAL_CALLS_DROPPED_DESC = "Total number of calls in general queue which " +
|
||||||
"were dropped by CoDel RPC executor";
|
"were dropped by CoDel RPC executor";
|
||||||
|
|
|
@ -21,11 +21,30 @@ package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
public interface MetricsHBaseServerWrapper {
|
public interface MetricsHBaseServerWrapper {
|
||||||
long getTotalQueueSize();
|
long getTotalQueueSize();
|
||||||
|
|
||||||
int getGeneralQueueLength();
|
int getGeneralQueueLength();
|
||||||
|
|
||||||
int getReplicationQueueLength();
|
int getReplicationQueueLength();
|
||||||
|
|
||||||
int getPriorityQueueLength();
|
int getPriorityQueueLength();
|
||||||
|
|
||||||
int getNumOpenConnections();
|
int getNumOpenConnections();
|
||||||
|
|
||||||
int getActiveRpcHandlerCount();
|
int getActiveRpcHandlerCount();
|
||||||
|
|
||||||
long getNumGeneralCallsDropped();
|
long getNumGeneralCallsDropped();
|
||||||
|
|
||||||
long getNumLifoModeSwitches();
|
long getNumLifoModeSwitches();
|
||||||
|
|
||||||
|
int getWriteQueueLength();
|
||||||
|
|
||||||
|
int getReadQueueLength();
|
||||||
|
|
||||||
|
int getScanQueueLength();
|
||||||
|
|
||||||
|
int getActiveWriteRpcHandlerCount();
|
||||||
|
|
||||||
|
int getActiveReadRpcHandlerCount();
|
||||||
|
|
||||||
|
int getActiveScanRpcHandlerCount();
|
||||||
}
|
}
|
||||||
|
|
|
@ -239,7 +239,19 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
|
||||||
.addCounter(Interns.info(NUM_GENERAL_CALLS_DROPPED_NAME,
|
.addCounter(Interns.info(NUM_GENERAL_CALLS_DROPPED_NAME,
|
||||||
NUM_GENERAL_CALLS_DROPPED_DESC), wrapper.getNumGeneralCallsDropped())
|
NUM_GENERAL_CALLS_DROPPED_DESC), wrapper.getNumGeneralCallsDropped())
|
||||||
.addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME,
|
.addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME,
|
||||||
NUM_LIFO_MODE_SWITCHES_DESC), wrapper.getNumLifoModeSwitches());
|
NUM_LIFO_MODE_SWITCHES_DESC), wrapper.getNumLifoModeSwitches())
|
||||||
|
.addGauge(Interns.info(WRITE_QUEUE_NAME, WRITE_QUEUE_DESC),
|
||||||
|
wrapper.getWriteQueueLength())
|
||||||
|
.addGauge(Interns.info(READ_QUEUE_NAME, READ_QUEUE_DESC),
|
||||||
|
wrapper.getReadQueueLength())
|
||||||
|
.addGauge(Interns.info(SCAN_QUEUE_NAME, SCAN_QUEUE_DESC),
|
||||||
|
wrapper.getScanQueueLength())
|
||||||
|
.addGauge(Interns.info(NUM_ACTIVE_WRITE_HANDLER_NAME, NUM_ACTIVE_WRITE_HANDLER_DESC),
|
||||||
|
wrapper.getActiveWriteRpcHandlerCount())
|
||||||
|
.addGauge(Interns.info(NUM_ACTIVE_READ_HANDLER_NAME, NUM_ACTIVE_READ_HANDLER_DESC),
|
||||||
|
wrapper.getActiveReadRpcHandlerCount())
|
||||||
|
.addGauge(Interns.info(NUM_ACTIVE_SCAN_HANDLER_NAME, NUM_ACTIVE_SCAN_HANDLER_DESC),
|
||||||
|
wrapper.getActiveScanRpcHandlerCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
metricsRegistry.snapshot(mrb, all);
|
metricsRegistry.snapshot(mrb, all);
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Deque;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
@ -56,8 +57,9 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Handler getHandler(String name, double handlerFailureThreshhold,
|
protected Handler getHandler(String name, double handlerFailureThreshhold,
|
||||||
BlockingQueue<CallRunner> q) {
|
BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount) {
|
||||||
return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack);
|
return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount,
|
||||||
|
fastPathHandlerStack);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -83,8 +85,9 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
|
||||||
private CallRunner loadedCallRunner;
|
private CallRunner loadedCallRunner;
|
||||||
|
|
||||||
FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
|
FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
|
||||||
|
final AtomicInteger activeHandlerCount,
|
||||||
final Deque<FastPathHandler> fastPathHandlerStack) {
|
final Deque<FastPathHandler> fastPathHandlerStack) {
|
||||||
super(name, handlerFailureThreshhold, q);
|
super(name, handlerFailureThreshhold, q, activeHandlerCount);
|
||||||
this.fastPathHandlerStack = fastPathHandlerStack;
|
this.fastPathHandlerStack = fastPathHandlerStack;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -118,4 +118,34 @@ public class FifoRpcScheduler extends RpcScheduler {
|
||||||
public long getNumLifoModeSwitches() {
|
public long getNumLifoModeSwitches() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getWriteQueueLength() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getReadQueueLength() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getScanQueueLength() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveWriteRpcHandlerCount() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveReadRpcHandlerCount() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveScanRpcHandlerCount() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,4 +94,52 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
|
||||||
}
|
}
|
||||||
return server.getScheduler().getNumLifoModeSwitches();
|
return server.getScheduler().getNumLifoModeSwitches();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getWriteQueueLength() {
|
||||||
|
if (!isServerStarted() || this.server.getScheduler() == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return server.getScheduler().getWriteQueueLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getReadQueueLength() {
|
||||||
|
if (!isServerStarted() || this.server.getScheduler() == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return server.getScheduler().getReadQueueLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getScanQueueLength() {
|
||||||
|
if (!isServerStarted() || this.server.getScheduler() == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return server.getScheduler().getScanQueueLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveWriteRpcHandlerCount() {
|
||||||
|
if (!isServerStarted() || this.server.getScheduler() == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return server.getScheduler().getActiveWriteRpcHandlerCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveReadRpcHandlerCount() {
|
||||||
|
if (!isServerStarted() || this.server.getScheduler() == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return server.getScheduler().getActiveReadRpcHandlerCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveScanRpcHandlerCount() {
|
||||||
|
if (!isServerStarted() || this.server.getScheduler() == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return server.getScheduler().getActiveScanRpcHandlerCount();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,12 +19,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.lang.ArrayUtils;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -39,7 +36,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -67,6 +63,10 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
||||||
private final int numReadQueues;
|
private final int numReadQueues;
|
||||||
private final int numScanQueues;
|
private final int numScanQueues;
|
||||||
|
|
||||||
|
private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0);
|
||||||
|
private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0);
|
||||||
|
private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0);
|
||||||
|
|
||||||
public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
|
public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
|
||||||
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
|
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
|
||||||
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
|
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
|
||||||
|
@ -117,11 +117,13 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void startHandlers(final int port) {
|
protected void startHandlers(final int port) {
|
||||||
startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
|
startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port,
|
||||||
startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
|
activeWriteHandlerCount);
|
||||||
|
startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port,
|
||||||
|
activeReadHandlerCount);
|
||||||
if (numScanQueues > 0) {
|
if (numScanQueues > 0) {
|
||||||
startHandlers(".scan", scanHandlersCount, queues, numWriteQueues + numReadQueues,
|
startHandlers(".scan", scanHandlersCount, queues, numWriteQueues + numReadQueues,
|
||||||
numScanQueues, port);
|
numScanQueues, port, activeScanHandlerCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,6 +146,55 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
||||||
return queue.offer(callTask);
|
return queue.offer(callTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getWriteQueueLength() {
|
||||||
|
int length = 0;
|
||||||
|
for (int i = 0; i < numWriteQueues; i++) {
|
||||||
|
length += queues.get(i).size();
|
||||||
|
}
|
||||||
|
return length;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getReadQueueLength() {
|
||||||
|
int length = 0;
|
||||||
|
for (int i = numWriteQueues; i < (numWriteQueues + numReadQueues); i++) {
|
||||||
|
length += queues.get(i).size();
|
||||||
|
}
|
||||||
|
return length;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getScanQueueLength() {
|
||||||
|
int length = 0;
|
||||||
|
for (int i = numWriteQueues + numReadQueues;
|
||||||
|
i < (numWriteQueues + numReadQueues + numScanQueues); i++) {
|
||||||
|
length += queues.get(i).size();
|
||||||
|
}
|
||||||
|
return length;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveHandlerCount() {
|
||||||
|
return activeWriteHandlerCount.get() + activeReadHandlerCount.get()
|
||||||
|
+ activeScanHandlerCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveWriteHandlerCount() {
|
||||||
|
return activeWriteHandlerCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveReadHandlerCount() {
|
||||||
|
return activeReadHandlerCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveScanHandlerCount() {
|
||||||
|
return activeScanHandlerCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
private boolean isWriteRequest(final RequestHeader header, final Message param) {
|
private boolean isWriteRequest(final RequestHeader header, final Message param) {
|
||||||
// TODO: Is there a better way to do this?
|
// TODO: Is there a better way to do this?
|
||||||
if (param instanceof MultiRequest) {
|
if (param instanceof MultiRequest) {
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
@ -175,19 +174,6 @@ public abstract class RpcExecutor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getActiveHandlerCount() {
|
|
||||||
return activeHandlerCount.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns the length of the pending queue */
|
|
||||||
public int getQueueLength() {
|
|
||||||
int length = 0;
|
|
||||||
for (final BlockingQueue<CallRunner> queue: queues) {
|
|
||||||
length += queue.size();
|
|
||||||
}
|
|
||||||
return length;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Add the request to the executor queue */
|
/** Add the request to the executor queue */
|
||||||
public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
|
public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
|
||||||
|
|
||||||
|
@ -198,15 +184,15 @@ public abstract class RpcExecutor {
|
||||||
|
|
||||||
protected void startHandlers(final int port) {
|
protected void startHandlers(final int port) {
|
||||||
List<BlockingQueue<CallRunner>> callQueues = getQueues();
|
List<BlockingQueue<CallRunner>> callQueues = getQueues();
|
||||||
startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
|
startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port, activeHandlerCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Override if providing alternate Handler implementation.
|
* Override if providing alternate Handler implementation.
|
||||||
*/
|
*/
|
||||||
protected Handler getHandler(final String name, final double handlerFailureThreshhold,
|
protected Handler getHandler(final String name, final double handlerFailureThreshhold,
|
||||||
final BlockingQueue<CallRunner> q) {
|
final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
|
||||||
return new Handler(name, handlerFailureThreshhold, q);
|
return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -214,7 +200,7 @@ public abstract class RpcExecutor {
|
||||||
*/
|
*/
|
||||||
protected void startHandlers(final String nameSuffix, final int numHandlers,
|
protected void startHandlers(final String nameSuffix, final int numHandlers,
|
||||||
final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize,
|
final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize,
|
||||||
final int port) {
|
final int port, final AtomicInteger activeHandlerCount) {
|
||||||
final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
|
final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
|
||||||
double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble(
|
double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble(
|
||||||
HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
|
HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
|
||||||
|
@ -223,7 +209,8 @@ public abstract class RpcExecutor {
|
||||||
final int index = qindex + (i % qsize);
|
final int index = qindex + (i % qsize);
|
||||||
String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
|
String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
|
||||||
+ ",port=" + port;
|
+ ",port=" + port;
|
||||||
Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index));
|
Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index),
|
||||||
|
activeHandlerCount);
|
||||||
handler.start();
|
handler.start();
|
||||||
LOG.debug("Started " + name);
|
LOG.debug("Started " + name);
|
||||||
handlers.add(handler);
|
handlers.add(handler);
|
||||||
|
@ -241,12 +228,16 @@ public abstract class RpcExecutor {
|
||||||
|
|
||||||
final double handlerFailureThreshhold;
|
final double handlerFailureThreshhold;
|
||||||
|
|
||||||
|
// metrics (shared with other handlers)
|
||||||
|
final AtomicInteger activeHandlerCount;
|
||||||
|
|
||||||
Handler(final String name, final double handlerFailureThreshhold,
|
Handler(final String name, final double handlerFailureThreshhold,
|
||||||
final BlockingQueue<CallRunner> q) {
|
final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
|
||||||
super(name);
|
super(name);
|
||||||
setDaemon(true);
|
setDaemon(true);
|
||||||
this.q = q;
|
this.q = q;
|
||||||
this.handlerFailureThreshhold = handlerFailureThreshhold;
|
this.handlerFailureThreshhold = handlerFailureThreshhold;
|
||||||
|
this.activeHandlerCount = activeHandlerCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -282,7 +273,7 @@ public abstract class RpcExecutor {
|
||||||
MonitoredRPCHandler status = RpcServer.getStatus();
|
MonitoredRPCHandler status = RpcServer.getStatus();
|
||||||
cr.setStatus(status);
|
cr.setStatus(status);
|
||||||
try {
|
try {
|
||||||
activeHandlerCount.incrementAndGet();
|
this.activeHandlerCount.incrementAndGet();
|
||||||
cr.run();
|
cr.run();
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
if (e instanceof Error) {
|
if (e instanceof Error) {
|
||||||
|
@ -305,7 +296,7 @@ public abstract class RpcExecutor {
|
||||||
LOG.warn("Handler exception " + StringUtils.stringifyException(e));
|
LOG.warn("Handler exception " + StringUtils.stringifyException(e));
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
activeHandlerCount.decrementAndGet();
|
this.activeHandlerCount.decrementAndGet();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -400,6 +391,43 @@ public abstract class RpcExecutor {
|
||||||
return numLifoModeSwitches.get();
|
return numLifoModeSwitches.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getActiveHandlerCount() {
|
||||||
|
return activeHandlerCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getActiveWriteHandlerCount() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getActiveReadHandlerCount() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getActiveScanHandlerCount() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the length of the pending queue */
|
||||||
|
public int getQueueLength() {
|
||||||
|
int length = 0;
|
||||||
|
for (final BlockingQueue<CallRunner> queue: queues) {
|
||||||
|
length += queue.size();
|
||||||
|
}
|
||||||
|
return length;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getReadQueueLength() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getScanQueueLength() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getWriteQueueLength() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return this.name;
|
return this.name;
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,4 +89,22 @@ public abstract class RpcScheduler {
|
||||||
* in the period of overloade we serve last requests first); returns 0 otherwise.
|
* in the period of overloade we serve last requests first); returns 0 otherwise.
|
||||||
*/
|
*/
|
||||||
public abstract long getNumLifoModeSwitches();
|
public abstract long getNumLifoModeSwitches();
|
||||||
|
|
||||||
|
/** Retrieves length of the write queue for metrics when use RWQueueRpcExecutor. */
|
||||||
|
public abstract int getWriteQueueLength();
|
||||||
|
|
||||||
|
/** Retrieves length of the read queue for metrics when use RWQueueRpcExecutor. */
|
||||||
|
public abstract int getReadQueueLength();
|
||||||
|
|
||||||
|
/** Retrieves length of the scan queue for metrics when use RWQueueRpcExecutor. */
|
||||||
|
public abstract int getScanQueueLength();
|
||||||
|
|
||||||
|
/** Retrieves the number of active write rpc handler when use RWQueueRpcExecutor. */
|
||||||
|
public abstract int getActiveWriteRpcHandlerCount();
|
||||||
|
|
||||||
|
/** Retrieves the number of active write rpc handler when use RWQueueRpcExecutor. */
|
||||||
|
public abstract int getActiveReadRpcHandlerCount();
|
||||||
|
|
||||||
|
/** Retrieves the number of active write rpc handler when use RWQueueRpcExecutor. */
|
||||||
|
public abstract int getActiveScanRpcHandlerCount();
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,11 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -31,7 +26,6 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
|
* The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
|
||||||
|
@ -205,5 +199,35 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
||||||
public long getNumLifoModeSwitches() {
|
public long getNumLifoModeSwitches() {
|
||||||
return callExecutor.getNumLifoModeSwitches();
|
return callExecutor.getNumLifoModeSwitches();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getWriteQueueLength() {
|
||||||
|
return callExecutor.getWriteQueueLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getReadQueueLength() {
|
||||||
|
return callExecutor.getReadQueueLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getScanQueueLength() {
|
||||||
|
return callExecutor.getScanQueueLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveWriteRpcHandlerCount() {
|
||||||
|
return callExecutor.getActiveWriteHandlerCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveReadRpcHandlerCount() {
|
||||||
|
return callExecutor.getActiveReadHandlerCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveScanRpcHandlerCount() {
|
||||||
|
return callExecutor.getActiveScanHandlerCount();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,4 +73,34 @@ public class DelegatingRpcScheduler extends RpcScheduler {
|
||||||
public long getNumLifoModeSwitches() {
|
public long getNumLifoModeSwitches() {
|
||||||
return delegate.getNumLifoModeSwitches();
|
return delegate.getNumLifoModeSwitches();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getWriteQueueLength() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getReadQueueLength() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getScanQueueLength() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveWriteRpcHandlerCount() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveReadRpcHandlerCount() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveScanRpcHandlerCount() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -58,4 +58,34 @@ public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper{
|
||||||
public long getNumLifoModeSwitches() {
|
public long getNumLifoModeSwitches() {
|
||||||
return 5;
|
return 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getWriteQueueLength() {
|
||||||
|
return 50;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getReadQueueLength() {
|
||||||
|
return 50;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getScanQueueLength() {
|
||||||
|
return 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveWriteRpcHandlerCount() {
|
||||||
|
return 50;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveReadRpcHandlerCount() {
|
||||||
|
return 50;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveScanRpcHandlerCount() {
|
||||||
|
return 6;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,6 @@ public class TestRpcMetrics {
|
||||||
MetricsHBaseServer rsMetrics = new MetricsHBaseServer("HRegionServer", new MetricsHBaseServerWrapperStub());
|
MetricsHBaseServer rsMetrics = new MetricsHBaseServer("HRegionServer", new MetricsHBaseServerWrapperStub());
|
||||||
MetricsHBaseServerSource rsSource = rsMetrics.getMetricsSource();
|
MetricsHBaseServerSource rsSource = rsMetrics.getMetricsSource();
|
||||||
|
|
||||||
|
|
||||||
assertEquals("master", masterSource.getMetricsContext());
|
assertEquals("master", masterSource.getMetricsContext());
|
||||||
assertEquals("regionserver", rsSource.getMetricsContext());
|
assertEquals("regionserver", rsSource.getMetricsContext());
|
||||||
|
|
||||||
|
@ -71,6 +70,12 @@ public class TestRpcMetrics {
|
||||||
HELPER.assertGauge("numCallsInPriorityQueue", 104, serverSource);
|
HELPER.assertGauge("numCallsInPriorityQueue", 104, serverSource);
|
||||||
HELPER.assertGauge("numOpenConnections", 105, serverSource);
|
HELPER.assertGauge("numOpenConnections", 105, serverSource);
|
||||||
HELPER.assertGauge("numActiveHandler", 106, serverSource);
|
HELPER.assertGauge("numActiveHandler", 106, serverSource);
|
||||||
|
HELPER.assertGauge("numActiveWriteHandler", 50, serverSource);
|
||||||
|
HELPER.assertGauge("numActiveReadHandler", 50, serverSource);
|
||||||
|
HELPER.assertGauge("numActiveScanHandler", 6, serverSource);
|
||||||
|
HELPER.assertGauge("numCallsInWriteQueue", 50, serverSource);
|
||||||
|
HELPER.assertGauge("numCallsInReadQueue", 50, serverSource);
|
||||||
|
HELPER.assertGauge("numCallsInScanQueue", 2, serverSource);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue