diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index cf43c8b814c..9c617bbe7f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -29,7 +29,6 @@ import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -49,7 +48,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.servlet.ServletException; @@ -117,11 +115,8 @@ import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner; import org.apache.hadoop.hbase.master.cleaner.SnapshotCleanerChore; import org.apache.hadoop.hbase.master.janitor.CatalogJanitor; import org.apache.hadoop.hbase.master.locking.LockManager; -import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan; -import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; -import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; -import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory; +import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager; import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure; import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; @@ -202,7 +197,6 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HFileArchiveUtil; @@ -233,7 +227,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; import org.apache.hbase.thirdparty.com.google.protobuf.Service; -import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server; import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector; import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder; @@ -337,9 +330,6 @@ public class HMaster extends HRegionServer implements MasterServices { // Tracker for split and merge state private SplitOrMergeTracker splitOrMergeTracker; - // Tracker for region normalizer state - private RegionNormalizerTracker regionNormalizerTracker; - private ClusterSchemaService clusterSchemaService; public static final String HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS = @@ -406,11 +396,8 @@ public class HMaster extends HRegionServer implements MasterServices { private final LockManager lockManager = new LockManager(this); private RSGroupBasedLoadBalancer balancer; - // a lock to prevent concurrent normalization actions. - private final ReentrantLock normalizationInProgressLock = new ReentrantLock(); - private RegionNormalizer normalizer; private BalancerChore balancerChore; - private RegionNormalizerChore normalizerChore; + private RegionNormalizerManager regionNormalizerManager; private ClusterStatusChore clusterStatusChore; private ClusterStatusPublisher clusterStatusPublisherChore = null; private SnapshotCleanerChore snapshotCleanerChore = null; @@ -464,9 +451,6 @@ public class HMaster extends HRegionServer implements MasterServices { // handle table states private TableStateManager tableStateManager; - private long splitPlanCount; - private long mergePlanCount; - /** jetty server for master to redirect requests to regionserver infoServer */ private Server masterJettyServer; @@ -788,26 +772,19 @@ public class HMaster extends HRegionServer implements MasterServices { } /** - *

* Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it * should have already been initialized along with {@link ServerManager}. - *

- *

- * Will be overridden in tests. - *

*/ - @VisibleForTesting - protected void initializeZKBasedSystemTrackers() - throws IOException, InterruptedException, KeeperException, ReplicationException { + private void initializeZKBasedSystemTrackers() + throws IOException, KeeperException, ReplicationException { this.balancer = new RSGroupBasedLoadBalancer(); this.balancer.setConf(conf); this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this); this.loadBalancerTracker.start(); - this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf); - this.normalizer.setMasterServices(this); - this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this); - this.regionNormalizerTracker.start(); + this.regionNormalizerManager = + RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this); + this.regionNormalizerManager.start(); this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this); this.splitOrMergeTracker.start(); @@ -900,10 +877,10 @@ public class HMaster extends HRegionServer implements MasterServices { * * *
  • If this is a new deploy, schedule a InitMetaProcedure to initialize meta
  • - *
  • Start necessary service threads - balancer, catalog janior, executor services, and also the - * procedure executor, etc. Notice that the balancer must be created first as assignment manager - * may use it when assigning regions.
  • - *
  • Wait for meta to be initialized if necesssary, start table state manager.
  • + *
  • Start necessary service threads - balancer, catalog janitor, executor services, and also + * the procedure executor, etc. Notice that the balancer must be created first as assignment + * manager may use it when assigning regions.
  • + *
  • Wait for meta to be initialized if necessary, start table state manager.
  • *
  • Wait for enough region servers to check-in
  • *
  • Let assignment manager load data from meta and construct region states
  • *
  • Start all other things such as chore services, etc
  • @@ -1116,8 +1093,7 @@ public class HMaster extends HRegionServer implements MasterServices { getChoreService().scheduleChore(clusterStatusChore); this.balancerChore = new BalancerChore(this); getChoreService().scheduleChore(balancerChore); - this.normalizerChore = new RegionNormalizerChore(this); - getChoreService().scheduleChore(normalizerChore); + getChoreService().scheduleChore(regionNormalizerManager.getRegionNormalizerChore()); this.catalogJanitorChore = new CatalogJanitor(this); getChoreService().scheduleChore(catalogJanitorChore); this.hbckChore = new HbckChore(this); @@ -1533,6 +1509,9 @@ public class HMaster extends HRegionServer implements MasterServices { // example. stopProcedureExecutor(); + if (regionNormalizerManager != null) { + regionNormalizerManager.stop(); + } if (this.quotaManager != null) { this.quotaManager.stop(); } @@ -1651,7 +1630,7 @@ public class HMaster extends HRegionServer implements MasterServices { choreService.cancelChore(this.mobFileCleanerChore); choreService.cancelChore(this.mobFileCompactionChore); choreService.cancelChore(this.balancerChore); - choreService.cancelChore(this.normalizerChore); + choreService.cancelChore(getRegionNormalizerManager().getRegionNormalizerChore()); choreService.cancelChore(this.clusterStatusChore); choreService.cancelChore(this.catalogJanitorChore); choreService.cancelChore(this.clusterStatusPublisherChore); @@ -1751,7 +1730,9 @@ public class HMaster extends HRegionServer implements MasterServices { * @param action the name of the action under consideration, for logging. * @return {@code true} when the caller should exit early, {@code false} otherwise. */ - private boolean skipRegionManagementAction(final String action) { + @Override + public boolean skipRegionManagementAction(final String action) { + // Note: this method could be `default` on MasterServices if but for logging. if (!isInitialized()) { LOG.debug("Master has not been initialized, don't run {}.", action); return true; @@ -1896,24 +1877,16 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public RegionNormalizer getRegionNormalizer() { - return this.normalizer; + public RegionNormalizerManager getRegionNormalizerManager() { + return regionNormalizerManager; } - public boolean normalizeRegions() throws IOException { - return normalizeRegions(new NormalizeTableFilterParams.Builder().build()); - } - - /** - * Perform normalization of cluster. - * - * @return true if an existing normalization was already in progress, or if a new normalization - * was performed successfully; false otherwise (specifically, if HMaster finished initializing - * or normalization is globally disabled). - */ - public boolean normalizeRegions(final NormalizeTableFilterParams ntfp) throws IOException { - final long startTime = EnvironmentEdgeManager.currentTime(); - if (regionNormalizerTracker == null || !regionNormalizerTracker.isNormalizerOn()) { + @Override + public boolean normalizeRegions( + final NormalizeTableFilterParams ntfp, + final boolean isHighPriority + ) throws IOException { + if (regionNormalizerManager == null || !regionNormalizerManager.isNormalizerOn()) { LOG.debug("Region normalization is disabled, don't run region normalizer."); return false; } @@ -1924,70 +1897,17 @@ public class HMaster extends HRegionServer implements MasterServices { return false; } - if (!normalizationInProgressLock.tryLock()) { - // Don't run the normalizer concurrently - LOG.info("Normalization already in progress. Skipping request."); - return true; - } - - int affectedTables = 0; - try { - final Set matchingTables = getTableDescriptors(new LinkedList<>(), - ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false) - .stream() - .map(TableDescriptor::getTableName) - .collect(Collectors.toSet()); - final Set allEnabledTables = - tableStateManager.getTablesInStates(TableState.State.ENABLED); - final List targetTables = - new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables)); - Collections.shuffle(targetTables); - - final List submittedPlanProcIds = new ArrayList<>(); - for (TableName table : targetTables) { - if (table.isSystemTable()) { - continue; - } - final TableDescriptor tblDesc = getTableDescriptors().get(table); - if (tblDesc != null && !tblDesc.isNormalizationEnabled()) { - LOG.debug( - "Skipping table {} because normalization is disabled in its table properties.", table); - continue; - } - - // make one last check that the cluster isn't shutting down before proceeding. - if (skipRegionManagementAction("region normalizer")) { - return false; - } - - final List plans = normalizer.computePlansForTable(table); - if (CollectionUtils.isEmpty(plans)) { - LOG.debug("No normalization required for table {}.", table); - continue; - } - - affectedTables++; - // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to - // submit task , so there's no artificial rate- - // limiting of merge/split requests due to this serial loop. - for (NormalizationPlan plan : plans) { - long procId = plan.submit(this); - submittedPlanProcIds.add(procId); - if (plan.getType() == PlanType.SPLIT) { - splitPlanCount++; - } else if (plan.getType() == PlanType.MERGE) { - mergePlanCount++; - } - } - } - final long endTime = EnvironmentEdgeManager.currentTime(); - LOG.info("Normalizer ran successfully in {}. Submitted {} plans, affecting {} tables.", - Duration.ofMillis(endTime - startTime), submittedPlanProcIds.size(), affectedTables); - LOG.debug("Normalizer submitted procID list: {}", submittedPlanProcIds); - } finally { - normalizationInProgressLock.unlock(); - } - return true; + final Set matchingTables = getTableDescriptors(new LinkedList<>(), + ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false) + .stream() + .map(TableDescriptor::getTableName) + .collect(Collectors.toSet()); + final Set allEnabledTables = + tableStateManager.getTablesInStates(TableState.State.ENABLED); + final List targetTables = + new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables)); + Collections.shuffle(targetTables); + return regionNormalizerManager.normalizeRegions(targetTables, isHighPriority); } /** @@ -3003,20 +2923,6 @@ public class HMaster extends HRegionServer implements MasterServices { return regionStates.getAverageLoad(); } - /* - * @return the count of region split plans executed - */ - public long getSplitPlanCount() { - return splitPlanCount; - } - - /* - * @return the count of region merge plans executed - */ - public long getMergePlanCount() { - return mergePlanCount; - } - @Override public boolean registerService(Service instance) { /* @@ -3511,8 +3417,7 @@ public class HMaster extends HRegionServer implements MasterServices { */ public boolean isNormalizerOn() { return !isInMaintenanceMode() - && regionNormalizerTracker != null - && regionNormalizerTracker.isNormalizerOn(); + && getRegionNormalizerManager().isNormalizerOn(); } /** @@ -3540,13 +3445,6 @@ public class HMaster extends HRegionServer implements MasterServices { LoadBalancerFactory.getDefaultLoadBalancerClass().getName()); } - /** - * @return RegionNormalizerTracker instance - */ - public RegionNormalizerTracker getRegionNormalizerTracker() { - return regionNormalizerTracker; - } - public SplitOrMergeTracker getSplitOrMergeTracker() { return splitOrMergeTracker; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 37fc58985e7..d4dbc8d55dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -1913,9 +1913,7 @@ public class MasterRpcServices extends RSRpcServices implements master.cpHost.postSetSplitOrMergeEnabled(newValue, switchType); } } - } catch (IOException e) { - throw new ServiceException(e); - } catch (KeeperException e) { + } catch (IOException | KeeperException e) { throw new ServiceException(e); } return response.build(); @@ -1940,7 +1938,8 @@ public class MasterRpcServices extends RSRpcServices implements .namespace(request.hasNamespace() ? request.getNamespace() : null) .build(); return NormalizeResponse.newBuilder() - .setNormalizerRan(master.normalizeRegions(ntfp)) + // all API requests are considered priority requests. + .setNormalizerRan(master.normalizeRegions(ntfp, true)) .build(); } catch (IOException ex) { throw new ServiceException(ex); @@ -1953,20 +1952,27 @@ public class MasterRpcServices extends RSRpcServices implements rpcPreCheck("setNormalizerRunning"); // Sets normalizer on/off flag in ZK. - boolean prevValue = master.getRegionNormalizerTracker().isNormalizerOn(); - boolean newValue = request.getOn(); - try { - master.getRegionNormalizerTracker().setNormalizerOn(newValue); - } catch (KeeperException ke) { - LOG.warn("Error flipping normalizer switch", ke); - } + // TODO: this method is totally broken in terms of atomicity of actions and values read. + // 1. The contract has this RPC returning the previous value. There isn't a ZKUtil method + // that lets us retrieve the previous value as part of setting a new value, so we simply + // perform a read before issuing the update. Thus we have a data race opportunity, between + // when the `prevValue` is read and whatever is actually overwritten. + // 2. Down in `setNormalizerOn`, the call to `createAndWatch` inside of the catch clause can + // itself fail in the event that the znode already exists. Thus, another data race, between + // when the initial `setData` call is notified of the absence of the target znode and the + // subsequent `createAndWatch`, with another client creating said node. + // That said, there's supposed to be only one active master and thus there's supposed to be + // only one process with the authority to modify the value. + final boolean prevValue = master.getRegionNormalizerManager().isNormalizerOn(); + final boolean newValue = request.getOn(); + master.getRegionNormalizerManager().setNormalizerOn(newValue); LOG.info("{} set normalizerSwitch={}", master.getClientIdAuditPrefix(), newValue); return SetNormalizerRunningResponse.newBuilder().setPrevNormalizerValue(prevValue).build(); } @Override public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller, - IsNormalizerEnabledRequest request) throws ServiceException { + IsNormalizerEnabledRequest request) { IsNormalizerEnabledResponse.Builder response = IsNormalizerEnabledResponse.newBuilder(); response.setEnabled(master.isNormalizerOn()); return response.build(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 908d21270c6..384785d738f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.MasterSwitchType; +import org.apache.hadoop.hbase.client.NormalizeTableFilterParams; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.executor.ExecutorService; @@ -34,7 +35,7 @@ import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.janitor.CatalogJanitor; import org.apache.hadoop.hbase.master.locking.LockManager; -import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; +import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager; @@ -54,7 +55,6 @@ import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.protobuf.Service; @@ -122,9 +122,9 @@ public interface MasterServices extends Server { MasterQuotaManager getMasterQuotaManager(); /** - * @return Master's instance of {@link RegionNormalizer} + * @return Master's instance of {@link RegionNormalizerManager} */ - RegionNormalizer getRegionNormalizer(); + RegionNormalizerManager getRegionNormalizerManager(); /** * @return Master's instance of {@link CatalogJanitor} @@ -354,6 +354,13 @@ public interface MasterServices extends Server { */ boolean isInMaintenanceMode(); + /** + * Checks master state before initiating action over region topology. + * @param action the name of the action under consideration, for logging. + * @return {@code true} when the caller should exit early, {@code false} otherwise. + */ + boolean skipRegionManagementAction(final String action); + /** * Abort a procedure. * @param procId ID of the procedure @@ -553,4 +560,14 @@ public interface MasterServices extends Server { * @return The state of the load balancer, or false if the load balancer isn't defined. */ boolean isBalancerOn(); + + /** + * Perform normalization of cluster. + * @param ntfp Selection criteria for identifying which tables to normalize. + * @param isHighPriority {@code true} when these requested tables should skip to the front of + * the queue. + * @return {@code true} when the request was submitted, {@code false} otherwise. + */ + boolean normalizeRegions( + final NormalizeTableFilterParams ntfp, final boolean isHighPriority) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java index 9d4550c5eb0..aeaae929209 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java @@ -55,12 +55,12 @@ public class MetricsMasterWrapperImpl implements MetricsMasterWrapper { @Override public long getSplitPlanCount() { - return master.getSplitPlanCount(); + return master.getRegionNormalizerManager().getSplitPlanCount(); } @Override public long getMergePlanCount() { - return master.getMergePlanCount(); + return master.getRegionNormalizerManager().getMergePlanCount(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java index f1b3329b25c..5e06a44912b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java @@ -59,9 +59,7 @@ import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; @@ -534,8 +532,10 @@ public class MergeTableRegionsProcedure try { env.getMasterServices().getMasterQuotaManager().onRegionMerged(this.mergedRegion); } catch (QuotaExceededException e) { - env.getMasterServices().getRegionNormalizer().planSkipped(this.mergedRegion, - NormalizationPlan.PlanType.MERGE); + // TODO: why is this here? merge requests can be submitted by actors other than the normalizer + env.getMasterServices() + .getRegionNormalizerManager() + .planSkipped(NormalizationPlan.PlanType.MERGE); throw e; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java index d0413360e6d..0eb7667d7da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -71,13 +71,11 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; @@ -181,9 +179,10 @@ public class SplitTableRegionProcedure private void checkSplittable(final MasterProcedureEnv env, final RegionInfo regionToSplit, final byte[] splitRow) throws IOException { // Ask the remote RS if this region is splittable. - // If we get an IOE, report it along w/ the failure so can see why we are not splittable at this time. + // If we get an IOE, report it along w/ the failure so can see why we are not splittable at + // this time. if(regionToSplit.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { - throw new IllegalArgumentException ("Can't invoke split on non-default regions directly"); + throw new IllegalArgumentException("Can't invoke split on non-default regions directly"); } RegionStateNode node = env.getAssignmentManager().getRegionStates().getRegionStateNode(getParentRegion()); @@ -570,8 +569,10 @@ public class SplitTableRegionProcedure try { env.getMasterServices().getMasterQuotaManager().onRegionSplit(this.getParentRegion()); } catch (QuotaExceededException e) { - env.getMasterServices().getRegionNormalizer().planSkipped(this.getParentRegion(), - NormalizationPlan.PlanType.SPLIT); + // TODO: why is this here? split requests can be submitted by actors other than the normalizer + env.getMasterServices() + .getRegionNormalizerManager() + .planSkipped(NormalizationPlan.PlanType.SPLIT); throw e; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java index 17e313047d7..677b9ec8052 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java @@ -18,41 +18,35 @@ */ package org.apache.hadoop.hbase.master.normalizer; -import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; /** - * Normalization plan to merge regions (smallest region in the table with its smallest neighbor). + * Normalization plan to merge adjacent regions. As with any call to + * {@link MasterServices#mergeRegions(RegionInfo[], boolean, long, long)} + * with {@code forcible=false}, Region order and adjacency are important. It's the caller's + * responsibility to ensure the provided parameters are ordered according to the + * {code mergeRegions} method requirements. */ @InterfaceAudience.Private -public class MergeNormalizationPlan implements NormalizationPlan { +final class MergeNormalizationPlan implements NormalizationPlan { - private final RegionInfo firstRegion; - private final RegionInfo secondRegion; + private final List normalizationTargets; - public MergeNormalizationPlan(RegionInfo firstRegion, RegionInfo secondRegion) { - this.firstRegion = firstRegion; - this.secondRegion = secondRegion; - } - - /** - * {@inheritDoc} - */ - @Override - public long submit(MasterServices masterServices) throws IOException { - // Do not use force=true as corner cases can happen, non adjacent regions, - // merge with a merged child region with no GC done yet, it is going to - // cause all different issues. - return masterServices - .mergeRegions(new RegionInfo[] { firstRegion, secondRegion }, false, HConstants.NO_NONCE, - HConstants.NO_NONCE); + private MergeNormalizationPlan(List normalizationTargets) { + Preconditions.checkNotNull(normalizationTargets); + Preconditions.checkState(normalizationTargets.size() >= 2, + "normalizationTargets.size() must be >= 2 but was %s", normalizationTargets.size()); + this.normalizationTargets = Collections.unmodifiableList(normalizationTargets); } @Override @@ -60,19 +54,14 @@ public class MergeNormalizationPlan implements NormalizationPlan { return PlanType.MERGE; } - RegionInfo getFirstRegion() { - return firstRegion; - } - - RegionInfo getSecondRegion() { - return secondRegion; + public List getNormalizationTargets() { + return normalizationTargets; } @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) - .append("firstRegion", firstRegion) - .append("secondRegion", secondRegion) + .append("normalizationTargets", normalizationTargets) .toString(); } @@ -89,16 +78,31 @@ public class MergeNormalizationPlan implements NormalizationPlan { MergeNormalizationPlan that = (MergeNormalizationPlan) o; return new EqualsBuilder() - .append(firstRegion, that.firstRegion) - .append(secondRegion, that.secondRegion) + .append(normalizationTargets, that.normalizationTargets) .isEquals(); } @Override public int hashCode() { return new HashCodeBuilder(17, 37) - .append(firstRegion) - .append(secondRegion) + .append(normalizationTargets) .toHashCode(); } + + /** + * A helper for constructing instances of {@link MergeNormalizationPlan}. + */ + static class Builder { + + private final List normalizationTargets = new LinkedList<>(); + + public Builder addTarget(final RegionInfo regionInfo, final long regionSizeMb) { + normalizationTargets.add(new NormalizationTarget(regionInfo, regionSizeMb)); + return this; + } + + public MergeNormalizationPlan build() { + return new MergeNormalizationPlan(normalizationTargets); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java index cd13f69e764..3bfae14e0b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,12 +17,12 @@ */ package org.apache.hadoop.hbase.master.normalizer; -import org.apache.hadoop.hbase.master.MasterServices; import org.apache.yetus.audience.InterfaceAudience; -import java.io.IOException; /** - * Interface for normalization plan. + * A {@link NormalizationPlan} describes some modification to region split points as identified + * by an instance of {@link RegionNormalizer}. It is a POJO describing what action needs taken + * and the regions it targets. */ @InterfaceAudience.Private public interface NormalizationPlan { @@ -33,15 +32,6 @@ public interface NormalizationPlan { NONE } - /** - * Submits normalization plan on cluster (does actual splitting/merging work) and - * returns proc Id to caller. - * @param masterServices instance of {@link MasterServices} - * @return Proc Id for the submitted task - * @throws IOException If plan submission to Admin fails - */ - long submit(MasterServices masterServices) throws IOException; - /** * @return the type of this plan */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationTarget.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationTarget.java new file mode 100644 index 00000000000..9e4b3f42640 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationTarget.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.normalizer; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A POJO that caries details about a region selected for normalization through the pipeline. + */ +@InterfaceAudience.Private +class NormalizationTarget { + private final RegionInfo regionInfo; + private final long regionSizeMb; + + NormalizationTarget(final RegionInfo regionInfo, final long regionSizeMb) { + this.regionInfo = regionInfo; + this.regionSizeMb = regionSizeMb; + } + + public RegionInfo getRegionInfo() { + return regionInfo; + } + + public long getRegionSizeMb() { + return regionSizeMb; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + NormalizationTarget that = (NormalizationTarget) o; + + return new EqualsBuilder() + .append(regionSizeMb, that.regionSizeMb) + .append(regionInfo, that.regionInfo) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(regionInfo) + .append(regionSizeMb) + .toHashCode(); + } + + @Override public String toString() { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("regionInfo", regionInfo) + .append("regionSizeMb", regionSizeMb) + .toString(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java index 672171d1caf..6f939daeda9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java @@ -20,13 +20,9 @@ package org.apache.hadoop.hbase.master.normalizer; import java.util.List; import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; /** * Performs "normalization" of regions of a table, making sure that suboptimal @@ -39,8 +35,7 @@ import org.apache.yetus.audience.InterfaceStability; * "split/merge storms". */ @InterfaceAudience.Private -@InterfaceStability.Evolving -public interface RegionNormalizer extends Configurable { +interface RegionNormalizer extends Configurable { /** * Set the master service. Must be called before first call to * {@link #computePlansForTable(TableName)}. @@ -55,20 +50,5 @@ public interface RegionNormalizer extends Configurable { * @return A list of the normalization actions to perform, or an empty list * if there's nothing to do. */ - List computePlansForTable(TableName table) - throws HBaseIOException; - - /** - * Notification for the case where plan couldn't be executed due to constraint violation, such as - * namespace quota - * @param hri the region which is involved in the plan - * @param type type of plan - */ - void planSkipped(RegionInfo hri, PlanType type); - - /** - * @param type type of plan for which skipped count is to be returned - * @return the count of plans of specified type which were skipped - */ - long getSkippedCount(NormalizationPlan.PlanType type); + List computePlansForTable(TableName table); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java index 19d2dc7a3ba..d56acc2a935 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerChore.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,34 +17,35 @@ */ package org.apache.hadoop.hbase.master.normalizer; +import java.io.IOException; import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.client.NormalizeTableFilterParams; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.master.HMaster; - -import java.io.IOException; /** - * Chore that will call {@link org.apache.hadoop.hbase.master.HMaster#normalizeRegions()} - * when needed. + * Chore that will periodically call + * {@link HMaster#normalizeRegions(NormalizeTableFilterParams, boolean)}. */ @InterfaceAudience.Private -public class RegionNormalizerChore extends ScheduledChore { +class RegionNormalizerChore extends ScheduledChore { private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerChore.class); - private final HMaster master; + private final MasterServices master; - public RegionNormalizerChore(HMaster master) { + public RegionNormalizerChore(MasterServices master) { super(master.getServerName() + "-RegionNormalizerChore", master, - master.getConfiguration().getInt("hbase.normalizer.period", 300000)); + master.getConfiguration().getInt("hbase.normalizer.period", 300_000)); this.master = master; } @Override protected void chore() { try { - master.normalizeRegions(); + master.normalizeRegions(new NormalizeTableFilterParams.Builder().build(), false); } catch (IOException e) { LOG.error("Failed to normalize regions.", e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java index 06774c97a81..92d16648fcd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,8 +19,12 @@ package org.apache.hadoop.hbase.master.normalizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.yetus.audience.InterfaceAudience; /** * Factory to create instance of {@link RegionNormalizer} as configured. @@ -32,13 +35,30 @@ public final class RegionNormalizerFactory { private RegionNormalizerFactory() { } + public static RegionNormalizerManager createNormalizerManager( + final Configuration conf, + final ZKWatcher zkWatcher, + final HMaster master // TODO: consolidate this down to MasterServices + ) { + final RegionNormalizer regionNormalizer = getRegionNormalizer(conf); + regionNormalizer.setMasterServices(master); + final RegionNormalizerTracker tracker = new RegionNormalizerTracker(zkWatcher, master); + final RegionNormalizerChore chore = + master.isInMaintenanceMode() ? null : new RegionNormalizerChore(master); + final RegionNormalizerWorkQueue workQueue = + master.isInMaintenanceMode() ? null : new RegionNormalizerWorkQueue<>(); + final RegionNormalizerWorker worker = master.isInMaintenanceMode() + ? null + : new RegionNormalizerWorker(conf, master, regionNormalizer, workQueue); + return new RegionNormalizerManager(tracker, chore, workQueue, worker); + } + /** * Create a region normalizer from the given conf. * @param conf configuration * @return {@link RegionNormalizer} implementation */ - public static RegionNormalizer getRegionNormalizer(Configuration conf) { - + private static RegionNormalizer getRegionNormalizer(Configuration conf) { // Create instance of Region Normalizer Class balancerKlass = conf.getClass(HConstants.HBASE_MASTER_NORMALIZER_CLASS, SimpleRegionNormalizer.class, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java new file mode 100644 index 00000000000..e818519d651 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.normalizer; + +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * This class encapsulates the details of the {@link RegionNormalizer} subsystem. + */ +@InterfaceAudience.Private +public class RegionNormalizerManager { + private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class); + + private final RegionNormalizerTracker regionNormalizerTracker; + private final RegionNormalizerChore regionNormalizerChore; + private final RegionNormalizerWorkQueue workQueue; + private final RegionNormalizerWorker worker; + private final ExecutorService pool; + + private final Object startStopLock = new Object(); + private boolean started = false; + private boolean stopped = false; + + public RegionNormalizerManager( + @NonNull final RegionNormalizerTracker regionNormalizerTracker, + @Nullable final RegionNormalizerChore regionNormalizerChore, + @Nullable final RegionNormalizerWorkQueue workQueue, + @Nullable final RegionNormalizerWorker worker + ) { + this.regionNormalizerTracker = regionNormalizerTracker; + this.regionNormalizerChore = regionNormalizerChore; + this.workQueue = workQueue; + this.worker = worker; + this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("normalizer-worker-%d") + .setUncaughtExceptionHandler( + (thread, throwable) -> + LOG.error("Uncaught exception, worker thread likely terminated.", throwable)) + .build()); + } + + public void start() { + synchronized (startStopLock) { + if (started) { + return; + } + regionNormalizerTracker.start(); + if (worker != null) { + // worker will be null when master is in maintenance mode. + pool.submit(worker); + } + started = true; + } + } + + public void stop() { + synchronized (startStopLock) { + if (!started) { + throw new IllegalStateException("calling `stop` without first calling `start`."); + } + if (stopped) { + return; + } + pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()` + regionNormalizerTracker.stop(); + stopped = true; + } + } + + public ScheduledChore getRegionNormalizerChore() { + return regionNormalizerChore; + } + + /** + * Return {@code true} if region normalizer is on, {@code false} otherwise + */ + public boolean isNormalizerOn() { + return regionNormalizerTracker.isNormalizerOn(); + } + + /** + * Set region normalizer on/off + * @param normalizerOn whether normalizer should be on or off + */ + public void setNormalizerOn(boolean normalizerOn) { + try { + regionNormalizerTracker.setNormalizerOn(normalizerOn); + } catch (KeeperException e) { + LOG.warn("Error flipping normalizer switch", e); + } + } + + /** + * Call-back for the case where plan couldn't be executed due to constraint violation, + * such as namespace quota. + * @param type type of plan that was skipped. + */ + public void planSkipped(NormalizationPlan.PlanType type) { + // TODO: this appears to be used only for testing. + if (worker != null) { + worker.planSkipped(type); + } + } + + /** + * Retrieve a count of the number of times plans of type {@code type} were submitted but skipped. + * @param type type of plan for which skipped count is to be returned + */ + public long getSkippedCount(NormalizationPlan.PlanType type) { + // TODO: this appears to be used only for testing. + return worker == null ? 0 : worker.getSkippedCount(type); + } + + /** + * Return the number of times a {@link SplitNormalizationPlan} has been submitted. + */ + public long getSplitPlanCount() { + return worker == null ? 0 : worker.getSplitPlanCount(); + } + + /** + * Return the number of times a {@link MergeNormalizationPlan} has been submitted. + */ + public long getMergePlanCount() { + return worker == null ? 0 : worker.getMergePlanCount(); + } + + /** + * Submit tables for normalization. + * @param tables a list of tables to submit. + * @param isHighPriority {@code true} when these requested tables should skip to the front of + * the queue. + * @return {@code true} when work was queued, {@code false} otherwise. + */ + public boolean normalizeRegions(List tables, boolean isHighPriority) { + if (workQueue == null) { + return false; + } + if (isHighPriority) { + workQueue.putAllFirst(tables); + } else { + workQueue.putAll(tables); + } + return true; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java new file mode 100644 index 00000000000..5ebb4f9ad08 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.normalizer; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A specialized collection that holds pending work for the {@link RegionNormalizerWorker}. It is + * an ordered collection class that has the following properties: + *
      + *
    • Guarantees uniqueness of elements, as a {@link Set}.
    • + *
    • Consumers retrieve objects from the head, as a {@link Queue}, via {@link #take()}.
    • + *
    • Work is retrieved on a FIFO policy.
    • + *
    • Work retrieval blocks the calling thread until new work is available, as a + * {@link BlockingQueue}.
    • + *
    • Allows a producer to insert an item at the head of the queue, if desired.
    • + *
    + * Assumes low-frequency and low-parallelism concurrent access, so protects state using a + * simplistic synchronization strategy. + */ +@InterfaceAudience.Private +class RegionNormalizerWorkQueue { + + /** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */ + private LinkedHashSet delegate; + + // the locking structure used here follows the example found in LinkedBlockingQueue. The + // difference is that our locks guard access to `delegate` rather than the head node. + + /** Lock held by take, poll, etc */ + private final ReentrantLock takeLock; + + /** Wait queue for waiting takes */ + private final Condition notEmpty; + + /** Lock held by put, offer, etc */ + private final ReentrantLock putLock; + + RegionNormalizerWorkQueue() { + delegate = new LinkedHashSet<>(); + takeLock = new ReentrantLock(); + notEmpty = takeLock.newCondition(); + putLock = new ReentrantLock(); + } + + /** + * Signals a waiting take. Called only from put/offer (which do not + * otherwise ordinarily lock takeLock.) + */ + private void signalNotEmpty() { + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + } + + /** + * Locks to prevent both puts and takes. + */ + private void fullyLock() { + putLock.lock(); + takeLock.lock(); + } + + /** + * Unlocks to allow both puts and takes. + */ + private void fullyUnlock() { + takeLock.unlock(); + putLock.unlock(); + } + + /** + * Inserts the specified element at the tail of the queue, if it's not already present. + * + * @param e the element to add + */ + public void put(E e) { + if (e == null) { + throw new NullPointerException(); + } + + putLock.lock(); + try { + delegate.add(e); + } finally { + putLock.unlock(); + } + + if (!delegate.isEmpty()) { + signalNotEmpty(); + } + } + + /** + * Inserts the specified element at the head of the queue. + * + * @param e the element to add + */ + public void putFirst(E e) { + if (e == null) { + throw new NullPointerException(); + } + putAllFirst(Collections.singleton(e)); + } + + /** + * Inserts the specified elements at the tail of the queue. Any elements already present in + * the queue are ignored. + * + * @param c the elements to add + */ + public void putAll(Collection c) { + if (c == null) { + throw new NullPointerException(); + } + + putLock.lock(); + try { + delegate.addAll(c); + } finally { + putLock.unlock(); + } + + if (!delegate.isEmpty()) { + signalNotEmpty(); + } + } + + /** + * Inserts the specified elements at the head of the queue. + * + * @param c the elements to add + */ + public void putAllFirst(Collection c) { + if (c == null) { + throw new NullPointerException(); + } + + fullyLock(); + try { + final LinkedHashSet copy = new LinkedHashSet<>(c.size() + delegate.size()); + copy.addAll(c); + copy.addAll(delegate); + delegate = copy; + } finally { + fullyUnlock(); + } + + if (!delegate.isEmpty()) { + signalNotEmpty(); + } + } + + /** + * Retrieves and removes the head of this queue, waiting if necessary + * until an element becomes available. + * + * @return the head of this queue + * @throws InterruptedException if interrupted while waiting + */ + public E take() throws InterruptedException { + E x; + takeLock.lockInterruptibly(); + try { + while (delegate.isEmpty()) { + notEmpty.await(); + } + final Iterator iter = delegate.iterator(); + x = iter.next(); + iter.remove(); + if (!delegate.isEmpty()) { + notEmpty.signal(); + } + } finally { + takeLock.unlock(); + } + return x; + } + + /** + * Atomically removes all of the elements from this queue. + * The queue will be empty after this call returns. + */ + public void clear() { + putLock.lock(); + try { + delegate.clear(); + } finally { + putLock.unlock(); + } + } + + /** + * Returns the number of elements in this queue. + * + * @return the number of elements in this queue + */ + public int size() { + takeLock.lock(); + try { + return delegate.size(); + } finally { + takeLock.unlock(); + } + } + + @Override + public String toString() { + takeLock.lock(); + try { + return delegate.toString(); + } finally { + takeLock.unlock(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java new file mode 100644 index 00000000000..30f9fc25364 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.normalizer; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; + +/** + * Consumes normalization request targets ({@link TableName}s) off the + * {@link RegionNormalizerWorkQueue}, dispatches them to the {@link RegionNormalizer}, + * and executes the resulting {@link NormalizationPlan}s. + */ +@InterfaceAudience.Private +class RegionNormalizerWorker implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class); + + static final String RATE_LIMIT_BYTES_PER_SEC_KEY = + "hbase.normalizer.throughput.max_bytes_per_sec"; + private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec + + private final MasterServices masterServices; + private final RegionNormalizer regionNormalizer; + private final RegionNormalizerWorkQueue workQueue; + private final RateLimiter rateLimiter; + + private final long[] skippedCount; + private long splitPlanCount; + private long mergePlanCount; + + RegionNormalizerWorker( + final Configuration configuration, + final MasterServices masterServices, + final RegionNormalizer regionNormalizer, + final RegionNormalizerWorkQueue workQueue + ) { + this.masterServices = masterServices; + this.regionNormalizer = regionNormalizer; + this.workQueue = workQueue; + this.skippedCount = new long[NormalizationPlan.PlanType.values().length]; + this.splitPlanCount = 0; + this.mergePlanCount = 0; + this.rateLimiter = loadRateLimiter(configuration); + } + + private static RateLimiter loadRateLimiter(final Configuration configuration) { + long rateLimitBytes = + configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES); + long rateLimitMbs = rateLimitBytes / 1_000_000L; + if (rateLimitMbs <= 0) { + LOG.warn("Configured value {}={} is <= 1MB. Falling back to default.", + RATE_LIMIT_BYTES_PER_SEC_KEY, rateLimitBytes); + rateLimitBytes = RATE_UNLIMITED_BYTES; + rateLimitMbs = RATE_UNLIMITED_BYTES / 1_000_000L; + } + LOG.info("Normalizer rate limit set to {}", + rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec"); + return RateLimiter.create(rateLimitMbs); + } + + /** + * @see RegionNormalizerManager#planSkipped(NormalizationPlan.PlanType) + */ + void planSkipped(NormalizationPlan.PlanType type) { + synchronized (skippedCount) { + // updates come here via procedure threads, so synchronize access to this counter. + skippedCount[type.ordinal()]++; + } + } + + /** + * @see RegionNormalizerManager#getSkippedCount(NormalizationPlan.PlanType) + */ + long getSkippedCount(NormalizationPlan.PlanType type) { + return skippedCount[type.ordinal()]; + } + + /** + * @see RegionNormalizerManager#getSplitPlanCount() + */ + long getSplitPlanCount() { + return splitPlanCount; + } + + /** + * @see RegionNormalizerManager#getMergePlanCount() + */ + long getMergePlanCount() { + return mergePlanCount; + } + + @Override + public void run() { + while (true) { + if (Thread.interrupted()) { + LOG.debug("interrupt detected. terminating."); + break; + } + final TableName tableName; + try { + tableName = workQueue.take(); + } catch (InterruptedException e) { + LOG.debug("interrupt detected. terminating."); + break; + } + + final List plans = calculatePlans(tableName); + submitPlans(plans); + } + } + + private List calculatePlans(final TableName tableName) { + if (masterServices.skipRegionManagementAction("region normalizer")) { + return Collections.emptyList(); + } + + try { + final TableDescriptor tblDesc = masterServices.getTableDescriptors().get(tableName); + if (tblDesc != null && !tblDesc.isNormalizationEnabled()) { + LOG.debug("Skipping table {} because normalization is disabled in its table properties.", + tableName); + return Collections.emptyList(); + } + } catch (IOException e) { + LOG.debug("Skipping table {} because unable to access its table descriptor.", tableName, e); + return Collections.emptyList(); + } + + final List plans = regionNormalizer.computePlansForTable(tableName); + if (CollectionUtils.isEmpty(plans)) { + LOG.debug("No normalization required for table {}.", tableName); + return Collections.emptyList(); + } + return plans; + } + + private void submitPlans(final List plans) { + // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit + // task, so there's no artificial rate-limiting of merge/split requests due to this serial loop. + for (NormalizationPlan plan : plans) { + switch (plan.getType()) { + case MERGE: { + submitMergePlan((MergeNormalizationPlan) plan); + break; + } + case SPLIT: { + submitSplitPlan((SplitNormalizationPlan) plan); + break; + } + case NONE: + LOG.debug("Nothing to do for {} with PlanType=NONE. Ignoring.", plan); + planSkipped(plan.getType()); + break; + default: + LOG.warn("Plan {} is of an unrecognized PlanType. Ignoring.", plan); + planSkipped(plan.getType()); + break; + } + } + } + + /** + * Interacts with {@link MasterServices} in order to execute a plan. + */ + private void submitMergePlan(final MergeNormalizationPlan plan) { + final int totalSizeMb; + try { + final long totalSizeMbLong = plan.getNormalizationTargets() + .stream() + .mapToLong(NormalizationTarget::getRegionSizeMb) + .reduce(0, Math::addExact); + totalSizeMb = Math.toIntExact(totalSizeMbLong); + } catch (ArithmeticException e) { + LOG.debug("Sum of merge request size overflows rate limiter data type. {}", plan); + planSkipped(plan.getType()); + return; + } + + final RegionInfo[] infos = plan.getNormalizationTargets() + .stream() + .map(NormalizationTarget::getRegionInfo) + .toArray(RegionInfo[]::new); + final long pid; + try { + pid = masterServices.mergeRegions( + infos, false, HConstants.NO_NONCE, HConstants.NO_NONCE); + } catch (IOException e) { + LOG.info("failed to submit plan {}.", plan, e); + planSkipped(plan.getType()); + return; + } + mergePlanCount++; + LOG.info("Submitted {} resulting in pid {}", plan, pid); + final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb))); + LOG.debug("Rate limiting delayed the worker by {}", Duration.ofSeconds(rateLimitedSecs)); + } + + /** + * Interacts with {@link MasterServices} in order to execute a plan. + */ + private void submitSplitPlan(final SplitNormalizationPlan plan) { + final int totalSizeMb; + try { + totalSizeMb = Math.toIntExact(plan.getSplitTarget().getRegionSizeMb()); + } catch (ArithmeticException e) { + LOG.debug("Split request size overflows rate limiter data type. {}", plan); + planSkipped(plan.getType()); + return; + } + final RegionInfo info = plan.getSplitTarget().getRegionInfo(); + final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb))); + LOG.debug("Rate limiting delayed this operation by {}", Duration.ofSeconds(rateLimitedSecs)); + + final long pid; + try { + pid = masterServices.splitRegion( + info, null, HConstants.NO_NONCE, HConstants.NO_NONCE); + } catch (IOException e) { + LOG.info("failed to submit plan {}.", plan, e); + planSkipped(plan.getType()); + return; + } + splitPlanCount++; + LOG.info("Submitted {} resulting in pid {}", plan, pid); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java index a904e17f7b0..a641a0aa25b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.assignment.RegionStates; -import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -54,29 +53,9 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti *
  • Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1 * are kindly requested to merge.
  • * - *

    - * The following parameters are configurable: - *

      - *
    1. Whether to split a region as part of normalization. Configuration: - * {@value #SPLIT_ENABLED_KEY}, default: {@value #DEFAULT_SPLIT_ENABLED}.
    2. - *
    3. Whether to merge a region as part of normalization. Configuration: - * {@value #MERGE_ENABLED_KEY}, default: {@value #DEFAULT_MERGE_ENABLED}.
    4. - *
    5. The minimum number of regions in a table to consider it for merge normalization. - * Configuration: {@value #MIN_REGION_COUNT_KEY}, default: - * {@value #DEFAULT_MIN_REGION_COUNT}.
    6. - *
    7. The minimum age for a region to be considered for a merge, in days. Configuration: - * {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}, default: - * {@value #DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.
    8. - *
    9. The minimum size for a region to be considered for a merge, in whole MBs. Configuration: - * {@value #MERGE_MIN_REGION_SIZE_MB_KEY}, default: - * {@value #DEFAULT_MERGE_MIN_REGION_SIZE_MB}.
    10. - *
    - *

    - * To see detailed logging of the application of these configuration values, set the log level for - * this class to `TRACE`. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class SimpleRegionNormalizer implements RegionNormalizer { +class SimpleRegionNormalizer implements RegionNormalizer { private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class); static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled"; @@ -92,7 +71,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer { static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb"; static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1; - private final long[] skippedCount; private Configuration conf; private MasterServices masterServices; private boolean splitEnabled; @@ -102,7 +80,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer { private int mergeMinRegionSizeMb; public SimpleRegionNormalizer() { - skippedCount = new long[NormalizationPlan.PlanType.values().length]; splitEnabled = DEFAULT_SPLIT_ENABLED; mergeEnabled = DEFAULT_MERGE_ENABLED; minRegionCount = DEFAULT_MIN_REGION_COUNT; @@ -203,16 +180,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer { this.masterServices = masterServices; } - @Override - public void planSkipped(final RegionInfo hri, final PlanType type) { - skippedCount[type.ordinal()]++; - } - - @Override - public long getSkippedCount(NormalizationPlan.PlanType type) { - return skippedCount[type.ordinal()]; - } - @Override public List computePlansForTable(final TableName table) { if (table == null) { @@ -371,7 +338,11 @@ public class SimpleRegionNormalizer implements RegionNormalizer { final long nextSizeMb = getRegionSizeMB(next); // always merge away empty regions when they present themselves. if (currentSizeMb == 0 || nextSizeMb == 0 || currentSizeMb + nextSizeMb < avgRegionSizeMb) { - plans.add(new MergeNormalizationPlan(current, next)); + final MergeNormalizationPlan plan = new MergeNormalizationPlan.Builder() + .addTarget(current, currentSizeMb) + .addTarget(next, nextSizeMb) + .build(); + plans.add(plan); candidateIdx++; } } @@ -408,11 +379,11 @@ public class SimpleRegionNormalizer implements RegionNormalizer { if (skipForSplit(ctx.getRegionStates().getRegionState(hri), hri)) { continue; } - final long regionSize = getRegionSizeMB(hri); - if (regionSize > 2 * avgRegionSize) { + final long regionSizeMb = getRegionSizeMB(hri); + if (regionSizeMb > 2 * avgRegionSize) { LOG.info("Table {}, large region {} has size {}, more than twice avg size {}, splitting", - ctx.getTableName(), hri.getRegionNameAsString(), regionSize, avgRegionSize); - plans.add(new SplitNormalizationPlan(hri)); + ctx.getTableName(), hri.getRegionNameAsString(), regionSizeMb, avgRegionSize); + plans.add(new SplitNormalizationPlan(hri, regionSizeMb)); } } return plans; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java index 7c634fbf248..ffe68cc9f62 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java @@ -18,32 +18,23 @@ */ package org.apache.hadoop.hbase.master.normalizer; -import java.io.IOException; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.master.MasterServices; import org.apache.yetus.audience.InterfaceAudience; /** - * Normalization plan to split region. + * Normalization plan to split a region. */ @InterfaceAudience.Private -public class SplitNormalizationPlan implements NormalizationPlan { +final class SplitNormalizationPlan implements NormalizationPlan { - private final RegionInfo regionInfo; + private final NormalizationTarget splitTarget; - public SplitNormalizationPlan(RegionInfo regionInfo) { - this.regionInfo = regionInfo; - } - - @Override - public long submit(MasterServices masterServices) throws IOException { - return masterServices.splitRegion(regionInfo, null, HConstants.NO_NONCE, - HConstants.NO_NONCE); + SplitNormalizationPlan(final RegionInfo splitTarget, final long splitTargetSizeMb) { + this.splitTarget = new NormalizationTarget(splitTarget, splitTargetSizeMb); } @Override @@ -51,14 +42,14 @@ public class SplitNormalizationPlan implements NormalizationPlan { return PlanType.SPLIT; } - public RegionInfo getRegionInfo() { - return regionInfo; + public NormalizationTarget getSplitTarget() { + return splitTarget; } @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) - .append("regionInfo", regionInfo) + .append("splitTarget", splitTarget) .toString(); } @@ -75,13 +66,13 @@ public class SplitNormalizationPlan implements NormalizationPlan { SplitNormalizationPlan that = (SplitNormalizationPlan) o; return new EqualsBuilder() - .append(regionInfo, that.regionInfo) + .append(splitTarget, that.splitTarget) .isEquals(); } @Override public int hashCode() { return new HashCodeBuilder(17, 37) - .append(regionInfo) + .append(splitTarget) .toHashCode(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/package-info.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/package-info.java new file mode 100644 index 00000000000..e3180347dc3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/package-info.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The Region Normalizer subsystem is responsible for coaxing all the regions in a table toward + * a "normal" size, according to their storefile size. It does this by splitting regions that + * are significantly larger than the norm, and merging regions that are significantly smaller than + * the norm. + *

    + * The public interface to the Region Normalizer subsystem is limited to the following classes: + *
      + *
    • + * The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory} provides an + * entry point for creating an instance of the + * {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager}. + *
    • + *
    • + * The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager} encapsulates + * the whole Region Normalizer subsystem. You'll find one of these hanging off of the + * {@link org.apache.hadoop.hbase.master.HMaster}, which uses it to delegate API calls. There + * is usually only a single instance of this class. + *
    • + *
    • + * Various configuration points that share the common prefix of {@code hbase.normalizer}. + *
        + *
      • Whether to split a region as part of normalization. Configuration: + * {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#SPLIT_ENABLED_KEY}, + * default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_SPLIT_ENABLED}. + *
      • + *
      • Whether to merge a region as part of normalization. Configuration: + * {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_ENABLED_KEY}, + * default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_ENABLED}. + *
      • + *
      • The minimum number of regions in a table to consider it for merge normalization. + * Configuration: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MIN_REGION_COUNT_KEY}, + * default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MIN_REGION_COUNT}. + *
      • + *
      • The minimum age for a region to be considered for a merge, in days. Configuration: + * {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_MIN_REGION_AGE_DAYS_KEY}, + * default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_MIN_REGION_AGE_DAYS}. + *
      • + *
      • The minimum size for a region to be considered for a merge, in whole MBs. Configuration: + * {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_MIN_REGION_SIZE_MB_KEY}, + * default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_MIN_REGION_SIZE_MB}. + *
      • + *
      • The limit on total throughput of the Region Normalizer's actions, in whole MBs. Configuration: + * {@value org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker#RATE_LIMIT_BYTES_PER_SEC_KEY}, + * default: unlimited. + *
      • + *
      + *

      + * To see detailed logging of the application of these configuration values, set the log + * level for this package to `TRACE`. + *

      + *
    • + *
    + * The Region Normalizer subsystem is composed of a handful of related classes: + *
      + *
    • + * The {@link org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker} provides a system by + * which the Normalizer can be disabled at runtime. It currently does this by managing a znode, + * but this is an implementation detail. + *
    • + *
    • + * The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorkQueue} is a + * {@link java.util.Set}-like {@link java.util.Queue} that permits a single copy of a given + * work item to exist in the queue at one time. It also provides a facility for a producer to + * add an item to the front of the line. Consumers are blocked waiting for new work. + *
    • + *
    • + * The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore} wakes up + * periodically and schedules new normalization work, adding targets to the queue. + *
    • + *
    • + * The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker} runs in a + * daemon thread, grabbing work off the queue as is it becomes available. + *
    • + *
    • + * The {@link org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer} implements the + * logic for calculating target region sizes and emitting a list of corresponding + * {@link org.apache.hadoop.hbase.master.normalizer.NormalizationPlan} objects. + *
    • + *
    + */ +package org.apache.hadoop.hbase.master.normalizer; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 7c65005de55..3f3e80960bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master; import static org.mockito.Mockito.mock; - import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.MasterSwitchType; +import org.apache.hadoop.hbase.client.NormalizeTableFilterParams; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.executor.ExecutorService; @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.janitor.CatalogJanitor; import org.apache.hadoop.hbase.master.locking.LockManager; -import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; +import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager; @@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; - import org.apache.hbase.thirdparty.com.google.protobuf.Service; public class MockNoopMasterServices implements MasterServices { @@ -109,11 +108,6 @@ public class MockNoopMasterServices implements MasterServices { return null; } - @Override - public RegionNormalizer getRegionNormalizer() { - return null; - } - @Override public CatalogJanitor getCatalogJanitor() { return null; @@ -139,6 +133,10 @@ public class MockNoopMasterServices implements MasterServices { return null; } + @Override public RegionNormalizerManager getRegionNormalizerManager() { + return null; + } + @Override public ProcedureExecutor getMasterProcedureExecutor() { return null; @@ -341,6 +339,10 @@ public class MockNoopMasterServices implements MasterServices { return false; } + @Override public boolean skipRegionManagementAction(String action) { + return false; + } + @Override public long getLastMajorCompactionTimestamp(TableName table) throws IOException { return 0; @@ -507,4 +509,9 @@ public class MockNoopMasterServices implements MasterServices { public boolean isBalancerOn() { return false; } + + @Override + public boolean normalizeRegions(NormalizeTableFilterParams ntfp, boolean isHighPriority) { + return false; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterChoreScheduled.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterChoreScheduled.java index 5aec49bdb11..87a7e680ff8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterChoreScheduled.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterChoreScheduled.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.master; import java.lang.reflect.Field; - import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.ScheduledChore; @@ -30,7 +29,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner; import org.apache.hadoop.hbase.master.janitor.CatalogJanitor; -import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.AfterClass; @@ -66,7 +64,7 @@ public class TestMasterChoreScheduled { } @Test - public void testDefaultScheduledChores() throws Exception { + public void testDefaultScheduledChores() { // test if logCleaner chore is scheduled by default in HMaster init TestChoreField logCleanerTestChoreField = new TestChoreField<>(); LogCleaner logCleaner = logCleanerTestChoreField.getChoreObj("logCleaner"); @@ -96,10 +94,10 @@ public class TestMasterChoreScheduled { balancerChoreTestChoreField.testIfChoreScheduled(balancerChore); // test if normalizerChore chore is scheduled by default in HMaster init - TestChoreField regionNormalizerChoreTestChoreField = + ScheduledChore regionNormalizerChore = hMaster.getRegionNormalizerManager() + .getRegionNormalizerChore(); + TestChoreField regionNormalizerChoreTestChoreField = new TestChoreField<>(); - RegionNormalizerChore regionNormalizerChore = regionNormalizerChoreTestChoreField - .getChoreObj("normalizerChore"); regionNormalizerChoreTestChoreField.testIfChoreScheduled(regionNormalizerChore); // test if catalogJanitorChore chore is scheduled by default in HMaster init @@ -114,22 +112,27 @@ public class TestMasterChoreScheduled { hbckChoreTestChoreField.testIfChoreScheduled(hbckChore); } - + /** + * Reflect into the {@link HMaster} instance and find by field name a specified instance + * of {@link ScheduledChore}. + */ private static class TestChoreField { - private E getChoreObj(String fieldName) throws NoSuchFieldException, - IllegalAccessException { - Field masterField = HMaster.class.getDeclaredField(fieldName); - masterField.setAccessible(true); - E choreFieldVal = (E) masterField.get(hMaster); - return choreFieldVal; + @SuppressWarnings("unchecked") + private E getChoreObj(String fieldName) { + try { + Field masterField = HMaster.class.getDeclaredField(fieldName); + masterField.setAccessible(true); + return (E) masterField.get(hMaster); + } catch (Exception e) { + throw new AssertionError( + "Unable to retrieve field '" + fieldName + "' from HMaster instance.", e); + } } private void testIfChoreScheduled(E choreObj) { Assert.assertNotNull(choreObj); Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(choreObj)); } - } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java index ff88be1ef20..6ac68b30048 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java @@ -72,8 +72,10 @@ public class TestMasterMetricsWrapper { public void testInfo() { HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); MetricsMasterWrapperImpl info = new MetricsMasterWrapperImpl(master); - assertEquals(master.getSplitPlanCount(), info.getSplitPlanCount(), 0); - assertEquals(master.getMergePlanCount(), info.getMergePlanCount(), 0); + assertEquals( + master.getRegionNormalizerManager().getSplitPlanCount(), info.getSplitPlanCount(), 0); + assertEquals( + master.getRegionNormalizerManager().getMergePlanCount(), info.getMergePlanCount(), 0); assertEquals(master.getAverageLoad(), info.getAverageLoad(), 0); assertEquals(master.getClusterId(), info.getClusterId()); assertEquals(master.getMasterActiveTime(), info.getActiveTime()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorkQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorkQueue.java new file mode 100644 index 00000000000..7e6c74910ed --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorkQueue.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.normalizer; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Tests that {@link RegionNormalizerWorkQueue} implements the contract described in its docstring. + */ +@Category({ MasterTests.class, SmallTests.class}) +public class TestRegionNormalizerWorkQueue { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionNormalizerWorkQueue.class); + + @Rule + public TestName testName = new TestName(); + + @Test + public void testElementUniquenessAndFIFO() throws Exception { + final RegionNormalizerWorkQueue queue = new RegionNormalizerWorkQueue<>(); + final List content = new LinkedList<>(); + IntStream.of(4, 3, 2, 1, 4, 3, 2, 1) + .boxed() + .forEach(queue::put); + assertEquals(4, queue.size()); + while (queue.size() > 0) { + content.add(queue.take()); + } + assertThat(content, contains(4, 3, 2, 1)); + + queue.clear(); + queue.putAll(Arrays.asList(4, 3, 2, 1)); + queue.putAll(Arrays.asList(4, 5)); + assertEquals(5, queue.size()); + content.clear(); + while (queue.size() > 0) { + content.add(queue.take()); + } + assertThat(content, contains(4, 3, 2, 1, 5)); + } + + @Test + public void testPriorityAndFIFO() throws Exception { + final RegionNormalizerWorkQueue queue = new RegionNormalizerWorkQueue<>(); + final List content = new LinkedList<>(); + queue.putAll(Arrays.asList(4, 3, 2, 1)); + assertEquals(4, queue.size()); + queue.putFirst(0); + assertEquals(5, queue.size()); + drainTo(queue, content); + assertThat("putFirst items should jump the queue, preserving existing order", + content, contains(0, 4, 3, 2, 1)); + + queue.clear(); + content.clear(); + queue.putAll(Arrays.asList(4, 3, 2, 1)); + queue.putFirst(1); + assertEquals(4, queue.size()); + drainTo(queue, content); + assertThat("existing items re-added with putFirst should jump the queue", + content, contains(1, 4, 3, 2)); + + queue.clear(); + content.clear(); + queue.putAll(Arrays.asList(4, 3, 2, 1)); + queue.putAllFirst(Arrays.asList(2, 3)); + assertEquals(4, queue.size()); + drainTo(queue, content); + assertThat( + "existing items re-added with putAllFirst jump the queue AND honor changes in priority", + content, contains(2, 3, 4, 1)); + } + + private enum Action { + PUT, + PUT_FIRST, + PUT_ALL, + PUT_ALL_FIRST, + } + + /** + * Test that the uniqueness constraint is honored in the face of concurrent modification. + */ + @Test + public void testConcurrentPut() throws Exception { + final RegionNormalizerWorkQueue queue = new RegionNormalizerWorkQueue<>(); + final int maxValue = 100; + final Runnable producer = () -> { + final Random rand = ThreadLocalRandom.current(); + for (int i = 0; i < 1_000; i++) { + final Action action = Action.values()[rand.nextInt(Action.values().length)]; + switch (action) { + case PUT: { + final int val = rand.nextInt(maxValue); + queue.put(val); + break; + } + case PUT_FIRST: { + final int val = rand.nextInt(maxValue); + queue.putFirst(val); + break; + } + case PUT_ALL: { + final List vals = rand.ints(5, 0, maxValue) + .boxed() + .collect(Collectors.toList()); + queue.putAll(vals); + break; + } + case PUT_ALL_FIRST: { + final List vals = rand.ints(5, 0, maxValue) + .boxed() + .collect(Collectors.toList()); + queue.putAllFirst(vals); + break; + } + default: + fail("Unrecognized action " + action); + } + } + }; + + final int numThreads = 5; + final CompletableFuture[] futures = IntStream.range(0, numThreads) + .mapToObj(val -> CompletableFuture.runAsync(producer)) + .toArray(CompletableFuture[]::new); + CompletableFuture.allOf(futures).join(); + + final List content = new ArrayList<>(queue.size()); + drainTo(queue, content); + assertThat("at most `maxValue` items should be present.", + content.size(), lessThanOrEqualTo(maxValue)); + assertEquals("all items should be unique.", content.size(), new HashSet<>(content).size()); + } + + /** + * Test that calls to {@link RegionNormalizerWorkQueue#take()} block the requesting thread. The + * producing thread places new entries onto the queue following a known schedule. The consuming + * thread collects a time measurement between calls to {@code take}. Finally, the test makes + * coarse-grained assertions of the consumer's observations based on the producer's schedule. + */ + @Test + public void testTake() throws Exception { + final RegionNormalizerWorkQueue queue = new RegionNormalizerWorkQueue<>(); + final ConcurrentLinkedQueue takeTimes = new ConcurrentLinkedQueue<>(); + final AtomicBoolean finished = new AtomicBoolean(false); + final Runnable consumer = () -> { + try { + while (!finished.get()) { + queue.take(); + takeTimes.add(System.nanoTime()); + } + } catch (InterruptedException e) { + fail("interrupted."); + } + }; + + CompletableFuture worker = CompletableFuture.runAsync(consumer); + final long testStart = System.nanoTime(); + for (int i = 0; i < 5; i++) { + Thread.sleep(10); + queue.put(i); + } + + // set finished = true and pipe one more value in case the thread needs an extra pass through + // the loop. + finished.set(true); + queue.put(1); + worker.get(1, TimeUnit.SECONDS); + + final Iterator times = takeTimes.iterator(); + assertTrue("should have timing information for at least 2 calls to take.", + takeTimes.size() >= 5); + for (int i = 0; i < 5; i++) { + assertThat( + "Observations collected in takeTimes should increase by roughly 10ms every interval", + times.next(), greaterThan(testStart + TimeUnit.MILLISECONDS.toNanos(i * 10))); + } + } + + private static void drainTo(final RegionNormalizerWorkQueue queue, Collection dest) + throws InterruptedException { + assertThat(queue.size(), greaterThan(0)); + while (queue.size() > 0) { + dest.add(queue.take()); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java new file mode 100644 index 00000000000..e3a29b85406 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.normalizer; + +import static java.util.Collections.singletonList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.comparesEqualTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.when; +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNameTestRule; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.StringDescription; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A test over {@link RegionNormalizerWorker}. Being a background thread, the only points of + * interaction we have to this class are its input source ({@link RegionNormalizerWorkQueue} and + * its callbacks invoked against {@link RegionNormalizer} and {@link MasterServices}. The work + * queue is simple enough to use directly; for {@link MasterServices}, use a mock because, as of + * now, the worker only invokes 4 methods. + */ +@Category({ MasterTests.class, SmallTests.class}) +public class TestRegionNormalizerWorker { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionNormalizerWorker.class); + + @Rule + public TestName testName = new TestName(); + @Rule + public TableNameTestRule tableName = new TableNameTestRule(); + + @Rule + public MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MasterServices masterServices; + @Mock + private RegionNormalizer regionNormalizer; + + private HBaseCommonTestingUtility testingUtility; + private RegionNormalizerWorkQueue queue; + private ExecutorService workerPool; + + private final AtomicReference workerThreadThrowable = new AtomicReference<>(); + + @Before + public void before() throws Exception { + MockitoAnnotations.initMocks(this); + when(masterServices.skipRegionManagementAction(any())).thenReturn(false); + testingUtility = new HBaseCommonTestingUtility(); + queue = new RegionNormalizerWorkQueue<>(); + workerThreadThrowable.set(null); + + final String threadNameFmt = + TestRegionNormalizerWorker.class.getSimpleName() + "-" + testName.getMethodName() + "-%d"; + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat(threadNameFmt) + .setDaemon(true) + .setUncaughtExceptionHandler((t, e) -> workerThreadThrowable.set(e)) + .build(); + workerPool = Executors.newSingleThreadExecutor(threadFactory); + } + + @After + public void after() throws Exception { + workerPool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()` + assertTrue("timeout waiting for worker thread to terminate", + workerPool.awaitTermination(30, TimeUnit.SECONDS)); + final Throwable workerThrowable = workerThreadThrowable.get(); + assertThat("worker thread threw unexpected exception", workerThrowable, nullValue()); + } + + @Test + public void testMergeCounter() throws Exception { + final TableName tn = tableName.getTableName(); + final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn) + .setNormalizationEnabled(true) + .build(); + when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); + when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())) + .thenReturn(1L); + when(regionNormalizer.computePlansForTable(tn)) + .thenReturn(singletonList(new MergeNormalizationPlan.Builder() + .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10) + .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20) + .build())); + + final RegionNormalizerWorker worker = new RegionNormalizerWorker( + testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); + final long beforeMergePlanCount = worker.getMergePlanCount(); + workerPool.submit(worker); + queue.put(tn); + + assertThatEventually("executing work should see plan count increase", + worker::getMergePlanCount, greaterThan(beforeMergePlanCount)); + } + + @Test + public void testSplitCounter() throws Exception { + final TableName tn = tableName.getTableName(); + final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn) + .setNormalizationEnabled(true) + .build(); + when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); + when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())) + .thenReturn(1L); + when(regionNormalizer.computePlansForTable(tn)) + .thenReturn(singletonList( + new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10))); + + final RegionNormalizerWorker worker = new RegionNormalizerWorker( + testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); + final long beforeSplitPlanCount = worker.getSplitPlanCount(); + workerPool.submit(worker); + queue.put(tn); + + assertThatEventually("executing work should see plan count increase", + worker::getSplitPlanCount, greaterThan(beforeSplitPlanCount)); + } + + /** + * Assert that a rate limit is honored, at least in a rough way. Maintainers should manually + * inspect the log messages emitted by the worker thread to confirm that expected behavior. + */ + @Test + public void testRateLimit() throws Exception { + final TableName tn = tableName.getTableName(); + final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn) + .setNormalizationEnabled(true) + .build(); + final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build(); + final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build(); + final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build(); + when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); + when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())) + .thenReturn(1L); + when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())) + .thenReturn(1L); + when(regionNormalizer.computePlansForTable(tn)) + .thenReturn(Arrays.asList( + new SplitNormalizationPlan(splitRegionInfo, 2), + new MergeNormalizationPlan.Builder() + .addTarget(mergeRegionInfo1, 1) + .addTarget(mergeRegionInfo2, 2) + .build(), + new SplitNormalizationPlan(splitRegionInfo, 1))); + + final Configuration conf = testingUtility.getConfiguration(); + conf.set("hbase.normalizer.throughput.max_bytes_per_sec", "1m"); + final RegionNormalizerWorker worker = new RegionNormalizerWorker( + testingUtility.getConfiguration(), masterServices, regionNormalizer, queue); + workerPool.submit(worker); + final long startTime = System.nanoTime(); + queue.put(tn); + + assertThatEventually("executing work should see split plan count increase", + worker::getSplitPlanCount, comparesEqualTo(2L)); + assertThatEventually("executing work should see merge plan count increase", + worker::getMergePlanCount, comparesEqualTo(1L)); + + final long endTime = System.nanoTime(); + assertThat("rate limited normalizer should have taken at least 5 seconds", + Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5))); + } + + /** + * Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} + * until the matcher succeeds or the timeout period of 30 seconds is exhausted. + */ + private void assertThatEventually( + final String reason, + final Supplier actualSupplier, + final Matcher matcher + ) throws Exception { + testingUtility.waitFor(TimeUnit.SECONDS.toMillis(30), + new Waiter.ExplainingPredicate() { + private T lastValue = null; + + @Override + public String explainFailure() { + final Description description = new StringDescription() + .appendText(reason) + .appendText("\nExpected: ") + .appendDescriptionOf(matcher) + .appendText("\n but: "); + matcher.describeMismatch(lastValue, description); + return description.toString(); + } + + @Override public boolean evaluate() { + lastValue = actualSupplier.get(); + return matcher.matches(lastValue); + } + }); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java index 89da907eeb0..f263cbc4fdf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java @@ -175,8 +175,12 @@ public class TestSimpleRegionNormalizer { createRegionSizesMap(regionInfos, 15, 5, 5, 15, 16); setupMocksForNormalizer(regionSizes, regionInfos); - assertThat(normalizer.computePlansForTable(tableName), contains( - new MergeNormalizationPlan(regionInfos.get(1), regionInfos.get(2)))); + assertThat( + normalizer.computePlansForTable(tableName), + contains(new MergeNormalizationPlan.Builder() + .addTarget(regionInfos.get(1), 5) + .addTarget(regionInfos.get(2), 5) + .build())); } // Test for situation illustrated in HBASE-14867 @@ -188,9 +192,12 @@ public class TestSimpleRegionNormalizer { createRegionSizesMap(regionInfos, 1, 10000, 10000, 10000, 2700, 2700); setupMocksForNormalizer(regionSizes, regionInfos); - assertThat(normalizer.computePlansForTable(tableName), contains( - new MergeNormalizationPlan(regionInfos.get(4), regionInfos.get(5)) - )); + assertThat( + normalizer.computePlansForTable(tableName), + contains(new MergeNormalizationPlan.Builder() + .addTarget(regionInfos.get(4), 2700) + .addTarget(regionInfos.get(5), 2700) + .build())); } @Test @@ -214,7 +221,7 @@ public class TestSimpleRegionNormalizer { setupMocksForNormalizer(regionSizes, regionInfos); assertThat(normalizer.computePlansForTable(tableName), contains( - new SplitNormalizationPlan(regionInfos.get(3)))); + new SplitNormalizationPlan(regionInfos.get(3), 30))); } @Test @@ -229,18 +236,26 @@ public class TestSimpleRegionNormalizer { when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize()) .thenReturn(20L); assertThat(normalizer.computePlansForTable(tableName), contains( - new SplitNormalizationPlan(regionInfos.get(2)), - new SplitNormalizationPlan(regionInfos.get(3)), - new SplitNormalizationPlan(regionInfos.get(4)), - new SplitNormalizationPlan(regionInfos.get(5)) + new SplitNormalizationPlan(regionInfos.get(2), 60), + new SplitNormalizationPlan(regionInfos.get(3), 80), + new SplitNormalizationPlan(regionInfos.get(4), 100), + new SplitNormalizationPlan(regionInfos.get(5), 120) )); // test when target region size is 200 when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize()) .thenReturn(200L); - assertThat(normalizer.computePlansForTable(tableName), contains( - new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1)), - new MergeNormalizationPlan(regionInfos.get(2), regionInfos.get(3)))); + assertThat( + normalizer.computePlansForTable(tableName), + contains( + new MergeNormalizationPlan.Builder() + .addTarget(regionInfos.get(0), 20) + .addTarget(regionInfos.get(1), 40) + .build(), + new MergeNormalizationPlan.Builder() + .addTarget(regionInfos.get(2), 60) + .addTarget(regionInfos.get(3), 80) + .build())); } @Test @@ -255,14 +270,18 @@ public class TestSimpleRegionNormalizer { when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount()) .thenReturn(8); assertThat(normalizer.computePlansForTable(tableName), contains( - new SplitNormalizationPlan(regionInfos.get(2)), - new SplitNormalizationPlan(regionInfos.get(3)))); + new SplitNormalizationPlan(regionInfos.get(2), 60), + new SplitNormalizationPlan(regionInfos.get(3), 80))); // test when target region count is 3 when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount()) .thenReturn(3); - assertThat(normalizer.computePlansForTable(tableName), contains( - new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1)))); + assertThat( + normalizer.computePlansForTable(tableName), + contains(new MergeNormalizationPlan.Builder() + .addTarget(regionInfos.get(0), 20) + .addTarget(regionInfos.get(1), 40) + .build())); } @Test @@ -312,14 +331,17 @@ public class TestSimpleRegionNormalizer { List plans = normalizer.computePlansForTable(tableName); assertThat(plans, contains( - new SplitNormalizationPlan(regionInfos.get(2)), - new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1)))); + new SplitNormalizationPlan(regionInfos.get(2), 10), + new MergeNormalizationPlan.Builder() + .addTarget(regionInfos.get(0), 1) + .addTarget(regionInfos.get(1), 1) + .build())); // have to call setupMocks again because we don't have dynamic config update on normalizer. conf.setInt(MIN_REGION_COUNT_KEY, 4); setupMocksForNormalizer(regionSizes, regionInfos); assertThat(normalizer.computePlansForTable(tableName), contains( - new SplitNormalizationPlan(regionInfos.get(2)))); + new SplitNormalizationPlan(regionInfos.get(2), 10))); } @Test @@ -356,8 +378,12 @@ public class TestSimpleRegionNormalizer { assertFalse(normalizer.isSplitEnabled()); assertEquals(1, normalizer.getMergeMinRegionSizeMb()); - assertThat(normalizer.computePlansForTable(tableName), contains( - new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1)))); + assertThat( + normalizer.computePlansForTable(tableName), + contains(new MergeNormalizationPlan.Builder() + .addTarget(regionInfos.get(0), 1) + .addTarget(regionInfos.get(1), 2) + .build())); conf.setInt(MERGE_MIN_REGION_SIZE_MB_KEY, 3); setupMocksForNormalizer(regionSizes, regionInfos); @@ -378,9 +404,18 @@ public class TestSimpleRegionNormalizer { assertFalse(normalizer.isSplitEnabled()); assertEquals(0, normalizer.getMergeMinRegionSizeMb()); assertThat(normalizer.computePlansForTable(tableName), contains( - new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1)), - new MergeNormalizationPlan(regionInfos.get(2), regionInfos.get(3)), - new MergeNormalizationPlan(regionInfos.get(5), regionInfos.get(6)))); + new MergeNormalizationPlan.Builder() + .addTarget(regionInfos.get(0), 0) + .addTarget(regionInfos.get(1), 1) + .build(), + new MergeNormalizationPlan.Builder() + .addTarget(regionInfos.get(2), 10) + .addTarget(regionInfos.get(3), 0) + .build(), + new MergeNormalizationPlan.Builder() + .addTarget(regionInfos.get(5), 10) + .addTarget(regionInfos.get(6), 0) + .build())); } // This test is to make sure that normalizer is only going to merge adjacent regions. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java index 173adf49db2..f5feb59ca32 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizerOnCluster.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.util.Collections; import java.util.Comparator; @@ -161,6 +160,7 @@ public class TestSimpleRegionNormalizerOnCluster { tn2 + " should not have split.", tn2RegionCount, getRegionCount(tn2)); + LOG.debug("waiting for t3 to settle..."); waitForTableRegionCount(tn3, tn3RegionCount); } finally { dropIfExists(tn1); @@ -187,7 +187,7 @@ public class TestSimpleRegionNormalizerOnCluster { : TableName.valueOf(name.getMethodName()); final int currentRegionCount = createTableBegsSplit(tableName, true, false); - final long existingSkippedSplitCount = master.getRegionNormalizer() + final long existingSkippedSplitCount = master.getRegionNormalizerManager() .getSkippedCount(PlanType.SPLIT); assertFalse(admin.normalizerSwitch(true).get()); assertTrue(admin.normalize().get()); @@ -332,7 +332,8 @@ public class TestSimpleRegionNormalizerOnCluster { return "waiting to observe split attempt and skipped."; } @Override public boolean evaluate() { - final long skippedSplitCount = master.getRegionNormalizer().getSkippedCount(PlanType.SPLIT); + final long skippedSplitCount = master.getRegionNormalizerManager() + .getSkippedCount(PlanType.SPLIT); return skippedSplitCount > existingSkippedSplitCount; } });