HBASE-24628 Region normalizer now respects a rate limit

Implement a rate limiter for the normalizer. Implemented in terms of
MB/sec of affacted region size (the same metrics used to make
normalization decisions). Uses Guava `RateLimiter` to perform the
resource accounting. `RateLimiter` works by blocking (uninterruptible
😖) the calling thread. Thus, the whole construction of the normalizer
subsystem needed refactoring. See the provided `package-info.java` for
an overview of this new structure.

Introduces a new configuration,
`hbase.normalizer.throughput.max_bytes_per_sec`, for specifying a
limit on the throughput of actions executed by the normalizer. Note
that while this configuration value is in bytes, the minimum honored
valued `1_000_000`. Supports values configured using the
human-readable suffixes honored by `Configuration.getLongBytes`

Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Huaxiang Sun <huaxiangsun@apache.com>
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Nick Dimiduk 2020-09-23 16:47:23 -07:00 committed by Nick Dimiduk
parent b82d8a5517
commit 78ae1f176d
25 changed files with 1634 additions and 371 deletions

View File

@ -29,7 +29,6 @@ import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -49,7 +48,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.servlet.ServletException;
@ -117,11 +115,8 @@ import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
import org.apache.hadoop.hbase.master.cleaner.SnapshotCleanerChore;
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
@ -202,7 +197,6 @@ import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
@ -233,7 +227,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
@ -337,9 +330,6 @@ public class HMaster extends HRegionServer implements MasterServices {
// Tracker for split and merge state
private SplitOrMergeTracker splitOrMergeTracker;
// Tracker for region normalizer state
private RegionNormalizerTracker regionNormalizerTracker;
private ClusterSchemaService clusterSchemaService;
public static final String HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS =
@ -406,11 +396,8 @@ public class HMaster extends HRegionServer implements MasterServices {
private final LockManager lockManager = new LockManager(this);
private RSGroupBasedLoadBalancer balancer;
// a lock to prevent concurrent normalization actions.
private final ReentrantLock normalizationInProgressLock = new ReentrantLock();
private RegionNormalizer normalizer;
private BalancerChore balancerChore;
private RegionNormalizerChore normalizerChore;
private RegionNormalizerManager regionNormalizerManager;
private ClusterStatusChore clusterStatusChore;
private ClusterStatusPublisher clusterStatusPublisherChore = null;
private SnapshotCleanerChore snapshotCleanerChore = null;
@ -464,9 +451,6 @@ public class HMaster extends HRegionServer implements MasterServices {
// handle table states
private TableStateManager tableStateManager;
private long splitPlanCount;
private long mergePlanCount;
/** jetty server for master to redirect requests to regionserver infoServer */
private Server masterJettyServer;
@ -788,26 +772,19 @@ public class HMaster extends HRegionServer implements MasterServices {
}
/**
* <p>
* Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it
* should have already been initialized along with {@link ServerManager}.
* </p>
* <p>
* Will be overridden in tests.
* </p>
*/
@VisibleForTesting
protected void initializeZKBasedSystemTrackers()
throws IOException, InterruptedException, KeeperException, ReplicationException {
private void initializeZKBasedSystemTrackers()
throws IOException, KeeperException, ReplicationException {
this.balancer = new RSGroupBasedLoadBalancer();
this.balancer.setConf(conf);
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start();
this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
this.normalizer.setMasterServices(this);
this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
this.regionNormalizerTracker.start();
this.regionNormalizerManager =
RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this);
this.regionNormalizerManager.start();
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
this.splitOrMergeTracker.start();
@ -900,10 +877,10 @@ public class HMaster extends HRegionServer implements MasterServices {
* </ol>
* </li>
* <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li>
* <li>Start necessary service threads - balancer, catalog janior, executor services, and also the
* procedure executor, etc. Notice that the balancer must be created first as assignment manager
* may use it when assigning regions.</li>
* <li>Wait for meta to be initialized if necesssary, start table state manager.</li>
* <li>Start necessary service threads - balancer, catalog janitor, executor services, and also
* the procedure executor, etc. Notice that the balancer must be created first as assignment
* manager may use it when assigning regions.</li>
* <li>Wait for meta to be initialized if necessary, start table state manager.</li>
* <li>Wait for enough region servers to check-in</li>
* <li>Let assignment manager load data from meta and construct region states</li>
* <li>Start all other things such as chore services, etc</li>
@ -1116,8 +1093,7 @@ public class HMaster extends HRegionServer implements MasterServices {
getChoreService().scheduleChore(clusterStatusChore);
this.balancerChore = new BalancerChore(this);
getChoreService().scheduleChore(balancerChore);
this.normalizerChore = new RegionNormalizerChore(this);
getChoreService().scheduleChore(normalizerChore);
getChoreService().scheduleChore(regionNormalizerManager.getRegionNormalizerChore());
this.catalogJanitorChore = new CatalogJanitor(this);
getChoreService().scheduleChore(catalogJanitorChore);
this.hbckChore = new HbckChore(this);
@ -1533,6 +1509,9 @@ public class HMaster extends HRegionServer implements MasterServices {
// example.
stopProcedureExecutor();
if (regionNormalizerManager != null) {
regionNormalizerManager.stop();
}
if (this.quotaManager != null) {
this.quotaManager.stop();
}
@ -1651,7 +1630,7 @@ public class HMaster extends HRegionServer implements MasterServices {
choreService.cancelChore(this.mobFileCleanerChore);
choreService.cancelChore(this.mobFileCompactionChore);
choreService.cancelChore(this.balancerChore);
choreService.cancelChore(this.normalizerChore);
choreService.cancelChore(getRegionNormalizerManager().getRegionNormalizerChore());
choreService.cancelChore(this.clusterStatusChore);
choreService.cancelChore(this.catalogJanitorChore);
choreService.cancelChore(this.clusterStatusPublisherChore);
@ -1751,7 +1730,9 @@ public class HMaster extends HRegionServer implements MasterServices {
* @param action the name of the action under consideration, for logging.
* @return {@code true} when the caller should exit early, {@code false} otherwise.
*/
private boolean skipRegionManagementAction(final String action) {
@Override
public boolean skipRegionManagementAction(final String action) {
// Note: this method could be `default` on MasterServices if but for logging.
if (!isInitialized()) {
LOG.debug("Master has not been initialized, don't run {}.", action);
return true;
@ -1896,24 +1877,16 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
public RegionNormalizer getRegionNormalizer() {
return this.normalizer;
public RegionNormalizerManager getRegionNormalizerManager() {
return regionNormalizerManager;
}
public boolean normalizeRegions() throws IOException {
return normalizeRegions(new NormalizeTableFilterParams.Builder().build());
}
/**
* Perform normalization of cluster.
*
* @return true if an existing normalization was already in progress, or if a new normalization
* was performed successfully; false otherwise (specifically, if HMaster finished initializing
* or normalization is globally disabled).
*/
public boolean normalizeRegions(final NormalizeTableFilterParams ntfp) throws IOException {
final long startTime = EnvironmentEdgeManager.currentTime();
if (regionNormalizerTracker == null || !regionNormalizerTracker.isNormalizerOn()) {
@Override
public boolean normalizeRegions(
final NormalizeTableFilterParams ntfp,
final boolean isHighPriority
) throws IOException {
if (regionNormalizerManager == null || !regionNormalizerManager.isNormalizerOn()) {
LOG.debug("Region normalization is disabled, don't run region normalizer.");
return false;
}
@ -1924,14 +1897,6 @@ public class HMaster extends HRegionServer implements MasterServices {
return false;
}
if (!normalizationInProgressLock.tryLock()) {
// Don't run the normalizer concurrently
LOG.info("Normalization already in progress. Skipping request.");
return true;
}
int affectedTables = 0;
try {
final Set<TableName> matchingTables = getTableDescriptors(new LinkedList<>(),
ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false)
.stream()
@ -1942,52 +1907,7 @@ public class HMaster extends HRegionServer implements MasterServices {
final List<TableName> targetTables =
new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables));
Collections.shuffle(targetTables);
final List<Long> submittedPlanProcIds = new ArrayList<>();
for (TableName table : targetTables) {
if (table.isSystemTable()) {
continue;
}
final TableDescriptor tblDesc = getTableDescriptors().get(table);
if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
LOG.debug(
"Skipping table {} because normalization is disabled in its table properties.", table);
continue;
}
// make one last check that the cluster isn't shutting down before proceeding.
if (skipRegionManagementAction("region normalizer")) {
return false;
}
final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
if (CollectionUtils.isEmpty(plans)) {
LOG.debug("No normalization required for table {}.", table);
continue;
}
affectedTables++;
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to
// submit task , so there's no artificial rate-
// limiting of merge/split requests due to this serial loop.
for (NormalizationPlan plan : plans) {
long procId = plan.submit(this);
submittedPlanProcIds.add(procId);
if (plan.getType() == PlanType.SPLIT) {
splitPlanCount++;
} else if (plan.getType() == PlanType.MERGE) {
mergePlanCount++;
}
}
}
final long endTime = EnvironmentEdgeManager.currentTime();
LOG.info("Normalizer ran successfully in {}. Submitted {} plans, affecting {} tables.",
Duration.ofMillis(endTime - startTime), submittedPlanProcIds.size(), affectedTables);
LOG.debug("Normalizer submitted procID list: {}", submittedPlanProcIds);
} finally {
normalizationInProgressLock.unlock();
}
return true;
return regionNormalizerManager.normalizeRegions(targetTables, isHighPriority);
}
/**
@ -3003,20 +2923,6 @@ public class HMaster extends HRegionServer implements MasterServices {
return regionStates.getAverageLoad();
}
/*
* @return the count of region split plans executed
*/
public long getSplitPlanCount() {
return splitPlanCount;
}
/*
* @return the count of region merge plans executed
*/
public long getMergePlanCount() {
return mergePlanCount;
}
@Override
public boolean registerService(Service instance) {
/*
@ -3511,8 +3417,7 @@ public class HMaster extends HRegionServer implements MasterServices {
*/
public boolean isNormalizerOn() {
return !isInMaintenanceMode()
&& regionNormalizerTracker != null
&& regionNormalizerTracker.isNormalizerOn();
&& getRegionNormalizerManager().isNormalizerOn();
}
/**
@ -3540,13 +3445,6 @@ public class HMaster extends HRegionServer implements MasterServices {
LoadBalancerFactory.getDefaultLoadBalancerClass().getName());
}
/**
* @return RegionNormalizerTracker instance
*/
public RegionNormalizerTracker getRegionNormalizerTracker() {
return regionNormalizerTracker;
}
public SplitOrMergeTracker getSplitOrMergeTracker() {
return splitOrMergeTracker;
}

View File

@ -1913,9 +1913,7 @@ public class MasterRpcServices extends RSRpcServices implements
master.cpHost.postSetSplitOrMergeEnabled(newValue, switchType);
}
}
} catch (IOException e) {
throw new ServiceException(e);
} catch (KeeperException e) {
} catch (IOException | KeeperException e) {
throw new ServiceException(e);
}
return response.build();
@ -1940,7 +1938,8 @@ public class MasterRpcServices extends RSRpcServices implements
.namespace(request.hasNamespace() ? request.getNamespace() : null)
.build();
return NormalizeResponse.newBuilder()
.setNormalizerRan(master.normalizeRegions(ntfp))
// all API requests are considered priority requests.
.setNormalizerRan(master.normalizeRegions(ntfp, true))
.build();
} catch (IOException ex) {
throw new ServiceException(ex);
@ -1953,20 +1952,27 @@ public class MasterRpcServices extends RSRpcServices implements
rpcPreCheck("setNormalizerRunning");
// Sets normalizer on/off flag in ZK.
boolean prevValue = master.getRegionNormalizerTracker().isNormalizerOn();
boolean newValue = request.getOn();
try {
master.getRegionNormalizerTracker().setNormalizerOn(newValue);
} catch (KeeperException ke) {
LOG.warn("Error flipping normalizer switch", ke);
}
// TODO: this method is totally broken in terms of atomicity of actions and values read.
// 1. The contract has this RPC returning the previous value. There isn't a ZKUtil method
// that lets us retrieve the previous value as part of setting a new value, so we simply
// perform a read before issuing the update. Thus we have a data race opportunity, between
// when the `prevValue` is read and whatever is actually overwritten.
// 2. Down in `setNormalizerOn`, the call to `createAndWatch` inside of the catch clause can
// itself fail in the event that the znode already exists. Thus, another data race, between
// when the initial `setData` call is notified of the absence of the target znode and the
// subsequent `createAndWatch`, with another client creating said node.
// That said, there's supposed to be only one active master and thus there's supposed to be
// only one process with the authority to modify the value.
final boolean prevValue = master.getRegionNormalizerManager().isNormalizerOn();
final boolean newValue = request.getOn();
master.getRegionNormalizerManager().setNormalizerOn(newValue);
LOG.info("{} set normalizerSwitch={}", master.getClientIdAuditPrefix(), newValue);
return SetNormalizerRunningResponse.newBuilder().setPrevNormalizerValue(prevValue).build();
}
@Override
public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
IsNormalizerEnabledRequest request) throws ServiceException {
IsNormalizerEnabledRequest request) {
IsNormalizerEnabledResponse.Builder response = IsNormalizerEnabledResponse.newBuilder();
response.setEnabled(master.isNormalizerOn());
return response.build();

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.executor.ExecutorService;
@ -34,7 +35,7 @@ import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
@ -54,7 +55,6 @@ import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
@ -122,9 +122,9 @@ public interface MasterServices extends Server {
MasterQuotaManager getMasterQuotaManager();
/**
* @return Master's instance of {@link RegionNormalizer}
* @return Master's instance of {@link RegionNormalizerManager}
*/
RegionNormalizer getRegionNormalizer();
RegionNormalizerManager getRegionNormalizerManager();
/**
* @return Master's instance of {@link CatalogJanitor}
@ -354,6 +354,13 @@ public interface MasterServices extends Server {
*/
boolean isInMaintenanceMode();
/**
* Checks master state before initiating action over region topology.
* @param action the name of the action under consideration, for logging.
* @return {@code true} when the caller should exit early, {@code false} otherwise.
*/
boolean skipRegionManagementAction(final String action);
/**
* Abort a procedure.
* @param procId ID of the procedure
@ -553,4 +560,14 @@ public interface MasterServices extends Server {
* @return The state of the load balancer, or false if the load balancer isn't defined.
*/
boolean isBalancerOn();
/**
* Perform normalization of cluster.
* @param ntfp Selection criteria for identifying which tables to normalize.
* @param isHighPriority {@code true} when these requested tables should skip to the front of
* the queue.
* @return {@code true} when the request was submitted, {@code false} otherwise.
*/
boolean normalizeRegions(
final NormalizeTableFilterParams ntfp, final boolean isHighPriority) throws IOException;
}

View File

@ -55,12 +55,12 @@ public class MetricsMasterWrapperImpl implements MetricsMasterWrapper {
@Override
public long getSplitPlanCount() {
return master.getSplitPlanCount();
return master.getRegionNormalizerManager().getSplitPlanCount();
}
@Override
public long getMergePlanCount() {
return master.getMergePlanCount();
return master.getRegionNormalizerManager().getMergePlanCount();
}
@Override

View File

@ -59,9 +59,7 @@ import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@ -534,8 +532,10 @@ public class MergeTableRegionsProcedure
try {
env.getMasterServices().getMasterQuotaManager().onRegionMerged(this.mergedRegion);
} catch (QuotaExceededException e) {
env.getMasterServices().getRegionNormalizer().planSkipped(this.mergedRegion,
NormalizationPlan.PlanType.MERGE);
// TODO: why is this here? merge requests can be submitted by actors other than the normalizer
env.getMasterServices()
.getRegionNormalizerManager()
.planSkipped(NormalizationPlan.PlanType.MERGE);
throw e;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -71,13 +71,11 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
@ -181,7 +179,8 @@ public class SplitTableRegionProcedure
private void checkSplittable(final MasterProcedureEnv env,
final RegionInfo regionToSplit, final byte[] splitRow) throws IOException {
// Ask the remote RS if this region is splittable.
// If we get an IOE, report it along w/ the failure so can see why we are not splittable at this time.
// If we get an IOE, report it along w/ the failure so can see why we are not splittable at
// this time.
if(regionToSplit.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
throw new IllegalArgumentException("Can't invoke split on non-default regions directly");
}
@ -570,8 +569,10 @@ public class SplitTableRegionProcedure
try {
env.getMasterServices().getMasterQuotaManager().onRegionSplit(this.getParentRegion());
} catch (QuotaExceededException e) {
env.getMasterServices().getRegionNormalizer().planSkipped(this.getParentRegion(),
NormalizationPlan.PlanType.SPLIT);
// TODO: why is this here? split requests can be submitted by actors other than the normalizer
env.getMasterServices()
.getRegionNormalizerManager()
.planSkipped(NormalizationPlan.PlanType.SPLIT);
throw e;
}
}

View File

@ -18,41 +18,35 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* Normalization plan to merge regions (smallest region in the table with its smallest neighbor).
* Normalization plan to merge adjacent regions. As with any call to
* {@link MasterServices#mergeRegions(RegionInfo[], boolean, long, long)}
* with {@code forcible=false}, Region order and adjacency are important. It's the caller's
* responsibility to ensure the provided parameters are ordered according to the
* {code mergeRegions} method requirements.
*/
@InterfaceAudience.Private
public class MergeNormalizationPlan implements NormalizationPlan {
final class MergeNormalizationPlan implements NormalizationPlan {
private final RegionInfo firstRegion;
private final RegionInfo secondRegion;
private final List<NormalizationTarget> normalizationTargets;
public MergeNormalizationPlan(RegionInfo firstRegion, RegionInfo secondRegion) {
this.firstRegion = firstRegion;
this.secondRegion = secondRegion;
}
/**
* {@inheritDoc}
*/
@Override
public long submit(MasterServices masterServices) throws IOException {
// Do not use force=true as corner cases can happen, non adjacent regions,
// merge with a merged child region with no GC done yet, it is going to
// cause all different issues.
return masterServices
.mergeRegions(new RegionInfo[] { firstRegion, secondRegion }, false, HConstants.NO_NONCE,
HConstants.NO_NONCE);
private MergeNormalizationPlan(List<NormalizationTarget> normalizationTargets) {
Preconditions.checkNotNull(normalizationTargets);
Preconditions.checkState(normalizationTargets.size() >= 2,
"normalizationTargets.size() must be >= 2 but was %s", normalizationTargets.size());
this.normalizationTargets = Collections.unmodifiableList(normalizationTargets);
}
@Override
@ -60,19 +54,14 @@ public class MergeNormalizationPlan implements NormalizationPlan {
return PlanType.MERGE;
}
RegionInfo getFirstRegion() {
return firstRegion;
}
RegionInfo getSecondRegion() {
return secondRegion;
public List<NormalizationTarget> getNormalizationTargets() {
return normalizationTargets;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("firstRegion", firstRegion)
.append("secondRegion", secondRegion)
.append("normalizationTargets", normalizationTargets)
.toString();
}
@ -89,16 +78,31 @@ public class MergeNormalizationPlan implements NormalizationPlan {
MergeNormalizationPlan that = (MergeNormalizationPlan) o;
return new EqualsBuilder()
.append(firstRegion, that.firstRegion)
.append(secondRegion, that.secondRegion)
.append(normalizationTargets, that.normalizationTargets)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(firstRegion)
.append(secondRegion)
.append(normalizationTargets)
.toHashCode();
}
/**
* A helper for constructing instances of {@link MergeNormalizationPlan}.
*/
static class Builder {
private final List<NormalizationTarget> normalizationTargets = new LinkedList<>();
public Builder addTarget(final RegionInfo regionInfo, final long regionSizeMb) {
normalizationTargets.add(new NormalizationTarget(regionInfo, regionSizeMb));
return this;
}
public MergeNormalizationPlan build() {
return new MergeNormalizationPlan(normalizationTargets);
}
}
}

View File

@ -1,5 +1,4 @@
/**
*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -18,12 +17,12 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
import java.io.IOException;
/**
* Interface for normalization plan.
* A {@link NormalizationPlan} describes some modification to region split points as identified
* by an instance of {@link RegionNormalizer}. It is a POJO describing what action needs taken
* and the regions it targets.
*/
@InterfaceAudience.Private
public interface NormalizationPlan {
@ -33,15 +32,6 @@ public interface NormalizationPlan {
NONE
}
/**
* Submits normalization plan on cluster (does actual splitting/merging work) and
* returns proc Id to caller.
* @param masterServices instance of {@link MasterServices}
* @return Proc Id for the submitted task
* @throws IOException If plan submission to Admin fails
*/
long submit(MasterServices masterServices) throws IOException;
/**
* @return the type of this plan
*/

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A POJO that caries details about a region selected for normalization through the pipeline.
*/
@InterfaceAudience.Private
class NormalizationTarget {
private final RegionInfo regionInfo;
private final long regionSizeMb;
NormalizationTarget(final RegionInfo regionInfo, final long regionSizeMb) {
this.regionInfo = regionInfo;
this.regionSizeMb = regionSizeMb;
}
public RegionInfo getRegionInfo() {
return regionInfo;
}
public long getRegionSizeMb() {
return regionSizeMb;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NormalizationTarget that = (NormalizationTarget) o;
return new EqualsBuilder()
.append(regionSizeMb, that.regionSizeMb)
.append(regionInfo, that.regionInfo)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(regionInfo)
.append(regionSizeMb)
.toHashCode();
}
@Override public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("regionInfo", regionInfo)
.append("regionSizeMb", regionSizeMb)
.toString();
}
}

View File

@ -20,13 +20,9 @@ package org.apache.hadoop.hbase.master.normalizer;
import java.util.List;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/**
* Performs "normalization" of regions of a table, making sure that suboptimal
@ -39,8 +35,7 @@ import org.apache.yetus.audience.InterfaceStability;
* "split/merge storms".
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface RegionNormalizer extends Configurable {
interface RegionNormalizer extends Configurable {
/**
* Set the master service. Must be called before first call to
* {@link #computePlansForTable(TableName)}.
@ -55,20 +50,5 @@ public interface RegionNormalizer extends Configurable {
* @return A list of the normalization actions to perform, or an empty list
* if there's nothing to do.
*/
List<NormalizationPlan> computePlansForTable(TableName table)
throws HBaseIOException;
/**
* Notification for the case where plan couldn't be executed due to constraint violation, such as
* namespace quota
* @param hri the region which is involved in the plan
* @param type type of plan
*/
void planSkipped(RegionInfo hri, PlanType type);
/**
* @param type type of plan for which skipped count is to be returned
* @return the count of plans of specified type which were skipped
*/
long getSkippedCount(NormalizationPlan.PlanType type);
List<NormalizationPlan> computePlansForTable(TableName table);
}

View File

@ -1,5 +1,4 @@
/**
*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -18,34 +17,35 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
import java.io.IOException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.master.HMaster;
import java.io.IOException;
/**
* Chore that will call {@link org.apache.hadoop.hbase.master.HMaster#normalizeRegions()}
* when needed.
* Chore that will periodically call
* {@link HMaster#normalizeRegions(NormalizeTableFilterParams, boolean)}.
*/
@InterfaceAudience.Private
public class RegionNormalizerChore extends ScheduledChore {
class RegionNormalizerChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerChore.class);
private final HMaster master;
private final MasterServices master;
public RegionNormalizerChore(HMaster master) {
public RegionNormalizerChore(MasterServices master) {
super(master.getServerName() + "-RegionNormalizerChore", master,
master.getConfiguration().getInt("hbase.normalizer.period", 300000));
master.getConfiguration().getInt("hbase.normalizer.period", 300_000));
this.master = master;
}
@Override
protected void chore() {
try {
master.normalizeRegions();
master.normalizeRegions(new NormalizeTableFilterParams.Builder().build(), false);
} catch (IOException e) {
LOG.error("Failed to normalize regions.", e);
}

View File

@ -1,5 +1,4 @@
/**
*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -20,8 +19,12 @@ package org.apache.hadoop.hbase.master.normalizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Factory to create instance of {@link RegionNormalizer} as configured.
@ -32,13 +35,30 @@ public final class RegionNormalizerFactory {
private RegionNormalizerFactory() {
}
public static RegionNormalizerManager createNormalizerManager(
final Configuration conf,
final ZKWatcher zkWatcher,
final HMaster master // TODO: consolidate this down to MasterServices
) {
final RegionNormalizer regionNormalizer = getRegionNormalizer(conf);
regionNormalizer.setMasterServices(master);
final RegionNormalizerTracker tracker = new RegionNormalizerTracker(zkWatcher, master);
final RegionNormalizerChore chore =
master.isInMaintenanceMode() ? null : new RegionNormalizerChore(master);
final RegionNormalizerWorkQueue<TableName> workQueue =
master.isInMaintenanceMode() ? null : new RegionNormalizerWorkQueue<>();
final RegionNormalizerWorker worker = master.isInMaintenanceMode()
? null
: new RegionNormalizerWorker(conf, master, regionNormalizer, workQueue);
return new RegionNormalizerManager(tracker, chore, workQueue, worker);
}
/**
* Create a region normalizer from the given conf.
* @param conf configuration
* @return {@link RegionNormalizer} implementation
*/
public static RegionNormalizer getRegionNormalizer(Configuration conf) {
private static RegionNormalizer getRegionNormalizer(Configuration conf) {
// Create instance of Region Normalizer
Class<? extends RegionNormalizer> balancerKlass =
conf.getClass(HConstants.HBASE_MASTER_NORMALIZER_CLASS, SimpleRegionNormalizer.class,

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This class encapsulates the details of the {@link RegionNormalizer} subsystem.
*/
@InterfaceAudience.Private
public class RegionNormalizerManager {
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class);
private final RegionNormalizerTracker regionNormalizerTracker;
private final RegionNormalizerChore regionNormalizerChore;
private final RegionNormalizerWorkQueue<TableName> workQueue;
private final RegionNormalizerWorker worker;
private final ExecutorService pool;
private final Object startStopLock = new Object();
private boolean started = false;
private boolean stopped = false;
public RegionNormalizerManager(
@NonNull final RegionNormalizerTracker regionNormalizerTracker,
@Nullable final RegionNormalizerChore regionNormalizerChore,
@Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
@Nullable final RegionNormalizerWorker worker
) {
this.regionNormalizerTracker = regionNormalizerTracker;
this.regionNormalizerChore = regionNormalizerChore;
this.workQueue = workQueue;
this.worker = worker;
this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("normalizer-worker-%d")
.setUncaughtExceptionHandler(
(thread, throwable) ->
LOG.error("Uncaught exception, worker thread likely terminated.", throwable))
.build());
}
public void start() {
synchronized (startStopLock) {
if (started) {
return;
}
regionNormalizerTracker.start();
if (worker != null) {
// worker will be null when master is in maintenance mode.
pool.submit(worker);
}
started = true;
}
}
public void stop() {
synchronized (startStopLock) {
if (!started) {
throw new IllegalStateException("calling `stop` without first calling `start`.");
}
if (stopped) {
return;
}
pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
regionNormalizerTracker.stop();
stopped = true;
}
}
public ScheduledChore getRegionNormalizerChore() {
return regionNormalizerChore;
}
/**
* Return {@code true} if region normalizer is on, {@code false} otherwise
*/
public boolean isNormalizerOn() {
return regionNormalizerTracker.isNormalizerOn();
}
/**
* Set region normalizer on/off
* @param normalizerOn whether normalizer should be on or off
*/
public void setNormalizerOn(boolean normalizerOn) {
try {
regionNormalizerTracker.setNormalizerOn(normalizerOn);
} catch (KeeperException e) {
LOG.warn("Error flipping normalizer switch", e);
}
}
/**
* Call-back for the case where plan couldn't be executed due to constraint violation,
* such as namespace quota.
* @param type type of plan that was skipped.
*/
public void planSkipped(NormalizationPlan.PlanType type) {
// TODO: this appears to be used only for testing.
if (worker != null) {
worker.planSkipped(type);
}
}
/**
* Retrieve a count of the number of times plans of type {@code type} were submitted but skipped.
* @param type type of plan for which skipped count is to be returned
*/
public long getSkippedCount(NormalizationPlan.PlanType type) {
// TODO: this appears to be used only for testing.
return worker == null ? 0 : worker.getSkippedCount(type);
}
/**
* Return the number of times a {@link SplitNormalizationPlan} has been submitted.
*/
public long getSplitPlanCount() {
return worker == null ? 0 : worker.getSplitPlanCount();
}
/**
* Return the number of times a {@link MergeNormalizationPlan} has been submitted.
*/
public long getMergePlanCount() {
return worker == null ? 0 : worker.getMergePlanCount();
}
/**
* Submit tables for normalization.
* @param tables a list of tables to submit.
* @param isHighPriority {@code true} when these requested tables should skip to the front of
* the queue.
* @return {@code true} when work was queued, {@code false} otherwise.
*/
public boolean normalizeRegions(List<TableName> tables, boolean isHighPriority) {
if (workQueue == null) {
return false;
}
if (isHighPriority) {
workQueue.putAllFirst(tables);
} else {
workQueue.putAll(tables);
}
return true;
}
}

View File

@ -0,0 +1,244 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A specialized collection that holds pending work for the {@link RegionNormalizerWorker}. It is
* an ordered collection class that has the following properties:
* <ul>
* <li>Guarantees uniqueness of elements, as a {@link Set}.</li>
* <li>Consumers retrieve objects from the head, as a {@link Queue}, via {@link #take()}.</li>
* <li>Work is retrieved on a FIFO policy.</li>
* <li>Work retrieval blocks the calling thread until new work is available, as a
* {@link BlockingQueue}.</li>
* <li>Allows a producer to insert an item at the head of the queue, if desired.</li>
* </ul>
* Assumes low-frequency and low-parallelism concurrent access, so protects state using a
* simplistic synchronization strategy.
*/
@InterfaceAudience.Private
class RegionNormalizerWorkQueue<E> {
/** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */
private LinkedHashSet<E> delegate;
// the locking structure used here follows the example found in LinkedBlockingQueue. The
// difference is that our locks guard access to `delegate` rather than the head node.
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock;
/** Wait queue for waiting takes */
private final Condition notEmpty;
/** Lock held by put, offer, etc */
private final ReentrantLock putLock;
RegionNormalizerWorkQueue() {
delegate = new LinkedHashSet<>();
takeLock = new ReentrantLock();
notEmpty = takeLock.newCondition();
putLock = new ReentrantLock();
}
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* Locks to prevent both puts and takes.
*/
private void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* Unlocks to allow both puts and takes.
*/
private void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
/**
* Inserts the specified element at the tail of the queue, if it's not already present.
*
* @param e the element to add
*/
public void put(E e) {
if (e == null) {
throw new NullPointerException();
}
putLock.lock();
try {
delegate.add(e);
} finally {
putLock.unlock();
}
if (!delegate.isEmpty()) {
signalNotEmpty();
}
}
/**
* Inserts the specified element at the head of the queue.
*
* @param e the element to add
*/
public void putFirst(E e) {
if (e == null) {
throw new NullPointerException();
}
putAllFirst(Collections.singleton(e));
}
/**
* Inserts the specified elements at the tail of the queue. Any elements already present in
* the queue are ignored.
*
* @param c the elements to add
*/
public void putAll(Collection<? extends E> c) {
if (c == null) {
throw new NullPointerException();
}
putLock.lock();
try {
delegate.addAll(c);
} finally {
putLock.unlock();
}
if (!delegate.isEmpty()) {
signalNotEmpty();
}
}
/**
* Inserts the specified elements at the head of the queue.
*
* @param c the elements to add
*/
public void putAllFirst(Collection<? extends E> c) {
if (c == null) {
throw new NullPointerException();
}
fullyLock();
try {
final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() + delegate.size());
copy.addAll(c);
copy.addAll(delegate);
delegate = copy;
} finally {
fullyUnlock();
}
if (!delegate.isEmpty()) {
signalNotEmpty();
}
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
public E take() throws InterruptedException {
E x;
takeLock.lockInterruptibly();
try {
while (delegate.isEmpty()) {
notEmpty.await();
}
final Iterator<E> iter = delegate.iterator();
x = iter.next();
iter.remove();
if (!delegate.isEmpty()) {
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
return x;
}
/**
* Atomically removes all of the elements from this queue.
* The queue will be empty after this call returns.
*/
public void clear() {
putLock.lock();
try {
delegate.clear();
} finally {
putLock.unlock();
}
}
/**
* Returns the number of elements in this queue.
*
* @return the number of elements in this queue
*/
public int size() {
takeLock.lock();
try {
return delegate.size();
} finally {
takeLock.unlock();
}
}
@Override
public String toString() {
takeLock.lock();
try {
return delegate.toString();
} finally {
takeLock.unlock();
}
}
}

View File

@ -0,0 +1,253 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
/**
* Consumes normalization request targets ({@link TableName}s) off the
* {@link RegionNormalizerWorkQueue}, dispatches them to the {@link RegionNormalizer},
* and executes the resulting {@link NormalizationPlan}s.
*/
@InterfaceAudience.Private
class RegionNormalizerWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class);
static final String RATE_LIMIT_BYTES_PER_SEC_KEY =
"hbase.normalizer.throughput.max_bytes_per_sec";
private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec
private final MasterServices masterServices;
private final RegionNormalizer regionNormalizer;
private final RegionNormalizerWorkQueue<TableName> workQueue;
private final RateLimiter rateLimiter;
private final long[] skippedCount;
private long splitPlanCount;
private long mergePlanCount;
RegionNormalizerWorker(
final Configuration configuration,
final MasterServices masterServices,
final RegionNormalizer regionNormalizer,
final RegionNormalizerWorkQueue<TableName> workQueue
) {
this.masterServices = masterServices;
this.regionNormalizer = regionNormalizer;
this.workQueue = workQueue;
this.skippedCount = new long[NormalizationPlan.PlanType.values().length];
this.splitPlanCount = 0;
this.mergePlanCount = 0;
this.rateLimiter = loadRateLimiter(configuration);
}
private static RateLimiter loadRateLimiter(final Configuration configuration) {
long rateLimitBytes =
configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES);
long rateLimitMbs = rateLimitBytes / 1_000_000L;
if (rateLimitMbs <= 0) {
LOG.warn("Configured value {}={} is <= 1MB. Falling back to default.",
RATE_LIMIT_BYTES_PER_SEC_KEY, rateLimitBytes);
rateLimitBytes = RATE_UNLIMITED_BYTES;
rateLimitMbs = RATE_UNLIMITED_BYTES / 1_000_000L;
}
LOG.info("Normalizer rate limit set to {}",
rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec");
return RateLimiter.create(rateLimitMbs);
}
/**
* @see RegionNormalizerManager#planSkipped(NormalizationPlan.PlanType)
*/
void planSkipped(NormalizationPlan.PlanType type) {
synchronized (skippedCount) {
// updates come here via procedure threads, so synchronize access to this counter.
skippedCount[type.ordinal()]++;
}
}
/**
* @see RegionNormalizerManager#getSkippedCount(NormalizationPlan.PlanType)
*/
long getSkippedCount(NormalizationPlan.PlanType type) {
return skippedCount[type.ordinal()];
}
/**
* @see RegionNormalizerManager#getSplitPlanCount()
*/
long getSplitPlanCount() {
return splitPlanCount;
}
/**
* @see RegionNormalizerManager#getMergePlanCount()
*/
long getMergePlanCount() {
return mergePlanCount;
}
@Override
public void run() {
while (true) {
if (Thread.interrupted()) {
LOG.debug("interrupt detected. terminating.");
break;
}
final TableName tableName;
try {
tableName = workQueue.take();
} catch (InterruptedException e) {
LOG.debug("interrupt detected. terminating.");
break;
}
final List<NormalizationPlan> plans = calculatePlans(tableName);
submitPlans(plans);
}
}
private List<NormalizationPlan> calculatePlans(final TableName tableName) {
if (masterServices.skipRegionManagementAction("region normalizer")) {
return Collections.emptyList();
}
try {
final TableDescriptor tblDesc = masterServices.getTableDescriptors().get(tableName);
if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
LOG.debug("Skipping table {} because normalization is disabled in its table properties.",
tableName);
return Collections.emptyList();
}
} catch (IOException e) {
LOG.debug("Skipping table {} because unable to access its table descriptor.", tableName, e);
return Collections.emptyList();
}
final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tableName);
if (CollectionUtils.isEmpty(plans)) {
LOG.debug("No normalization required for table {}.", tableName);
return Collections.emptyList();
}
return plans;
}
private void submitPlans(final List<NormalizationPlan> plans) {
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
// task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.
for (NormalizationPlan plan : plans) {
switch (plan.getType()) {
case MERGE: {
submitMergePlan((MergeNormalizationPlan) plan);
break;
}
case SPLIT: {
submitSplitPlan((SplitNormalizationPlan) plan);
break;
}
case NONE:
LOG.debug("Nothing to do for {} with PlanType=NONE. Ignoring.", plan);
planSkipped(plan.getType());
break;
default:
LOG.warn("Plan {} is of an unrecognized PlanType. Ignoring.", plan);
planSkipped(plan.getType());
break;
}
}
}
/**
* Interacts with {@link MasterServices} in order to execute a plan.
*/
private void submitMergePlan(final MergeNormalizationPlan plan) {
final int totalSizeMb;
try {
final long totalSizeMbLong = plan.getNormalizationTargets()
.stream()
.mapToLong(NormalizationTarget::getRegionSizeMb)
.reduce(0, Math::addExact);
totalSizeMb = Math.toIntExact(totalSizeMbLong);
} catch (ArithmeticException e) {
LOG.debug("Sum of merge request size overflows rate limiter data type. {}", plan);
planSkipped(plan.getType());
return;
}
final RegionInfo[] infos = plan.getNormalizationTargets()
.stream()
.map(NormalizationTarget::getRegionInfo)
.toArray(RegionInfo[]::new);
final long pid;
try {
pid = masterServices.mergeRegions(
infos, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
} catch (IOException e) {
LOG.info("failed to submit plan {}.", plan, e);
planSkipped(plan.getType());
return;
}
mergePlanCount++;
LOG.info("Submitted {} resulting in pid {}", plan, pid);
final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
LOG.debug("Rate limiting delayed the worker by {}", Duration.ofSeconds(rateLimitedSecs));
}
/**
* Interacts with {@link MasterServices} in order to execute a plan.
*/
private void submitSplitPlan(final SplitNormalizationPlan plan) {
final int totalSizeMb;
try {
totalSizeMb = Math.toIntExact(plan.getSplitTarget().getRegionSizeMb());
} catch (ArithmeticException e) {
LOG.debug("Split request size overflows rate limiter data type. {}", plan);
planSkipped(plan.getType());
return;
}
final RegionInfo info = plan.getSplitTarget().getRegionInfo();
final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
LOG.debug("Rate limiting delayed this operation by {}", Duration.ofSeconds(rateLimitedSecs));
final long pid;
try {
pid = masterServices.splitRegion(
info, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
} catch (IOException e) {
LOG.info("failed to submit plan {}.", plan, e);
planSkipped(plan.getType());
return;
}
splitPlanCount++;
LOG.info("Submitted {} resulting in pid {}", plan, pid);
}
}

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -54,29 +53,9 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
* <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
* are kindly requested to merge.</li>
* </ol>
* <p>
* The following parameters are configurable:
* <ol>
* <li>Whether to split a region as part of normalization. Configuration:
* {@value #SPLIT_ENABLED_KEY}, default: {@value #DEFAULT_SPLIT_ENABLED}.</li>
* <li>Whether to merge a region as part of normalization. Configuration:
* {@value #MERGE_ENABLED_KEY}, default: {@value #DEFAULT_MERGE_ENABLED}.</li>
* <li>The minimum number of regions in a table to consider it for merge normalization.
* Configuration: {@value #MIN_REGION_COUNT_KEY}, default:
* {@value #DEFAULT_MIN_REGION_COUNT}.</li>
* <li>The minimum age for a region to be considered for a merge, in days. Configuration:
* {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
* {@value #DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
* <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
* {@value #MERGE_MIN_REGION_SIZE_MB_KEY}, default:
* {@value #DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
* </ol>
* <p>
* To see detailed logging of the application of these configuration values, set the log level for
* this class to `TRACE`.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class SimpleRegionNormalizer implements RegionNormalizer {
class SimpleRegionNormalizer implements RegionNormalizer {
private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
@ -92,7 +71,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
private final long[] skippedCount;
private Configuration conf;
private MasterServices masterServices;
private boolean splitEnabled;
@ -102,7 +80,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
private int mergeMinRegionSizeMb;
public SimpleRegionNormalizer() {
skippedCount = new long[NormalizationPlan.PlanType.values().length];
splitEnabled = DEFAULT_SPLIT_ENABLED;
mergeEnabled = DEFAULT_MERGE_ENABLED;
minRegionCount = DEFAULT_MIN_REGION_COUNT;
@ -203,16 +180,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
this.masterServices = masterServices;
}
@Override
public void planSkipped(final RegionInfo hri, final PlanType type) {
skippedCount[type.ordinal()]++;
}
@Override
public long getSkippedCount(NormalizationPlan.PlanType type) {
return skippedCount[type.ordinal()];
}
@Override
public List<NormalizationPlan> computePlansForTable(final TableName table) {
if (table == null) {
@ -371,7 +338,11 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
final long nextSizeMb = getRegionSizeMB(next);
// always merge away empty regions when they present themselves.
if (currentSizeMb == 0 || nextSizeMb == 0 || currentSizeMb + nextSizeMb < avgRegionSizeMb) {
plans.add(new MergeNormalizationPlan(current, next));
final MergeNormalizationPlan plan = new MergeNormalizationPlan.Builder()
.addTarget(current, currentSizeMb)
.addTarget(next, nextSizeMb)
.build();
plans.add(plan);
candidateIdx++;
}
}
@ -408,11 +379,11 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
if (skipForSplit(ctx.getRegionStates().getRegionState(hri), hri)) {
continue;
}
final long regionSize = getRegionSizeMB(hri);
if (regionSize > 2 * avgRegionSize) {
final long regionSizeMb = getRegionSizeMB(hri);
if (regionSizeMb > 2 * avgRegionSize) {
LOG.info("Table {}, large region {} has size {}, more than twice avg size {}, splitting",
ctx.getTableName(), hri.getRegionNameAsString(), regionSize, avgRegionSize);
plans.add(new SplitNormalizationPlan(hri));
ctx.getTableName(), hri.getRegionNameAsString(), regionSizeMb, avgRegionSize);
plans.add(new SplitNormalizationPlan(hri, regionSizeMb));
}
}
return plans;

View File

@ -18,32 +18,23 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
import java.io.IOException;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Normalization plan to split region.
* Normalization plan to split a region.
*/
@InterfaceAudience.Private
public class SplitNormalizationPlan implements NormalizationPlan {
final class SplitNormalizationPlan implements NormalizationPlan {
private final RegionInfo regionInfo;
private final NormalizationTarget splitTarget;
public SplitNormalizationPlan(RegionInfo regionInfo) {
this.regionInfo = regionInfo;
}
@Override
public long submit(MasterServices masterServices) throws IOException {
return masterServices.splitRegion(regionInfo, null, HConstants.NO_NONCE,
HConstants.NO_NONCE);
SplitNormalizationPlan(final RegionInfo splitTarget, final long splitTargetSizeMb) {
this.splitTarget = new NormalizationTarget(splitTarget, splitTargetSizeMb);
}
@Override
@ -51,14 +42,14 @@ public class SplitNormalizationPlan implements NormalizationPlan {
return PlanType.SPLIT;
}
public RegionInfo getRegionInfo() {
return regionInfo;
public NormalizationTarget getSplitTarget() {
return splitTarget;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("regionInfo", regionInfo)
.append("splitTarget", splitTarget)
.toString();
}
@ -75,13 +66,13 @@ public class SplitNormalizationPlan implements NormalizationPlan {
SplitNormalizationPlan that = (SplitNormalizationPlan) o;
return new EqualsBuilder()
.append(regionInfo, that.regionInfo)
.append(splitTarget, that.splitTarget)
.isEquals();
}
@Override public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(regionInfo)
.append(splitTarget)
.toHashCode();
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* The Region Normalizer subsystem is responsible for coaxing all the regions in a table toward
* a "normal" size, according to their storefile size. It does this by splitting regions that
* are significantly larger than the norm, and merging regions that are significantly smaller than
* the norm.
* </p>
* The public interface to the Region Normalizer subsystem is limited to the following classes:
* <ul>
* <li>
* The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory} provides an
* entry point for creating an instance of the
* {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager}.
* </li>
* <li>
* The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager} encapsulates
* the whole Region Normalizer subsystem. You'll find one of these hanging off of the
* {@link org.apache.hadoop.hbase.master.HMaster}, which uses it to delegate API calls. There
* is usually only a single instance of this class.
* </li>
* <li>
* Various configuration points that share the common prefix of {@code hbase.normalizer}.
* <ul>
* <li>Whether to split a region as part of normalization. Configuration:
* {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#SPLIT_ENABLED_KEY},
* default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_SPLIT_ENABLED}.
* </li>
* <li>Whether to merge a region as part of normalization. Configuration:
* {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_ENABLED_KEY},
* default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_ENABLED}.
* </li>
* <li>The minimum number of regions in a table to consider it for merge normalization.
* Configuration: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MIN_REGION_COUNT_KEY},
* default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MIN_REGION_COUNT}.
* </li>
* <li>The minimum age for a region to be considered for a merge, in days. Configuration:
* {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_MIN_REGION_AGE_DAYS_KEY},
* default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.
* </li>
* <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
* {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_MIN_REGION_SIZE_MB_KEY},
* default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_MIN_REGION_SIZE_MB}.
* </li>
* <li>The limit on total throughput of the Region Normalizer's actions, in whole MBs. Configuration:
* {@value org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker#RATE_LIMIT_BYTES_PER_SEC_KEY},
* default: unlimited.
* </li>
* </ul>
* <p>
* To see detailed logging of the application of these configuration values, set the log
* level for this package to `TRACE`.
* </p>
* </li>
* </ul>
* The Region Normalizer subsystem is composed of a handful of related classes:
* <ul>
* <li>
* The {@link org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker} provides a system by
* which the Normalizer can be disabled at runtime. It currently does this by managing a znode,
* but this is an implementation detail.
* </li>
* <li>
* The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorkQueue} is a
* {@link java.util.Set}-like {@link java.util.Queue} that permits a single copy of a given
* work item to exist in the queue at one time. It also provides a facility for a producer to
* add an item to the front of the line. Consumers are blocked waiting for new work.
* </li>
* <li>
* The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore} wakes up
* periodically and schedules new normalization work, adding targets to the queue.
* </li>
* <li>
* The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker} runs in a
* daemon thread, grabbing work off the queue as is it becomes available.
* </li>
* <li>
* The {@link org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer} implements the
* logic for calculating target region sizes and emitting a list of corresponding
* {@link org.apache.hadoop.hbase.master.normalizer.NormalizationPlan} objects.
* </li>
* </ul>
*/
package org.apache.hadoop.hbase.master.normalizer;

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.executor.ExecutorService;
@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
public class MockNoopMasterServices implements MasterServices {
@ -109,11 +108,6 @@ public class MockNoopMasterServices implements MasterServices {
return null;
}
@Override
public RegionNormalizer getRegionNormalizer() {
return null;
}
@Override
public CatalogJanitor getCatalogJanitor() {
return null;
@ -139,6 +133,10 @@ public class MockNoopMasterServices implements MasterServices {
return null;
}
@Override public RegionNormalizerManager getRegionNormalizerManager() {
return null;
}
@Override
public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
return null;
@ -341,6 +339,10 @@ public class MockNoopMasterServices implements MasterServices {
return false;
}
@Override public boolean skipRegionManagementAction(String action) {
return false;
}
@Override
public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
return 0;
@ -507,4 +509,9 @@ public class MockNoopMasterServices implements MasterServices {
public boolean isBalancerOn() {
return false;
}
@Override
public boolean normalizeRegions(NormalizeTableFilterParams ntfp, boolean isHighPriority) {
return false;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.master;
import java.lang.reflect.Field;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ScheduledChore;
@ -30,7 +29,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
@ -66,7 +64,7 @@ public class TestMasterChoreScheduled {
}
@Test
public void testDefaultScheduledChores() throws Exception {
public void testDefaultScheduledChores() {
// test if logCleaner chore is scheduled by default in HMaster init
TestChoreField<LogCleaner> logCleanerTestChoreField = new TestChoreField<>();
LogCleaner logCleaner = logCleanerTestChoreField.getChoreObj("logCleaner");
@ -96,10 +94,10 @@ public class TestMasterChoreScheduled {
balancerChoreTestChoreField.testIfChoreScheduled(balancerChore);
// test if normalizerChore chore is scheduled by default in HMaster init
TestChoreField<RegionNormalizerChore> regionNormalizerChoreTestChoreField =
ScheduledChore regionNormalizerChore = hMaster.getRegionNormalizerManager()
.getRegionNormalizerChore();
TestChoreField<ScheduledChore> regionNormalizerChoreTestChoreField =
new TestChoreField<>();
RegionNormalizerChore regionNormalizerChore = regionNormalizerChoreTestChoreField
.getChoreObj("normalizerChore");
regionNormalizerChoreTestChoreField.testIfChoreScheduled(regionNormalizerChore);
// test if catalogJanitorChore chore is scheduled by default in HMaster init
@ -114,22 +112,27 @@ public class TestMasterChoreScheduled {
hbckChoreTestChoreField.testIfChoreScheduled(hbckChore);
}
/**
* Reflect into the {@link HMaster} instance and find by field name a specified instance
* of {@link ScheduledChore}.
*/
private static class TestChoreField<E extends ScheduledChore> {
private E getChoreObj(String fieldName) throws NoSuchFieldException,
IllegalAccessException {
@SuppressWarnings("unchecked")
private E getChoreObj(String fieldName) {
try {
Field masterField = HMaster.class.getDeclaredField(fieldName);
masterField.setAccessible(true);
E choreFieldVal = (E) masterField.get(hMaster);
return choreFieldVal;
return (E) masterField.get(hMaster);
} catch (Exception e) {
throw new AssertionError(
"Unable to retrieve field '" + fieldName + "' from HMaster instance.", e);
}
}
private void testIfChoreScheduled(E choreObj) {
Assert.assertNotNull(choreObj);
Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(choreObj));
}
}
}

View File

@ -72,8 +72,10 @@ public class TestMasterMetricsWrapper {
public void testInfo() {
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
MetricsMasterWrapperImpl info = new MetricsMasterWrapperImpl(master);
assertEquals(master.getSplitPlanCount(), info.getSplitPlanCount(), 0);
assertEquals(master.getMergePlanCount(), info.getMergePlanCount(), 0);
assertEquals(
master.getRegionNormalizerManager().getSplitPlanCount(), info.getSplitPlanCount(), 0);
assertEquals(
master.getRegionNormalizerManager().getMergePlanCount(), info.getMergePlanCount(), 0);
assertEquals(master.getAverageLoad(), info.getAverageLoad(), 0);
assertEquals(master.getClusterId(), info.getClusterId());
assertEquals(master.getMasterActiveTime(), info.getActiveTime());

View File

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
/**
* Tests that {@link RegionNormalizerWorkQueue} implements the contract described in its docstring.
*/
@Category({ MasterTests.class, SmallTests.class})
public class TestRegionNormalizerWorkQueue {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionNormalizerWorkQueue.class);
@Rule
public TestName testName = new TestName();
@Test
public void testElementUniquenessAndFIFO() throws Exception {
final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
final List<Integer> content = new LinkedList<>();
IntStream.of(4, 3, 2, 1, 4, 3, 2, 1)
.boxed()
.forEach(queue::put);
assertEquals(4, queue.size());
while (queue.size() > 0) {
content.add(queue.take());
}
assertThat(content, contains(4, 3, 2, 1));
queue.clear();
queue.putAll(Arrays.asList(4, 3, 2, 1));
queue.putAll(Arrays.asList(4, 5));
assertEquals(5, queue.size());
content.clear();
while (queue.size() > 0) {
content.add(queue.take());
}
assertThat(content, contains(4, 3, 2, 1, 5));
}
@Test
public void testPriorityAndFIFO() throws Exception {
final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
final List<Integer> content = new LinkedList<>();
queue.putAll(Arrays.asList(4, 3, 2, 1));
assertEquals(4, queue.size());
queue.putFirst(0);
assertEquals(5, queue.size());
drainTo(queue, content);
assertThat("putFirst items should jump the queue, preserving existing order",
content, contains(0, 4, 3, 2, 1));
queue.clear();
content.clear();
queue.putAll(Arrays.asList(4, 3, 2, 1));
queue.putFirst(1);
assertEquals(4, queue.size());
drainTo(queue, content);
assertThat("existing items re-added with putFirst should jump the queue",
content, contains(1, 4, 3, 2));
queue.clear();
content.clear();
queue.putAll(Arrays.asList(4, 3, 2, 1));
queue.putAllFirst(Arrays.asList(2, 3));
assertEquals(4, queue.size());
drainTo(queue, content);
assertThat(
"existing items re-added with putAllFirst jump the queue AND honor changes in priority",
content, contains(2, 3, 4, 1));
}
private enum Action {
PUT,
PUT_FIRST,
PUT_ALL,
PUT_ALL_FIRST,
}
/**
* Test that the uniqueness constraint is honored in the face of concurrent modification.
*/
@Test
public void testConcurrentPut() throws Exception {
final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
final int maxValue = 100;
final Runnable producer = () -> {
final Random rand = ThreadLocalRandom.current();
for (int i = 0; i < 1_000; i++) {
final Action action = Action.values()[rand.nextInt(Action.values().length)];
switch (action) {
case PUT: {
final int val = rand.nextInt(maxValue);
queue.put(val);
break;
}
case PUT_FIRST: {
final int val = rand.nextInt(maxValue);
queue.putFirst(val);
break;
}
case PUT_ALL: {
final List<Integer> vals = rand.ints(5, 0, maxValue)
.boxed()
.collect(Collectors.toList());
queue.putAll(vals);
break;
}
case PUT_ALL_FIRST: {
final List<Integer> vals = rand.ints(5, 0, maxValue)
.boxed()
.collect(Collectors.toList());
queue.putAllFirst(vals);
break;
}
default:
fail("Unrecognized action " + action);
}
}
};
final int numThreads = 5;
final CompletableFuture<?>[] futures = IntStream.range(0, numThreads)
.mapToObj(val -> CompletableFuture.runAsync(producer))
.toArray(CompletableFuture<?>[]::new);
CompletableFuture.allOf(futures).join();
final List<Integer> content = new ArrayList<>(queue.size());
drainTo(queue, content);
assertThat("at most `maxValue` items should be present.",
content.size(), lessThanOrEqualTo(maxValue));
assertEquals("all items should be unique.", content.size(), new HashSet<>(content).size());
}
/**
* Test that calls to {@link RegionNormalizerWorkQueue#take()} block the requesting thread. The
* producing thread places new entries onto the queue following a known schedule. The consuming
* thread collects a time measurement between calls to {@code take}. Finally, the test makes
* coarse-grained assertions of the consumer's observations based on the producer's schedule.
*/
@Test
public void testTake() throws Exception {
final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
final ConcurrentLinkedQueue<Long> takeTimes = new ConcurrentLinkedQueue<>();
final AtomicBoolean finished = new AtomicBoolean(false);
final Runnable consumer = () -> {
try {
while (!finished.get()) {
queue.take();
takeTimes.add(System.nanoTime());
}
} catch (InterruptedException e) {
fail("interrupted.");
}
};
CompletableFuture<Void> worker = CompletableFuture.runAsync(consumer);
final long testStart = System.nanoTime();
for (int i = 0; i < 5; i++) {
Thread.sleep(10);
queue.put(i);
}
// set finished = true and pipe one more value in case the thread needs an extra pass through
// the loop.
finished.set(true);
queue.put(1);
worker.get(1, TimeUnit.SECONDS);
final Iterator<Long> times = takeTimes.iterator();
assertTrue("should have timing information for at least 2 calls to take.",
takeTimes.size() >= 5);
for (int i = 0; i < 5; i++) {
assertThat(
"Observations collected in takeTimes should increase by roughly 10ms every interval",
times.next(), greaterThan(testStart + TimeUnit.MILLISECONDS.toNanos(i * 10)));
}
}
private static <E> void drainTo(final RegionNormalizerWorkQueue<E> queue, Collection<E> dest)
throws InterruptedException {
assertThat(queue.size(), greaterThan(0));
while (queue.size() > 0) {
dest.add(queue.take());
}
}
}

View File

@ -0,0 +1,252 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import static java.util.Collections.singletonList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.StringDescription;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A test over {@link RegionNormalizerWorker}. Being a background thread, the only points of
* interaction we have to this class are its input source ({@link RegionNormalizerWorkQueue} and
* its callbacks invoked against {@link RegionNormalizer} and {@link MasterServices}. The work
* queue is simple enough to use directly; for {@link MasterServices}, use a mock because, as of
* now, the worker only invokes 4 methods.
*/
@Category({ MasterTests.class, SmallTests.class})
public class TestRegionNormalizerWorker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionNormalizerWorker.class);
@Rule
public TestName testName = new TestName();
@Rule
public TableNameTestRule tableName = new TableNameTestRule();
@Rule
public MockitoRule mockitoRule = MockitoJUnit.rule();
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private MasterServices masterServices;
@Mock
private RegionNormalizer regionNormalizer;
private HBaseCommonTestingUtility testingUtility;
private RegionNormalizerWorkQueue<TableName> queue;
private ExecutorService workerPool;
private final AtomicReference<Throwable> workerThreadThrowable = new AtomicReference<>();
@Before
public void before() throws Exception {
MockitoAnnotations.initMocks(this);
when(masterServices.skipRegionManagementAction(any())).thenReturn(false);
testingUtility = new HBaseCommonTestingUtility();
queue = new RegionNormalizerWorkQueue<>();
workerThreadThrowable.set(null);
final String threadNameFmt =
TestRegionNormalizerWorker.class.getSimpleName() + "-" + testName.getMethodName() + "-%d";
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNameFmt)
.setDaemon(true)
.setUncaughtExceptionHandler((t, e) -> workerThreadThrowable.set(e))
.build();
workerPool = Executors.newSingleThreadExecutor(threadFactory);
}
@After
public void after() throws Exception {
workerPool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
assertTrue("timeout waiting for worker thread to terminate",
workerPool.awaitTermination(30, TimeUnit.SECONDS));
final Throwable workerThrowable = workerThreadThrowable.get();
assertThat("worker thread threw unexpected exception", workerThrowable, nullValue());
}
@Test
public void testMergeCounter() throws Exception {
final TableName tn = tableName.getTableName();
final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
.setNormalizationEnabled(true)
.build();
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
.thenReturn(1L);
when(regionNormalizer.computePlansForTable(tn))
.thenReturn(singletonList(new MergeNormalizationPlan.Builder()
.addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10)
.addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20)
.build()));
final RegionNormalizerWorker worker = new RegionNormalizerWorker(
testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
final long beforeMergePlanCount = worker.getMergePlanCount();
workerPool.submit(worker);
queue.put(tn);
assertThatEventually("executing work should see plan count increase",
worker::getMergePlanCount, greaterThan(beforeMergePlanCount));
}
@Test
public void testSplitCounter() throws Exception {
final TableName tn = tableName.getTableName();
final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
.setNormalizationEnabled(true)
.build();
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
when(masterServices.splitRegion(any(), any(), anyLong(), anyLong()))
.thenReturn(1L);
when(regionNormalizer.computePlansForTable(tn))
.thenReturn(singletonList(
new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10)));
final RegionNormalizerWorker worker = new RegionNormalizerWorker(
testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
final long beforeSplitPlanCount = worker.getSplitPlanCount();
workerPool.submit(worker);
queue.put(tn);
assertThatEventually("executing work should see plan count increase",
worker::getSplitPlanCount, greaterThan(beforeSplitPlanCount));
}
/**
* Assert that a rate limit is honored, at least in a rough way. Maintainers should manually
* inspect the log messages emitted by the worker thread to confirm that expected behavior.
*/
@Test
public void testRateLimit() throws Exception {
final TableName tn = tableName.getTableName();
final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
.setNormalizationEnabled(true)
.build();
final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
when(masterServices.splitRegion(any(), any(), anyLong(), anyLong()))
.thenReturn(1L);
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
.thenReturn(1L);
when(regionNormalizer.computePlansForTable(tn))
.thenReturn(Arrays.asList(
new SplitNormalizationPlan(splitRegionInfo, 2),
new MergeNormalizationPlan.Builder()
.addTarget(mergeRegionInfo1, 1)
.addTarget(mergeRegionInfo2, 2)
.build(),
new SplitNormalizationPlan(splitRegionInfo, 1)));
final Configuration conf = testingUtility.getConfiguration();
conf.set("hbase.normalizer.throughput.max_bytes_per_sec", "1m");
final RegionNormalizerWorker worker = new RegionNormalizerWorker(
testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
workerPool.submit(worker);
final long startTime = System.nanoTime();
queue.put(tn);
assertThatEventually("executing work should see split plan count increase",
worker::getSplitPlanCount, comparesEqualTo(2L));
assertThatEventually("executing work should see merge plan count increase",
worker::getMergePlanCount, comparesEqualTo(1L));
final long endTime = System.nanoTime();
assertThat("rate limited normalizer should have taken at least 5 seconds",
Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
}
/**
* Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier}
* until the matcher succeeds or the timeout period of 30 seconds is exhausted.
*/
private <T> void assertThatEventually(
final String reason,
final Supplier<? extends T> actualSupplier,
final Matcher<? super T> matcher
) throws Exception {
testingUtility.waitFor(TimeUnit.SECONDS.toMillis(30),
new Waiter.ExplainingPredicate<Exception>() {
private T lastValue = null;
@Override
public String explainFailure() {
final Description description = new StringDescription()
.appendText(reason)
.appendText("\nExpected: ")
.appendDescriptionOf(matcher)
.appendText("\n but: ");
matcher.describeMismatch(lastValue, description);
return description.toString();
}
@Override public boolean evaluate() {
lastValue = actualSupplier.get();
return matcher.matches(lastValue);
}
});
}
}

View File

@ -175,8 +175,12 @@ public class TestSimpleRegionNormalizer {
createRegionSizesMap(regionInfos, 15, 5, 5, 15, 16);
setupMocksForNormalizer(regionSizes, regionInfos);
assertThat(normalizer.computePlansForTable(tableName), contains(
new MergeNormalizationPlan(regionInfos.get(1), regionInfos.get(2))));
assertThat(
normalizer.computePlansForTable(tableName),
contains(new MergeNormalizationPlan.Builder()
.addTarget(regionInfos.get(1), 5)
.addTarget(regionInfos.get(2), 5)
.build()));
}
// Test for situation illustrated in HBASE-14867
@ -188,9 +192,12 @@ public class TestSimpleRegionNormalizer {
createRegionSizesMap(regionInfos, 1, 10000, 10000, 10000, 2700, 2700);
setupMocksForNormalizer(regionSizes, regionInfos);
assertThat(normalizer.computePlansForTable(tableName), contains(
new MergeNormalizationPlan(regionInfos.get(4), regionInfos.get(5))
));
assertThat(
normalizer.computePlansForTable(tableName),
contains(new MergeNormalizationPlan.Builder()
.addTarget(regionInfos.get(4), 2700)
.addTarget(regionInfos.get(5), 2700)
.build()));
}
@Test
@ -214,7 +221,7 @@ public class TestSimpleRegionNormalizer {
setupMocksForNormalizer(regionSizes, regionInfos);
assertThat(normalizer.computePlansForTable(tableName), contains(
new SplitNormalizationPlan(regionInfos.get(3))));
new SplitNormalizationPlan(regionInfos.get(3), 30)));
}
@Test
@ -229,18 +236,26 @@ public class TestSimpleRegionNormalizer {
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
.thenReturn(20L);
assertThat(normalizer.computePlansForTable(tableName), contains(
new SplitNormalizationPlan(regionInfos.get(2)),
new SplitNormalizationPlan(regionInfos.get(3)),
new SplitNormalizationPlan(regionInfos.get(4)),
new SplitNormalizationPlan(regionInfos.get(5))
new SplitNormalizationPlan(regionInfos.get(2), 60),
new SplitNormalizationPlan(regionInfos.get(3), 80),
new SplitNormalizationPlan(regionInfos.get(4), 100),
new SplitNormalizationPlan(regionInfos.get(5), 120)
));
// test when target region size is 200
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
.thenReturn(200L);
assertThat(normalizer.computePlansForTable(tableName), contains(
new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1)),
new MergeNormalizationPlan(regionInfos.get(2), regionInfos.get(3))));
assertThat(
normalizer.computePlansForTable(tableName),
contains(
new MergeNormalizationPlan.Builder()
.addTarget(regionInfos.get(0), 20)
.addTarget(regionInfos.get(1), 40)
.build(),
new MergeNormalizationPlan.Builder()
.addTarget(regionInfos.get(2), 60)
.addTarget(regionInfos.get(3), 80)
.build()));
}
@Test
@ -255,14 +270,18 @@ public class TestSimpleRegionNormalizer {
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
.thenReturn(8);
assertThat(normalizer.computePlansForTable(tableName), contains(
new SplitNormalizationPlan(regionInfos.get(2)),
new SplitNormalizationPlan(regionInfos.get(3))));
new SplitNormalizationPlan(regionInfos.get(2), 60),
new SplitNormalizationPlan(regionInfos.get(3), 80)));
// test when target region count is 3
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
.thenReturn(3);
assertThat(normalizer.computePlansForTable(tableName), contains(
new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1))));
assertThat(
normalizer.computePlansForTable(tableName),
contains(new MergeNormalizationPlan.Builder()
.addTarget(regionInfos.get(0), 20)
.addTarget(regionInfos.get(1), 40)
.build()));
}
@Test
@ -312,14 +331,17 @@ public class TestSimpleRegionNormalizer {
List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
assertThat(plans, contains(
new SplitNormalizationPlan(regionInfos.get(2)),
new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1))));
new SplitNormalizationPlan(regionInfos.get(2), 10),
new MergeNormalizationPlan.Builder()
.addTarget(regionInfos.get(0), 1)
.addTarget(regionInfos.get(1), 1)
.build()));
// have to call setupMocks again because we don't have dynamic config update on normalizer.
conf.setInt(MIN_REGION_COUNT_KEY, 4);
setupMocksForNormalizer(regionSizes, regionInfos);
assertThat(normalizer.computePlansForTable(tableName), contains(
new SplitNormalizationPlan(regionInfos.get(2))));
new SplitNormalizationPlan(regionInfos.get(2), 10)));
}
@Test
@ -356,8 +378,12 @@ public class TestSimpleRegionNormalizer {
assertFalse(normalizer.isSplitEnabled());
assertEquals(1, normalizer.getMergeMinRegionSizeMb());
assertThat(normalizer.computePlansForTable(tableName), contains(
new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1))));
assertThat(
normalizer.computePlansForTable(tableName),
contains(new MergeNormalizationPlan.Builder()
.addTarget(regionInfos.get(0), 1)
.addTarget(regionInfos.get(1), 2)
.build()));
conf.setInt(MERGE_MIN_REGION_SIZE_MB_KEY, 3);
setupMocksForNormalizer(regionSizes, regionInfos);
@ -378,9 +404,18 @@ public class TestSimpleRegionNormalizer {
assertFalse(normalizer.isSplitEnabled());
assertEquals(0, normalizer.getMergeMinRegionSizeMb());
assertThat(normalizer.computePlansForTable(tableName), contains(
new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1)),
new MergeNormalizationPlan(regionInfos.get(2), regionInfos.get(3)),
new MergeNormalizationPlan(regionInfos.get(5), regionInfos.get(6))));
new MergeNormalizationPlan.Builder()
.addTarget(regionInfos.get(0), 0)
.addTarget(regionInfos.get(1), 1)
.build(),
new MergeNormalizationPlan.Builder()
.addTarget(regionInfos.get(2), 10)
.addTarget(regionInfos.get(3), 0)
.build(),
new MergeNormalizationPlan.Builder()
.addTarget(regionInfos.get(5), 10)
.addTarget(regionInfos.get(6), 0)
.build()));
}
// This test is to make sure that normalizer is only going to merge adjacent regions.

View File

@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
@ -161,6 +160,7 @@ public class TestSimpleRegionNormalizerOnCluster {
tn2 + " should not have split.",
tn2RegionCount,
getRegionCount(tn2));
LOG.debug("waiting for t3 to settle...");
waitForTableRegionCount(tn3, tn3RegionCount);
} finally {
dropIfExists(tn1);
@ -187,7 +187,7 @@ public class TestSimpleRegionNormalizerOnCluster {
: TableName.valueOf(name.getMethodName());
final int currentRegionCount = createTableBegsSplit(tableName, true, false);
final long existingSkippedSplitCount = master.getRegionNormalizer()
final long existingSkippedSplitCount = master.getRegionNormalizerManager()
.getSkippedCount(PlanType.SPLIT);
assertFalse(admin.normalizerSwitch(true).get());
assertTrue(admin.normalize().get());
@ -332,7 +332,8 @@ public class TestSimpleRegionNormalizerOnCluster {
return "waiting to observe split attempt and skipped.";
}
@Override public boolean evaluate() {
final long skippedSplitCount = master.getRegionNormalizer().getSkippedCount(PlanType.SPLIT);
final long skippedSplitCount = master.getRegionNormalizerManager()
.getSkippedCount(PlanType.SPLIT);
return skippedSplitCount > existingSkippedSplitCount;
}
});