HBASE-20965 Separate region server report requests to new handlers

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
meiyi 2018-08-08 15:15:36 +08:00 committed by Guanghao Zhang
parent 3f5033f88e
commit e2fcde2d6f
6 changed files with 391 additions and 16 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -39,17 +40,15 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
@InterfaceAudience.Private
public class FifoRpcScheduler extends RpcScheduler {
private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class);
private final int handlerCount;
private final int maxQueueLength;
private final AtomicInteger queueSize = new AtomicInteger(0);
private ThreadPoolExecutor executor;
protected final int handlerCount;
protected final int maxQueueLength;
protected final AtomicInteger queueSize = new AtomicInteger(0);
protected ThreadPoolExecutor executor;
public FifoRpcScheduler(Configuration conf, int handlerCount) {
this.handlerCount = handlerCount;
this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
LOG.info("Using " + this.getClass().getSimpleName() + " as user call queue; handlerCount=" +
handlerCount + "; maxQueueLength=" + maxQueueLength);
}
@Override
@ -59,6 +58,8 @@ public class FifoRpcScheduler extends RpcScheduler {
@Override
public void start() {
LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}",
this.getClass().getSimpleName(), handlerCount, maxQueueLength);
this.executor = new ThreadPoolExecutor(
handlerCount,
handlerCount,
@ -94,6 +95,11 @@ public class FifoRpcScheduler extends RpcScheduler {
@Override
public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
return executeRpcCall(executor, queueSize, task);
}
protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize,
final CallRunner task) {
// Executors provide no offer, so make our own.
int queued = queueSize.getAndIncrement();
if (maxQueueLength > 0 && queued >= maxQueueLength) {
@ -199,15 +205,19 @@ public class FifoRpcScheduler extends RpcScheduler {
callQueueInfo.setCallMethodCount(queueName, methodCount);
callQueueInfo.setCallMethodSize(queueName, methodSize);
updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize);
for (Runnable r:executor.getQueue()) {
return callQueueInfo;
}
protected void updateMethodCountAndSizeByQueue(BlockingQueue<Runnable> queue,
HashMap<String, Long> methodCount, HashMap<String, Long> methodSize) {
for (Runnable r : queue) {
FifoCallRunner mcr = (FifoCallRunner) r;
RpcCall rpcCall = mcr.getCallRunner().getRpcCall();
String method;
if (null==rpcCall.getMethod() ||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
String method = getCallMethod(mcr.getCallRunner());
if (StringUtil.isNullOrEmpty(method)) {
method = "Unknown";
}
@ -216,7 +226,13 @@ public class FifoRpcScheduler extends RpcScheduler {
methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L));
methodSize.put(method, size + methodSize.getOrDefault(method, 0L));
}
}
return callQueueInfo;
protected String getCallMethod(final CallRunner task) {
RpcCall call = task.getRpcCall();
if (call != null && call.getMethod() != null) {
return call.getMethod().getName();
}
return null;
}
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A special {@code }RpcScheduler} only used for master. This scheduler separates RegionServerReport
* requests to independent handlers to avoid these requests block other requests. To use this
* scheduler, please set "hbase.master.rpc.scheduler.factory.class" to
* "org.apache.hadoop.hbase.ipc.MasterFifoRpcScheduler".
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MasterFifoRpcScheduler extends FifoRpcScheduler {
private static final Logger LOG = LoggerFactory.getLogger(MasterFifoRpcScheduler.class);
/**
* Set RSReport requests handlers count when masters use MasterFifoRpcScheduler. The default value
* is half of "hbase.regionserver.handler.count" value, but at least 1. The other handlers count
* is "hbase.regionserver.handler.count" value minus RSReport handlers count, but at least 1 too.
*/
public static final String MASTER_SERVER_REPORT_HANDLER_COUNT =
"hbase.master.server.report.handler.count";
private static final String REGION_SERVER_REPORT = "RegionServerReport";
private final int rsReportHandlerCount;
private final int rsRsreportMaxQueueLength;
private final AtomicInteger rsReportQueueSize = new AtomicInteger(0);
private ThreadPoolExecutor rsReportExecutor;
public MasterFifoRpcScheduler(Configuration conf, int callHandlerCount,
int rsReportHandlerCount) {
super(conf, callHandlerCount);
this.rsReportHandlerCount = rsReportHandlerCount;
this.rsRsreportMaxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
rsReportHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
}
@Override
public void start() {
LOG.info(
"Using {} as call queue; handlerCount={}; maxQueueLength={}; rsReportHandlerCount={}; "
+ "rsReportMaxQueueLength={}",
this.getClass().getSimpleName(), handlerCount, maxQueueLength, rsReportHandlerCount,
rsRsreportMaxQueueLength);
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(maxQueueLength),
new DaemonThreadFactory("MasterFifoRpcScheduler.call.handler"),
new ThreadPoolExecutor.CallerRunsPolicy());
this.rsReportExecutor = new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(rsRsreportMaxQueueLength),
new DaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"),
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void stop() {
this.executor.shutdown();
this.rsReportExecutor.shutdown();
}
@Override
public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
String method = getCallMethod(task);
if (rsReportExecutor != null && method != null && method.equals(REGION_SERVER_REPORT)) {
return executeRpcCall(rsReportExecutor, rsReportQueueSize, task);
} else {
return executeRpcCall(executor, queueSize, task);
}
}
@Override
public int getGeneralQueueLength() {
return executor.getQueue().size() + rsReportExecutor.getQueue().size();
}
@Override
public int getActiveRpcHandlerCount() {
return executor.getActiveCount() + rsReportExecutor.getActiveCount();
}
@Override
public CallQueueInfo getCallQueueInfo() {
String queueName = "Master Fifo Queue";
HashMap<String, Long> methodCount = new HashMap<>();
HashMap<String, Long> methodSize = new HashMap<>();
CallQueueInfo callQueueInfo = new CallQueueInfo();
callQueueInfo.setCallMethodCount(queueName, methodCount);
callQueueInfo.setCallMethodSize(queueName, methodSize);
updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize);
updateMethodCountAndSizeByQueue(rsReportExecutor.getQueue(), methodCount, methodSize);
return callQueueInfo;
}
}

