HBASE-27144 Add special rpc handlers for bulkload operations (#4558)

Co-authored-by: SiCheng-Zheng <zhengsicheng@jd.com>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
SiCheng-Zheng 2022-07-17 21:21:58 +08:00 committed by GitHub
parent 70a2ee1716
commit ff8eb59709
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 125 additions and 4 deletions

View File

@ -1063,6 +1063,9 @@ public final class HConstants {
public static final String REGION_SERVER_REPLICATION_HANDLER_COUNT =
"hbase.regionserver.replication.handler.count";
public static final int DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT = 3;
public static final String REGION_SERVER_BULKLOAD_HANDLER_COUNT =
"hbase.regionserver.bulkload.handler.count";
public static final int DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT = 0;
// Meta Transition handlers to deal with meta ReportRegionStateTransitionRequest. Meta transition
// should be dealt with in a separate handler in case blocking other region's transition.
public static final String MASTER_META_TRANSITION_HANDLER_COUNT =
@ -1139,6 +1142,7 @@ public final class HConstants {
public static final int PRIORITY_UNSET = -1;
public static final int NORMAL_QOS = 0;
public static final int REPLICATION_QOS = 5;
public static final int BULKLOAD_QOS = 4;
/**
* @deprecated since 3.0.0, will be removed in 4.0.0. DLR has been purged for a long time and
* region replication has its own 'replay' method.

View File

@ -56,6 +56,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String METAPRIORITY_QUEUE_NAME = "numCallsInMetaPriorityQueue";
String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
String REPLICATION_QUEUE_DESC = "Number of calls in the replication call queue waiting to be run";
String BULKLOAD_QUEUE_NAME = "numCallsInBulkLoadQueue";
String BULKLOAD_QUEUE_DESC = "Number of calls in the bulkload call queue waiting to be run";
String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
String METAPRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
String WRITE_QUEUE_NAME = "numCallsInWriteQueue";
@ -77,6 +79,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String NUM_ACTIVE_PRIORITY_HANDLER_DESC = "Number of active priority rpc handlers.";
String NUM_ACTIVE_REPLICATION_HANDLER_NAME = "numActiveReplicationHandler";
String NUM_ACTIVE_REPLICATION_HANDLER_DESC = "Number of active replication rpc handlers.";
String NUM_ACTIVE_BULKLOAD_HANDLER_NAME = "numActiveBulkLoadHandler";
String NUM_ACTIVE_BULKLOAD_HANDLER_DESC = "Number of active bulkload rpc handlers.";
String NUM_ACTIVE_WRITE_HANDLER_NAME = "numActiveWriteHandler";
String NUM_ACTIVE_WRITE_HANDLER_DESC = "Number of active write rpc handlers.";
String NUM_ACTIVE_READ_HANDLER_NAME = "numActiveReadHandler";

View File

@ -143,6 +143,8 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl
wrapper.getGeneralQueueLength())
.addGauge(Interns.info(REPLICATION_QUEUE_NAME, REPLICATION_QUEUE_DESC),
wrapper.getReplicationQueueLength())
.addGauge(Interns.info(BULKLOAD_QUEUE_NAME, BULKLOAD_QUEUE_DESC),
wrapper.getBulkLoadQueueLength())
.addGauge(Interns.info(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_DESC),
wrapper.getPriorityQueueLength())
.addGauge(Interns.info(METAPRIORITY_QUEUE_NAME, METAPRIORITY_QUEUE_DESC),
@ -163,6 +165,8 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl
.addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME, NUM_LIFO_MODE_SWITCHES_DESC),
wrapper.getNumLifoModeSwitches())
.addGauge(Interns.info(WRITE_QUEUE_NAME, WRITE_QUEUE_DESC), wrapper.getWriteQueueLength())
.addGauge(Interns.info(NUM_ACTIVE_BULKLOAD_HANDLER_NAME, NUM_ACTIVE_BULKLOAD_HANDLER_DESC),
wrapper.getActiveBulkLoadRpcHandlerCount())
.addGauge(Interns.info(READ_QUEUE_NAME, READ_QUEUE_DESC), wrapper.getReadQueueLength())
.addGauge(Interns.info(SCAN_QUEUE_NAME, SCAN_QUEUE_DESC), wrapper.getScanQueueLength())
.addGauge(Interns.info(NUM_ACTIVE_WRITE_HANDLER_NAME, NUM_ACTIVE_WRITE_HANDLER_DESC),

View File

@ -27,6 +27,8 @@ public interface MetricsHBaseServerWrapper {
int getReplicationQueueLength();
int getBulkLoadQueueLength();
int getPriorityQueueLength();
int getMetaPriorityQueueLength();
@ -41,6 +43,8 @@ public interface MetricsHBaseServerWrapper {
int getActiveReplicationRpcHandlerCount();
int getActiveBulkLoadRpcHandlerCount();
int getActiveMetaPriorityRpcHandlerCount();
long getNumGeneralCallsDropped();

View File

@ -130,6 +130,11 @@ public class FifoRpcScheduler extends RpcScheduler {
return 0;
}
@Override
public int getBulkLoadQueueLength() {
return 0;
}
@Override
public int getActiveRpcHandlerCount() {
return executor.getActiveCount();
@ -150,6 +155,11 @@ public class FifoRpcScheduler extends RpcScheduler {
return 0;
}
@Override
public int getActiveBulkLoadRpcHandlerCount() {
return 0;
}
@Override
public int getActiveMetaPriorityRpcHandlerCount() {
return 0;

View File

@ -57,6 +57,14 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
return server.getScheduler().getReplicationQueueLength();
}
@Override
public int getBulkLoadQueueLength() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getBulkLoadQueueLength();
}
@Override
public int getPriorityQueueLength() {
if (!isServerStarted() || this.server.getScheduler() == null) {
@ -121,6 +129,14 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
return server.getScheduler().getActiveReplicationRpcHandlerCount();
}
@Override
public int getActiveBulkLoadRpcHandlerCount() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getActiveBulkLoadRpcHandlerCount();
}
@Override
public long getNumGeneralCallsDropped() {
if (!isServerStarted() || this.server.getScheduler() == null) {

View File

@ -35,6 +35,8 @@ public abstract class RpcScheduler {
"hbase.ipc.server.priority.max.callqueue.length";
public static final String IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH =
"hbase.ipc.server.replication.max.callqueue.length";
public static final String IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH =
"hbase.ipc.server.bulkload.max.callqueue.length";
/** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */
public static abstract class Context {
@ -78,6 +80,9 @@ public abstract class RpcScheduler {
/** Retrieves length of the replication queue for metrics. */
public abstract int getReplicationQueueLength();
/** Retrieves length of the bulkload queue for metrics. */
public abstract int getBulkLoadQueueLength();
/** Retrieves the total number of active handler. */
public abstract int getActiveRpcHandlerCount();
@ -93,6 +98,9 @@ public abstract class RpcScheduler {
/** Retrieves the number of active replication handler. */
public abstract int getActiveReplicationRpcHandlerCount();
/** Retrieves the number of active bulkload handler. */
public abstract int getActiveBulkLoadRpcHandlerCount();
/**
* If CoDel-based RPC executors are used, retrieves the number of Calls that were dropped from
* general queue because RPC executor is under high load; returns 0 otherwise.

View File

@ -49,6 +49,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
*/
private final RpcExecutor metaTransitionExecutor;
private final RpcExecutor bulkloadExecutor;
/** What level a high priority call is at. */
private final int highPriorityLevel;
@ -63,7 +65,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority,
Abortable server, int highPriorityLevel) {
int bulkLoadHandlerCount = conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT);
int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
@ -71,6 +74,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
int maxReplicationQueueLength =
conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxBulkLoadQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH,
bulkLoadHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
this.priority = priority;
this.highPriorityLevel = highPriorityLevel;
@ -122,6 +127,11 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
abortable)
: null;
this.bulkloadExecutor = bulkLoadHandlerCount > 0
? new FastPathBalancedQueueRpcExecutor("bulkLoad.FPBQ", bulkLoadHandlerCount,
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxBulkLoadQueueLength, priority, conf,
abortable)
: null;
}
public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
@ -173,6 +183,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
if (metaTransitionExecutor != null) {
metaTransitionExecutor.start(port);
}
if (bulkloadExecutor != null) {
bulkloadExecutor.start(port);
}
}
@ -188,6 +201,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
if (metaTransitionExecutor != null) {
metaTransitionExecutor.stop();
}
if (bulkloadExecutor != null) {
bulkloadExecutor.stop();
}
}
@ -208,6 +224,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
return priorityExecutor.dispatch(callTask);
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
return replicationExecutor.dispatch(callTask);
} else if (bulkloadExecutor != null && level == HConstants.BULKLOAD_QOS) {
return bulkloadExecutor.dispatch(callTask);
} else {
return callExecutor.dispatch(callTask);
}
@ -233,10 +251,16 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
}
@Override
public int getBulkLoadQueueLength() {
return bulkloadExecutor == null ? 0 : bulkloadExecutor.getQueueLength();
}
@Override
public int getActiveRpcHandlerCount() {
return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount()
+ getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount();
+ getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount()
+ getActiveBulkLoadRpcHandlerCount();
}
@Override
@ -259,6 +283,11 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
return (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
}
@Override
public int getActiveBulkLoadRpcHandlerCount() {
return bulkloadExecutor == null ? 0 : bulkloadExecutor.getActiveHandlerCount();
}
@Override
public long getNumGeneralCallsDropped() {
return callExecutor.getNumGeneralCallsDropped();
@ -330,6 +359,12 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary());
}
if (null != bulkloadExecutor) {
queueName = "BulkLoad Queue";
callQueueInfo.setCallMethodCount(queueName, bulkloadExecutor.getCallQueueCountsSummary());
callQueueInfo.setCallMethodSize(queueName, bulkloadExecutor.getCallQueueSizeSummary());
}
return callQueueInfo;
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegi
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
@ -96,6 +97,10 @@ class RSAnnotationReadingPriorityFunction extends AnnotationReadingPriorityFunct
if (header.hasPriority()) {
return header.getPriority();
}
if (param instanceof BulkLoadHFileRequest) {
return HConstants.BULKLOAD_QOS;
}
String cls = param.getClass().getName();
Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
RegionSpecifier regionSpecifier = null;

View File

@ -44,6 +44,11 @@ public class DelegatingRpcScheduler extends RpcScheduler {
return delegate.getReplicationQueueLength();
}
@Override
public int getBulkLoadQueueLength() {
return delegate.getBulkLoadQueueLength();
}
@Override
public int getPriorityQueueLength() {
return delegate.getPriorityQueueLength();
@ -74,6 +79,11 @@ public class DelegatingRpcScheduler extends RpcScheduler {
return delegate.getActiveReplicationRpcHandlerCount();
}
@Override
public int getActiveBulkLoadRpcHandlerCount() {
return delegate.getActiveBulkLoadRpcHandlerCount();
}
@Override
public boolean dispatch(CallRunner task) {
return delegate.dispatch(task);

View File

@ -33,6 +33,11 @@ public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper
return 103;
}
@Override
public int getBulkLoadQueueLength() {
return 109;
}
@Override
public int getPriorityQueueLength() {
return 104;
@ -63,6 +68,11 @@ public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper
return 203;
}
@Override
public int getActiveBulkLoadRpcHandlerCount() {
return 204;
}
@Override
public long getNumGeneralCallsDropped() {
return 3;

View File

@ -480,12 +480,21 @@ public class TestSimpleRpcScheduler {
when(scanCall.getHeader()).thenReturn(scanHead);
when(scanCall.getParam()).thenReturn(scanCall.param);
CallRunner bulkLoadCallTask = mock(CallRunner.class);
ServerCall bulkLoadCall = mock(ServerCall.class);
bulkLoadCall.param = ScanRequest.newBuilder().build();
RequestHeader bulkLadHead = RequestHeader.newBuilder().setMethodName("bulkload").build();
when(bulkLoadCallTask.getRpcCall()).thenReturn(bulkLoadCall);
when(bulkLoadCall.getHeader()).thenReturn(bulkLadHead);
when(bulkLoadCall.getParam()).thenReturn(bulkLoadCall.param);
ArrayList<Integer> work = new ArrayList<>();
doAnswerTaskExecution(putCallTask, work, 1, 1000);
doAnswerTaskExecution(getCallTask, work, 2, 1000);
doAnswerTaskExecution(scanCallTask, work, 3, 1000);
doAnswerTaskExecution(bulkLoadCallTask, work, 4, 1000);
// There are 3 queues: [puts], [gets], [scans]
// There are 3 queues: [puts], [gets], [scans], [bulkload]
// so the calls will be interleaved
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
@ -496,7 +505,9 @@ public class TestSimpleRpcScheduler {
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(bulkLoadCallTask);
scheduler.dispatch(bulkLoadCallTask);
scheduler.dispatch(bulkLoadCallTask);
while (work.size() < 6) {
Thread.sleep(100);
}