HBASE-10561 Forward port: HBASE-10212 New rpc metric: number of active handler
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1594117 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0b5b0faeb5
commit
5be2766ef4
|
@ -25,4 +25,5 @@ public interface MetricsHBaseServerWrapper {
|
||||||
int getReplicationQueueLength();
|
int getReplicationQueueLength();
|
||||||
int getPriorityQueueLength();
|
int getPriorityQueueLength();
|
||||||
int getNumOpenConnections();
|
int getNumOpenConnections();
|
||||||
|
int getActiveRpcHandlerCount();
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,4 +89,9 @@ public class FifoRpcScheduler implements RpcScheduler {
|
||||||
public int getReplicationQueueLength() {
|
public int getReplicationQueueLength() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveRpcHandlerCount() {
|
||||||
|
return executor.getActiveCount();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,8 +37,7 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getGeneralQueueLength() {
|
public int getGeneralQueueLength() {
|
||||||
if (this.server == null
|
if (this.server == null || this.server.getScheduler() == null) {
|
||||||
|| this.server.getScheduler() == null) {
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return server.getScheduler().getGeneralQueueLength();
|
return server.getScheduler().getGeneralQueueLength();
|
||||||
|
@ -46,8 +45,7 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getReplicationQueueLength() {
|
public int getReplicationQueueLength() {
|
||||||
if (this.server == null
|
if (this.server == null || this.server.getScheduler() == null) {
|
||||||
|| this.server.getScheduler() == null) {
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return server.getScheduler().getReplicationQueueLength();
|
return server.getScheduler().getReplicationQueueLength();
|
||||||
|
@ -55,8 +53,7 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getPriorityQueueLength() {
|
public int getPriorityQueueLength() {
|
||||||
if (this.server == null
|
if (this.server == null || this.server.getScheduler() == null) {
|
||||||
|| this.server.getScheduler() == null) {
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return server.getScheduler().getPriorityQueueLength();
|
return server.getScheduler().getPriorityQueueLength();
|
||||||
|
@ -69,4 +66,12 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
|
||||||
}
|
}
|
||||||
return server.connectionList.size();
|
return server.connectionList.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveRpcHandlerCount() {
|
||||||
|
if (this.server == null || this.server.getScheduler() == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return server.getScheduler().getActiveRpcHandlerCount();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,4 +67,7 @@ public interface RpcScheduler {
|
||||||
|
|
||||||
/** Retrieves length of the replication queue for metrics. */
|
/** Retrieves length of the replication queue for metrics. */
|
||||||
int getReplicationQueueLength();
|
int getReplicationQueueLength();
|
||||||
|
|
||||||
|
/** Retrieves the number of active handler. */
|
||||||
|
int getActiveRpcHandlerCount();
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -90,7 +91,7 @@ public class SimpleRpcScheduler implements RpcScheduler {
|
||||||
final BlockingQueue<CallRunner> replicationQueue;
|
final BlockingQueue<CallRunner> replicationQueue;
|
||||||
private volatile boolean running = false;
|
private volatile boolean running = false;
|
||||||
private final List<Thread> handlers = Lists.newArrayList();
|
private final List<Thread> handlers = Lists.newArrayList();
|
||||||
|
private AtomicInteger activeHandlerCount = new AtomicInteger(0);
|
||||||
/** What level a high priority call is at. */
|
/** What level a high priority call is at. */
|
||||||
private final int highPriorityLevel;
|
private final int highPriorityLevel;
|
||||||
|
|
||||||
|
@ -204,13 +205,23 @@ public class SimpleRpcScheduler implements RpcScheduler {
|
||||||
return replicationQueue == null ? 0 : replicationQueue.size();
|
return replicationQueue == null ? 0 : replicationQueue.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveRpcHandlerCount() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
private void consumerLoop(BlockingQueue<CallRunner> myQueue) {
|
private void consumerLoop(BlockingQueue<CallRunner> myQueue) {
|
||||||
boolean interrupted = false;
|
boolean interrupted = false;
|
||||||
try {
|
try {
|
||||||
while (running) {
|
while (running) {
|
||||||
try {
|
try {
|
||||||
CallRunner task = myQueue.take();
|
CallRunner task = myQueue.take();
|
||||||
task.run();
|
try {
|
||||||
|
activeHandlerCount.incrementAndGet();
|
||||||
|
task.run();
|
||||||
|
} finally {
|
||||||
|
activeHandlerCount.decrementAndGet();
|
||||||
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
interrupted = true;
|
interrupted = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,4 +43,9 @@ public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper{
|
||||||
public int getNumOpenConnections() {
|
public int getNumOpenConnections() {
|
||||||
return 105;
|
return 105;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveRpcHandlerCount() {
|
||||||
|
return 100;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue