HBASE-25528: Dedicated merge dispatch threadpool on master

Adds "hbase.master.executor.merge.dispatch.threads" and defaults to 2.

Also adds additional logging that includes the number of split plans
and merge plans computed for each normalizer run.
This commit is contained in:
Bharath Vissapragada 2021-01-25 12:20:37 -08:00
parent b07549febb
commit 36b4698cea
No known key found for this signature in database
GPG Key ID: 18AE42A0B5A93FA7
5 changed files with 22 additions and 4 deletions

View File

@ -1605,6 +1605,13 @@ public final class HConstants {
"hbase.master.executor.serverops.threads"; "hbase.master.executor.serverops.threads";
public static final int MASTER_SERVER_OPERATIONS_THREADS_DEFAULT = 5; public static final int MASTER_SERVER_OPERATIONS_THREADS_DEFAULT = 5;
/**
* Number of threads used to dispatch merge operations to the regionservers.
*/
public static final String MASTER_MERGE_DISPATCH_THREADS =
"hbase.master.executor.merge.dispatch.threads";
public static final int MASTER_MERGE_DISPATCH_THREADS_DEFAULT = 2;
public static final String MASTER_META_SERVER_OPERATIONS_THREADS = public static final String MASTER_META_SERVER_OPERATIONS_THREADS =
"hbase.master.executor.meta.serverops.threads"; "hbase.master.executor.meta.serverops.threads";
public static final int MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT = 5; public static final int MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT = 5;

View File

@ -152,7 +152,7 @@ public enum EventType {
* C_M_MERGE_REGION<br> * C_M_MERGE_REGION<br>
* Client asking Master to merge regions. * Client asking Master to merge regions.
*/ */
C_M_MERGE_REGION (30, ExecutorType.MASTER_TABLE_OPERATIONS), C_M_MERGE_REGION (30, ExecutorType.MASTER_MERGE_OPERATIONS),
/** /**
* Messages originating from Client to Master.<br> * Messages originating from Client to Master.<br>
* C_M_DELETE_TABLE<br> * C_M_DELETE_TABLE<br>

View File

@ -35,6 +35,7 @@ public enum ExecutorType {
MASTER_META_SERVER_OPERATIONS (6), MASTER_META_SERVER_OPERATIONS (6),
M_LOG_REPLAY_OPS (7), M_LOG_REPLAY_OPS (7),
MASTER_SNAPSHOT_OPERATIONS (8), MASTER_SNAPSHOT_OPERATIONS (8),
MASTER_MERGE_OPERATIONS (9),
// RegionServer executor services // RegionServer executor services
RS_OPEN_REGION (20), RS_OPEN_REGION (20),

View File

@ -1324,6 +1324,9 @@ public class HMaster extends HRegionServer implements MasterServices {
HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT)); HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt( this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt(
SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT)); SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT));
this.executorService.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS, conf.getInt(
HConstants.MASTER_MERGE_DISPATCH_THREADS,
HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT));
// We depend on there being only one instance of this executor running // We depend on there being only one instance of this executor running
// at a time. To do concurrency, would need fencing of enable/disable of // at a time. To do concurrency, would need fencing of enable/disable of

View File

@ -209,14 +209,21 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
ctx.getTableRegions().size()); ctx.getTableRegions().size());
final List<NormalizationPlan> plans = new ArrayList<>(); final List<NormalizationPlan> plans = new ArrayList<>();
int splitPlansCount = 0;
if (proceedWithSplitPlanning) { if (proceedWithSplitPlanning) {
plans.addAll(computeSplitNormalizationPlans(ctx)); List<NormalizationPlan> splitPlans = computeSplitNormalizationPlans(ctx);
splitPlansCount = splitPlans.size();
plans.addAll(splitPlans);
} }
int mergePlansCount = 0;
if (proceedWithMergePlanning) { if (proceedWithMergePlanning) {
plans.addAll(computeMergeNormalizationPlans(ctx)); List<NormalizationPlan> mergePlans = computeMergeNormalizationPlans(ctx);
mergePlansCount = mergePlans.size();
plans.addAll(mergePlans);
} }
LOG.debug("Computed {} normalization plans for table {}", plans.size(), table); LOG.debug("Computed normalization plans for table {}. Total plans: {}, split plans: {}, " +
"merge plans: {}", table, plans.size(), splitPlansCount, mergePlansCount);
return plans; return plans;
} }