View File

@ -335,6 +335,16 @@ public class MasterRpcServices extends RSRpcServices
master = m;
}
@Override
protected Class<?> getRpcSchedulerFactoryClass() {
Configuration conf = getConfiguration();
if (conf != null) {
return conf.getClass(MASTER_RPC_SCHEDULER_FACTORY_CLASS, super.getRpcSchedulerFactoryClass());
} else {
return super.getRpcSchedulerFactoryClass();
}
}
@Override
protected RpcServerInterface createRpcServer(Server server, Configuration conf,
RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name)

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.MasterFifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/**
* Factory to use when you want to use the {@link MasterFifoRpcScheduler}
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MasterFifoRpcSchedulerFactory extends FifoRpcSchedulerFactory {
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int totalHandlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
int rsReportHandlerCount = Math.max(1, conf
.getInt(MasterFifoRpcScheduler.MASTER_SERVER_REPORT_HANDLER_COUNT, totalHandlerCount / 2));
int callHandlerCount = Math.max(1, totalHandlerCount - rsReportHandlerCount);
return new MasterFifoRpcScheduler(conf, callHandlerCount, rsReportHandlerCount);
}
}

View File

@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcCallback;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
@ -257,6 +258,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
"hbase.region.server.rpc.scheduler.factory.class";
/** RPC scheduler to use for the master. */
public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS =
"hbase.master.rpc.scheduler.factory.class";
/**
* Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
* configuration exists to prevent the scenario where a time limit is specified to be so
@ -1203,10 +1208,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
RpcSchedulerFactory rpcSchedulerFactory;
try {
Class<?> cls = rs.conf.getClass(
REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
SimpleRpcSchedulerFactory.class);
rpcSchedulerFactory = cls.asSubclass(RpcSchedulerFactory.class)
rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
.getDeclaredConstructor().newInstance();
} catch (NoSuchMethodException | InvocationTargetException |
InstantiationException | IllegalAccessException e) {
@ -1283,6 +1285,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
protected Class<?> getRpcSchedulerFactoryClass() {
return this.regionServer.conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
SimpleRpcSchedulerFactory.class);
}
@Override
public void onConfigurationChange(Configuration newConf) {
if (rpcServer instanceof ConfigurationObserver) {
@ -3700,4 +3707,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
throw new ServiceException(e);
}
}
@VisibleForTesting
public RpcScheduler getRpcScheduler() {
return rpcServer.getScheduler();
}
}

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
@Category({ RPCTests.class, LargeTests.class })
public class TestMasterFifoRpcScheduler {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterFifoRpcScheduler.class);
private static final Logger LOG = LoggerFactory.getLogger(TestMasterFifoRpcScheduler.class);
private static final String REGION_SERVER_REPORT = "RegionServerReport";
private static final String OTHER = "Other";
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@BeforeClass
public static void setupBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(RSRpcServices.MASTER_RPC_SCHEDULER_FACTORY_CLASS,
"org.apache.hadoop.hbase.regionserver.MasterFifoRpcSchedulerFactory");
conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5);
conf.setInt(MasterFifoRpcScheduler.MASTER_SERVER_REPORT_HANDLER_COUNT, 2);
TEST_UTIL.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testMasterRpcScheduler() {
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
MasterRpcServices masterRpcServices = master.getMasterRpcServices();
RpcScheduler masterRpcScheduler = masterRpcServices.getRpcScheduler();
Assert.assertTrue(masterRpcScheduler instanceof MasterFifoRpcScheduler);
}
@Test
public void testCallQueueInfo() throws Exception {
Configuration conf = HBaseConfiguration.create();
AtomicInteger callExecutionCount = new AtomicInteger(0);
RpcScheduler scheduler = new MockMasterFifoRpcScheduler(conf, 2, 1);
scheduler.start();
int totalCallMethods = 30;
int unableToDispatch = 0;
for (int i = totalCallMethods; i > 0; i--) {
CallRunner task = createMockTask(callExecutionCount, i < 20);
if (!scheduler.dispatch(task)) {
unableToDispatch++;
}
Thread.sleep(10);
}
CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
int executionCount = callExecutionCount.get();
String expectedQueueName = "Master Fifo Queue";
assertEquals(1, callQueueInfo.getCallQueueNames().size());
long callQueueSize = 0;
for (String queueName : callQueueInfo.getCallQueueNames()) {
assertEquals(expectedQueueName, queueName);
Set<String> methodNames = callQueueInfo.getCalledMethodNames(queueName);
if (methodNames.size() == 2) {
assertTrue(methodNames.contains(REGION_SERVER_REPORT));
assertTrue(methodNames.contains(OTHER));
}
for (String methodName : callQueueInfo.getCalledMethodNames(queueName)) {
callQueueSize += callQueueInfo.getCallMethodCount(queueName, methodName);
}
}
assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount);
scheduler.stop();
}
private CallRunner createMockTask(AtomicInteger callExecutionCount,
boolean isRegionServerReportTask) {
CallRunner task = mock(CallRunner.class);
ServerCall call = mock(ServerCall.class);
when(task.getRpcCall()).thenReturn(call);
when(call.getHeader()).thenReturn(RPCProtos.RequestHeader.newBuilder()
.setMethodName(isRegionServerReportTask ? REGION_SERVER_REPORT : OTHER).build());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
callExecutionCount.incrementAndGet();
Thread.sleep(1000);
return null;
}
}).when(task).run();
return task;
}
private static class MockMasterFifoRpcScheduler extends MasterFifoRpcScheduler {
public MockMasterFifoRpcScheduler(Configuration conf, int callHandlerCount,
int rsReportHandlerCount) {
super(conf, callHandlerCount, rsReportHandlerCount);
}
/**
* Override this method because we can't mock a Descriptors.MethodDescriptor
*/
@Override
protected String getCallMethod(final CallRunner task) {
RpcCall call = task.getRpcCall();
if (call.getHeader() != null) {
return call.getHeader().getMethodName();
}
return null;
}
}
}