HBASE-24628 Region normalizer now respects a rate limit
Implement a rate limiter for the normalizer. Implemented in terms of
MB/sec of affacted region size (the same metrics used to make
normalization decisions). Uses Guava `RateLimiter` to perform the
resource accounting. `RateLimiter` works by blocking (uninterruptible
😖) the calling thread. Thus, the whole construction of the normalizer
subsystem needed refactoring. See the provided `package-info.java` for
an overview of this new structure.
Introduces a new configuration,
`hbase.normalizer.throughput.max_bytes_per_sec`, for specifying a
limit on the throughput of actions executed by the normalizer. Note
that while this configuration value is in bytes, the minimum honored
valued `1_000_000`. Supports values configured using the
human-readable suffixes honored by `Configuration.getLongBytes`
Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Huaxiang Sun <huaxiangsun@apache.com>
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
45e10d43fe
commit
2253800ee1
|
@ -31,7 +31,6 @@ import java.lang.reflect.InvocationTargetException;
|
|||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -51,7 +50,6 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.servlet.ServletException;
|
||||
|
@ -118,11 +116,8 @@ import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
|
|||
import org.apache.hadoop.hbase.master.cleaner.SnapshotCleanerChore;
|
||||
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
|
||||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
|
||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
|
||||
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
|
||||
|
@ -196,7 +191,6 @@ import org.apache.hadoop.hbase.security.UserProvider;
|
|||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.HBaseFsck;
|
||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||
import org.apache.hadoop.hbase.util.IdLock;
|
||||
|
@ -224,7 +218,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
|||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
||||
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
|
||||
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
|
||||
import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
|
||||
|
@ -325,9 +318,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// Tracker for split and merge state
|
||||
private SplitOrMergeTracker splitOrMergeTracker;
|
||||
|
||||
// Tracker for region normalizer state
|
||||
private RegionNormalizerTracker regionNormalizerTracker;
|
||||
|
||||
private ClusterSchemaService clusterSchemaService;
|
||||
|
||||
public static final String HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS =
|
||||
|
@ -389,11 +379,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
private final LockManager lockManager = new LockManager(this);
|
||||
|
||||
private LoadBalancer balancer;
|
||||
// a lock to prevent concurrent normalization actions.
|
||||
private final ReentrantLock normalizationInProgressLock = new ReentrantLock();
|
||||
private RegionNormalizer normalizer;
|
||||
private BalancerChore balancerChore;
|
||||
private RegionNormalizerChore normalizerChore;
|
||||
private RegionNormalizerManager regionNormalizerManager;
|
||||
private ClusterStatusChore clusterStatusChore;
|
||||
private ClusterStatusPublisher clusterStatusPublisherChore = null;
|
||||
private SnapshotCleanerChore snapshotCleanerChore = null;
|
||||
|
@ -448,9 +435,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// handle table states
|
||||
private TableStateManager tableStateManager;
|
||||
|
||||
private long splitPlanCount;
|
||||
private long mergePlanCount;
|
||||
|
||||
/* Handle favored nodes information */
|
||||
private FavoredNodesManager favoredNodesManager;
|
||||
|
||||
|
@ -775,27 +759,19 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it
|
||||
* should have already been initialized along with {@link ServerManager}.
|
||||
* </p>
|
||||
* <p>
|
||||
* Will be overridden in tests.
|
||||
* </p>
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected void initializeZKBasedSystemTrackers()
|
||||
throws IOException, InterruptedException, KeeperException, ReplicationException {
|
||||
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
|
||||
this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
|
||||
this.normalizer.setMasterServices(this);
|
||||
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
|
||||
this.loadBalancerTracker.start();
|
||||
|
||||
this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
|
||||
this.normalizer.setMasterServices(this);
|
||||
this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
|
||||
this.regionNormalizerTracker.start();
|
||||
this.regionNormalizerManager =
|
||||
RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this);
|
||||
this.regionNormalizerManager.start();
|
||||
|
||||
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
|
||||
this.splitOrMergeTracker.start();
|
||||
|
@ -875,10 +851,10 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
* </ol>
|
||||
* </li>
|
||||
* <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li>
|
||||
* <li>Start necessary service threads - balancer, catalog janior, executor services, and also the
|
||||
* procedure executor, etc. Notice that the balancer must be created first as assignment manager
|
||||
* may use it when assigning regions.</li>
|
||||
* <li>Wait for meta to be initialized if necesssary, start table state manager.</li>
|
||||
* <li>Start necessary service threads - balancer, catalog janitor, executor services, and also
|
||||
* the procedure executor, etc. Notice that the balancer must be created first as assignment
|
||||
* manager may use it when assigning regions.</li>
|
||||
* <li>Wait for meta to be initialized if necessary, start table state manager.</li>
|
||||
* <li>Wait for enough region servers to check-in</li>
|
||||
* <li>Let assignment manager load data from meta and construct region states</li>
|
||||
* <li>Start all other things such as chore services, etc</li>
|
||||
|
@ -1091,8 +1067,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
getChoreService().scheduleChore(clusterStatusChore);
|
||||
this.balancerChore = new BalancerChore(this);
|
||||
getChoreService().scheduleChore(balancerChore);
|
||||
this.normalizerChore = new RegionNormalizerChore(this);
|
||||
getChoreService().scheduleChore(normalizerChore);
|
||||
getChoreService().scheduleChore(regionNormalizerManager.getRegionNormalizerChore());
|
||||
this.catalogJanitorChore = new CatalogJanitor(this);
|
||||
getChoreService().scheduleChore(catalogJanitorChore);
|
||||
this.hbckChore = new HbckChore(this);
|
||||
|
@ -1508,6 +1483,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// example.
|
||||
stopProcedureExecutor();
|
||||
|
||||
if (regionNormalizerManager != null) {
|
||||
regionNormalizerManager.stop();
|
||||
}
|
||||
if (this.quotaManager != null) {
|
||||
this.quotaManager.stop();
|
||||
}
|
||||
|
@ -1626,7 +1604,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
choreService.cancelChore(this.expiredMobFileCleanerChore);
|
||||
choreService.cancelChore(this.mobCompactChore);
|
||||
choreService.cancelChore(this.balancerChore);
|
||||
choreService.cancelChore(this.normalizerChore);
|
||||
choreService.cancelChore(getRegionNormalizerManager().getRegionNormalizerChore());
|
||||
choreService.cancelChore(this.clusterStatusChore);
|
||||
choreService.cancelChore(this.catalogJanitorChore);
|
||||
choreService.cancelChore(this.clusterStatusPublisherChore);
|
||||
|
@ -1726,7 +1704,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
* @param action the name of the action under consideration, for logging.
|
||||
* @return {@code true} when the caller should exit early, {@code false} otherwise.
|
||||
*/
|
||||
private boolean skipRegionManagementAction(final String action) {
|
||||
@Override
|
||||
public boolean skipRegionManagementAction(final String action) {
|
||||
// Note: this method could be `default` on MasterServices if but for logging.
|
||||
if (!isInitialized()) {
|
||||
LOG.debug("Master has not been initialized, don't run {}.", action);
|
||||
return true;
|
||||
|
@ -1871,24 +1851,16 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RegionNormalizer getRegionNormalizer() {
|
||||
return this.normalizer;
|
||||
public RegionNormalizerManager getRegionNormalizerManager() {
|
||||
return regionNormalizerManager;
|
||||
}
|
||||
|
||||
public boolean normalizeRegions() throws IOException {
|
||||
return normalizeRegions(new NormalizeTableFilterParams.Builder().build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform normalization of cluster.
|
||||
*
|
||||
* @return true if an existing normalization was already in progress, or if a new normalization
|
||||
* was performed successfully; false otherwise (specifically, if HMaster finished initializing
|
||||
* or normalization is globally disabled).
|
||||
*/
|
||||
public boolean normalizeRegions(final NormalizeTableFilterParams ntfp) throws IOException {
|
||||
final long startTime = EnvironmentEdgeManager.currentTime();
|
||||
if (regionNormalizerTracker == null || !regionNormalizerTracker.isNormalizerOn()) {
|
||||
@Override
|
||||
public boolean normalizeRegions(
|
||||
final NormalizeTableFilterParams ntfp,
|
||||
final boolean isHighPriority
|
||||
) throws IOException {
|
||||
if (regionNormalizerManager == null || !regionNormalizerManager.isNormalizerOn()) {
|
||||
LOG.debug("Region normalization is disabled, don't run region normalizer.");
|
||||
return false;
|
||||
}
|
||||
|
@ -1899,70 +1871,17 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (!normalizationInProgressLock.tryLock()) {
|
||||
// Don't run the normalizer concurrently
|
||||
LOG.info("Normalization already in progress. Skipping request.");
|
||||
return true;
|
||||
}
|
||||
|
||||
int affectedTables = 0;
|
||||
try {
|
||||
final Set<TableName> matchingTables = getTableDescriptors(new LinkedList<>(),
|
||||
ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false)
|
||||
.stream()
|
||||
.map(TableDescriptor::getTableName)
|
||||
.collect(Collectors.toSet());
|
||||
final Set<TableName> allEnabledTables =
|
||||
tableStateManager.getTablesInStates(TableState.State.ENABLED);
|
||||
final List<TableName> targetTables =
|
||||
new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables));
|
||||
Collections.shuffle(targetTables);
|
||||
|
||||
final List<Long> submittedPlanProcIds = new ArrayList<>();
|
||||
for (TableName table : targetTables) {
|
||||
if (table.isSystemTable()) {
|
||||
continue;
|
||||
}
|
||||
final TableDescriptor tblDesc = getTableDescriptors().get(table);
|
||||
if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
|
||||
LOG.debug(
|
||||
"Skipping table {} because normalization is disabled in its table properties.", table);
|
||||
continue;
|
||||
}
|
||||
|
||||
// make one last check that the cluster isn't shutting down before proceeding.
|
||||
if (skipRegionManagementAction("region normalizer")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
|
||||
if (CollectionUtils.isEmpty(plans)) {
|
||||
LOG.debug("No normalization required for table {}.", table);
|
||||
continue;
|
||||
}
|
||||
|
||||
affectedTables++;
|
||||
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to
|
||||
// submit task , so there's no artificial rate-
|
||||
// limiting of merge/split requests due to this serial loop.
|
||||
for (NormalizationPlan plan : plans) {
|
||||
long procId = plan.submit(this);
|
||||
submittedPlanProcIds.add(procId);
|
||||
if (plan.getType() == PlanType.SPLIT) {
|
||||
splitPlanCount++;
|
||||
} else if (plan.getType() == PlanType.MERGE) {
|
||||
mergePlanCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
final long endTime = EnvironmentEdgeManager.currentTime();
|
||||
LOG.info("Normalizer ran successfully in {}. Submitted {} plans, affecting {} tables.",
|
||||
Duration.ofMillis(endTime - startTime), submittedPlanProcIds.size(), affectedTables);
|
||||
LOG.debug("Normalizer submitted procID list: {}", submittedPlanProcIds);
|
||||
} finally {
|
||||
normalizationInProgressLock.unlock();
|
||||
}
|
||||
return true;
|
||||
final Set<TableName> matchingTables = getTableDescriptors(new LinkedList<>(),
|
||||
ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false)
|
||||
.stream()
|
||||
.map(TableDescriptor::getTableName)
|
||||
.collect(Collectors.toSet());
|
||||
final Set<TableName> allEnabledTables =
|
||||
tableStateManager.getTablesInStates(TableState.State.ENABLED);
|
||||
final List<TableName> targetTables =
|
||||
new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables));
|
||||
Collections.shuffle(targetTables);
|
||||
return regionNormalizerManager.normalizeRegions(targetTables, isHighPriority);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2969,20 +2888,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
return regionStates.getAverageLoad();
|
||||
}
|
||||
|
||||
/*
|
||||
* @return the count of region split plans executed
|
||||
*/
|
||||
public long getSplitPlanCount() {
|
||||
return splitPlanCount;
|
||||
}
|
||||
|
||||
/*
|
||||
* @return the count of region merge plans executed
|
||||
*/
|
||||
public long getMergePlanCount() {
|
||||
return mergePlanCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean registerService(Service instance) {
|
||||
/*
|
||||
|
@ -3487,8 +3392,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
*/
|
||||
public boolean isNormalizerOn() {
|
||||
return !isInMaintenanceMode()
|
||||
&& regionNormalizerTracker != null
|
||||
&& regionNormalizerTracker.isNormalizerOn();
|
||||
&& getRegionNormalizerManager().isNormalizerOn();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3514,13 +3418,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
.getDefaultLoadBalancerClass().getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return RegionNormalizerTracker instance
|
||||
*/
|
||||
public RegionNormalizerTracker getRegionNormalizerTracker() {
|
||||
return regionNormalizerTracker;
|
||||
}
|
||||
|
||||
public SplitOrMergeTracker getSplitOrMergeTracker() {
|
||||
return splitOrMergeTracker;
|
||||
}
|
||||
|
|
|
@ -1927,9 +1927,7 @@ public class MasterRpcServices extends RSRpcServices implements
|
|||
master.cpHost.postSetSplitOrMergeEnabled(newValue, switchType);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (KeeperException e) {
|
||||
} catch (IOException | KeeperException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return response.build();
|
||||
|
@ -1954,7 +1952,8 @@ public class MasterRpcServices extends RSRpcServices implements
|
|||
.namespace(request.hasNamespace() ? request.getNamespace() : null)
|
||||
.build();
|
||||
return NormalizeResponse.newBuilder()
|
||||
.setNormalizerRan(master.normalizeRegions(ntfp))
|
||||
// all API requests are considered priority requests.
|
||||
.setNormalizerRan(master.normalizeRegions(ntfp, true))
|
||||
.build();
|
||||
} catch (IOException ex) {
|
||||
throw new ServiceException(ex);
|
||||
|
@ -1967,20 +1966,27 @@ public class MasterRpcServices extends RSRpcServices implements
|
|||
rpcPreCheck("setNormalizerRunning");
|
||||
|
||||
// Sets normalizer on/off flag in ZK.
|
||||
boolean prevValue = master.getRegionNormalizerTracker().isNormalizerOn();
|
||||
boolean newValue = request.getOn();
|
||||
try {
|
||||
master.getRegionNormalizerTracker().setNormalizerOn(newValue);
|
||||
} catch (KeeperException ke) {
|
||||
LOG.warn("Error flipping normalizer switch", ke);
|
||||
}
|
||||
// TODO: this method is totally broken in terms of atomicity of actions and values read.
|
||||
// 1. The contract has this RPC returning the previous value. There isn't a ZKUtil method
|
||||
// that lets us retrieve the previous value as part of setting a new value, so we simply
|
||||
// perform a read before issuing the update. Thus we have a data race opportunity, between
|
||||
// when the `prevValue` is read and whatever is actually overwritten.
|
||||
// 2. Down in `setNormalizerOn`, the call to `createAndWatch` inside of the catch clause can
|
||||
// itself fail in the event that the znode already exists. Thus, another data race, between
|
||||
// when the initial `setData` call is notified of the absence of the target znode and the
|
||||
// subsequent `createAndWatch`, with another client creating said node.
|
||||
// That said, there's supposed to be only one active master and thus there's supposed to be
|
||||
// only one process with the authority to modify the value.
|
||||
final boolean prevValue = master.getRegionNormalizerManager().isNormalizerOn();
|
||||
final boolean newValue = request.getOn();
|
||||
master.getRegionNormalizerManager().setNormalizerOn(newValue);
|
||||
LOG.info("{} set normalizerSwitch={}", master.getClientIdAuditPrefix(), newValue);
|
||||
return SetNormalizerRunningResponse.newBuilder().setPrevNormalizerValue(prevValue).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
|
||||
IsNormalizerEnabledRequest request) throws ServiceException {
|
||||
IsNormalizerEnabledRequest request) {
|
||||
IsNormalizerEnabledResponse.Builder response = IsNormalizerEnabledResponse.newBuilder();
|
||||
response.setEnabled(master.isNormalizerOn());
|
||||
return response.build();
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
|
|||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.MasterSwitchType;
|
||||
import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
|
@ -37,7 +38,7 @@ import org.apache.hadoop.hbase.favored.FavoredNodesManager;
|
|||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
|
||||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
|
@ -53,7 +54,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
|||
import org.apache.hadoop.hbase.security.access.AccessChecker;
|
||||
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
|
@ -120,9 +120,9 @@ public interface MasterServices extends Server {
|
|||
MasterQuotaManager getMasterQuotaManager();
|
||||
|
||||
/**
|
||||
* @return Master's instance of {@link RegionNormalizer}
|
||||
* @return Master's instance of {@link RegionNormalizerManager}
|
||||
*/
|
||||
RegionNormalizer getRegionNormalizer();
|
||||
RegionNormalizerManager getRegionNormalizerManager();
|
||||
|
||||
/**
|
||||
* @return Master's instance of {@link CatalogJanitor}
|
||||
|
@ -352,6 +352,13 @@ public interface MasterServices extends Server {
|
|||
*/
|
||||
boolean isInMaintenanceMode();
|
||||
|
||||
/**
|
||||
* Checks master state before initiating action over region topology.
|
||||
* @param action the name of the action under consideration, for logging.
|
||||
* @return {@code true} when the caller should exit early, {@code false} otherwise.
|
||||
*/
|
||||
boolean skipRegionManagementAction(final String action);
|
||||
|
||||
/**
|
||||
* Abort a procedure.
|
||||
* @param procId ID of the procedure
|
||||
|
@ -525,4 +532,14 @@ public interface MasterServices extends Server {
|
|||
* Run the ReplicationBarrierChore.
|
||||
*/
|
||||
void runReplicationBarrierCleaner();
|
||||
|
||||
/**
|
||||
* Perform normalization of cluster.
|
||||
* @param ntfp Selection criteria for identifying which tables to normalize.
|
||||
* @param isHighPriority {@code true} when these requested tables should skip to the front of
|
||||
* the queue.
|
||||
* @return {@code true} when the request was submitted, {@code false} otherwise.
|
||||
*/
|
||||
boolean normalizeRegions(
|
||||
final NormalizeTableFilterParams ntfp, final boolean isHighPriority) throws IOException;
|
||||
}
|
||||
|
|
|
@ -50,12 +50,12 @@ public class MetricsMasterWrapperImpl implements MetricsMasterWrapper {
|
|||
|
||||
@Override
|
||||
public long getSplitPlanCount() {
|
||||
return master.getSplitPlanCount();
|
||||
return master.getRegionNormalizerManager().getSplitPlanCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMergePlanCount() {
|
||||
return master.getMergePlanCount();
|
||||
return master.getRegionNormalizerManager().getMergePlanCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -60,9 +60,7 @@ import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
||||
|
@ -535,8 +533,10 @@ public class MergeTableRegionsProcedure
|
|||
try {
|
||||
env.getMasterServices().getMasterQuotaManager().onRegionMerged(this.mergedRegion);
|
||||
} catch (QuotaExceededException e) {
|
||||
env.getMasterServices().getRegionNormalizer().planSkipped(this.mergedRegion,
|
||||
NormalizationPlan.PlanType.MERGE);
|
||||
// TODO: why is this here? merge requests can be submitted by actors other than the normalizer
|
||||
env.getMasterServices()
|
||||
.getRegionNormalizerManager()
|
||||
.planSkipped(NormalizationPlan.PlanType.MERGE);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -71,13 +71,11 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
||||
|
@ -181,9 +179,10 @@ public class SplitTableRegionProcedure
|
|||
private void checkSplittable(final MasterProcedureEnv env,
|
||||
final RegionInfo regionToSplit, final byte[] splitRow) throws IOException {
|
||||
// Ask the remote RS if this region is splittable.
|
||||
// If we get an IOE, report it along w/ the failure so can see why we are not splittable at this time.
|
||||
// If we get an IOE, report it along w/ the failure so can see why we are not splittable at
|
||||
// this time.
|
||||
if(regionToSplit.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
|
||||
throw new IllegalArgumentException ("Can't invoke split on non-default regions directly");
|
||||
throw new IllegalArgumentException("Can't invoke split on non-default regions directly");
|
||||
}
|
||||
RegionStateNode node =
|
||||
env.getAssignmentManager().getRegionStates().getRegionStateNode(getParentRegion());
|
||||
|
@ -570,8 +569,10 @@ public class SplitTableRegionProcedure
|
|||
try {
|
||||
env.getMasterServices().getMasterQuotaManager().onRegionSplit(this.getParentRegion());
|
||||
} catch (QuotaExceededException e) {
|
||||
env.getMasterServices().getRegionNormalizer().planSkipped(this.getParentRegion(),
|
||||
NormalizationPlan.PlanType.SPLIT);
|
||||
// TODO: why is this here? split requests can be submitted by actors other than the normalizer
|
||||
env.getMasterServices()
|
||||
.getRegionNormalizerManager()
|
||||
.planSkipped(NormalizationPlan.PlanType.SPLIT);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,41 +18,35 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringStyle;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Normalization plan to merge regions (smallest region in the table with its smallest neighbor).
|
||||
* Normalization plan to merge adjacent regions. As with any call to
|
||||
* {@link MasterServices#mergeRegions(RegionInfo[], boolean, long, long)}
|
||||
* with {@code forcible=false}, Region order and adjacency are important. It's the caller's
|
||||
* responsibility to ensure the provided parameters are ordered according to the
|
||||
* {code mergeRegions} method requirements.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MergeNormalizationPlan implements NormalizationPlan {
|
||||
final class MergeNormalizationPlan implements NormalizationPlan {
|
||||
|
||||
private final RegionInfo firstRegion;
|
||||
private final RegionInfo secondRegion;
|
||||
private final List<NormalizationTarget> normalizationTargets;
|
||||
|
||||
public MergeNormalizationPlan(RegionInfo firstRegion, RegionInfo secondRegion) {
|
||||
this.firstRegion = firstRegion;
|
||||
this.secondRegion = secondRegion;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public long submit(MasterServices masterServices) throws IOException {
|
||||
// Do not use force=true as corner cases can happen, non adjacent regions,
|
||||
// merge with a merged child region with no GC done yet, it is going to
|
||||
// cause all different issues.
|
||||
return masterServices
|
||||
.mergeRegions(new RegionInfo[] { firstRegion, secondRegion }, false, HConstants.NO_NONCE,
|
||||
HConstants.NO_NONCE);
|
||||
private MergeNormalizationPlan(List<NormalizationTarget> normalizationTargets) {
|
||||
Preconditions.checkNotNull(normalizationTargets);
|
||||
Preconditions.checkState(normalizationTargets.size() >= 2,
|
||||
"normalizationTargets.size() must be >= 2 but was %s", normalizationTargets.size());
|
||||
this.normalizationTargets = Collections.unmodifiableList(normalizationTargets);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -60,19 +54,14 @@ public class MergeNormalizationPlan implements NormalizationPlan {
|
|||
return PlanType.MERGE;
|
||||
}
|
||||
|
||||
RegionInfo getFirstRegion() {
|
||||
return firstRegion;
|
||||
}
|
||||
|
||||
RegionInfo getSecondRegion() {
|
||||
return secondRegion;
|
||||
public List<NormalizationTarget> getNormalizationTargets() {
|
||||
return normalizationTargets;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
|
||||
.append("firstRegion", firstRegion)
|
||||
.append("secondRegion", secondRegion)
|
||||
.append("normalizationTargets", normalizationTargets)
|
||||
.toString();
|
||||
}
|
||||
|
||||
|
@ -89,16 +78,31 @@ public class MergeNormalizationPlan implements NormalizationPlan {
|
|||
MergeNormalizationPlan that = (MergeNormalizationPlan) o;
|
||||
|
||||
return new EqualsBuilder()
|
||||
.append(firstRegion, that.firstRegion)
|
||||
.append(secondRegion, that.secondRegion)
|
||||
.append(normalizationTargets, that.normalizationTargets)
|
||||
.isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return new HashCodeBuilder(17, 37)
|
||||
.append(firstRegion)
|
||||
.append(secondRegion)
|
||||
.append(normalizationTargets)
|
||||
.toHashCode();
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper for constructing instances of {@link MergeNormalizationPlan}.
|
||||
*/
|
||||
static class Builder {
|
||||
|
||||
private final List<NormalizationTarget> normalizationTargets = new LinkedList<>();
|
||||
|
||||
public Builder addTarget(final RegionInfo regionInfo, final long regionSizeMb) {
|
||||
normalizationTargets.add(new NormalizationTarget(regionInfo, regionSizeMb));
|
||||
return this;
|
||||
}
|
||||
|
||||
public MergeNormalizationPlan build() {
|
||||
return new MergeNormalizationPlan(normalizationTargets);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/**
|
||||
*
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -18,12 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Interface for normalization plan.
|
||||
* A {@link NormalizationPlan} describes some modification to region split points as identified
|
||||
* by an instance of {@link RegionNormalizer}. It is a POJO describing what action needs taken
|
||||
* and the regions it targets.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface NormalizationPlan {
|
||||
|
@ -33,15 +32,6 @@ public interface NormalizationPlan {
|
|||
NONE
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits normalization plan on cluster (does actual splitting/merging work) and
|
||||
* returns proc Id to caller.
|
||||
* @param masterServices instance of {@link MasterServices}
|
||||
* @return Proc Id for the submitted task
|
||||
* @throws IOException If plan submission to Admin fails
|
||||
*/
|
||||
long submit(MasterServices masterServices) throws IOException;
|
||||
|
||||
/**
|
||||
* @return the type of this plan
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringStyle;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A POJO that caries details about a region selected for normalization through the pipeline.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class NormalizationTarget {
|
||||
private final RegionInfo regionInfo;
|
||||
private final long regionSizeMb;
|
||||
|
||||
NormalizationTarget(final RegionInfo regionInfo, final long regionSizeMb) {
|
||||
this.regionInfo = regionInfo;
|
||||
this.regionSizeMb = regionSizeMb;
|
||||
}
|
||||
|
||||
public RegionInfo getRegionInfo() {
|
||||
return regionInfo;
|
||||
}
|
||||
|
||||
public long getRegionSizeMb() {
|
||||
return regionSizeMb;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
NormalizationTarget that = (NormalizationTarget) o;
|
||||
|
||||
return new EqualsBuilder()
|
||||
.append(regionSizeMb, that.regionSizeMb)
|
||||
.append(regionInfo, that.regionInfo)
|
||||
.isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return new HashCodeBuilder(17, 37)
|
||||
.append(regionInfo)
|
||||
.append(regionSizeMb)
|
||||
.toHashCode();
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
|
||||
.append("regionInfo", regionInfo)
|
||||
.append("regionSizeMb", regionSizeMb)
|
||||
.toString();
|
||||
}
|
||||
}
|
|
@ -20,13 +20,9 @@ package org.apache.hadoop.hbase.master.normalizer;
|
|||
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Performs "normalization" of regions of a table, making sure that suboptimal
|
||||
|
@ -39,8 +35,7 @@ import org.apache.yetus.audience.InterfaceStability;
|
|||
* "split/merge storms".
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public interface RegionNormalizer extends Configurable {
|
||||
interface RegionNormalizer extends Configurable {
|
||||
/**
|
||||
* Set the master service. Must be called before first call to
|
||||
* {@link #computePlansForTable(TableName)}.
|
||||
|
@ -55,20 +50,5 @@ public interface RegionNormalizer extends Configurable {
|
|||
* @return A list of the normalization actions to perform, or an empty list
|
||||
* if there's nothing to do.
|
||||
*/
|
||||
List<NormalizationPlan> computePlansForTable(TableName table)
|
||||
throws HBaseIOException;
|
||||
|
||||
/**
|
||||
* Notification for the case where plan couldn't be executed due to constraint violation, such as
|
||||
* namespace quota
|
||||
* @param hri the region which is involved in the plan
|
||||
* @param type type of plan
|
||||
*/
|
||||
void planSkipped(RegionInfo hri, PlanType type);
|
||||
|
||||
/**
|
||||
* @param type type of plan for which skipped count is to be returned
|
||||
* @return the count of plans of specified type which were skipped
|
||||
*/
|
||||
long getSkippedCount(NormalizationPlan.PlanType type);
|
||||
List<NormalizationPlan> computePlansForTable(TableName table);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/**
|
||||
*
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -18,34 +17,35 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Chore that will call {@link org.apache.hadoop.hbase.master.HMaster#normalizeRegions()}
|
||||
* when needed.
|
||||
* Chore that will periodically call
|
||||
* {@link HMaster#normalizeRegions(NormalizeTableFilterParams, boolean)}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RegionNormalizerChore extends ScheduledChore {
|
||||
class RegionNormalizerChore extends ScheduledChore {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerChore.class);
|
||||
|
||||
private final HMaster master;
|
||||
private final MasterServices master;
|
||||
|
||||
public RegionNormalizerChore(HMaster master) {
|
||||
public RegionNormalizerChore(MasterServices master) {
|
||||
super(master.getServerName() + "-RegionNormalizerChore", master,
|
||||
master.getConfiguration().getInt("hbase.normalizer.period", 300000));
|
||||
master.getConfiguration().getInt("hbase.normalizer.period", 300_000));
|
||||
this.master = master;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void chore() {
|
||||
try {
|
||||
master.normalizeRegions();
|
||||
master.normalizeRegions(new NormalizeTableFilterParams.Builder().build(), false);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to normalize regions.", e);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/**
|
||||
*
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -20,8 +19,12 @@ package org.apache.hadoop.hbase.master.normalizer;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Factory to create instance of {@link RegionNormalizer} as configured.
|
||||
|
@ -32,13 +35,30 @@ public final class RegionNormalizerFactory {
|
|||
private RegionNormalizerFactory() {
|
||||
}
|
||||
|
||||
public static RegionNormalizerManager createNormalizerManager(
|
||||
final Configuration conf,
|
||||
final ZKWatcher zkWatcher,
|
||||
final HMaster master // TODO: consolidate this down to MasterServices
|
||||
) {
|
||||
final RegionNormalizer regionNormalizer = getRegionNormalizer(conf);
|
||||
regionNormalizer.setMasterServices(master);
|
||||
final RegionNormalizerTracker tracker = new RegionNormalizerTracker(zkWatcher, master);
|
||||
final RegionNormalizerChore chore =
|
||||
master.isInMaintenanceMode() ? null : new RegionNormalizerChore(master);
|
||||
final RegionNormalizerWorkQueue<TableName> workQueue =
|
||||
master.isInMaintenanceMode() ? null : new RegionNormalizerWorkQueue<>();
|
||||
final RegionNormalizerWorker worker = master.isInMaintenanceMode()
|
||||
? null
|
||||
: new RegionNormalizerWorker(conf, master, regionNormalizer, workQueue);
|
||||
return new RegionNormalizerManager(tracker, chore, workQueue, worker);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a region normalizer from the given conf.
|
||||
* @param conf configuration
|
||||
* @return {@link RegionNormalizer} implementation
|
||||
*/
|
||||
public static RegionNormalizer getRegionNormalizer(Configuration conf) {
|
||||
|
||||
private static RegionNormalizer getRegionNormalizer(Configuration conf) {
|
||||
// Create instance of Region Normalizer
|
||||
Class<? extends RegionNormalizer> balancerKlass =
|
||||
conf.getClass(HConstants.HBASE_MASTER_NORMALIZER_CLASS, SimpleRegionNormalizer.class,
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.NonNull;
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* This class encapsulates the details of the {@link RegionNormalizer} subsystem.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RegionNormalizerManager {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class);
|
||||
|
||||
private final RegionNormalizerTracker regionNormalizerTracker;
|
||||
private final RegionNormalizerChore regionNormalizerChore;
|
||||
private final RegionNormalizerWorkQueue<TableName> workQueue;
|
||||
private final RegionNormalizerWorker worker;
|
||||
private final ExecutorService pool;
|
||||
|
||||
private final Object startStopLock = new Object();
|
||||
private boolean started = false;
|
||||
private boolean stopped = false;
|
||||
|
||||
public RegionNormalizerManager(
|
||||
@NonNull final RegionNormalizerTracker regionNormalizerTracker,
|
||||
@Nullable final RegionNormalizerChore regionNormalizerChore,
|
||||
@Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
|
||||
@Nullable final RegionNormalizerWorker worker
|
||||
) {
|
||||
this.regionNormalizerTracker = regionNormalizerTracker;
|
||||
this.regionNormalizerChore = regionNormalizerChore;
|
||||
this.workQueue = workQueue;
|
||||
this.worker = worker;
|
||||
this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("normalizer-worker-%d")
|
||||
.setUncaughtExceptionHandler(
|
||||
(thread, throwable) ->
|
||||
LOG.error("Uncaught exception, worker thread likely terminated.", throwable))
|
||||
.build());
|
||||
}
|
||||
|
||||
public void start() {
|
||||
synchronized (startStopLock) {
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
regionNormalizerTracker.start();
|
||||
if (worker != null) {
|
||||
// worker will be null when master is in maintenance mode.
|
||||
pool.submit(worker);
|
||||
}
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
synchronized (startStopLock) {
|
||||
if (!started) {
|
||||
throw new IllegalStateException("calling `stop` without first calling `start`.");
|
||||
}
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
|
||||
regionNormalizerTracker.stop();
|
||||
stopped = true;
|
||||
}
|
||||
}
|
||||
|
||||
public ScheduledChore getRegionNormalizerChore() {
|
||||
return regionNormalizerChore;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return {@code true} if region normalizer is on, {@code false} otherwise
|
||||
*/
|
||||
public boolean isNormalizerOn() {
|
||||
return regionNormalizerTracker.isNormalizerOn();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set region normalizer on/off
|
||||
* @param normalizerOn whether normalizer should be on or off
|
||||
*/
|
||||
public void setNormalizerOn(boolean normalizerOn) {
|
||||
try {
|
||||
regionNormalizerTracker.setNormalizerOn(normalizerOn);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Error flipping normalizer switch", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Call-back for the case where plan couldn't be executed due to constraint violation,
|
||||
* such as namespace quota.
|
||||
* @param type type of plan that was skipped.
|
||||
*/
|
||||
public void planSkipped(NormalizationPlan.PlanType type) {
|
||||
// TODO: this appears to be used only for testing.
|
||||
if (worker != null) {
|
||||
worker.planSkipped(type);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve a count of the number of times plans of type {@code type} were submitted but skipped.
|
||||
* @param type type of plan for which skipped count is to be returned
|
||||
*/
|
||||
public long getSkippedCount(NormalizationPlan.PlanType type) {
|
||||
// TODO: this appears to be used only for testing.
|
||||
return worker == null ? 0 : worker.getSkippedCount(type);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of times a {@link SplitNormalizationPlan} has been submitted.
|
||||
*/
|
||||
public long getSplitPlanCount() {
|
||||
return worker == null ? 0 : worker.getSplitPlanCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of times a {@link MergeNormalizationPlan} has been submitted.
|
||||
*/
|
||||
public long getMergePlanCount() {
|
||||
return worker == null ? 0 : worker.getMergePlanCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit tables for normalization.
|
||||
* @param tables a list of tables to submit.
|
||||
* @param isHighPriority {@code true} when these requested tables should skip to the front of
|
||||
* the queue.
|
||||
* @return {@code true} when work was queued, {@code false} otherwise.
|
||||
*/
|
||||
public boolean normalizeRegions(List<TableName> tables, boolean isHighPriority) {
|
||||
if (workQueue == null) {
|
||||
return false;
|
||||
}
|
||||
if (isHighPriority) {
|
||||
workQueue.putAllFirst(tables);
|
||||
} else {
|
||||
workQueue.putAll(tables);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,244 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A specialized collection that holds pending work for the {@link RegionNormalizerWorker}. It is
|
||||
* an ordered collection class that has the following properties:
|
||||
* <ul>
|
||||
* <li>Guarantees uniqueness of elements, as a {@link Set}.</li>
|
||||
* <li>Consumers retrieve objects from the head, as a {@link Queue}, via {@link #take()}.</li>
|
||||
* <li>Work is retrieved on a FIFO policy.</li>
|
||||
* <li>Work retrieval blocks the calling thread until new work is available, as a
|
||||
* {@link BlockingQueue}.</li>
|
||||
* <li>Allows a producer to insert an item at the head of the queue, if desired.</li>
|
||||
* </ul>
|
||||
* Assumes low-frequency and low-parallelism concurrent access, so protects state using a
|
||||
* simplistic synchronization strategy.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class RegionNormalizerWorkQueue<E> {
|
||||
|
||||
/** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */
|
||||
private LinkedHashSet<E> delegate;
|
||||
|
||||
// the locking structure used here follows the example found in LinkedBlockingQueue. The
|
||||
// difference is that our locks guard access to `delegate` rather than the head node.
|
||||
|
||||
/** Lock held by take, poll, etc */
|
||||
private final ReentrantLock takeLock;
|
||||
|
||||
/** Wait queue for waiting takes */
|
||||
private final Condition notEmpty;
|
||||
|
||||
/** Lock held by put, offer, etc */
|
||||
private final ReentrantLock putLock;
|
||||
|
||||
RegionNormalizerWorkQueue() {
|
||||
delegate = new LinkedHashSet<>();
|
||||
takeLock = new ReentrantLock();
|
||||
notEmpty = takeLock.newCondition();
|
||||
putLock = new ReentrantLock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Signals a waiting take. Called only from put/offer (which do not
|
||||
* otherwise ordinarily lock takeLock.)
|
||||
*/
|
||||
private void signalNotEmpty() {
|
||||
final ReentrantLock takeLock = this.takeLock;
|
||||
takeLock.lock();
|
||||
try {
|
||||
notEmpty.signal();
|
||||
} finally {
|
||||
takeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Locks to prevent both puts and takes.
|
||||
*/
|
||||
private void fullyLock() {
|
||||
putLock.lock();
|
||||
takeLock.lock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Unlocks to allow both puts and takes.
|
||||
*/
|
||||
private void fullyUnlock() {
|
||||
takeLock.unlock();
|
||||
putLock.unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts the specified element at the tail of the queue, if it's not already present.
|
||||
*
|
||||
* @param e the element to add
|
||||
*/
|
||||
public void put(E e) {
|
||||
if (e == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
|
||||
putLock.lock();
|
||||
try {
|
||||
delegate.add(e);
|
||||
} finally {
|
||||
putLock.unlock();
|
||||
}
|
||||
|
||||
if (!delegate.isEmpty()) {
|
||||
signalNotEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts the specified element at the head of the queue.
|
||||
*
|
||||
* @param e the element to add
|
||||
*/
|
||||
public void putFirst(E e) {
|
||||
if (e == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
putAllFirst(Collections.singleton(e));
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts the specified elements at the tail of the queue. Any elements already present in
|
||||
* the queue are ignored.
|
||||
*
|
||||
* @param c the elements to add
|
||||
*/
|
||||
public void putAll(Collection<? extends E> c) {
|
||||
if (c == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
|
||||
putLock.lock();
|
||||
try {
|
||||
delegate.addAll(c);
|
||||
} finally {
|
||||
putLock.unlock();
|
||||
}
|
||||
|
||||
if (!delegate.isEmpty()) {
|
||||
signalNotEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts the specified elements at the head of the queue.
|
||||
*
|
||||
* @param c the elements to add
|
||||
*/
|
||||
public void putAllFirst(Collection<? extends E> c) {
|
||||
if (c == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
|
||||
fullyLock();
|
||||
try {
|
||||
final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() + delegate.size());
|
||||
copy.addAll(c);
|
||||
copy.addAll(delegate);
|
||||
delegate = copy;
|
||||
} finally {
|
||||
fullyUnlock();
|
||||
}
|
||||
|
||||
if (!delegate.isEmpty()) {
|
||||
signalNotEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves and removes the head of this queue, waiting if necessary
|
||||
* until an element becomes available.
|
||||
*
|
||||
* @return the head of this queue
|
||||
* @throws InterruptedException if interrupted while waiting
|
||||
*/
|
||||
public E take() throws InterruptedException {
|
||||
E x;
|
||||
takeLock.lockInterruptibly();
|
||||
try {
|
||||
while (delegate.isEmpty()) {
|
||||
notEmpty.await();
|
||||
}
|
||||
final Iterator<E> iter = delegate.iterator();
|
||||
x = iter.next();
|
||||
iter.remove();
|
||||
if (!delegate.isEmpty()) {
|
||||
notEmpty.signal();
|
||||
}
|
||||
} finally {
|
||||
takeLock.unlock();
|
||||
}
|
||||
return x;
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically removes all of the elements from this queue.
|
||||
* The queue will be empty after this call returns.
|
||||
*/
|
||||
public void clear() {
|
||||
putLock.lock();
|
||||
try {
|
||||
delegate.clear();
|
||||
} finally {
|
||||
putLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of elements in this queue.
|
||||
*
|
||||
* @return the number of elements in this queue
|
||||
*/
|
||||
public int size() {
|
||||
takeLock.lock();
|
||||
try {
|
||||
return delegate.size();
|
||||
} finally {
|
||||
takeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
takeLock.lock();
|
||||
try {
|
||||
return delegate.toString();
|
||||
} finally {
|
||||
takeLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,253 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
||||
|
||||
/**
|
||||
* Consumes normalization request targets ({@link TableName}s) off the
|
||||
* {@link RegionNormalizerWorkQueue}, dispatches them to the {@link RegionNormalizer},
|
||||
* and executes the resulting {@link NormalizationPlan}s.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class RegionNormalizerWorker implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class);
|
||||
|
||||
static final String RATE_LIMIT_BYTES_PER_SEC_KEY =
|
||||
"hbase.normalizer.throughput.max_bytes_per_sec";
|
||||
private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec
|
||||
|
||||
private final MasterServices masterServices;
|
||||
private final RegionNormalizer regionNormalizer;
|
||||
private final RegionNormalizerWorkQueue<TableName> workQueue;
|
||||
private final RateLimiter rateLimiter;
|
||||
|
||||
private final long[] skippedCount;
|
||||
private long splitPlanCount;
|
||||
private long mergePlanCount;
|
||||
|
||||
RegionNormalizerWorker(
|
||||
final Configuration configuration,
|
||||
final MasterServices masterServices,
|
||||
final RegionNormalizer regionNormalizer,
|
||||
final RegionNormalizerWorkQueue<TableName> workQueue
|
||||
) {
|
||||
this.masterServices = masterServices;
|
||||
this.regionNormalizer = regionNormalizer;
|
||||
this.workQueue = workQueue;
|
||||
this.skippedCount = new long[NormalizationPlan.PlanType.values().length];
|
||||
this.splitPlanCount = 0;
|
||||
this.mergePlanCount = 0;
|
||||
this.rateLimiter = loadRateLimiter(configuration);
|
||||
}
|
||||
|
||||
private static RateLimiter loadRateLimiter(final Configuration configuration) {
|
||||
long rateLimitBytes =
|
||||
configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES);
|
||||
long rateLimitMbs = rateLimitBytes / 1_000_000L;
|
||||
if (rateLimitMbs <= 0) {
|
||||
LOG.warn("Configured value {}={} is <= 1MB. Falling back to default.",
|
||||
RATE_LIMIT_BYTES_PER_SEC_KEY, rateLimitBytes);
|
||||
rateLimitBytes = RATE_UNLIMITED_BYTES;
|
||||
rateLimitMbs = RATE_UNLIMITED_BYTES / 1_000_000L;
|
||||
}
|
||||
LOG.info("Normalizer rate limit set to {}",
|
||||
rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec");
|
||||
return RateLimiter.create(rateLimitMbs);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see RegionNormalizerManager#planSkipped(NormalizationPlan.PlanType)
|
||||
*/
|
||||
void planSkipped(NormalizationPlan.PlanType type) {
|
||||
synchronized (skippedCount) {
|
||||
// updates come here via procedure threads, so synchronize access to this counter.
|
||||
skippedCount[type.ordinal()]++;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @see RegionNormalizerManager#getSkippedCount(NormalizationPlan.PlanType)
|
||||
*/
|
||||
long getSkippedCount(NormalizationPlan.PlanType type) {
|
||||
return skippedCount[type.ordinal()];
|
||||
}
|
||||
|
||||
/**
|
||||
* @see RegionNormalizerManager#getSplitPlanCount()
|
||||
*/
|
||||
long getSplitPlanCount() {
|
||||
return splitPlanCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see RegionNormalizerManager#getMergePlanCount()
|
||||
*/
|
||||
long getMergePlanCount() {
|
||||
return mergePlanCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
if (Thread.interrupted()) {
|
||||
LOG.debug("interrupt detected. terminating.");
|
||||
break;
|
||||
}
|
||||
final TableName tableName;
|
||||
try {
|
||||
tableName = workQueue.take();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("interrupt detected. terminating.");
|
||||
break;
|
||||
}
|
||||
|
||||
final List<NormalizationPlan> plans = calculatePlans(tableName);
|
||||
submitPlans(plans);
|
||||
}
|
||||
}
|
||||
|
||||
private List<NormalizationPlan> calculatePlans(final TableName tableName) {
|
||||
if (masterServices.skipRegionManagementAction("region normalizer")) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
try {
|
||||
final TableDescriptor tblDesc = masterServices.getTableDescriptors().get(tableName);
|
||||
if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
|
||||
LOG.debug("Skipping table {} because normalization is disabled in its table properties.",
|
||||
tableName);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.debug("Skipping table {} because unable to access its table descriptor.", tableName, e);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tableName);
|
||||
if (CollectionUtils.isEmpty(plans)) {
|
||||
LOG.debug("No normalization required for table {}.", tableName);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return plans;
|
||||
}
|
||||
|
||||
private void submitPlans(final List<NormalizationPlan> plans) {
|
||||
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
|
||||
// task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.
|
||||
for (NormalizationPlan plan : plans) {
|
||||
switch (plan.getType()) {
|
||||
case MERGE: {
|
||||
submitMergePlan((MergeNormalizationPlan) plan);
|
||||
break;
|
||||
}
|
||||
case SPLIT: {
|
||||
submitSplitPlan((SplitNormalizationPlan) plan);
|
||||
break;
|
||||
}
|
||||
case NONE:
|
||||
LOG.debug("Nothing to do for {} with PlanType=NONE. Ignoring.", plan);
|
||||
planSkipped(plan.getType());
|
||||
break;
|
||||
default:
|
||||
LOG.warn("Plan {} is of an unrecognized PlanType. Ignoring.", plan);
|
||||
planSkipped(plan.getType());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Interacts with {@link MasterServices} in order to execute a plan.
|
||||
*/
|
||||
private void submitMergePlan(final MergeNormalizationPlan plan) {
|
||||
final int totalSizeMb;
|
||||
try {
|
||||
final long totalSizeMbLong = plan.getNormalizationTargets()
|
||||
.stream()
|
||||
.mapToLong(NormalizationTarget::getRegionSizeMb)
|
||||
.reduce(0, Math::addExact);
|
||||
totalSizeMb = Math.toIntExact(totalSizeMbLong);
|
||||
} catch (ArithmeticException e) {
|
||||
LOG.debug("Sum of merge request size overflows rate limiter data type. {}", plan);
|
||||
planSkipped(plan.getType());
|
||||
return;
|
||||
}
|
||||
|
||||
final RegionInfo[] infos = plan.getNormalizationTargets()
|
||||
.stream()
|
||||
.map(NormalizationTarget::getRegionInfo)
|
||||
.toArray(RegionInfo[]::new);
|
||||
final long pid;
|
||||
try {
|
||||
pid = masterServices.mergeRegions(
|
||||
infos, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
} catch (IOException e) {
|
||||
LOG.info("failed to submit plan {}.", plan, e);
|
||||
planSkipped(plan.getType());
|
||||
return;
|
||||
}
|
||||
mergePlanCount++;
|
||||
LOG.info("Submitted {} resulting in pid {}", plan, pid);
|
||||
final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
|
||||
LOG.debug("Rate limiting delayed the worker by {}", Duration.ofSeconds(rateLimitedSecs));
|
||||
}
|
||||
|
||||
/**
|
||||
* Interacts with {@link MasterServices} in order to execute a plan.
|
||||
*/
|
||||
private void submitSplitPlan(final SplitNormalizationPlan plan) {
|
||||
final int totalSizeMb;
|
||||
try {
|
||||
totalSizeMb = Math.toIntExact(plan.getSplitTarget().getRegionSizeMb());
|
||||
} catch (ArithmeticException e) {
|
||||
LOG.debug("Split request size overflows rate limiter data type. {}", plan);
|
||||
planSkipped(plan.getType());
|
||||
return;
|
||||
}
|
||||
final RegionInfo info = plan.getSplitTarget().getRegionInfo();
|
||||
final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
|
||||
LOG.debug("Rate limiting delayed this operation by {}", Duration.ofSeconds(rateLimitedSecs));
|
||||
|
||||
final long pid;
|
||||
try {
|
||||
pid = masterServices.splitRegion(
|
||||
info, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
} catch (IOException e) {
|
||||
LOG.info("failed to submit plan {}.", plan, e);
|
||||
planSkipped(plan.getType());
|
||||
return;
|
||||
}
|
||||
splitPlanCount++;
|
||||
LOG.info("Submitted {} resulting in pid {}", plan, pid);
|
||||
}
|
||||
}
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
|||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -54,29 +53,9 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
|
|||
* <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
|
||||
* are kindly requested to merge.</li>
|
||||
* </ol>
|
||||
* <p>
|
||||
* The following parameters are configurable:
|
||||
* <ol>
|
||||
* <li>Whether to split a region as part of normalization. Configuration:
|
||||
* {@value #SPLIT_ENABLED_KEY}, default: {@value #DEFAULT_SPLIT_ENABLED}.</li>
|
||||
* <li>Whether to merge a region as part of normalization. Configuration:
|
||||
* {@value #MERGE_ENABLED_KEY}, default: {@value #DEFAULT_MERGE_ENABLED}.</li>
|
||||
* <li>The minimum number of regions in a table to consider it for merge normalization.
|
||||
* Configuration: {@value #MIN_REGION_COUNT_KEY}, default:
|
||||
* {@value #DEFAULT_MIN_REGION_COUNT}.</li>
|
||||
* <li>The minimum age for a region to be considered for a merge, in days. Configuration:
|
||||
* {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
|
||||
* {@value #DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
|
||||
* <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
|
||||
* {@value #MERGE_MIN_REGION_SIZE_MB_KEY}, default:
|
||||
* {@value #DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
|
||||
* </ol>
|
||||
* <p>
|
||||
* To see detailed logging of the application of these configuration values, set the log level for
|
||||
* this class to `TRACE`.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
public class SimpleRegionNormalizer implements RegionNormalizer {
|
||||
class SimpleRegionNormalizer implements RegionNormalizer {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
|
||||
|
||||
static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
|
||||
|
@ -92,7 +71,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
|
||||
static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
|
||||
|
||||
private final long[] skippedCount;
|
||||
private Configuration conf;
|
||||
private MasterServices masterServices;
|
||||
private boolean splitEnabled;
|
||||
|
@ -102,7 +80,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
private int mergeMinRegionSizeMb;
|
||||
|
||||
public SimpleRegionNormalizer() {
|
||||
skippedCount = new long[NormalizationPlan.PlanType.values().length];
|
||||
splitEnabled = DEFAULT_SPLIT_ENABLED;
|
||||
mergeEnabled = DEFAULT_MERGE_ENABLED;
|
||||
minRegionCount = DEFAULT_MIN_REGION_COUNT;
|
||||
|
@ -203,16 +180,6 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
this.masterServices = masterServices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void planSkipped(final RegionInfo hri, final PlanType type) {
|
||||
skippedCount[type.ordinal()]++;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSkippedCount(NormalizationPlan.PlanType type) {
|
||||
return skippedCount[type.ordinal()];
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NormalizationPlan> computePlansForTable(final TableName table) {
|
||||
if (table == null) {
|
||||
|
@ -371,7 +338,11 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
final long nextSizeMb = getRegionSizeMB(next);
|
||||
// always merge away empty regions when they present themselves.
|
||||
if (currentSizeMb == 0 || nextSizeMb == 0 || currentSizeMb + nextSizeMb < avgRegionSizeMb) {
|
||||
plans.add(new MergeNormalizationPlan(current, next));
|
||||
final MergeNormalizationPlan plan = new MergeNormalizationPlan.Builder()
|
||||
.addTarget(current, currentSizeMb)
|
||||
.addTarget(next, nextSizeMb)
|
||||
.build();
|
||||
plans.add(plan);
|
||||
candidateIdx++;
|
||||
}
|
||||
}
|
||||
|
@ -408,11 +379,11 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
if (skipForSplit(ctx.getRegionStates().getRegionState(hri), hri)) {
|
||||
continue;
|
||||
}
|
||||
final long regionSize = getRegionSizeMB(hri);
|
||||
if (regionSize > 2 * avgRegionSize) {
|
||||
final long regionSizeMb = getRegionSizeMB(hri);
|
||||
if (regionSizeMb > 2 * avgRegionSize) {
|
||||
LOG.info("Table {}, large region {} has size {}, more than twice avg size {}, splitting",
|
||||
ctx.getTableName(), hri.getRegionNameAsString(), regionSize, avgRegionSize);
|
||||
plans.add(new SplitNormalizationPlan(hri));
|
||||
ctx.getTableName(), hri.getRegionNameAsString(), regionSizeMb, avgRegionSize);
|
||||
plans.add(new SplitNormalizationPlan(hri, regionSizeMb));
|
||||
}
|
||||
}
|
||||
return plans;
|
||||
|
|
|
@ -18,32 +18,23 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.commons.lang3.builder.ToStringStyle;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Normalization plan to split region.
|
||||
* Normalization plan to split a region.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SplitNormalizationPlan implements NormalizationPlan {
|
||||
final class SplitNormalizationPlan implements NormalizationPlan {
|
||||
|
||||
private final RegionInfo regionInfo;
|
||||
private final NormalizationTarget splitTarget;
|
||||
|
||||
public SplitNormalizationPlan(RegionInfo regionInfo) {
|
||||
this.regionInfo = regionInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long submit(MasterServices masterServices) throws IOException {
|
||||
return masterServices.splitRegion(regionInfo, null, HConstants.NO_NONCE,
|
||||
HConstants.NO_NONCE);
|
||||
SplitNormalizationPlan(final RegionInfo splitTarget, final long splitTargetSizeMb) {
|
||||
this.splitTarget = new NormalizationTarget(splitTarget, splitTargetSizeMb);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -51,14 +42,14 @@ public class SplitNormalizationPlan implements NormalizationPlan {
|
|||
return PlanType.SPLIT;
|
||||
}
|
||||
|
||||
public RegionInfo getRegionInfo() {
|
||||
return regionInfo;
|
||||
public NormalizationTarget getSplitTarget() {
|
||||
return splitTarget;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
|
||||
.append("regionInfo", regionInfo)
|
||||
.append("splitTarget", splitTarget)
|
||||
.toString();
|
||||
}
|
||||
|
||||
|
@ -75,13 +66,13 @@ public class SplitNormalizationPlan implements NormalizationPlan {
|
|||
SplitNormalizationPlan that = (SplitNormalizationPlan) o;
|
||||
|
||||
return new EqualsBuilder()
|
||||
.append(regionInfo, that.regionInfo)
|
||||
.append(splitTarget, that.splitTarget)
|
||||
.isEquals();
|
||||
}
|
||||
|
||||
@Override public int hashCode() {
|
||||
return new HashCodeBuilder(17, 37)
|
||||
.append(regionInfo)
|
||||
.append(splitTarget)
|
||||
.toHashCode();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* The Region Normalizer subsystem is responsible for coaxing all the regions in a table toward
|
||||
* a "normal" size, according to their storefile size. It does this by splitting regions that
|
||||
* are significantly larger than the norm, and merging regions that are significantly smaller than
|
||||
* the norm.
|
||||
* </p>
|
||||
* The public interface to the Region Normalizer subsystem is limited to the following classes:
|
||||
* <ul>
|
||||
* <li>
|
||||
* The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory} provides an
|
||||
* entry point for creating an instance of the
|
||||
* {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager}.
|
||||
* </li>
|
||||
* <li>
|
||||
* The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager} encapsulates
|
||||
* the whole Region Normalizer subsystem. You'll find one of these hanging off of the
|
||||
* {@link org.apache.hadoop.hbase.master.HMaster}, which uses it to delegate API calls. There
|
||||
* is usually only a single instance of this class.
|
||||
* </li>
|
||||
* <li>
|
||||
* Various configuration points that share the common prefix of {@code hbase.normalizer}.
|
||||
* <ul>
|
||||
* <li>Whether to split a region as part of normalization. Configuration:
|
||||
* {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#SPLIT_ENABLED_KEY},
|
||||
* default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_SPLIT_ENABLED}.
|
||||
* </li>
|
||||
* <li>Whether to merge a region as part of normalization. Configuration:
|
||||
* {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_ENABLED_KEY},
|
||||
* default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_ENABLED}.
|
||||
* </li>
|
||||
* <li>The minimum number of regions in a table to consider it for merge normalization.
|
||||
* Configuration: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MIN_REGION_COUNT_KEY},
|
||||
* default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MIN_REGION_COUNT}.
|
||||
* </li>
|
||||
* <li>The minimum age for a region to be considered for a merge, in days. Configuration:
|
||||
* {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_MIN_REGION_AGE_DAYS_KEY},
|
||||
* default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.
|
||||
* </li>
|
||||
* <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
|
||||
* {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#MERGE_MIN_REGION_SIZE_MB_KEY},
|
||||
* default: {@value org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer#DEFAULT_MERGE_MIN_REGION_SIZE_MB}.
|
||||
* </li>
|
||||
* <li>The limit on total throughput of the Region Normalizer's actions, in whole MBs. Configuration:
|
||||
* {@value org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker#RATE_LIMIT_BYTES_PER_SEC_KEY},
|
||||
* default: unlimited.
|
||||
* </li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* To see detailed logging of the application of these configuration values, set the log
|
||||
* level for this package to `TRACE`.
|
||||
* </p>
|
||||
* </li>
|
||||
* </ul>
|
||||
* The Region Normalizer subsystem is composed of a handful of related classes:
|
||||
* <ul>
|
||||
* <li>
|
||||
* The {@link org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker} provides a system by
|
||||
* which the Normalizer can be disabled at runtime. It currently does this by managing a znode,
|
||||
* but this is an implementation detail.
|
||||
* </li>
|
||||
* <li>
|
||||
* The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorkQueue} is a
|
||||
* {@link java.util.Set}-like {@link java.util.Queue} that permits a single copy of a given
|
||||
* work item to exist in the queue at one time. It also provides a facility for a producer to
|
||||
* add an item to the front of the line. Consumers are blocked waiting for new work.
|
||||
* </li>
|
||||
* <li>
|
||||
* The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore} wakes up
|
||||
* periodically and schedules new normalization work, adding targets to the queue.
|
||||
* </li>
|
||||
* <li>
|
||||
* The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker} runs in a
|
||||
* daemon thread, grabbing work off the queue as is it becomes available.
|
||||
* </li>
|
||||
* <li>
|
||||
* The {@link org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer} implements the
|
||||
* logic for calculating target region sizes and emitting a list of corresponding
|
||||
* {@link org.apache.hadoop.hbase.master.normalizer.NormalizationPlan} objects.
|
||||
* </li>
|
||||
* </ul>
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
|
|||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.MasterSwitchType;
|
||||
import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
|
@ -40,7 +41,7 @@ import org.apache.hadoop.hbase.favored.FavoredNodesManager;
|
|||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
|
||||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
|
@ -106,11 +107,6 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionNormalizer getRegionNormalizer() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CatalogJanitor getCatalogJanitor() {
|
||||
return null;
|
||||
|
@ -136,6 +132,10 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override public RegionNormalizerManager getRegionNormalizerManager() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
|
||||
return null;
|
||||
|
@ -338,6 +338,10 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override public boolean skipRegionManagementAction(String action) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
|
||||
return 0;
|
||||
|
@ -483,4 +487,9 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
|
||||
@Override
|
||||
public void runReplicationBarrierCleaner() {}
|
||||
|
||||
@Override
|
||||
public boolean normalizeRegions(NormalizeTableFilterParams ntfp, boolean isHighPriority) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
|
@ -30,7 +29,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
|||
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
|
||||
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -66,7 +64,7 @@ public class TestMasterChoreScheduled {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultScheduledChores() throws Exception {
|
||||
public void testDefaultScheduledChores() {
|
||||
// test if logCleaner chore is scheduled by default in HMaster init
|
||||
TestChoreField<LogCleaner> logCleanerTestChoreField = new TestChoreField<>();
|
||||
LogCleaner logCleaner = logCleanerTestChoreField.getChoreObj("logCleaner");
|
||||
|
@ -96,10 +94,10 @@ public class TestMasterChoreScheduled {
|
|||
balancerChoreTestChoreField.testIfChoreScheduled(balancerChore);
|
||||
|
||||
// test if normalizerChore chore is scheduled by default in HMaster init
|
||||
TestChoreField<RegionNormalizerChore> regionNormalizerChoreTestChoreField =
|
||||
ScheduledChore regionNormalizerChore = hMaster.getRegionNormalizerManager()
|
||||
.getRegionNormalizerChore();
|
||||
TestChoreField<ScheduledChore> regionNormalizerChoreTestChoreField =
|
||||
new TestChoreField<>();
|
||||
RegionNormalizerChore regionNormalizerChore = regionNormalizerChoreTestChoreField
|
||||
.getChoreObj("normalizerChore");
|
||||
regionNormalizerChoreTestChoreField.testIfChoreScheduled(regionNormalizerChore);
|
||||
|
||||
// test if catalogJanitorChore chore is scheduled by default in HMaster init
|
||||
|
@ -114,22 +112,27 @@ public class TestMasterChoreScheduled {
|
|||
hbckChoreTestChoreField.testIfChoreScheduled(hbckChore);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reflect into the {@link HMaster} instance and find by field name a specified instance
|
||||
* of {@link ScheduledChore}.
|
||||
*/
|
||||
private static class TestChoreField<E extends ScheduledChore> {
|
||||
|
||||
private E getChoreObj(String fieldName) throws NoSuchFieldException,
|
||||
IllegalAccessException {
|
||||
Field masterField = HMaster.class.getDeclaredField(fieldName);
|
||||
masterField.setAccessible(true);
|
||||
E choreFieldVal = (E) masterField.get(hMaster);
|
||||
return choreFieldVal;
|
||||
@SuppressWarnings("unchecked")
|
||||
private E getChoreObj(String fieldName) {
|
||||
try {
|
||||
Field masterField = HMaster.class.getDeclaredField(fieldName);
|
||||
masterField.setAccessible(true);
|
||||
return (E) masterField.get(hMaster);
|
||||
} catch (Exception e) {
|
||||
throw new AssertionError(
|
||||
"Unable to retrieve field '" + fieldName + "' from HMaster instance.", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void testIfChoreScheduled(E choreObj) {
|
||||
Assert.assertNotNull(choreObj);
|
||||
Assert.assertTrue(hMaster.getChoreService().isChoreScheduled(choreObj));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -62,8 +62,10 @@ public class TestMasterMetricsWrapper {
|
|||
public void testInfo() {
|
||||
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
MetricsMasterWrapperImpl info = new MetricsMasterWrapperImpl(master);
|
||||
assertEquals(master.getSplitPlanCount(), info.getSplitPlanCount(), 0);
|
||||
assertEquals(master.getMergePlanCount(), info.getMergePlanCount(), 0);
|
||||
assertEquals(
|
||||
master.getRegionNormalizerManager().getSplitPlanCount(), info.getSplitPlanCount(), 0);
|
||||
assertEquals(
|
||||
master.getRegionNormalizerManager().getMergePlanCount(), info.getMergePlanCount(), 0);
|
||||
assertEquals(master.getAverageLoad(), info.getAverageLoad(), 0);
|
||||
assertEquals(master.getClusterId(), info.getClusterId());
|
||||
assertEquals(master.getMasterActiveTime(), info.getActiveTime());
|
||||
|
|
|
@ -0,0 +1,234 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
/**
|
||||
* Tests that {@link RegionNormalizerWorkQueue} implements the contract described in its docstring.
|
||||
*/
|
||||
@Category({ MasterTests.class, SmallTests.class})
|
||||
public class TestRegionNormalizerWorkQueue {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRegionNormalizerWorkQueue.class);
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
@Test
|
||||
public void testElementUniquenessAndFIFO() throws Exception {
|
||||
final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
|
||||
final List<Integer> content = new LinkedList<>();
|
||||
IntStream.of(4, 3, 2, 1, 4, 3, 2, 1)
|
||||
.boxed()
|
||||
.forEach(queue::put);
|
||||
assertEquals(4, queue.size());
|
||||
while (queue.size() > 0) {
|
||||
content.add(queue.take());
|
||||
}
|
||||
assertThat(content, contains(4, 3, 2, 1));
|
||||
|
||||
queue.clear();
|
||||
queue.putAll(Arrays.asList(4, 3, 2, 1));
|
||||
queue.putAll(Arrays.asList(4, 5));
|
||||
assertEquals(5, queue.size());
|
||||
content.clear();
|
||||
while (queue.size() > 0) {
|
||||
content.add(queue.take());
|
||||
}
|
||||
assertThat(content, contains(4, 3, 2, 1, 5));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPriorityAndFIFO() throws Exception {
|
||||
final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
|
||||
final List<Integer> content = new LinkedList<>();
|
||||
queue.putAll(Arrays.asList(4, 3, 2, 1));
|
||||
assertEquals(4, queue.size());
|
||||
queue.putFirst(0);
|
||||
assertEquals(5, queue.size());
|
||||
drainTo(queue, content);
|
||||
assertThat("putFirst items should jump the queue, preserving existing order",
|
||||
content, contains(0, 4, 3, 2, 1));
|
||||
|
||||
queue.clear();
|
||||
content.clear();
|
||||
queue.putAll(Arrays.asList(4, 3, 2, 1));
|
||||
queue.putFirst(1);
|
||||
assertEquals(4, queue.size());
|
||||
drainTo(queue, content);
|
||||
assertThat("existing items re-added with putFirst should jump the queue",
|
||||
content, contains(1, 4, 3, 2));
|
||||
|
||||
queue.clear();
|
||||
content.clear();
|
||||
queue.putAll(Arrays.asList(4, 3, 2, 1));
|
||||
queue.putAllFirst(Arrays.asList(2, 3));
|
||||
assertEquals(4, queue.size());
|
||||
drainTo(queue, content);
|
||||
assertThat(
|
||||
"existing items re-added with putAllFirst jump the queue AND honor changes in priority",
|
||||
content, contains(2, 3, 4, 1));
|
||||
}
|
||||
|
||||
private enum Action {
|
||||
PUT,
|
||||
PUT_FIRST,
|
||||
PUT_ALL,
|
||||
PUT_ALL_FIRST,
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the uniqueness constraint is honored in the face of concurrent modification.
|
||||
*/
|
||||
@Test
|
||||
public void testConcurrentPut() throws Exception {
|
||||
final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
|
||||
final int maxValue = 100;
|
||||
final Runnable producer = () -> {
|
||||
final Random rand = ThreadLocalRandom.current();
|
||||
for (int i = 0; i < 1_000; i++) {
|
||||
final Action action = Action.values()[rand.nextInt(Action.values().length)];
|
||||
switch (action) {
|
||||
case PUT: {
|
||||
final int val = rand.nextInt(maxValue);
|
||||
queue.put(val);
|
||||
break;
|
||||
}
|
||||
case PUT_FIRST: {
|
||||
final int val = rand.nextInt(maxValue);
|
||||
queue.putFirst(val);
|
||||
break;
|
||||
}
|
||||
case PUT_ALL: {
|
||||
final List<Integer> vals = rand.ints(5, 0, maxValue)
|
||||
.boxed()
|
||||
.collect(Collectors.toList());
|
||||
queue.putAll(vals);
|
||||
break;
|
||||
}
|
||||
case PUT_ALL_FIRST: {
|
||||
final List<Integer> vals = rand.ints(5, 0, maxValue)
|
||||
.boxed()
|
||||
.collect(Collectors.toList());
|
||||
queue.putAllFirst(vals);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
fail("Unrecognized action " + action);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
final int numThreads = 5;
|
||||
final CompletableFuture<?>[] futures = IntStream.range(0, numThreads)
|
||||
.mapToObj(val -> CompletableFuture.runAsync(producer))
|
||||
.toArray(CompletableFuture<?>[]::new);
|
||||
CompletableFuture.allOf(futures).join();
|
||||
|
||||
final List<Integer> content = new ArrayList<>(queue.size());
|
||||
drainTo(queue, content);
|
||||
assertThat("at most `maxValue` items should be present.",
|
||||
content.size(), lessThanOrEqualTo(maxValue));
|
||||
assertEquals("all items should be unique.", content.size(), new HashSet<>(content).size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that calls to {@link RegionNormalizerWorkQueue#take()} block the requesting thread. The
|
||||
* producing thread places new entries onto the queue following a known schedule. The consuming
|
||||
* thread collects a time measurement between calls to {@code take}. Finally, the test makes
|
||||
* coarse-grained assertions of the consumer's observations based on the producer's schedule.
|
||||
*/
|
||||
@Test
|
||||
public void testTake() throws Exception {
|
||||
final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
|
||||
final ConcurrentLinkedQueue<Long> takeTimes = new ConcurrentLinkedQueue<>();
|
||||
final AtomicBoolean finished = new AtomicBoolean(false);
|
||||
final Runnable consumer = () -> {
|
||||
try {
|
||||
while (!finished.get()) {
|
||||
queue.take();
|
||||
takeTimes.add(System.nanoTime());
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
fail("interrupted.");
|
||||
}
|
||||
};
|
||||
|
||||
CompletableFuture<Void> worker = CompletableFuture.runAsync(consumer);
|
||||
final long testStart = System.nanoTime();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
Thread.sleep(10);
|
||||
queue.put(i);
|
||||
}
|
||||
|
||||
// set finished = true and pipe one more value in case the thread needs an extra pass through
|
||||
// the loop.
|
||||
finished.set(true);
|
||||
queue.put(1);
|
||||
worker.get(1, TimeUnit.SECONDS);
|
||||
|
||||
final Iterator<Long> times = takeTimes.iterator();
|
||||
assertTrue("should have timing information for at least 2 calls to take.",
|
||||
takeTimes.size() >= 5);
|
||||
for (int i = 0; i < 5; i++) {
|
||||
assertThat(
|
||||
"Observations collected in takeTimes should increase by roughly 10ms every interval",
|
||||
times.next(), greaterThan(testStart + TimeUnit.MILLISECONDS.toNanos(i * 10)));
|
||||
}
|
||||
}
|
||||
|
||||
private static <E> void drainTo(final RegionNormalizerWorkQueue<E> queue, Collection<E> dest)
|
||||
throws InterruptedException {
|
||||
assertThat(queue.size(), greaterThan(0));
|
||||
while (queue.size() > 0) {
|
||||
dest.add(queue.take());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,252 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.comparesEqualTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.Mockito.when;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNameTestRule;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.hamcrest.Description;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.hamcrest.StringDescription;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.mockito.Answers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.junit.MockitoJUnit;
|
||||
import org.mockito.junit.MockitoRule;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* A test over {@link RegionNormalizerWorker}. Being a background thread, the only points of
|
||||
* interaction we have to this class are its input source ({@link RegionNormalizerWorkQueue} and
|
||||
* its callbacks invoked against {@link RegionNormalizer} and {@link MasterServices}. The work
|
||||
* queue is simple enough to use directly; for {@link MasterServices}, use a mock because, as of
|
||||
* now, the worker only invokes 4 methods.
|
||||
*/
|
||||
@Category({ MasterTests.class, SmallTests.class})
|
||||
public class TestRegionNormalizerWorker {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRegionNormalizerWorker.class);
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
@Rule
|
||||
public TableNameTestRule tableName = new TableNameTestRule();
|
||||
|
||||
@Rule
|
||||
public MockitoRule mockitoRule = MockitoJUnit.rule();
|
||||
|
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
|
||||
private MasterServices masterServices;
|
||||
@Mock
|
||||
private RegionNormalizer regionNormalizer;
|
||||
|
||||
private HBaseCommonTestingUtility testingUtility;
|
||||
private RegionNormalizerWorkQueue<TableName> queue;
|
||||
private ExecutorService workerPool;
|
||||
|
||||
private final AtomicReference<Throwable> workerThreadThrowable = new AtomicReference<>();
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
when(masterServices.skipRegionManagementAction(any())).thenReturn(false);
|
||||
testingUtility = new HBaseCommonTestingUtility();
|
||||
queue = new RegionNormalizerWorkQueue<>();
|
||||
workerThreadThrowable.set(null);
|
||||
|
||||
final String threadNameFmt =
|
||||
TestRegionNormalizerWorker.class.getSimpleName() + "-" + testName.getMethodName() + "-%d";
|
||||
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||
.setNameFormat(threadNameFmt)
|
||||
.setDaemon(true)
|
||||
.setUncaughtExceptionHandler((t, e) -> workerThreadThrowable.set(e))
|
||||
.build();
|
||||
workerPool = Executors.newSingleThreadExecutor(threadFactory);
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws Exception {
|
||||
workerPool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
|
||||
assertTrue("timeout waiting for worker thread to terminate",
|
||||
workerPool.awaitTermination(30, TimeUnit.SECONDS));
|
||||
final Throwable workerThrowable = workerThreadThrowable.get();
|
||||
assertThat("worker thread threw unexpected exception", workerThrowable, nullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeCounter() throws Exception {
|
||||
final TableName tn = tableName.getTableName();
|
||||
final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
|
||||
.setNormalizationEnabled(true)
|
||||
.build();
|
||||
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
|
||||
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
|
||||
.thenReturn(1L);
|
||||
when(regionNormalizer.computePlansForTable(tn))
|
||||
.thenReturn(singletonList(new MergeNormalizationPlan.Builder()
|
||||
.addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10)
|
||||
.addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20)
|
||||
.build()));
|
||||
|
||||
final RegionNormalizerWorker worker = new RegionNormalizerWorker(
|
||||
testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
|
||||
final long beforeMergePlanCount = worker.getMergePlanCount();
|
||||
workerPool.submit(worker);
|
||||
queue.put(tn);
|
||||
|
||||
assertThatEventually("executing work should see plan count increase",
|
||||
worker::getMergePlanCount, greaterThan(beforeMergePlanCount));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitCounter() throws Exception {
|
||||
final TableName tn = tableName.getTableName();
|
||||
final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
|
||||
.setNormalizationEnabled(true)
|
||||
.build();
|
||||
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
|
||||
when(masterServices.splitRegion(any(), any(), anyLong(), anyLong()))
|
||||
.thenReturn(1L);
|
||||
when(regionNormalizer.computePlansForTable(tn))
|
||||
.thenReturn(singletonList(
|
||||
new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10)));
|
||||
|
||||
final RegionNormalizerWorker worker = new RegionNormalizerWorker(
|
||||
testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
|
||||
final long beforeSplitPlanCount = worker.getSplitPlanCount();
|
||||
workerPool.submit(worker);
|
||||
queue.put(tn);
|
||||
|
||||
assertThatEventually("executing work should see plan count increase",
|
||||
worker::getSplitPlanCount, greaterThan(beforeSplitPlanCount));
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that a rate limit is honored, at least in a rough way. Maintainers should manually
|
||||
* inspect the log messages emitted by the worker thread to confirm that expected behavior.
|
||||
*/
|
||||
@Test
|
||||
public void testRateLimit() throws Exception {
|
||||
final TableName tn = tableName.getTableName();
|
||||
final TableDescriptor tnDescriptor = TableDescriptorBuilder.newBuilder(tn)
|
||||
.setNormalizationEnabled(true)
|
||||
.build();
|
||||
final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
|
||||
final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
|
||||
final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
|
||||
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
|
||||
when(masterServices.splitRegion(any(), any(), anyLong(), anyLong()))
|
||||
.thenReturn(1L);
|
||||
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
|
||||
.thenReturn(1L);
|
||||
when(regionNormalizer.computePlansForTable(tn))
|
||||
.thenReturn(Arrays.asList(
|
||||
new SplitNormalizationPlan(splitRegionInfo, 2),
|
||||
new MergeNormalizationPlan.Builder()
|
||||
.addTarget(mergeRegionInfo1, 1)
|
||||
.addTarget(mergeRegionInfo2, 2)
|
||||
.build(),
|
||||
new SplitNormalizationPlan(splitRegionInfo, 1)));
|
||||
|
||||
final Configuration conf = testingUtility.getConfiguration();
|
||||
conf.set("hbase.normalizer.throughput.max_bytes_per_sec", "1m");
|
||||
final RegionNormalizerWorker worker = new RegionNormalizerWorker(
|
||||
testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
|
||||
workerPool.submit(worker);
|
||||
final long startTime = System.nanoTime();
|
||||
queue.put(tn);
|
||||
|
||||
assertThatEventually("executing work should see split plan count increase",
|
||||
worker::getSplitPlanCount, comparesEqualTo(2L));
|
||||
assertThatEventually("executing work should see merge plan count increase",
|
||||
worker::getMergePlanCount, comparesEqualTo(1L));
|
||||
|
||||
final long endTime = System.nanoTime();
|
||||
assertThat("rate limited normalizer should have taken at least 5 seconds",
|
||||
Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier}
|
||||
* until the matcher succeeds or the timeout period of 30 seconds is exhausted.
|
||||
*/
|
||||
private <T> void assertThatEventually(
|
||||
final String reason,
|
||||
final Supplier<? extends T> actualSupplier,
|
||||
final Matcher<? super T> matcher
|
||||
) throws Exception {
|
||||
testingUtility.waitFor(TimeUnit.SECONDS.toMillis(30),
|
||||
new Waiter.ExplainingPredicate<Exception>() {
|
||||
private T lastValue = null;
|
||||
|
||||
@Override
|
||||
public String explainFailure() {
|
||||
final Description description = new StringDescription()
|
||||
.appendText(reason)
|
||||
.appendText("\nExpected: ")
|
||||
.appendDescriptionOf(matcher)
|
||||
.appendText("\n but: ");
|
||||
matcher.describeMismatch(lastValue, description);
|
||||
return description.toString();
|
||||
}
|
||||
|
||||
@Override public boolean evaluate() {
|
||||
lastValue = actualSupplier.get();
|
||||
return matcher.matches(lastValue);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -175,8 +175,12 @@ public class TestSimpleRegionNormalizer {
|
|||
createRegionSizesMap(regionInfos, 15, 5, 5, 15, 16);
|
||||
setupMocksForNormalizer(regionSizes, regionInfos);
|
||||
|
||||
assertThat(normalizer.computePlansForTable(tableName), contains(
|
||||
new MergeNormalizationPlan(regionInfos.get(1), regionInfos.get(2))));
|
||||
assertThat(
|
||||
normalizer.computePlansForTable(tableName),
|
||||
contains(new MergeNormalizationPlan.Builder()
|
||||
.addTarget(regionInfos.get(1), 5)
|
||||
.addTarget(regionInfos.get(2), 5)
|
||||
.build()));
|
||||
}
|
||||
|
||||
// Test for situation illustrated in HBASE-14867
|
||||
|
@ -188,9 +192,12 @@ public class TestSimpleRegionNormalizer {
|
|||
createRegionSizesMap(regionInfos, 1, 10000, 10000, 10000, 2700, 2700);
|
||||
setupMocksForNormalizer(regionSizes, regionInfos);
|
||||
|
||||
assertThat(normalizer.computePlansForTable(tableName), contains(
|
||||
new MergeNormalizationPlan(regionInfos.get(4), regionInfos.get(5))
|
||||
));
|
||||
assertThat(
|
||||
normalizer.computePlansForTable(tableName),
|
||||
contains(new MergeNormalizationPlan.Builder()
|
||||
.addTarget(regionInfos.get(4), 2700)
|
||||
.addTarget(regionInfos.get(5), 2700)
|
||||
.build()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -214,7 +221,7 @@ public class TestSimpleRegionNormalizer {
|
|||
setupMocksForNormalizer(regionSizes, regionInfos);
|
||||
|
||||
assertThat(normalizer.computePlansForTable(tableName), contains(
|
||||
new SplitNormalizationPlan(regionInfos.get(3))));
|
||||
new SplitNormalizationPlan(regionInfos.get(3), 30)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -229,18 +236,26 @@ public class TestSimpleRegionNormalizer {
|
|||
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
|
||||
.thenReturn(20L);
|
||||
assertThat(normalizer.computePlansForTable(tableName), contains(
|
||||
new SplitNormalizationPlan(regionInfos.get(2)),
|
||||
new SplitNormalizationPlan(regionInfos.get(3)),
|
||||
new SplitNormalizationPlan(regionInfos.get(4)),
|
||||
new SplitNormalizationPlan(regionInfos.get(5))
|
||||
new SplitNormalizationPlan(regionInfos.get(2), 60),
|
||||
new SplitNormalizationPlan(regionInfos.get(3), 80),
|
||||
new SplitNormalizationPlan(regionInfos.get(4), 100),
|
||||
new SplitNormalizationPlan(regionInfos.get(5), 120)
|
||||
));
|
||||
|
||||
// test when target region size is 200
|
||||
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
|
||||
.thenReturn(200L);
|
||||
assertThat(normalizer.computePlansForTable(tableName), contains(
|
||||
new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1)),
|
||||
new MergeNormalizationPlan(regionInfos.get(2), regionInfos.get(3))));
|
||||
assertThat(
|
||||
normalizer.computePlansForTable(tableName),
|
||||
contains(
|
||||
new MergeNormalizationPlan.Builder()
|
||||
.addTarget(regionInfos.get(0), 20)
|
||||
.addTarget(regionInfos.get(1), 40)
|
||||
.build(),
|
||||
new MergeNormalizationPlan.Builder()
|
||||
.addTarget(regionInfos.get(2), 60)
|
||||
.addTarget(regionInfos.get(3), 80)
|
||||
.build()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -255,14 +270,18 @@ public class TestSimpleRegionNormalizer {
|
|||
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
|
||||
.thenReturn(8);
|
||||
assertThat(normalizer.computePlansForTable(tableName), contains(
|
||||
new SplitNormalizationPlan(regionInfos.get(2)),
|
||||
new SplitNormalizationPlan(regionInfos.get(3))));
|
||||
new SplitNormalizationPlan(regionInfos.get(2), 60),
|
||||
new SplitNormalizationPlan(regionInfos.get(3), 80)));
|
||||
|
||||
// test when target region count is 3
|
||||
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
|
||||
.thenReturn(3);
|
||||
assertThat(normalizer.computePlansForTable(tableName), contains(
|
||||
new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1))));
|
||||
assertThat(
|
||||
normalizer.computePlansForTable(tableName),
|
||||
contains(new MergeNormalizationPlan.Builder()
|
||||
.addTarget(regionInfos.get(0), 20)
|
||||
.addTarget(regionInfos.get(1), 40)
|
||||
.build()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -312,14 +331,17 @@ public class TestSimpleRegionNormalizer {
|
|||
|
||||
List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
|
||||
assertThat(plans, contains(
|
||||
new SplitNormalizationPlan(regionInfos.get(2)),
|
||||
new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1))));
|
||||
new SplitNormalizationPlan(regionInfos.get(2), 10),
|
||||
new MergeNormalizationPlan.Builder()
|
||||
.addTarget(regionInfos.get(0), 1)
|
||||
.addTarget(regionInfos.get(1), 1)
|
||||
.build()));
|
||||
|
||||
// have to call setupMocks again because we don't have dynamic config update on normalizer.
|
||||
conf.setInt(MIN_REGION_COUNT_KEY, 4);
|
||||
setupMocksForNormalizer(regionSizes, regionInfos);
|
||||
assertThat(normalizer.computePlansForTable(tableName), contains(
|
||||
new SplitNormalizationPlan(regionInfos.get(2))));
|
||||
new SplitNormalizationPlan(regionInfos.get(2), 10)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -356,8 +378,12 @@ public class TestSimpleRegionNormalizer {
|
|||
|
||||
assertFalse(normalizer.isSplitEnabled());
|
||||
assertEquals(1, normalizer.getMergeMinRegionSizeMb());
|
||||
assertThat(normalizer.computePlansForTable(tableName), contains(
|
||||
new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1))));
|
||||
assertThat(
|
||||
normalizer.computePlansForTable(tableName),
|
||||
contains(new MergeNormalizationPlan.Builder()
|
||||
.addTarget(regionInfos.get(0), 1)
|
||||
.addTarget(regionInfos.get(1), 2)
|
||||
.build()));
|
||||
|
||||
conf.setInt(MERGE_MIN_REGION_SIZE_MB_KEY, 3);
|
||||
setupMocksForNormalizer(regionSizes, regionInfos);
|
||||
|
@ -378,9 +404,18 @@ public class TestSimpleRegionNormalizer {
|
|||
assertFalse(normalizer.isSplitEnabled());
|
||||
assertEquals(0, normalizer.getMergeMinRegionSizeMb());
|
||||
assertThat(normalizer.computePlansForTable(tableName), contains(
|
||||
new MergeNormalizationPlan(regionInfos.get(0), regionInfos.get(1)),
|
||||
new MergeNormalizationPlan(regionInfos.get(2), regionInfos.get(3)),
|
||||
new MergeNormalizationPlan(regionInfos.get(5), regionInfos.get(6))));
|
||||
new MergeNormalizationPlan.Builder()
|
||||
.addTarget(regionInfos.get(0), 0)
|
||||
.addTarget(regionInfos.get(1), 1)
|
||||
.build(),
|
||||
new MergeNormalizationPlan.Builder()
|
||||
.addTarget(regionInfos.get(2), 10)
|
||||
.addTarget(regionInfos.get(3), 0)
|
||||
.build(),
|
||||
new MergeNormalizationPlan.Builder()
|
||||
.addTarget(regionInfos.get(5), 10)
|
||||
.addTarget(regionInfos.get(6), 0)
|
||||
.build()));
|
||||
}
|
||||
|
||||
// This test is to make sure that normalizer is only going to merge adjacent regions.
|
||||
|
|
|
@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
|
@ -161,6 +160,7 @@ public class TestSimpleRegionNormalizerOnCluster {
|
|||
tn2 + " should not have split.",
|
||||
tn2RegionCount,
|
||||
getRegionCount(tn2));
|
||||
LOG.debug("waiting for t3 to settle...");
|
||||
waitForTableRegionCount(tn3, tn3RegionCount);
|
||||
} finally {
|
||||
dropIfExists(tn1);
|
||||
|
@ -187,7 +187,7 @@ public class TestSimpleRegionNormalizerOnCluster {
|
|||
: TableName.valueOf(name.getMethodName());
|
||||
|
||||
final int currentRegionCount = createTableBegsSplit(tableName, true, false);
|
||||
final long existingSkippedSplitCount = master.getRegionNormalizer()
|
||||
final long existingSkippedSplitCount = master.getRegionNormalizerManager()
|
||||
.getSkippedCount(PlanType.SPLIT);
|
||||
assertFalse(admin.normalizerSwitch(true));
|
||||
assertTrue(admin.normalize());
|
||||
|
@ -332,7 +332,8 @@ public class TestSimpleRegionNormalizerOnCluster {
|
|||
return "waiting to observe split attempt and skipped.";
|
||||
}
|
||||
@Override public boolean evaluate() {
|
||||
final long skippedSplitCount = master.getRegionNormalizer().getSkippedCount(PlanType.SPLIT);
|
||||
final long skippedSplitCount = master.getRegionNormalizerManager()
|
||||
.getSkippedCount(PlanType.SPLIT);
|
||||
return skippedSplitCount > existingSkippedSplitCount;
|
||||
}
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue