HBASE-16290 Dump summary of callQueue content; can help debugging

Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Sreeram Venkatasubramanian 2017-10-24 14:03:32 +08:00 committed by Chia-Ping Tsai
parent a4585e6ebc
commit 6ceb4a4f41
9 changed files with 481 additions and 1 deletions

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.yetus.audience.InterfaceAudience;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@InterfaceAudience.Private
public class CallQueueInfo {
private final Map<String, Map<String, Long>> callQueueMethodCountsSummary;
private final Map<String, Map<String, Long>> callQueueMethodSizeSummary;
CallQueueInfo() {
callQueueMethodCountsSummary = new HashMap<>();
callQueueMethodSizeSummary = new HashMap<>();
}
public Set<String> getCallQueueNames() {
return callQueueMethodCountsSummary.keySet();
}
public Set<String> getCalledMethodNames(String callQueueName) {
return callQueueMethodCountsSummary.get(callQueueName).keySet();
}
public long getCallMethodCount(String callQueueName, String methodName) {
long methodCount;
Map<String, Long> methodCountMap = callQueueMethodCountsSummary.getOrDefault(callQueueName, null);
if (null != methodCountMap) {
methodCount = methodCountMap.getOrDefault(methodName, 0L);
} else {
methodCount = 0L;
}
return methodCount;
}
void setCallMethodCount(String callQueueName, Map<String, Long> methodCountMap) {
callQueueMethodCountsSummary.put(callQueueName, methodCountMap);
}
public long getCallMethodSize(String callQueueName, String methodName) {
long methodSize;
Map<String, Long> methodSizeMap = callQueueMethodSizeSummary.getOrDefault(callQueueName, null);
if (null != methodSizeMap) {
methodSize = methodSizeMap.getOrDefault(methodName, 0L);
} else {
methodSize = 0L;
}
return methodSize;
}
void setCallMethodSize(String callQueueName, Map<String, Long> methodSizeMap) {
callQueueMethodSizeSummary.put(callQueueName, methodSizeMap);
}
}

View File

@ -21,12 +21,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.shaded.io.netty.util.internal.StringUtil;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.HashMap;
/**
* A very simple {@code }RpcScheduler} that serves incoming requests in order.
@ -70,6 +72,24 @@ public class FifoRpcScheduler extends RpcScheduler {
this.executor.shutdown();
}
private static class FifoCallRunner implements Runnable {
private final CallRunner callRunner;
FifoCallRunner(CallRunner cr) {
this.callRunner = cr;
}
CallRunner getCallRunner() {
return callRunner;
}
@Override
public void run() {
callRunner.run();
}
}
@Override
public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
// Executors provide no offer, so make our own.
@ -78,7 +98,8 @@ public class FifoRpcScheduler extends RpcScheduler {
queueSize.decrementAndGet();
return false;
}
executor.submit(new Runnable() {
executor.execute(new FifoCallRunner(task){
@Override
public void run() {
task.setStatus(RpcServer.getStatus());
@ -86,6 +107,7 @@ public class FifoRpcScheduler extends RpcScheduler {
queueSize.decrementAndGet();
}
});
return true;
}
@ -148,4 +170,37 @@ public class FifoRpcScheduler extends RpcScheduler {
public int getActiveScanRpcHandlerCount() {
return 0;
}
@Override
public CallQueueInfo getCallQueueInfo() {
String queueName = "Fifo Queue";
HashMap<String, Long> methodCount = new HashMap<>();
HashMap<String, Long> methodSize = new HashMap<>();
CallQueueInfo callQueueInfo = new CallQueueInfo();
callQueueInfo.setCallMethodCount(queueName, methodCount);
callQueueInfo.setCallMethodSize(queueName, methodSize);
for (Runnable r:executor.getQueue()) {
FifoCallRunner mcr = (FifoCallRunner) r;
RpcCall rpcCall = mcr.getCallRunner().getRpcCall();
String method;
if (null==rpcCall.getMethod() ||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
method = "Unknown";
}
long size = rpcCall.getSize();
methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L));
methodSize.put(method, size + methodSize.getOrDefault(method, 0L));
}
return callQueueInfo;
}
}

View File

@ -27,12 +27,15 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.Map;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.shaded.io.netty.util.internal.StringUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
@ -151,6 +154,50 @@ public abstract class RpcExecutor {
return Math.max(1, (int) Math.round(handlerCount * callQueuesHandlersFactor));
}
public Map<String, Long> getCallQueueCountsSummary() {
HashMap<String, Long> callQueueMethodTotalCount = new HashMap<>();
for(BlockingQueue<CallRunner> queue: queues) {
for (CallRunner cr:queue) {
RpcCall rpcCall = cr.getRpcCall();
String method;
if (null==rpcCall.getMethod() ||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
method = "Unknown";
}
callQueueMethodTotalCount.put(method, 1+callQueueMethodTotalCount.getOrDefault(method, 0L));
}
}
return callQueueMethodTotalCount;
}
public Map<String, Long> getCallQueueSizeSummary() {
HashMap<String, Long> callQueueMethodTotalSize = new HashMap<>();
for(BlockingQueue<CallRunner> queue: queues) {
for (CallRunner cr:queue) {
RpcCall rpcCall = cr.getRpcCall();
String method;
if (null==rpcCall.getMethod() ||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
method = "Unknown";
}
long size = rpcCall.getSize();
callQueueMethodTotalSize.put(method, size+callQueueMethodTotalSize.getOrDefault(method, 0L));
}
}
return callQueueMethodTotalSize;
}
protected void initializeQueues(final int numQueues) {
if (queueInitArgs.length > 0) {
currentQueueLimit = (int) queueInitArgs[0];

View File

@ -65,6 +65,9 @@ public abstract class RpcScheduler {
*/
public abstract boolean dispatch(CallRunner task) throws IOException, InterruptedException;
/** Get call queue information **/
public abstract CallQueueInfo getCallQueueInfo();
/** Retrieves length of the general queue for metrics. */
public abstract int getGeneralQueueLength();

