Revert "HBASE-20965 Separate region server report requests to new handlers"
This reverts commit 48d387413f
.
This commit is contained in:
parent
48d387413f
commit
d921262d38
|
@ -1022,8 +1022,6 @@ public final class HConstants {
|
||||||
|
|
||||||
public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count";
|
public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count";
|
||||||
public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 30;
|
public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 30;
|
||||||
public static final String REGION_SERVER_REPORT_HANDLER_COUNT =
|
|
||||||
"hbase.regionserver.report.handler.count";
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT:
|
* REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT:
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -40,10 +39,10 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class FifoRpcScheduler extends RpcScheduler {
|
public class FifoRpcScheduler extends RpcScheduler {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class);
|
private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class);
|
||||||
protected final int handlerCount;
|
private final int handlerCount;
|
||||||
protected final int maxQueueLength;
|
private final int maxQueueLength;
|
||||||
protected final AtomicInteger queueSize = new AtomicInteger(0);
|
private final AtomicInteger queueSize = new AtomicInteger(0);
|
||||||
protected ThreadPoolExecutor executor;
|
private ThreadPoolExecutor executor;
|
||||||
|
|
||||||
public FifoRpcScheduler(Configuration conf, int handlerCount) {
|
public FifoRpcScheduler(Configuration conf, int handlerCount) {
|
||||||
this.handlerCount = handlerCount;
|
this.handlerCount = handlerCount;
|
||||||
|
@ -95,11 +94,6 @@ public class FifoRpcScheduler extends RpcScheduler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
|
public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
|
||||||
return executeRpcCall(executor, queueSize, task);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize,
|
|
||||||
final CallRunner task) {
|
|
||||||
// Executors provide no offer, so make our own.
|
// Executors provide no offer, so make our own.
|
||||||
int queued = queueSize.getAndIncrement();
|
int queued = queueSize.getAndIncrement();
|
||||||
if (maxQueueLength > 0 && queued >= maxQueueLength) {
|
if (maxQueueLength > 0 && queued >= maxQueueLength) {
|
||||||
|
@ -205,19 +199,15 @@ public class FifoRpcScheduler extends RpcScheduler {
|
||||||
callQueueInfo.setCallMethodCount(queueName, methodCount);
|
callQueueInfo.setCallMethodCount(queueName, methodCount);
|
||||||
callQueueInfo.setCallMethodSize(queueName, methodSize);
|
callQueueInfo.setCallMethodSize(queueName, methodSize);
|
||||||
|
|
||||||
updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize);
|
|
||||||
|
|
||||||
return callQueueInfo;
|
for (Runnable r:executor.getQueue()) {
|
||||||
}
|
|
||||||
|
|
||||||
protected void updateMethodCountAndSizeByQueue(BlockingQueue<Runnable> queue,
|
|
||||||
HashMap<String, Long> methodCount, HashMap<String, Long> methodSize) {
|
|
||||||
for (Runnable r : queue) {
|
|
||||||
FifoCallRunner mcr = (FifoCallRunner) r;
|
FifoCallRunner mcr = (FifoCallRunner) r;
|
||||||
RpcCall rpcCall = mcr.getCallRunner().getRpcCall();
|
RpcCall rpcCall = mcr.getCallRunner().getRpcCall();
|
||||||
|
|
||||||
String method = getCallMethod(mcr.getCallRunner());
|
String method;
|
||||||
if (StringUtil.isNullOrEmpty(method)) {
|
|
||||||
|
if (null==rpcCall.getMethod() ||
|
||||||
|
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
|
||||||
method = "Unknown";
|
method = "Unknown";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,13 +216,7 @@ public class FifoRpcScheduler extends RpcScheduler {
|
||||||
methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L));
|
methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L));
|
||||||
methodSize.put(method, size + methodSize.getOrDefault(method, 0L));
|
methodSize.put(method, size + methodSize.getOrDefault(method, 0L));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
protected String getCallMethod(final CallRunner task) {
|
return callQueueInfo;
|
||||||
RpcCall call = task.getRpcCall();
|
|
||||||
if (call != null && call.getMethod() != null) {
|
|
||||||
return call.getMethod().getName();
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,108 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Special rpc scheduler only used for master.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class MasterFifoRpcScheduler extends FifoRpcScheduler {
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MasterFifoRpcScheduler.class);
|
|
||||||
|
|
||||||
private static final String REGION_SERVER_REPORT = "RegionServerReport";
|
|
||||||
private final int rsReportHandlerCount;
|
|
||||||
private final int rsRsreportMaxQueueLength;
|
|
||||||
private final AtomicInteger rsReportQueueSize = new AtomicInteger(0);
|
|
||||||
private ThreadPoolExecutor rsReportExecutor;
|
|
||||||
|
|
||||||
public MasterFifoRpcScheduler(Configuration conf, int callHandlerCount,
|
|
||||||
int rsReportHandlerCount) {
|
|
||||||
super(conf, callHandlerCount);
|
|
||||||
this.rsReportHandlerCount = rsReportHandlerCount;
|
|
||||||
this.rsRsreportMaxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
|
|
||||||
rsReportHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() {
|
|
||||||
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
|
|
||||||
new ArrayBlockingQueue<Runnable>(maxQueueLength),
|
|
||||||
new DaemonThreadFactory("MasterFifoRpcScheduler.call.handler"),
|
|
||||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
|
||||||
this.rsReportExecutor = new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60,
|
|
||||||
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(rsRsreportMaxQueueLength),
|
|
||||||
new DaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"),
|
|
||||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop() {
|
|
||||||
this.executor.shutdown();
|
|
||||||
this.rsReportExecutor.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
|
|
||||||
String method = getCallMethod(task);
|
|
||||||
if (rsReportExecutor != null && method != null && method.equals(REGION_SERVER_REPORT)) {
|
|
||||||
return executeRpcCall(rsReportExecutor, rsReportQueueSize, task);
|
|
||||||
} else {
|
|
||||||
return executeRpcCall(executor, queueSize, task);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getGeneralQueueLength() {
|
|
||||||
return executor.getQueue().size() + rsReportExecutor.getQueue().size();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getActiveRpcHandlerCount() {
|
|
||||||
return executor.getActiveCount() + rsReportExecutor.getActiveCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public CallQueueInfo getCallQueueInfo() {
|
|
||||||
String queueName = "Master Fifo Queue";
|
|
||||||
|
|
||||||
HashMap<String, Long> methodCount = new HashMap<>();
|
|
||||||
HashMap<String, Long> methodSize = new HashMap<>();
|
|
||||||
|
|
||||||
CallQueueInfo callQueueInfo = new CallQueueInfo();
|
|
||||||
callQueueInfo.setCallMethodCount(queueName, methodCount);
|
|
||||||
callQueueInfo.setCallMethodSize(queueName, methodSize);
|
|
||||||
|
|
||||||
updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize);
|
|
||||||
updateMethodCountAndSizeByQueue(rsReportExecutor.getQueue(), methodCount, methodSize);
|
|
||||||
|
|
||||||
return callQueueInfo;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -335,16 +335,6 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
master = m;
|
master = m;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Class<?> getRpcSchedulerFactoryClass() {
|
|
||||||
Configuration conf = getConfiguration();
|
|
||||||
if (conf != null) {
|
|
||||||
return conf.getClass(MASTER_RPC_SCHEDULER_FACTORY_CLASS, super.getRpcSchedulerFactoryClass());
|
|
||||||
} else {
|
|
||||||
return super.getRpcSchedulerFactoryClass();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected RpcServerInterface createRpcServer(Server server, Configuration conf,
|
protected RpcServerInterface createRpcServer(Server server, Configuration conf,
|
||||||
RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name)
|
RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name)
|
||||||
|
|
|
@ -1,46 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.ipc.MasterFifoRpcScheduler;
|
|
||||||
import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcScheduler;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Factory to use when you want to use the {@link MasterFifoRpcScheduler}
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public class MasterFifoRpcSchedulerFactory extends FifoRpcSchedulerFactory {
|
|
||||||
@Override
|
|
||||||
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
|
|
||||||
int totalHandlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
|
|
||||||
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
|
|
||||||
int rsReportHandlerCount = conf.getInt(HConstants.REGION_SERVER_REPORT_HANDLER_COUNT, 0);
|
|
||||||
if (rsReportHandlerCount == 0) {
|
|
||||||
rsReportHandlerCount = Math.max(1, totalHandlerCount / 2);
|
|
||||||
}
|
|
||||||
int callHandlerCount = Math.max(1, totalHandlerCount - rsReportHandlerCount);
|
|
||||||
return new MasterFifoRpcScheduler(conf, callHandlerCount, rsReportHandlerCount);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -94,7 +94,6 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
||||||
import org.apache.hadoop.hbase.ipc.QosPriority;
|
import org.apache.hadoop.hbase.ipc.QosPriority;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcCallContext;
|
import org.apache.hadoop.hbase.ipc.RpcCallContext;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcCallback;
|
import org.apache.hadoop.hbase.ipc.RpcCallback;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcScheduler;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
|
||||||
|
@ -258,10 +257,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
|
public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
|
||||||
"hbase.region.server.rpc.scheduler.factory.class";
|
"hbase.region.server.rpc.scheduler.factory.class";
|
||||||
|
|
||||||
/** RPC scheduler to use for the master. */
|
|
||||||
public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS =
|
|
||||||
"hbase.master.rpc.scheduler.factory.class";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
|
* Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
|
||||||
* configuration exists to prevent the scenario where a time limit is specified to be so
|
* configuration exists to prevent the scenario where a time limit is specified to be so
|
||||||
|
@ -1208,7 +1203,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
|
rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
|
||||||
RpcSchedulerFactory rpcSchedulerFactory;
|
RpcSchedulerFactory rpcSchedulerFactory;
|
||||||
try {
|
try {
|
||||||
rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
|
Class<?> cls = rs.conf.getClass(
|
||||||
|
REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
|
||||||
|
SimpleRpcSchedulerFactory.class);
|
||||||
|
rpcSchedulerFactory = cls.asSubclass(RpcSchedulerFactory.class)
|
||||||
.getDeclaredConstructor().newInstance();
|
.getDeclaredConstructor().newInstance();
|
||||||
} catch (NoSuchMethodException | InvocationTargetException |
|
} catch (NoSuchMethodException | InvocationTargetException |
|
||||||
InstantiationException | IllegalAccessException e) {
|
InstantiationException | IllegalAccessException e) {
|
||||||
|
@ -1285,11 +1283,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Class<?> getRpcSchedulerFactoryClass() {
|
|
||||||
return this.regionServer.conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
|
|
||||||
SimpleRpcSchedulerFactory.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onConfigurationChange(Configuration newConf) {
|
public void onConfigurationChange(Configuration newConf) {
|
||||||
if (rpcServer instanceof ConfigurationObserver) {
|
if (rpcServer instanceof ConfigurationObserver) {
|
||||||
|
@ -3707,9 +3700,4 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public RpcScheduler getRpcScheduler() {
|
|
||||||
return rpcServer.getScheduler();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,168 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.mockito.Mockito.doAnswer;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
|
||||||
import org.apache.hadoop.hbase.master.MasterRpcServices;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
|
||||||
import org.mockito.stubbing.Answer;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
|
|
||||||
|
|
||||||
@Category({ RPCTests.class, LargeTests.class })
|
|
||||||
public class TestMasterFifoRpcScheduler {
|
|
||||||
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestMasterFifoRpcScheduler.class);
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestMasterFifoRpcScheduler.class);
|
|
||||||
|
|
||||||
private static final String REGION_SERVER_REPORT = "RegionServerReport";
|
|
||||||
private static final String OTHER = "Other";
|
|
||||||
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setupBeforeClass() throws Exception {
|
|
||||||
Configuration conf = TEST_UTIL.getConfiguration();
|
|
||||||
conf.set(RSRpcServices.MASTER_RPC_SCHEDULER_FACTORY_CLASS,
|
|
||||||
"org.apache.hadoop.hbase.regionserver.MasterFifoRpcSchedulerFactory");
|
|
||||||
conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5);
|
|
||||||
conf.setInt(HConstants.REGION_SERVER_REPORT_HANDLER_COUNT, 2);
|
|
||||||
TEST_UTIL.startMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDownAfterClass() throws Exception {
|
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMasterRpcScheduler() {
|
|
||||||
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
|
||||||
MasterRpcServices masterRpcServices = master.getMasterRpcServices();
|
|
||||||
RpcScheduler masterRpcScheduler = masterRpcServices.getRpcScheduler();
|
|
||||||
Assert.assertTrue(masterRpcScheduler instanceof MasterFifoRpcScheduler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCallQueueInfo() throws Exception {
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
AtomicInteger callExecutionCount = new AtomicInteger(0);
|
|
||||||
|
|
||||||
RpcScheduler scheduler = new MockMasterFifoRpcScheduler(conf, 2, 1);
|
|
||||||
scheduler.start();
|
|
||||||
|
|
||||||
int totalCallMethods = 30;
|
|
||||||
int unableToDispatch = 0;
|
|
||||||
|
|
||||||
for (int i = totalCallMethods; i > 0; i--) {
|
|
||||||
CallRunner task = createMockTask(callExecutionCount, i < 20);
|
|
||||||
if (!scheduler.dispatch(task)) {
|
|
||||||
unableToDispatch++;
|
|
||||||
}
|
|
||||||
Thread.sleep(10);
|
|
||||||
}
|
|
||||||
|
|
||||||
CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
|
|
||||||
int executionCount = callExecutionCount.get();
|
|
||||||
|
|
||||||
String expectedQueueName = "Master Fifo Queue";
|
|
||||||
assertEquals(1, callQueueInfo.getCallQueueNames().size());
|
|
||||||
|
|
||||||
long callQueueSize = 0;
|
|
||||||
for (String queueName : callQueueInfo.getCallQueueNames()) {
|
|
||||||
assertEquals(expectedQueueName, queueName);
|
|
||||||
Set<String> methodNames = callQueueInfo.getCalledMethodNames(queueName);
|
|
||||||
if (methodNames.size() == 2) {
|
|
||||||
assertTrue(methodNames.contains(REGION_SERVER_REPORT));
|
|
||||||
assertTrue(methodNames.contains(OTHER));
|
|
||||||
}
|
|
||||||
for (String methodName : callQueueInfo.getCalledMethodNames(queueName)) {
|
|
||||||
callQueueSize += callQueueInfo.getCallMethodCount(queueName, methodName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount);
|
|
||||||
scheduler.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
private CallRunner createMockTask(AtomicInteger callExecutionCount,
|
|
||||||
boolean isRegionServerReportTask) {
|
|
||||||
CallRunner task = mock(CallRunner.class);
|
|
||||||
ServerCall call = mock(ServerCall.class);
|
|
||||||
when(task.getRpcCall()).thenReturn(call);
|
|
||||||
when(call.getHeader()).thenReturn(RPCProtos.RequestHeader.newBuilder()
|
|
||||||
.setMethodName(isRegionServerReportTask ? REGION_SERVER_REPORT : OTHER).build());
|
|
||||||
|
|
||||||
doAnswer(new Answer<Void>() {
|
|
||||||
@Override
|
|
||||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
||||||
callExecutionCount.incrementAndGet();
|
|
||||||
Thread.sleep(1000);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}).when(task).run();
|
|
||||||
|
|
||||||
return task;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class MockMasterFifoRpcScheduler extends MasterFifoRpcScheduler {
|
|
||||||
|
|
||||||
public MockMasterFifoRpcScheduler(Configuration conf, int callHandlerCount,
|
|
||||||
int rsReportHandlerCount) {
|
|
||||||
super(conf, callHandlerCount, rsReportHandlerCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Override this method because we can't mock a Descriptors.MethodDescriptor
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
protected String getCallMethod(final CallRunner task) {
|
|
||||||
RpcCall call = task.getRpcCall();
|
|
||||||
if (call.getHeader() != null) {
|
|
||||||
return call.getHeader().getMethodName();
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue