HBASE-21754 ReportRegionStateTransitionRequest should be executed in priority executor

This commit is contained in:
Allan Yang 2019-01-23 20:55:00 +08:00
parent dfad304ddb
commit b5619a2a26
16 changed files with 244 additions and 27 deletions

View File

@ -1048,6 +1048,11 @@ public final class HConstants {
public static final String REGION_SERVER_REPLICATION_HANDLER_COUNT =
"hbase.regionserver.replication.handler.count";
public static final int DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT = 3;
// Meta Transition handlers to deal with meta ReportRegionStateTransitionRequest. Meta transition
// should be dealt with in a separate handler in case blocking other region's transition.
public static final String MASTER_META_TRANSITION_HANDLER_COUNT =
"hbase.master.meta.transition.handler.count";
public static final int MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT = 1;
/** Conf key for enabling meta replication */
public static final String USE_META_REPLICAS = "hbase.meta.replicas.use";
@ -1105,7 +1110,7 @@ public final class HConstants {
* by different set of handlers. For example, HIGH_QOS tagged methods are
* handled by high priority handlers.
*/
// normal_QOS < replication_QOS < replay_QOS < QOS_threshold < admin_QOS < high_QOS
// normal_QOS < replication_QOS < replay_QOS < QOS_threshold < admin_QOS < high_QOS < meta_QOS
public static final int PRIORITY_UNSET = -1;
public static final int NORMAL_QOS = 0;
public static final int REPLICATION_QOS = 5;
@ -1114,6 +1119,8 @@ public final class HConstants {
public static final int ADMIN_QOS = 100;
public static final int HIGH_QOS = 200;
public static final int SYSTEMTABLE_QOS = HIGH_QOS;
public static final int META_QOS = 300;
/** Directory under /hbase where archived hfiles are stored */
public static final String HFILE_ARCHIVE_DIRECTORY = "archive";

View File

@ -60,10 +60,12 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String GENERAL_QUEUE_DESC = "Number of calls in the general call queue; " +
"parsed requests waiting in scheduler to be executed";
String PRIORITY_QUEUE_NAME = "numCallsInPriorityQueue";
String METAPRIORITY_QUEUE_NAME = "numCallsInMetaPriorityQueue";
String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
String REPLICATION_QUEUE_DESC =
"Number of calls in the replication call queue waiting to be run";
String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
String METAPRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
String WRITE_QUEUE_NAME = "numCallsInWriteQueue";
String WRITE_QUEUE_DESC = "Number of calls in the write call queue; " +
"parsed requests waiting in scheduler to be executed";

View File

@ -31,6 +31,8 @@ public interface MetricsHBaseServerWrapper {
int getPriorityQueueLength();
int getMetaPriorityQueueLength();
int getNumOpenConnections();
int getActiveRpcHandlerCount();
@ -41,6 +43,8 @@ public interface MetricsHBaseServerWrapper {
int getActiveReplicationRpcHandlerCount();
int getActiveMetaPriorityRpcHandlerCount();
long getNumGeneralCallsDropped();
long getNumLifoModeSwitches();

View File

@ -152,6 +152,8 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl
REPLICATION_QUEUE_DESC), wrapper.getReplicationQueueLength())
.addGauge(Interns.info(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_DESC),
wrapper.getPriorityQueueLength())
.addGauge(Interns.info(METAPRIORITY_QUEUE_NAME, METAPRIORITY_QUEUE_DESC),
wrapper.getMetaPriorityQueueLength())
.addGauge(Interns.info(NUM_OPEN_CONNECTIONS_NAME,
NUM_OPEN_CONNECTIONS_DESC), wrapper.getNumOpenConnections())
.addGauge(Interns.info(NUM_ACTIVE_HANDLER_NAME,

View File

@ -154,6 +154,11 @@ public class FifoRpcScheduler extends RpcScheduler {
return 0;
}
@Override
public int getActiveMetaPriorityRpcHandlerCount() {
return 0;
}
@Override
public long getNumGeneralCallsDropped() {
return 0;
@ -194,6 +199,11 @@ public class FifoRpcScheduler extends RpcScheduler {
return 0;
}
@Override
public int getMetaPriorityQueueLength() {
return 0;
}
@Override
public CallQueueInfo getCallQueueInfo() {
String queueName = "Fifo Queue";

View File

@ -67,6 +67,14 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
return server.getScheduler().getPriorityQueueLength();
}
@Override
public int getMetaPriorityQueueLength() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getMetaPriorityQueueLength();
}
@Override
public int getNumOpenConnections() {
if (!isServerStarted()) {
@ -99,6 +107,14 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
return server.getScheduler().getActivePriorityRpcHandlerCount();
}
@Override
public int getActiveMetaPriorityRpcHandlerCount() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getActiveMetaPriorityRpcHandlerCount();
}
@Override
public int getActiveReplicationRpcHandlerCount() {
if (!isServerStarted() || this.server.getScheduler() == null) {

View File

@ -76,6 +76,9 @@ public abstract class RpcScheduler {
/** Retrieves length of the priority queue for metrics. */
public abstract int getPriorityQueueLength();
/** Retrieves length of the meta priority queue for metrics. */
public abstract int getMetaPriorityQueueLength();
/** Retrieves length of the replication queue for metrics. */
public abstract int getReplicationQueueLength();
@ -88,6 +91,9 @@ public abstract class RpcScheduler {
/** Retrieves the number of active priority handler. */
public abstract int getActivePriorityRpcHandlerCount();
/** Retrieves the number of active meta priority handler. */
public abstract int getActiveMetaPriorityRpcHandlerCount();
/** Retrieves the number of active replication handler. */
public abstract int getActiveReplicationRpcHandlerCount();

View File

@ -41,6 +41,11 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
private final RpcExecutor priorityExecutor;
private final RpcExecutor replicationExecutor;
/**
* This executor is only for meta transition
*/
private final RpcExecutor metaTransitionExecutor;
/** What level a high priority call is at. */
private final int highPriorityLevel;
@ -59,6 +64,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
int handlerCount,
int priorityHandlerCount,
int replicationHandlerCount,
int metaTransitionHandler,
PriorityFunction priority,
Abortable server,
int highPriorityLevel) {
@ -103,18 +109,17 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxReplicationQueueLength, priority,
conf, abortable)
: null;
this.metaTransitionExecutor = metaTransitionHandler > 0 ?
new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler,
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
abortable) :
null;
}
public SimpleRpcScheduler(
Configuration conf,
int handlerCount,
int priorityHandlerCount,
int replicationHandlerCount,
PriorityFunction priority,
int highPriorityLevel) {
this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority,
null, highPriorityLevel);
public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) {
this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, 0, priority, null,
highPriorityLevel);
}
/**
@ -130,6 +135,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
if (replicationExecutor != null) {
replicationExecutor.resizeQueues(conf);
}
if (metaTransitionExecutor != null) {
metaTransitionExecutor.resizeQueues(conf);
}
String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
@ -146,15 +154,31 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
@Override
public void start() {
callExecutor.start(port);
if (priorityExecutor != null) priorityExecutor.start(port);
if (replicationExecutor != null) replicationExecutor.start(port);
if (priorityExecutor != null) {
priorityExecutor.start(port);
}
if (replicationExecutor != null) {
replicationExecutor.start(port);
}
if (metaTransitionExecutor != null) {
metaTransitionExecutor.start(port);
}
}
@Override
public void stop() {
callExecutor.stop();
if (priorityExecutor != null) priorityExecutor.stop();
if (replicationExecutor != null) replicationExecutor.stop();
if (priorityExecutor != null) {
priorityExecutor.stop();
}
if (replicationExecutor != null) {
replicationExecutor.stop();
}
if (metaTransitionExecutor != null) {
metaTransitionExecutor.stop();
}
}
@Override
@ -165,7 +189,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
if (level == HConstants.PRIORITY_UNSET) {
level = HConstants.NORMAL_QOS;
}
if (priorityExecutor != null && level > highPriorityLevel) {
if (metaTransitionExecutor != null && level == HConstants.META_QOS) {
return metaTransitionExecutor.dispatch(callTask);
} else if (priorityExecutor != null && level > highPriorityLevel) {
return priorityExecutor.dispatch(callTask);
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
return replicationExecutor.dispatch(callTask);
@ -174,6 +200,11 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
}
}
@Override
public int getMetaPriorityQueueLength() {
return metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getQueueLength();
}
@Override
public int getGeneralQueueLength() {
return callExecutor.getQueueLength();
@ -192,7 +223,12 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
@Override
public int getActiveRpcHandlerCount() {
return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount()
+ getActiveReplicationRpcHandlerCount();
+ getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount();
}
@Override
public int getActiveMetaPriorityRpcHandlerCount() {
return (metaTransitionExecutor == null ? 0 : metaTransitionExecutor.getActiveHandlerCount());
}
@Override
@ -256,24 +292,31 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
CallQueueInfo callQueueInfo = new CallQueueInfo();
if(null!=callExecutor) {
if (null != callExecutor) {
queueName = "Call Queue";
callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary());
callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary());
}
if(null!=priorityExecutor) {
if (null != priorityExecutor) {
queueName = "Priority Queue";
callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary());
callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary());
}
if(null!=replicationExecutor) {
if (null != replicationExecutor) {
queueName = "Replication Queue";
callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary());
callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary());
}
if (null != metaTransitionExecutor) {
queueName = "Meta Transition Queue";
callQueueInfo.setCallMethodCount(queueName,
metaTransitionExecutor.getCallQueueCountsSummary());
callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary());
}
return callQueueInfo;
}

View File

@ -613,7 +613,8 @@ public class SimpleRpcServer extends RpcServer {
"; connections=" + size() +
", queued calls size (bytes)=" + callQueueSizeInBytes.sum() +
", general queued calls=" + scheduler.getGeneralQueueLength() +
", priority queued calls=" + scheduler.getPriorityQueueLength());
", priority queued calls=" + scheduler.getPriorityQueueLength() +
", meta priority queued calls=" + scheduler.getMetaPriorityQueueLength());
}
return connection;
}

View File

@ -43,6 +43,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
* processing of the request to online meta. To accomplish this this priority function makes sure
* that all requests to transition meta are handled in different threads from other report region
* in transition calls.
* After HBASE-21754, ReportRegionStateTransitionRequest for meta region will be assigned a META_QOS
* , a separate executor called metaTransitionExecutor will execute it. Other transition request
* will be executed in priorityExecutor to prevent being mixed with normal requests
*/
@InterfaceAudience.Private
public class MasterAnnotationReadingPriorityFunction extends AnnotationReadingPriorityFunction {
@ -78,13 +81,13 @@ public class MasterAnnotationReadingPriorityFunction extends AnnotationReadingPr
if (rst.getRegionInfoList() != null) {
for (HBaseProtos.RegionInfo info : rst.getRegionInfoList()) {
TableName tn = ProtobufUtil.toTableName(info.getTableName());
if (tn.isSystemTable()) {
return HConstants.SYSTEMTABLE_QOS;
if (TableName.META_TABLE_NAME.equals(tn)) {
return HConstants.META_QOS;
}
}
}
}
return HConstants.NORMAL_QOS;
return HConstants.HIGH_QOS;
}
// Handle the rest of the different reasons to change priority.

View File

@ -49,6 +49,8 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
priority,
server,
HConstants.QOS_THRESHOLD);

View File

@ -79,6 +79,16 @@ public class DelegatingRpcScheduler extends RpcScheduler {
return delegate.dispatch(task);
}
@Override
public int getActiveMetaPriorityRpcHandlerCount() {
return delegate.getActiveMetaPriorityRpcHandlerCount();
}
@Override
public int getMetaPriorityQueueLength() {
return delegate.getMetaPriorityQueueLength();
}
@Override
public long getNumGeneralCallsDropped() {
return delegate.getNumGeneralCallsDropped();

View File

@ -108,4 +108,14 @@ public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper{
public long getNettyDmUsage() {
return 100L;
}
@Override
public int getMetaPriorityQueueLength() {
return 1;
}
@Override
public int getActiveMetaPriorityRpcHandlerCount() {
return 1;
}
}

View File

@ -95,7 +95,7 @@ public class TestRpcHandlerException {
PriorityFunction qosFunction = mock(PriorityFunction.class);
Abortable abortable = new AbortServer();
CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0);
RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, 0, qosFunction, abortable, 0);
RpcServer rpcServer = RpcServerFactory.createRpcServer(null, "testRpcServer",
Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, scheduler);

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ MasterTests.class, LargeTests.class })
public class TestMasterHandlerFullWhenTransitRegion {
private static Logger LOG = LoggerFactory
.getLogger(TestMasterHandlerFullWhenTransitRegion.class.getName());
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterHandlerFullWhenTransitRegion.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final String TABLENAME = "table";
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
DelayOpenCP.class.getName());
//set handler number to 1.
UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
UTIL.startMiniCluster(2);
UTIL.createTable(TableName.valueOf(TABLENAME), "fa");
}
@Test(timeout = 30000)
public void test() throws Exception {
RegionInfo regionInfo = UTIL.getAdmin().getRegions(TableName.valueOf(TABLENAME)).get(0);
//See HBASE-21754
//There is Only one handler, if ReportRegionStateTransitionRequest executes in the same kind
// of thread with moveRegion, it will lock each other. Making the move operation can not finish.
UTIL.getAdmin().move(regionInfo.getEncodedNameAsBytes(), null);
LOG.info("Region move complete");
}
/**
* Make open region very slow
*/
public static class DelayOpenCP implements RegionCoprocessor, RegionObserver {
@Override
public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
try {
if (!c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) {
LOG.info("begin to sleep");
Thread.sleep(10000);
LOG.info("finish sleep");
}
} catch (Throwable t) {
}
}
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
}
}

View File

@ -96,9 +96,9 @@ public class TestMasterQosFunction extends QosTestHelper {
.addTransition(normalTransition).build();
final String reportFuncName = "ReportRegionStateTransition";
checkMethod(conf, reportFuncName, HConstants.SYSTEMTABLE_QOS, qosFunction,
checkMethod(conf, reportFuncName, HConstants.META_QOS, qosFunction,
metaTransitionRequest);
checkMethod(conf, reportFuncName, HConstants.NORMAL_QOS, qosFunction, normalTransitionRequest);
checkMethod(conf, reportFuncName, HConstants.HIGH_QOS, qosFunction, normalTransitionRequest);
}
@Test