View File

@ -229,5 +229,33 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
public int getActiveScanRpcHandlerCount() {
return callExecutor.getActiveScanHandlerCount();
}
@Override
public CallQueueInfo getCallQueueInfo() {
String queueName;
CallQueueInfo callQueueInfo = new CallQueueInfo();
if(null!=callExecutor) {
queueName = "Call Queue";
callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary());
callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary());
}
if(null!=priorityExecutor) {
queueName = "Priority Queue";
callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary());
callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary());
}
if(null!=replicationExecutor) {
queueName = "Replication Queue";
callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary());
callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary());
}
return callQueueInfo;
}
}

View File

@ -26,6 +26,7 @@ import java.util.Date;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.hbase.ipc.CallQueueInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.monitoring.LogMonitoring;
@ -100,6 +101,10 @@ public class RSDumpServlet extends StateDumpServlet {
dumpQueue(hrs, out);
}
out.println("\n\nCall Queue Summary:");
out.println(LINE);
dumpCallQueues(hrs, out);
out.flush();
}
}
@ -136,4 +141,32 @@ public class RSDumpServlet extends StateDumpServlet {
out.println(hrs.cacheFlusher.dumpQueue());
}
}
public static void dumpCallQueues(HRegionServer hrs, PrintWriter out) {
CallQueueInfo callQueueInfo = hrs.rpcServices.rpcServer.getScheduler().getCallQueueInfo();
for(String queueName: callQueueInfo.getCallQueueNames()) {
out.println("\nQueue Name: " + queueName);
long totalCallCount = 0L, totalCallSize = 0L;
for (String methodName: callQueueInfo.getCalledMethodNames(queueName)) {
long thisMethodCount, thisMethodSize;
thisMethodCount = callQueueInfo.getCallMethodCount(queueName, methodName);
thisMethodSize = callQueueInfo.getCallMethodSize(queueName, methodName);
out.println("Method in call: "+methodName);
out.println("Total call count for method: "+thisMethodCount);
out.println("Total call size for method (bytes): "+thisMethodSize);
totalCallCount += thisMethodCount;
totalCallSize += thisMethodSize;
}
out.println("Total call count for queue: "+totalCallCount);
out.println("Total call size for queue (bytes): "+totalCallSize);
}
}
}

View File

@ -103,4 +103,9 @@ public class DelegatingRpcScheduler extends RpcScheduler {
public int getActiveScanRpcHandlerCount() {
return 0;
}
@Override
public CallQueueInfo getCallQueueInfo() {
return delegate.getCallQueueInfo();
}
}

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Category({RPCTests.class, LargeTests.class})
public class TestFifoRpcScheduler {
@Rule
public final TestRule timeout =
CategoryBasedTimeout.builder().withTimeout(this.getClass()).
withLookingForStuckThread(true).build();
private static final Log LOG = LogFactory.getLog(TestFifoRpcScheduler.class);
private AtomicInteger callExecutionCount;
private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
@Override
public InetSocketAddress getListenerAddress() {
return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
}
};
private Configuration conf;
@Before
public void setUp() {
conf = HBaseConfiguration.create();
callExecutionCount = new AtomicInteger(0);
}
private ThreadPoolExecutor disableHandlers(RpcScheduler scheduler) {
ThreadPoolExecutor rpcExecutor=null;
try {
Field ExecutorField = scheduler.getClass().getDeclaredField("executor");
ExecutorField.setAccessible(true);
scheduler.start();
rpcExecutor = (ThreadPoolExecutor) ExecutorField.get(scheduler);
rpcExecutor.setMaximumPoolSize(1);
rpcExecutor.allowCoreThreadTimeOut(true);
rpcExecutor.setCorePoolSize(0);
rpcExecutor.setKeepAliveTime(1, TimeUnit.MICROSECONDS);
// Wait for 2 seconds, so that idle threads will die
Thread.sleep(2000);
} catch (NoSuchFieldException e) {
LOG.error("No such field exception:"+e);
} catch (IllegalAccessException e) {
LOG.error("Illegal access exception:"+e);
} catch (InterruptedException e) {
LOG.error("Interrupted exception:"+e);
}
return rpcExecutor;
}
@Test
public void testCallQueueInfo() throws IOException, InterruptedException {
ThreadPoolExecutor rpcExecutor;
RpcScheduler scheduler = new FifoRpcScheduler(
conf, 1);
scheduler.init(CONTEXT);
// Set number of handlers to a minimum value
disableHandlers(scheduler);
int totalCallMethods = 30;
int unableToDispatch = 0;
for (int i = totalCallMethods; i>0; i--) {
CallRunner task = createMockTask();
task.setStatus(new MonitoredRPCHandlerImpl());
if(!scheduler.dispatch(task)) {
unableToDispatch++;
}
Thread.sleep(10);
}
CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
int executionCount = callExecutionCount.get();
int callQueueSize = 0;
for (String callQueueName:callQueueInfo.getCallQueueNames()) {
for (String calledMethod: callQueueInfo.getCalledMethodNames(callQueueName)) {
callQueueSize += callQueueInfo.getCallMethodCount(callQueueName, calledMethod);
}
}
assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount);
scheduler.stop();
}
private CallRunner createMockTask() {
ServerCall call = mock(ServerCall.class);
CallRunner task = mock(CallRunner.class);
when(task.getRpcCall()).thenReturn(call);
doAnswer(new Answer<Void>() {
@Override public Void answer (InvocationOnMock invocation) throws Throwable {
callExecutionCount.incrementAndGet();
Thread.sleep(1000);
return null;
}
}).when(task).run();
return task;
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
@ -112,6 +113,73 @@ public class TestSimpleRpcScheduler {
scheduler.stop();
}
private RpcScheduler disableHandlers(RpcScheduler scheduler) {
try {
Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor");
ExecutorField.setAccessible(true);
RpcExecutor rpcExecutor = (RpcExecutor)ExecutorField.get(scheduler);
Field handlerCountField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount");
handlerCountField.setAccessible(true);
handlerCountField.set(rpcExecutor, 0);
Field numCallQueuesField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues");
numCallQueuesField.setAccessible(true);
numCallQueuesField.set(rpcExecutor, 1);
Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("currentQueueLimit");
currentQueueLimitField.setAccessible(true);
currentQueueLimitField.set(rpcExecutor, 100);
} catch (NoSuchFieldException e) {
LOG.error("No such field exception"+e);
} catch (IllegalAccessException e) {
LOG.error("Illegal access exception"+e);
}
return scheduler;
}
@Test
public void testCallQueueInfo() throws IOException, InterruptedException {
PriorityFunction qosFunction = mock(PriorityFunction.class);
RpcScheduler scheduler = new SimpleRpcScheduler(
conf, 0, 0, 0, qosFunction, 0);
scheduler.init(CONTEXT);
// Set the handlers to zero. So that number of requests in call Queue can be tested
scheduler = disableHandlers(scheduler);
scheduler.start();
int totalCallMethods = 10;
for (int i = totalCallMethods; i>0; i--) {
CallRunner task = createMockTask();
task.setStatus(new MonitoredRPCHandlerImpl());
scheduler.dispatch(task);
}
CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
for (String callQueueName:callQueueInfo.getCallQueueNames()) {
for (String calledMethod: callQueueInfo.getCalledMethodNames(callQueueName)) {
assertEquals(callQueueInfo.getCallMethodCount(callQueueName, calledMethod), totalCallMethods);
}
}
scheduler.stop();
}
@Test
public void testHandlerIsolation() throws IOException, InterruptedException {