From e2fcde2d6f8f57cc28524a855e49c2d65e40b4ba Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 8 Aug 2018 15:15:36 +0800 Subject: [PATCH] HBASE-20965 Separate region server report requests to new handlers Signed-off-by: Guanghao Zhang --- .../hadoop/hbase/ipc/FifoRpcScheduler.java | 40 +++-- .../hbase/ipc/MasterFifoRpcScheduler.java | 125 +++++++++++++ .../hbase/master/MasterRpcServices.java | 10 ++ .../MasterFifoRpcSchedulerFactory.java | 44 +++++ .../hbase/regionserver/RSRpcServices.java | 20 ++- .../hbase/ipc/TestMasterFifoRpcScheduler.java | 168 ++++++++++++++++++ 6 files changed, 391 insertions(+), 16 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index bd8bdce7b48..833b02d4247 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; import java.util.HashMap; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -39,17 +40,15 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; @InterfaceAudience.Private public class FifoRpcScheduler extends RpcScheduler { private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class); - private final int handlerCount; - private final int maxQueueLength; - private final AtomicInteger queueSize = new AtomicInteger(0); - private ThreadPoolExecutor executor; + protected final int handlerCount; + protected final int maxQueueLength; + protected final AtomicInteger queueSize = new AtomicInteger(0); + protected ThreadPoolExecutor executor; public FifoRpcScheduler(Configuration conf, int handlerCount) { this.handlerCount = handlerCount; this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); - LOG.info("Using " + this.getClass().getSimpleName() + " as user call queue; handlerCount=" + - handlerCount + "; maxQueueLength=" + maxQueueLength); } @Override @@ -59,6 +58,8 @@ public class FifoRpcScheduler extends RpcScheduler { @Override public void start() { + LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}", + this.getClass().getSimpleName(), handlerCount, maxQueueLength); this.executor = new ThreadPoolExecutor( handlerCount, handlerCount, @@ -94,6 +95,11 @@ public class FifoRpcScheduler extends RpcScheduler { @Override public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { + return executeRpcCall(executor, queueSize, task); + } + + protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize, + final CallRunner task) { // Executors provide no offer, so make our own. int queued = queueSize.getAndIncrement(); if (maxQueueLength > 0 && queued >= maxQueueLength) { @@ -199,15 +205,19 @@ public class FifoRpcScheduler extends RpcScheduler { callQueueInfo.setCallMethodCount(queueName, methodCount); callQueueInfo.setCallMethodSize(queueName, methodSize); + updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize); - for (Runnable r:executor.getQueue()) { + return callQueueInfo; + } + + protected void updateMethodCountAndSizeByQueue(BlockingQueue queue, + HashMap methodCount, HashMap methodSize) { + for (Runnable r : queue) { FifoCallRunner mcr = (FifoCallRunner) r; RpcCall rpcCall = mcr.getCallRunner().getRpcCall(); - String method; - - if (null==rpcCall.getMethod() || - StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) { + String method = getCallMethod(mcr.getCallRunner()); + if (StringUtil.isNullOrEmpty(method)) { method = "Unknown"; } @@ -216,7 +226,13 @@ public class FifoRpcScheduler extends RpcScheduler { methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L)); methodSize.put(method, size + methodSize.getOrDefault(method, 0L)); } + } - return callQueueInfo; + protected String getCallMethod(final CallRunner task) { + RpcCall call = task.getRpcCall(); + if (call != null && call.getMethod() != null) { + return call.getMethod().getName(); + } + return null; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java new file mode 100644 index 00000000000..b596c40e750 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterFifoRpcScheduler.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A special {@code }RpcScheduler} only used for master. This scheduler separates RegionServerReport + * requests to independent handlers to avoid these requests block other requests. To use this + * scheduler, please set "hbase.master.rpc.scheduler.factory.class" to + * "org.apache.hadoop.hbase.ipc.MasterFifoRpcScheduler". + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MasterFifoRpcScheduler extends FifoRpcScheduler { + private static final Logger LOG = LoggerFactory.getLogger(MasterFifoRpcScheduler.class); + + /** + * Set RSReport requests handlers count when masters use MasterFifoRpcScheduler. The default value + * is half of "hbase.regionserver.handler.count" value, but at least 1. The other handlers count + * is "hbase.regionserver.handler.count" value minus RSReport handlers count, but at least 1 too. + */ + public static final String MASTER_SERVER_REPORT_HANDLER_COUNT = + "hbase.master.server.report.handler.count"; + private static final String REGION_SERVER_REPORT = "RegionServerReport"; + private final int rsReportHandlerCount; + private final int rsRsreportMaxQueueLength; + private final AtomicInteger rsReportQueueSize = new AtomicInteger(0); + private ThreadPoolExecutor rsReportExecutor; + + public MasterFifoRpcScheduler(Configuration conf, int callHandlerCount, + int rsReportHandlerCount) { + super(conf, callHandlerCount); + this.rsReportHandlerCount = rsReportHandlerCount; + this.rsRsreportMaxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, + rsReportHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + } + + @Override + public void start() { + LOG.info( + "Using {} as call queue; handlerCount={}; maxQueueLength={}; rsReportHandlerCount={}; " + + "rsReportMaxQueueLength={}", + this.getClass().getSimpleName(), handlerCount, maxQueueLength, rsReportHandlerCount, + rsRsreportMaxQueueLength); + this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS, + new ArrayBlockingQueue(maxQueueLength), + new DaemonThreadFactory("MasterFifoRpcScheduler.call.handler"), + new ThreadPoolExecutor.CallerRunsPolicy()); + this.rsReportExecutor = new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60, + TimeUnit.SECONDS, new ArrayBlockingQueue(rsRsreportMaxQueueLength), + new DaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"), + new ThreadPoolExecutor.CallerRunsPolicy()); + } + + @Override + public void stop() { + this.executor.shutdown(); + this.rsReportExecutor.shutdown(); + } + + @Override + public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { + String method = getCallMethod(task); + if (rsReportExecutor != null && method != null && method.equals(REGION_SERVER_REPORT)) { + return executeRpcCall(rsReportExecutor, rsReportQueueSize, task); + } else { + return executeRpcCall(executor, queueSize, task); + } + } + + @Override + public int getGeneralQueueLength() { + return executor.getQueue().size() + rsReportExecutor.getQueue().size(); + } + + @Override + public int getActiveRpcHandlerCount() { + return executor.getActiveCount() + rsReportExecutor.getActiveCount(); + } + + @Override + public CallQueueInfo getCallQueueInfo() { + String queueName = "Master Fifo Queue"; + + HashMap methodCount = new HashMap<>(); + HashMap methodSize = new HashMap<>(); + + CallQueueInfo callQueueInfo = new CallQueueInfo(); + callQueueInfo.setCallMethodCount(queueName, methodCount); + callQueueInfo.setCallMethodSize(queueName, methodSize); + + updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize); + updateMethodCountAndSizeByQueue(rsReportExecutor.getQueue(), methodCount, methodSize); + + return callQueueInfo; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index a4d9ff84f0a..9ebbd3c236f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -335,6 +335,16 @@ public class MasterRpcServices extends RSRpcServices master = m; } + @Override + protected Class getRpcSchedulerFactoryClass() { + Configuration conf = getConfiguration(); + if (conf != null) { + return conf.getClass(MASTER_RPC_SCHEDULER_FACTORY_CLASS, super.getRpcSchedulerFactoryClass()); + } else { + return super.getRpcSchedulerFactoryClass(); + } + } + @Override protected RpcServerInterface createRpcServer(Server server, Configuration conf, RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java new file mode 100644 index 00000000000..da2309a1fb8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MasterFifoRpcSchedulerFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.MasterFifoRpcScheduler; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Factory to use when you want to use the {@link MasterFifoRpcScheduler} + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MasterFifoRpcSchedulerFactory extends FifoRpcSchedulerFactory { + @Override + public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { + int totalHandlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + int rsReportHandlerCount = Math.max(1, conf + .getInt(MasterFifoRpcScheduler.MASTER_SERVER_REPORT_HANDLER_COUNT, totalHandlerCount / 2)); + int callHandlerCount = Math.max(1, totalHandlerCount - rsReportHandlerCount); + return new MasterFifoRpcScheduler(conf, callHandlerCount, rsReportHandlerCount); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index e292ce15beb..cb97d351377 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.RpcCallback; +import org.apache.hadoop.hbase.ipc.RpcScheduler; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerFactory; @@ -257,6 +258,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = "hbase.region.server.rpc.scheduler.factory.class"; + /** RPC scheduler to use for the master. */ + public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS = + "hbase.master.rpc.scheduler.factory.class"; + /** * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This * configuration exists to prevent the scenario where a time limit is specified to be so @@ -1203,10 +1208,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); RpcSchedulerFactory rpcSchedulerFactory; try { - Class cls = rs.conf.getClass( - REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, - SimpleRpcSchedulerFactory.class); - rpcSchedulerFactory = cls.asSubclass(RpcSchedulerFactory.class) + rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) .getDeclaredConstructor().newInstance(); } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { @@ -1283,6 +1285,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + protected Class getRpcSchedulerFactoryClass() { + return this.regionServer.conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + SimpleRpcSchedulerFactory.class); + } + @Override public void onConfigurationChange(Configuration newConf) { if (rpcServer instanceof ConfigurationObserver) { @@ -3700,4 +3707,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, throw new ServiceException(e); } } + + @VisibleForTesting + public RpcScheduler getRpcScheduler() { + return rpcServer.getScheduler(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java new file mode 100644 index 00000000000..13d09cf70e8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMasterFifoRpcScheduler.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterRpcServices; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; + +@Category({ RPCTests.class, LargeTests.class }) +public class TestMasterFifoRpcScheduler { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMasterFifoRpcScheduler.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestMasterFifoRpcScheduler.class); + + private static final String REGION_SERVER_REPORT = "RegionServerReport"; + private static final String OTHER = "Other"; + private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(RSRpcServices.MASTER_RPC_SCHEDULER_FACTORY_CLASS, + "org.apache.hadoop.hbase.regionserver.MasterFifoRpcSchedulerFactory"); + conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5); + conf.setInt(MasterFifoRpcScheduler.MASTER_SERVER_REPORT_HANDLER_COUNT, 2); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testMasterRpcScheduler() { + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + MasterRpcServices masterRpcServices = master.getMasterRpcServices(); + RpcScheduler masterRpcScheduler = masterRpcServices.getRpcScheduler(); + Assert.assertTrue(masterRpcScheduler instanceof MasterFifoRpcScheduler); + } + + @Test + public void testCallQueueInfo() throws Exception { + Configuration conf = HBaseConfiguration.create(); + AtomicInteger callExecutionCount = new AtomicInteger(0); + + RpcScheduler scheduler = new MockMasterFifoRpcScheduler(conf, 2, 1); + scheduler.start(); + + int totalCallMethods = 30; + int unableToDispatch = 0; + + for (int i = totalCallMethods; i > 0; i--) { + CallRunner task = createMockTask(callExecutionCount, i < 20); + if (!scheduler.dispatch(task)) { + unableToDispatch++; + } + Thread.sleep(10); + } + + CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo(); + int executionCount = callExecutionCount.get(); + + String expectedQueueName = "Master Fifo Queue"; + assertEquals(1, callQueueInfo.getCallQueueNames().size()); + + long callQueueSize = 0; + for (String queueName : callQueueInfo.getCallQueueNames()) { + assertEquals(expectedQueueName, queueName); + Set methodNames = callQueueInfo.getCalledMethodNames(queueName); + if (methodNames.size() == 2) { + assertTrue(methodNames.contains(REGION_SERVER_REPORT)); + assertTrue(methodNames.contains(OTHER)); + } + for (String methodName : callQueueInfo.getCalledMethodNames(queueName)) { + callQueueSize += callQueueInfo.getCallMethodCount(queueName, methodName); + } + } + + assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount); + scheduler.stop(); + } + + private CallRunner createMockTask(AtomicInteger callExecutionCount, + boolean isRegionServerReportTask) { + CallRunner task = mock(CallRunner.class); + ServerCall call = mock(ServerCall.class); + when(task.getRpcCall()).thenReturn(call); + when(call.getHeader()).thenReturn(RPCProtos.RequestHeader.newBuilder() + .setMethodName(isRegionServerReportTask ? REGION_SERVER_REPORT : OTHER).build()); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + callExecutionCount.incrementAndGet(); + Thread.sleep(1000); + return null; + } + }).when(task).run(); + + return task; + } + + private static class MockMasterFifoRpcScheduler extends MasterFifoRpcScheduler { + + public MockMasterFifoRpcScheduler(Configuration conf, int callHandlerCount, + int rsReportHandlerCount) { + super(conf, callHandlerCount, rsReportHandlerCount); + } + + /** + * Override this method because we can't mock a Descriptors.MethodDescriptor + */ + @Override + protected String getCallMethod(final CallRunner task) { + RpcCall call = task.getRpcCall(); + if (call.getHeader() != null) { + return call.getHeader().getMethodName(); + } + return null; + } + } +}