HBASE-24418 Consolidate Normalizer implementations

Simplify our Normalizer story to have just a single, configurable
implementation.

* fold the features of `MergeNormalizer` into
  `SimpleRegionNormalizer`, removing the intermediate abstract class.
* configuration keys for merge-only features now share a common
  structure.
* add configuration to selectively disable normalizer split/merge
  operations.
* `RegionNormalizer` now extends `Configurable` instead of creating a
  new instance of `HBaseConfiguration` or snooping one off of other
  fields.
* avoid the extra RPCs by using `MasterServices` instead of
  `MasterRpcServices`.
* boost test coverage of all the various flags and feature
  combinations.

Signed-off-by: Michael Stack <stack@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: huaxiangsun <huaxiangsun@apache.org>
This commit is contained in:
Nick Dimiduk 2020-05-20 17:15:12 -07:00 committed by Nick Dimiduk
parent 5c390f0800
commit 4884773676
10 changed files with 1125 additions and 1426 deletions

View File

@ -605,30 +605,54 @@ possible configurations would overwhelm and obscure the important.
<name>hbase.balancer.period <name>hbase.balancer.period
</name> </name>
<value>300000</value> <value>300000</value>
<description>Period at which the region balancer runs in the Master.</description> <description>Period at which the region balancer runs in the Master, in
</property> milliseconds.</description>
<property>
<name>hbase.normalizer.period</name>
<value>300000</value>
<description>Period at which the region normalizer runs in the Master.</description>
</property>
<property>
<name>hbase.normalizer.min.region.count</name>
<value>3</value>
<description>configure the minimum number of regions</description>
</property>
<property>
<name>hbase.normalizer.min.region.merge.age</name>
<value>3</value>
<description>configure the minimum age in days for region before it is considered for merge while
normalizing</description>
</property> </property>
<property> <property>
<name>hbase.regions.slop</name> <name>hbase.regions.slop</name>
<value>0.001</value> <value>0.001</value>
<description>Rebalance if any regionserver has average + (average * slop) regions. <description>Rebalance if any regionserver has average + (average * slop) regions.
The default value of this parameter is 0.001 in StochasticLoadBalancer (the default load balancer), The default value of this parameter is 0.001 in StochasticLoadBalancer (the default load
while the default is 0.2 in other load balancers (i.e., SimpleLoadBalancer).</description> balancer), while the default is 0.2 in other load balancers (i.e.,
SimpleLoadBalancer).</description>
</property>
<property>
<name>hbase.normalizer.period</name>
<value>300000</value>
<description>Period at which the region normalizer runs in the Master, in
milliseconds.</description>
</property>
<property>
<name>hbase.normalizer.split.enabled</name>
<value>true</value>
<description>Whether to split a region as part of normalization.</description>
</property>
<property>
<name>hbase.normalizer.merge.enabled</name>
<value>true</value>
<description>Whether to merge a region as part of normalization.</description>
</property>
<property>
<name>hbase.normalizer.min.region.count</name>
<value>3</value>
<description>The minimum number of regions in a table to consider it for merge
normalization.</description>
</property>
<property>
<name>hbase.normalizer.merge.min_region_age.days</name>
<value>3</value>
<description>The minimum age for a region to be considered for a merge, in days.</description>
</property>
<property>
<name>hbase.normalizer.merge.min_region_age.days</name>
<value>3</value>
<description>The minimum age for a region to be considered for a merge, in days.</description>
</property>
<property>
<name>hbase.normalizer.merge.min_region_size.mb</name>
<value>1</value>
<description>The minimum size for a region to be considered for a merge, in whole
MBs.</description>
</property> </property>
<property> <property>
<name>hbase.server.thread.wakefrequency</name> <name>hbase.server.thread.wakefrequency</name>

View File

@ -49,6 +49,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.servlet.ServletException; import javax.servlet.ServletException;
@ -387,6 +388,8 @@ public class HMaster extends HRegionServer implements MasterServices {
private final LockManager lockManager = new LockManager(this); private final LockManager lockManager = new LockManager(this);
private LoadBalancer balancer; private LoadBalancer balancer;
// a lock to prevent concurrent normalization actions.
private final ReentrantLock normalizationInProgressLock = new ReentrantLock();
private RegionNormalizer normalizer; private RegionNormalizer normalizer;
private BalancerChore balancerChore; private BalancerChore balancerChore;
private RegionNormalizerChore normalizerChore; private RegionNormalizerChore normalizerChore;
@ -787,10 +790,11 @@ public class HMaster extends HRegionServer implements MasterServices {
this.balancer = LoadBalancerFactory.getLoadBalancer(conf); this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf); this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
this.normalizer.setMasterServices(this); this.normalizer.setMasterServices(this);
this.normalizer.setMasterRpcServices((MasterRpcServices)rpcServices);
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this); this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start(); this.loadBalancerTracker.start();
this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
this.normalizer.setMasterServices(this);
this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this); this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
this.regionNormalizerTracker.start(); this.regionNormalizerTracker.start();
@ -1857,7 +1861,6 @@ public class HMaster extends HRegionServer implements MasterServices {
} }
@Override @Override
@VisibleForTesting
public RegionNormalizer getRegionNormalizer() { public RegionNormalizer getRegionNormalizer() {
return this.normalizer; return this.normalizer;
} }
@ -1865,9 +1868,9 @@ public class HMaster extends HRegionServer implements MasterServices {
/** /**
* Perform normalization of cluster (invoked by {@link RegionNormalizerChore}). * Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
* *
* @return true if normalization step was performed successfully, false otherwise * @return true if an existing normalization was already in progress, or if a new normalization
* (specifically, if HMaster hasn't been initialized properly or normalization * was performed successfully; false otherwise (specifically, if HMaster finished initializing
* is globally disabled) * or normalization is globally disabled).
*/ */
public boolean normalizeRegions() throws IOException { public boolean normalizeRegions() throws IOException {
if (regionNormalizerTracker == null || !regionNormalizerTracker.isNormalizerOn()) { if (regionNormalizerTracker == null || !regionNormalizerTracker.isNormalizerOn()) {
@ -1881,20 +1884,26 @@ public class HMaster extends HRegionServer implements MasterServices {
return false; return false;
} }
synchronized (this.normalizer) { if (!normalizationInProgressLock.tryLock()) {
// Don't run the normalizer concurrently // Don't run the normalizer concurrently
LOG.info("Normalization already in progress. Skipping request.");
return true;
}
List<TableName> allEnabledTables = new ArrayList<>( try {
this.tableStateManager.getTablesInStates(TableState.State.ENABLED)); final List<TableName> allEnabledTables = new ArrayList<>(
tableStateManager.getTablesInStates(TableState.State.ENABLED));
Collections.shuffle(allEnabledTables); Collections.shuffle(allEnabledTables);
try (final Admin admin = clusterConnection.getAdmin()) {
for (TableName table : allEnabledTables) { for (TableName table : allEnabledTables) {
TableDescriptor tblDesc = getTableDescriptors().get(table); if (table.isSystemTable()) {
if (table.isSystemTable() || (tblDesc != null && continue;
!tblDesc.isNormalizationEnabled())) { }
LOG.trace("Skipping normalization for {}, as it's either system" final TableDescriptor tblDesc = getTableDescriptors().get(table);
+ " table or doesn't have auto normalization turned on", table); if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
LOG.debug("Skipping table {} because normalization is disabled in its"
+ " table properties.", table);
continue; continue;
} }
@ -1903,12 +1912,14 @@ public class HMaster extends HRegionServer implements MasterServices {
return false; return false;
} }
final List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table); final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
if (CollectionUtils.isEmpty(plans)) { if (CollectionUtils.isEmpty(plans)) {
return true; LOG.debug("No normalization required for table {}.", table);
continue;
} }
try (final Admin admin = clusterConnection.getAdmin()) { // as of this writing, `plan.execute()` is non-blocking, so there's no artificial rate-
// limiting of merge requests due to this serial loop.
for (NormalizationPlan plan : plans) { for (NormalizationPlan plan : plans) {
plan.execute(admin); plan.execute(admin);
if (plan.getType() == PlanType.SPLIT) { if (plan.getType() == PlanType.SPLIT) {
@ -1919,9 +1930,9 @@ public class HMaster extends HRegionServer implements MasterServices {
} }
} }
} }
} finally {
normalizationInProgressLock.unlock();
} }
// If Region did not generate any plans, it means the cluster is already balanced.
// Return true indicating a success.
return true; return true;
} }

View File

@ -1,249 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@InterfaceAudience.Private
public abstract class AbstractRegionNormalizer implements RegionNormalizer {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRegionNormalizer.class);
public static final String HBASE_REGION_NORMALIZER_MIN_REGION_COUNT_KEY =
"hbase.normalizer.min.region.count";
public static final int HBASE_REGION_NORMALIZER_MIN_REGION_COUNT_DEFAULT = 3;
protected MasterServices masterServices;
protected MasterRpcServices masterRpcServices;
protected int minRegionCount;
/**
* Set the master service.
* @param masterServices inject instance of MasterServices
*/
@Override
public void setMasterServices(MasterServices masterServices) {
this.masterServices = masterServices;
minRegionCount = masterServices.getConfiguration().getInt(
HBASE_REGION_NORMALIZER_MIN_REGION_COUNT_KEY,
HBASE_REGION_NORMALIZER_MIN_REGION_COUNT_DEFAULT);
}
@Override
public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
this.masterRpcServices = masterRpcServices;
}
/**
* @param hri regioninfo
* @return size of region in MB and if region is not found than -1
*/
protected long getRegionSize(RegionInfo hri) {
ServerName sn =
masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
RegionMetrics regionLoad =
masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
if (regionLoad == null) {
LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
return -1;
}
return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
}
protected boolean isMergeEnabled() {
boolean mergeEnabled = true;
try {
mergeEnabled = masterRpcServices
.isSplitOrMergeEnabled(null,
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE))
.getEnabled();
} catch (ServiceException e) {
LOG.warn("Unable to determine whether merge is enabled", e);
}
return mergeEnabled;
}
protected boolean isSplitEnabled() {
boolean splitEnabled = true;
try {
splitEnabled = masterRpcServices
.isSplitOrMergeEnabled(null,
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT))
.getEnabled();
} catch (ServiceException se) {
LOG.warn("Unable to determine whether split is enabled", se);
}
return splitEnabled;
}
/**
* @param tableRegions regions of table to normalize
* @return average region size depending on
* @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
* Also make sure tableRegions contains regions of the same table
*/
protected double getAverageRegionSize(List<RegionInfo> tableRegions) {
long totalSizeMb = 0;
int actualRegionCnt = 0;
for (RegionInfo hri : tableRegions) {
long regionSize = getRegionSize(hri);
// don't consider regions that are in bytes for averaging the size.
if (regionSize > 0) {
actualRegionCnt++;
totalSizeMb += regionSize;
}
}
TableName table = tableRegions.get(0).getTable();
int targetRegionCount = -1;
long targetRegionSize = -1;
try {
TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
if (tableDescriptor != null) {
targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
LOG.debug("Table {}: target region count is {}, target region size is {}", table,
targetRegionCount, targetRegionSize);
}
} catch (IOException e) {
LOG.warn(
"cannot get the target number and target size of table {}, they will be default value -1.",
table, e);
}
double avgRegionSize;
if (targetRegionSize > 0) {
avgRegionSize = targetRegionSize;
} else if (targetRegionCount > 0) {
avgRegionSize = totalSizeMb / (double) targetRegionCount;
} else {
avgRegionSize = actualRegionCnt == 0 ? 0 : totalSizeMb / (double) actualRegionCnt;
}
LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,
totalSizeMb, avgRegionSize);
return avgRegionSize;
}
/**
* Determine if a region in {@link RegionState} should be considered for a merge operation.
*/
private static boolean skipForMerge(final RegionState state) {
return state == null || !Objects.equals(state.getState(), RegionState.State.OPEN);
}
/**
* Computes the merge plans that should be executed for this table to converge average region
* towards target average or target region count
* @param table table to normalize
* @return list of merge normalization plans
*/
protected List<NormalizationPlan> getMergeNormalizationPlan(TableName table) {
final RegionStates regionStates = masterServices.getAssignmentManager().getRegionStates();
final List<RegionInfo> tableRegions = regionStates.getRegionsOfTable(table);
final double avgRegionSize = getAverageRegionSize(tableRegions);
LOG.debug("Table {}, average region size: {}. Computing normalization plan for table: {}, "
+ "number of regions: {}",
table, avgRegionSize, table, tableRegions.size());
// The list of regionInfo from getRegionsOfTable() is ordered by regionName.
// regionName does not necessary guarantee the order by STARTKEY (let's say 'aa1', 'aa1!',
// in order by regionName, it will be 'aa1!' followed by 'aa1').
// This could result in normalizer merging non-adjacent regions into one and creates overlaps.
// In order to avoid that, sort the list by RegionInfo.COMPARATOR.
tableRegions.sort(RegionInfo.COMPARATOR);
final List<NormalizationPlan> plans = new ArrayList<>();
for (int candidateIdx = 0; candidateIdx < tableRegions.size() - 1; candidateIdx++) {
final RegionInfo hri = tableRegions.get(candidateIdx);
final RegionInfo hri2 = tableRegions.get(candidateIdx + 1);
if (skipForMerge(regionStates.getRegionState(hri))) {
continue;
}
if (skipForMerge(regionStates.getRegionState(hri2))) {
continue;
}
final long regionSize = getRegionSize(hri);
final long regionSize2 = getRegionSize(hri2);
if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
// at least one of the two regions should be older than MIN_REGION_DURATION days
plans.add(new MergeNormalizationPlan(hri, hri2));
candidateIdx++;
} else {
LOG.debug("Skipping region {} of table {} with size {}", hri.getRegionNameAsString(), table,
regionSize);
}
}
return plans;
}
/**
* Determine if a region in {@link RegionState} should be considered for a split operation.
*/
private static boolean skipForSplit(final RegionState state) {
return state == null || !Objects.equals(state.getState(), RegionState.State.OPEN);
}
/**
* Computes the split plans that should be executed for this table to converge average region size
* towards target average or target region count
* @param table table to normalize
* @return list of split normalization plans
*/
protected List<NormalizationPlan> getSplitNormalizationPlan(TableName table) {
final RegionStates regionStates = masterServices.getAssignmentManager().getRegionStates();
final List<RegionInfo> tableRegions = regionStates.getRegionsOfTable(table);
final double avgRegionSize = getAverageRegionSize(tableRegions);
LOG.debug("Table {}, average region size: {}", table, avgRegionSize);
final List<NormalizationPlan> plans = new ArrayList<>();
for (final RegionInfo hri : tableRegions) {
if (skipForSplit(regionStates.getRegionState(hri))) {
continue;
}
long regionSize = getRegionSize(hri);
// if the region is > 2 times larger than average, we split it, split
// is more high priority normalization action than merge.
if (regionSize > 2 * avgRegionSize) {
LOG.info("Table {}, large region {} has size {}, more than twice avg size {}, splitting",
table, hri.getRegionNameAsString(), regionSize, avgRegionSize);
plans.add(new SplitNormalizationPlan(hri, null));
}
}
return plans;
}
}

View File

@ -1,143 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of MergeNormalizer Logic in use:
* <ol>
* <li>get all regions of a given table
* <li>get avg size S of each region (by total size of store files reported in RegionLoad)
* <li>two regions R1 and its neighbour R2 are merged, if R1 + R2 &lt; S, and all such regions are
* returned to be merged
* <li>Otherwise, no action is performed
* </ol>
* <p>
* Considering the split policy takes care of splitting region we also want a way to merge when
* regions are too small. It is little different than what
* {@link org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer} does. Instead of doing
* splits and merge both to achieve average region size in cluster for a table. We only merge
* regions(older than defined age) and rely on Split policy for region splits. The goal of this
* normalizer is to merge small regions to make size of regions close to average size (which is
* either average size or depends on either target region size or count in that order). Consider
* region with size 1,2,3,4,10,10,10,5,4,3. If minimum merge age is set to 0 days this algorithm
* will find the average size as 7.2 assuming we haven't provided target region count or size. Now
* we will find all those adjacent region which if merged doesn't exceed the average size. so we
* will merge 1-2, 3-4, 4,3 in our first run. To get best results from this normalizer theoretically
* we should set target region size between 0.5 to 0.75 of configured maximum file size. If we set
* min merge age as 3 we create plan as above and see if we have a plan which has both regions as
* new(age less than 3) we discard such plans and we consider the regions even if one of the region
* is old enough to be merged.
* </p>
*/
@InterfaceAudience.Private
public class MergeNormalizer extends AbstractRegionNormalizer {
private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);
private int minRegionAge;
private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
@Override
public void setMasterServices(MasterServices masterServices) {
super.setMasterServices(masterServices);
Configuration conf = masterServices.getConfiguration();
minRegionAge = conf.getInt("hbase.normalizer.min.region.merge.age", 3);
}
@Override
public void planSkipped(RegionInfo hri, NormalizationPlan.PlanType type) {
skippedCount[type.ordinal()]++;
}
@Override
public long getSkippedCount(NormalizationPlan.PlanType type) {
return skippedCount[type.ordinal()];
}
@Override
public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
List<NormalizationPlan> plans = new ArrayList<>();
if (!shouldNormalize(table)) {
return null;
}
// at least one of the two regions should be older than MIN_REGION_AGE days
List<NormalizationPlan> normalizationPlans = getMergeNormalizationPlan(table);
for (NormalizationPlan plan : normalizationPlans) {
if (plan instanceof MergeNormalizationPlan) {
RegionInfo hri = ((MergeNormalizationPlan) plan).getFirstRegion();
RegionInfo hri2 = ((MergeNormalizationPlan) plan).getSecondRegion();
if (isOldEnoughToMerge(hri) || isOldEnoughToMerge(hri2)) {
plans.add(plan);
} else {
LOG.debug("Skipping region {} and {} as they are both new", hri.getEncodedName(),
hri2.getEncodedName());
}
}
}
if (plans.isEmpty()) {
LOG.debug("No normalization needed, regions look good for table: {}", table);
return null;
}
return plans;
}
private boolean isOldEnoughToMerge(RegionInfo hri) {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
Timestamp hriTime = new Timestamp(hri.getRegionId());
boolean isOld =
new Timestamp(hriTime.getTime() + TimeUnit.DAYS.toMillis(minRegionAge))
.before(currentTime);
return isOld;
}
private boolean shouldNormalize(TableName table) {
boolean normalize = false;
if (table == null || table.isSystemTable()) {
LOG.debug("Normalization of system table {} isn't allowed", table);
} else if (!isMergeEnabled()) {
LOG.debug("Merge disabled for table: {}", table);
} else {
List<RegionInfo> tableRegions =
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
if (tableRegions == null || tableRegions.size() < minRegionCount) {
int nrRegions = tableRegions == null ? 0 : tableRegions.size();
LOG.debug(
"Table {} has {} regions, required min number of regions for normalizer to run is {} , "
+ "not running normalizer",
table, nrRegions, minRegionCount);
} else {
normalize = true;
}
}
return normalize;
}
}

View File

@ -1,4 +1,4 @@
/** /*
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -19,17 +19,17 @@
package org.apache.hadoop.hbase.master.normalizer; package org.apache.hadoop.hbase.master.normalizer;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/** /**
* Performs "normalization" of regions on the cluster, making sure that suboptimal * Performs "normalization" of regions of a table, making sure that suboptimal
* choice of split keys doesn't leave cluster in a situation when some regions are * choice of split keys doesn't leave cluster in a situation when some regions are
* substantially larger than others for considerable amount of time. * substantially larger than others for considerable amount of time.
* *
@ -39,27 +39,23 @@ import org.apache.yetus.audience.InterfaceAudience;
* "split/merge storms". * "split/merge storms".
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface RegionNormalizer { @InterfaceStability.Evolving
public interface RegionNormalizer extends Configurable {
/** /**
* Set the master service. Must be called before first call to * Set the master service. Must be called before first call to
* {@link #computePlanForTable(TableName)}. * {@link #computePlansForTable(TableName)}.
* @param masterServices master services to use * @param masterServices master services to use
*/ */
void setMasterServices(MasterServices masterServices); void setMasterServices(MasterServices masterServices);
/** /**
* Set the master RPC service. Must be called before first call to * Computes a list of normalizer actions to perform on the target table. This is the primary
* {@link #computePlanForTable(TableName)}. * entry-point from the Master driving a normalization activity.
* @param masterRpcServices master RPC services to use
*/
void setMasterRpcServices(MasterRpcServices masterRpcServices);
/**
* Computes next optimal normalization plan.
* @param table table to normalize * @param table table to normalize
* @return normalization actions to perform. Null if no action to take * @return A list of the normalization actions to perform, or an empty list
* if there's nothing to do.
*/ */
List<NormalizationPlan> computePlanForTable(TableName table) List<NormalizationPlan> computePlansForTable(TableName table)
throws HBaseIOException; throws HBaseIOException;
/** /**

View File

@ -1,5 +1,4 @@
/** /*
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -18,43 +17,194 @@
*/ */
package org.apache.hadoop.hbase.master.normalizer; package org.apache.hadoop.hbase.master.normalizer;
import java.io.IOException;
import java.time.Instant;
import java.time.Period;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Objects;
import org.apache.hadoop.hbase.HBaseIOException; import java.util.function.BooleanSupplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
/** /**
* Simple implementation of region normalizer. Logic in use: * Simple implementation of region normalizer. Logic in use:
* <ol> * <ol>
* <li>Get all regions of a given table * <li>Get all regions of a given table</li>
* <li>Get avg size S of each region (by total size of store files reported in RegionMetrics) * <li>Get avg size S of the regions in the table (by total size of store files reported in
* <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly * RegionMetrics)</li>
* requested to split. Thereon evaluate the next region R1 * <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
* <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon * <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
* evaluate the next region R2 * are kindly requested to merge.</li>
* <li>Otherwise, R1 is evaluated
* </ol> * </ol>
* <p> * <p>
* Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions * The following parameters are configurable:
* (less than 1MB, with the previous note) are not merged away. This is by design to prevent * <ol>
* normalization from undoing the pre-splitting of a table. * <li>Whether to split a region as part of normalization. Configuration:
* {@value #SPLIT_ENABLED_KEY}, default: {@value #DEFAULT_SPLIT_ENABLED}.</li>
* <li>Whether to merge a region as part of normalization. Configuration:
* {@value #MERGE_ENABLED_KEY}, default: {@value #DEFAULT_MERGE_ENABLED}.</li>
* <li>The minimum number of regions in a table to consider it for merge normalization.
* Configuration: {@value #MIN_REGION_COUNT_KEY}, default:
* {@value #DEFAULT_MIN_REGION_COUNT}.</li>
* <li>The minimum age for a region to be considered for a merge, in days. Configuration:
* {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
* {@value #DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
* <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
* {@value #MERGE_MIN_REGION_SIZE_MB_KEY}, default:
* {@value #DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
* </ol>
* <p>
* To see detailed logging of the application of these configuration values, set the log level for
* this class to `TRACE`.
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class SimpleRegionNormalizer extends AbstractRegionNormalizer { public class SimpleRegionNormalizer implements RegionNormalizer {
private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class); private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
static final boolean DEFAULT_SPLIT_ENABLED = true;
static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
static final boolean DEFAULT_MERGE_ENABLED = true;
// TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
// deprecate/rename the configuration key.
static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
static final int DEFAULT_MIN_REGION_COUNT = 3;
static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
private final long[] skippedCount;
private Configuration conf;
private MasterServices masterServices;
private boolean splitEnabled;
private boolean mergeEnabled;
private int minRegionCount;
private Period mergeMinRegionAge;
private int mergeMinRegionSizeMb;
public SimpleRegionNormalizer() {
skippedCount = new long[NormalizationPlan.PlanType.values().length];
splitEnabled = DEFAULT_SPLIT_ENABLED;
mergeEnabled = DEFAULT_MERGE_ENABLED;
minRegionCount = DEFAULT_MIN_REGION_COUNT;
mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
}
@Override @Override
public void planSkipped(RegionInfo hri, PlanType type) { public Configuration getConf() {
return conf;
}
@Override
public void setConf(final Configuration conf) {
if (conf == null) {
return;
}
this.conf = conf;
splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
minRegionCount = parseMinRegionCount(conf);
mergeMinRegionAge = parseMergeMinRegionAge(conf);
mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
}
private static int parseMinRegionCount(final Configuration conf) {
final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
final int settledValue = Math.max(1, parsedValue);
if (parsedValue != settledValue) {
warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
}
return settledValue;
}
private static Period parseMergeMinRegionAge(final Configuration conf) {
final int parsedValue =
conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
final int settledValue = Math.max(0, parsedValue);
if (parsedValue != settledValue) {
warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
}
return Period.ofDays(settledValue);
}
private static int parseMergeMinRegionSizeMb(final Configuration conf) {
final int parsedValue =
conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
final int settledValue = Math.max(0, parsedValue);
if (parsedValue != settledValue) {
warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
}
return settledValue;
}
private static <T> void warnInvalidValue(final String key, final T parsedValue,
final T settledValue) {
LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
key, parsedValue, settledValue);
}
/**
* Return this instance's configured value for {@value #SPLIT_ENABLED_KEY}.
*/
public boolean isSplitEnabled() {
return splitEnabled;
}
/**
* Return this instance's configured value for {@value #MERGE_ENABLED_KEY}.
*/
public boolean isMergeEnabled() {
return mergeEnabled;
}
/**
* Return this instance's configured value for {@value #MIN_REGION_COUNT_KEY}.
*/
public int getMinRegionCount() {
return minRegionCount;
}
/**
* Return this instance's configured value for {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}.
*/
public Period getMergeMinRegionAge() {
return mergeMinRegionAge;
}
/**
* Return this instance's configured value for {@value #MERGE_MIN_REGION_SIZE_MB_KEY}.
*/
public int getMergeMinRegionSizeMb() {
return mergeMinRegionSizeMb;
}
@Override
public void setMasterServices(final MasterServices masterServices) {
this.masterServices = masterServices;
}
@Override
public void planSkipped(final RegionInfo hri, final PlanType type) {
skippedCount[type.ordinal()]++; skippedCount[type.ordinal()]++;
} }
@ -63,81 +213,278 @@ public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
return skippedCount[type.ordinal()]; return skippedCount[type.ordinal()];
} }
/**
* Comparator class that gives higher priority to region Split plan.
*/
static class PlanComparator implements Comparator<NormalizationPlan> {
@Override @Override
public int compare(NormalizationPlan plan1, NormalizationPlan plan2) { public List<NormalizationPlan> computePlansForTable(final TableName table) {
boolean plan1IsSplit = plan1 instanceof SplitNormalizationPlan; if (table == null) {
boolean plan2IsSplit = plan2 instanceof SplitNormalizationPlan; return Collections.emptyList();
if (plan1IsSplit && plan2IsSplit) {
return 0;
} else if (plan1IsSplit) {
return -1;
} else if (plan2IsSplit) {
return 1;
} else {
return 0;
} }
} if (table.isSystemTable()) {
}
private Comparator<NormalizationPlan> planComparator = new PlanComparator();
/**
* Computes next most "urgent" normalization action on the table. Action may be either a split, or
* a merge, or no action.
* @param table table to normalize
* @return normalization plan to execute
*/
@Override
public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
if (table == null || table.isSystemTable()) {
LOG.debug("Normalization of system table {} isn't allowed", table); LOG.debug("Normalization of system table {} isn't allowed", table);
return null; return Collections.emptyList();
} }
boolean splitEnabled = isSplitEnabled();
boolean mergeEnabled = isMergeEnabled();
if (!mergeEnabled && !splitEnabled) {
LOG.debug("Both split and merge are disabled for table: {}", table);
return null;
}
List<NormalizationPlan> plans = new ArrayList<>();
List<RegionInfo> tableRegions =
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
if (tableRegions == null) { final boolean proceedWithSplitPlanning = proceedWithSplitPlanning();
return null; final boolean proceedWithMergePlanning = proceedWithMergePlanning();
if (!proceedWithMergePlanning && !proceedWithSplitPlanning) {
LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
return Collections.emptyList();
}
final NormalizeContext ctx = new NormalizeContext(table);
if (CollectionUtils.isEmpty(ctx.getTableRegions())) {
return Collections.emptyList();
} }
LOG.debug("Computing normalization plan for table: {}, number of regions: {}", table, LOG.debug("Computing normalization plan for table: {}, number of regions: {}", table,
tableRegions.size()); ctx.getTableRegions().size());
if (splitEnabled) { final List<NormalizationPlan> plans = new ArrayList<>();
List<NormalizationPlan> splitPlans = getSplitNormalizationPlan(table); if (proceedWithSplitPlanning) {
if (splitPlans != null) { plans.addAll(computeSplitNormalizationPlans(ctx));
plans.addAll(splitPlans);
} }
if (proceedWithMergePlanning) {
plans.addAll(computeMergeNormalizationPlans(ctx));
} }
if (mergeEnabled) { LOG.debug("Computed {} normalization plans for table {}", plans.size(), table);
if (tableRegions.size() < minRegionCount) {
LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run" +
" is {}, not running normalizer",
table, tableRegions.size(), minRegionCount);
} else {
List<NormalizationPlan> mergePlans = getMergeNormalizationPlan(table);
if (mergePlans != null) {
plans.addAll(mergePlans);
}
}
}
if (plans.isEmpty()) {
LOG.debug("No normalization needed, regions look good for table: {}", table);
return null;
}
Collections.sort(plans, planComparator);
return plans; return plans;
} }
/**
* @return size of region in MB and if region is not found than -1
*/
private long getRegionSizeMB(RegionInfo hri) {
ServerName sn =
masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
RegionMetrics regionLoad =
masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
if (regionLoad == null) {
LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
return -1;
}
return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
}
private boolean isMasterSwitchEnabled(final MasterSwitchType masterSwitchType) {
return masterServices.isSplitOrMergeEnabled(masterSwitchType);
}
private boolean proceedWithSplitPlanning() {
return isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
}
private boolean proceedWithMergePlanning() {
return isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
}
/**
* @param tableRegions regions of table to normalize
* @return average region size depending on
* @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
* Also make sure tableRegions contains regions of the same table
*/
private double getAverageRegionSizeMb(final List<RegionInfo> tableRegions) {
if (CollectionUtils.isEmpty(tableRegions)) {
throw new IllegalStateException(
"Cannot calculate average size of a table without any regions.");
}
final int regionCount = tableRegions.size();
final long totalSizeMb = tableRegions.stream()
.mapToLong(this::getRegionSizeMB)
.sum();
TableName table = tableRegions.get(0).getTable();
int targetRegionCount = -1;
long targetRegionSize = -1;
try {
TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
if (tableDescriptor != null && LOG.isDebugEnabled()) {
targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
LOG.debug("Table {} configured with target region count {}, target region size {}", table,
targetRegionCount, targetRegionSize);
}
} catch (IOException e) {
LOG.warn("TableDescriptor for {} unavailable, table-level target region count and size"
+ " configurations cannot be considered.", table, e);
}
double avgRegionSize;
if (targetRegionSize > 0) {
avgRegionSize = targetRegionSize;
} else if (targetRegionCount > 0) {
avgRegionSize = totalSizeMb / (double) targetRegionCount;
} else {
avgRegionSize = totalSizeMb / (double) regionCount;
}
LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,
totalSizeMb, avgRegionSize);
return avgRegionSize;
}
/**
* Determine if a {@link RegionInfo} should be considered for a merge operation.
*/
private boolean skipForMerge(final RegionStates regionStates, final RegionInfo regionInfo) {
final RegionState state = regionStates.getRegionState(regionInfo);
final String name = regionInfo.getEncodedName();
return
logTraceReason(
() -> state == null,
"skipping merge of region {} because no state information is available.", name)
|| logTraceReason(
() -> !Objects.equals(state.getState(), RegionState.State.OPEN),
"skipping merge of region {} because it is not open.", name)
|| logTraceReason(
() -> !isOldEnoughForMerge(regionInfo),
"skipping merge of region {} because it is not old enough.", name)
|| logTraceReason(
() -> !isLargeEnoughForMerge(regionInfo),
"skipping merge region {} because it is not large enough.", name);
}
/**
* Computes the merge plans that should be executed for this table to converge average region
* towards target average or target region count.
*/
private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeContext ctx) {
if (ctx.getTableRegions().size() < minRegionCount) {
LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run"
+ " is {}, not computing merge plans.", ctx.getTableName(), ctx.getTableRegions().size(),
minRegionCount);
return Collections.emptyList();
}
final double avgRegionSizeMb = ctx.getAverageRegionSizeMb();
LOG.debug("Computing normalization plan for table {}. average region size: {}, number of"
+ " regions: {}.", ctx.getTableName(), avgRegionSizeMb, ctx.getTableRegions().size());
final List<NormalizationPlan> plans = new ArrayList<>();
for (int candidateIdx = 0; candidateIdx < ctx.getTableRegions().size() - 1; candidateIdx++) {
final RegionInfo current = ctx.getTableRegions().get(candidateIdx);
final RegionInfo next = ctx.getTableRegions().get(candidateIdx + 1);
if (skipForMerge(ctx.getRegionStates(), current)
|| skipForMerge(ctx.getRegionStates(), next)) {
continue;
}
final long currentSizeMb = getRegionSizeMB(current);
final long nextSizeMb = getRegionSizeMB(next);
if (currentSizeMb + nextSizeMb < avgRegionSizeMb) {
plans.add(new MergeNormalizationPlan(current, next));
candidateIdx++;
}
}
return plans;
}
/**
* Determine if a region in {@link RegionState} should be considered for a split operation.
*/
private static boolean skipForSplit(final RegionState state, final RegionInfo regionInfo) {
final String name = regionInfo.getEncodedName();
return
logTraceReason(
() -> state == null,
"skipping split of region {} because no state information is available.", name)
|| logTraceReason(
() -> !Objects.equals(state.getState(), RegionState.State.OPEN),
"skipping merge of region {} because it is not open.", name);
}
/**
* Computes the split plans that should be executed for this table to converge average region size
* towards target average or target region count.
* <br />
* if the region is > 2 times larger than average, we split it. split
* is more high priority normalization action than merge.
*/
private List<NormalizationPlan> computeSplitNormalizationPlans(final NormalizeContext ctx) {
final double avgRegionSize = ctx.getAverageRegionSizeMb();
LOG.debug("Table {}, average region size: {}", ctx.getTableName(), avgRegionSize);
final List<NormalizationPlan> plans = new ArrayList<>();
for (final RegionInfo hri : ctx.getTableRegions()) {
if (skipForSplit(ctx.getRegionStates().getRegionState(hri), hri)) {
continue;
}
final long regionSize = getRegionSizeMB(hri);
if (regionSize > 2 * avgRegionSize) {
LOG.info("Table {}, large region {} has size {}, more than twice avg size {}, splitting",
ctx.getTableName(), hri.getRegionNameAsString(), regionSize, avgRegionSize);
plans.add(new SplitNormalizationPlan(hri, null));
}
}
return plans;
}
/**
* Return {@code true} when {@code regionInfo} has a creation date that is old
* enough to be considered for a merge operation, {@code false} otherwise.
*/
private boolean isOldEnoughForMerge(final RegionInfo regionInfo) {
final Instant currentTime = Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime());
final Instant regionCreateTime = Instant.ofEpochMilli(regionInfo.getRegionId());
return currentTime.isAfter(regionCreateTime.plus(mergeMinRegionAge));
}
/**
* Return {@code true} when {@code regionInfo} has a size that is sufficient
* to be considered for a merge operation, {@code false} otherwise.
*/
private boolean isLargeEnoughForMerge(final RegionInfo regionInfo) {
return getRegionSizeMB(regionInfo) >= mergeMinRegionSizeMb;
}
private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
final Object... args) {
final boolean value = predicate.getAsBoolean();
if (value) {
LOG.trace(fmtWhenTrue, args);
}
return value;
}
/**
* Inner class caries the state necessary to perform a single invocation of
* {@link #computePlansForTable(TableName)}. Grabbing this data from the assignment manager
* up-front allows any computed values to be realized just once.
*/
private class NormalizeContext {
private final TableName tableName;
private final RegionStates regionStates;
private final List<RegionInfo> tableRegions;
private final double averageRegionSizeMb;
public NormalizeContext(final TableName tableName) {
this.tableName = tableName;
regionStates = SimpleRegionNormalizer.this.masterServices
.getAssignmentManager()
.getRegionStates();
tableRegions = regionStates.getRegionsOfTable(tableName);
// The list of regionInfo from getRegionsOfTable() is ordered by regionName.
// regionName does not necessary guarantee the order by STARTKEY (let's say 'aa1', 'aa1!',
// in order by regionName, it will be 'aa1!' followed by 'aa1').
// This could result in normalizer merging non-adjacent regions into one and creates overlaps.
// In order to avoid that, sort the list by RegionInfo.COMPARATOR.
// See HBASE-24376
tableRegions.sort(RegionInfo.COMPARATOR);
averageRegionSizeMb = SimpleRegionNormalizer.this.getAverageRegionSizeMb(this.tableRegions);
}
public TableName getTableName() {
return tableName;
}
public RegionStates getRegionStates() {
return regionStates;
}
public List<RegionInfo> getTableRegions() {
return tableRegions;
}
public double getAverageRegionSizeMb() {
return averageRegionSizeMb;
}
}
} }

View File

@ -1,262 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.when;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
@Category({ MasterTests.class, SmallTests.class })
public class TestMergeNormalizer {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMergeNormalizer.class);
private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);
private RegionNormalizer normalizer;
@Test
public void testNoNormalizationForMetaTable() throws HBaseIOException {
TableName testTable = TableName.META_TABLE_NAME;
List<RegionInfo> hris = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>();
setupMocksForNormalizer(regionSizes, hris);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
assertNull(plans);
}
@Test
public void testNoNormalizationIfTooFewRegions() throws HBaseIOException {
TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
List<RegionInfo> hris = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>();
RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
.setEndKey(Bytes.toBytes("bbb")).build();
regionSizes.put(hri1.getRegionName(), 10);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc")).build();
hris.add(hri2);
regionSizes.put(hri2.getRegionName(), 15);
setupMocksForNormalizer(regionSizes, hris);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
assertNull(plans);
}
@Test
public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException {
TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
List<RegionInfo> hris = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>();
RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
.setEndKey(Bytes.toBytes("bbb")).build();
hris.add(hri1);
regionSizes.put(hri1.getRegionName(), 10);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc")).build();
hris.add(hri2);
regionSizes.put(hri2.getRegionName(), 15);
RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
.setEndKey(Bytes.toBytes("ddd")).build();
hris.add(hri3);
regionSizes.put(hri3.getRegionName(), 8);
RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
.setEndKey(Bytes.toBytes("eee")).build();
hris.add(hri4);
regionSizes.put(hri4.getRegionName(), 10);
setupMocksForNormalizer(regionSizes, hris);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
assertNull(plans);
}
@Test
public void testMergeOfSmallRegions() throws HBaseIOException {
TableName testTable = TableName.valueOf("testMergeOfSmallRegions");
List<RegionInfo> hris = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>();
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
Timestamp threeDaysBefore = new Timestamp(currentTime.getTime() - TimeUnit.DAYS.toMillis(3));
RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
.setEndKey(Bytes.toBytes("bbb")).setRegionId(threeDaysBefore.getTime()).build();
hris.add(hri1);
regionSizes.put(hri1.getRegionName(), 15);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc")).setRegionId(threeDaysBefore.getTime()).build();
hris.add(hri2);
regionSizes.put(hri2.getRegionName(), 5);
RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
.setEndKey(Bytes.toBytes("ddd")).setRegionId(threeDaysBefore.getTime()).build();
hris.add(hri3);
regionSizes.put(hri3.getRegionName(), 5);
RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
.setEndKey(Bytes.toBytes("eee")).setRegionId(threeDaysBefore.getTime()).build();
hris.add(hri4);
regionSizes.put(hri4.getRegionName(), 15);
RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee"))
.setEndKey(Bytes.toBytes("fff")).build();
hris.add(hri5);
regionSizes.put(hri5.getRegionName(), 16);
RegionInfo hri6 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("fff"))
.setEndKey(Bytes.toBytes("ggg")).setRegionId(threeDaysBefore.getTime()).build();
hris.add(hri6);
regionSizes.put(hri6.getRegionName(), 0);
RegionInfo hri7 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ggg"))
.setEndKey(Bytes.toBytes("hhh")).build();
hris.add(hri7);
regionSizes.put(hri7.getRegionName(), 0);
setupMocksForNormalizer(regionSizes, hris);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = plans.get(0);
assertTrue(plan instanceof MergeNormalizationPlan);
assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
// to check last 0 sized regions are merged
plan = plans.get(1);
assertEquals(hri6, ((MergeNormalizationPlan) plan).getFirstRegion());
assertEquals(hri7, ((MergeNormalizationPlan) plan).getSecondRegion());
}
@Test
public void testMergeOfNewSmallRegions() throws HBaseIOException {
TableName testTable = TableName.valueOf("testMergeOfNewSmallRegions");
List<RegionInfo> hris = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>();
RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
.setEndKey(Bytes.toBytes("bbb")).build();
hris.add(hri1);
regionSizes.put(hri1.getRegionName(), 15);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc")).build();
hris.add(hri2);
regionSizes.put(hri2.getRegionName(), 5);
RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
.setEndKey(Bytes.toBytes("ddd")).build();
hris.add(hri3);
regionSizes.put(hri3.getRegionName(), 16);
RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
.setEndKey(Bytes.toBytes("eee")).build();
hris.add(hri4);
regionSizes.put(hri4.getRegionName(), 15);
RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee"))
.setEndKey(Bytes.toBytes("fff")).build();
hris.add(hri4);
regionSizes.put(hri5.getRegionName(), 5);
setupMocksForNormalizer(regionSizes, hris);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
assertNull(plans);
}
@SuppressWarnings("MockitoCast")
protected void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
List<RegionInfo> RegionInfo) {
MasterServices masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
MasterRpcServices masterRpcServices = Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS);
// for simplicity all regions are assumed to be on one server; doesn't matter to us
ServerName sn = ServerName.valueOf("localhost", 0, 1L);
when(masterServices.getAssignmentManager().getRegionStates()
.getRegionsOfTable(any())).thenReturn(RegionInfo);
when(masterServices.getAssignmentManager().getRegionStates()
.getRegionServerOfRegion(any())).thenReturn(sn);
when(masterServices.getAssignmentManager().getRegionStates()
.getRegionState(any(RegionInfo.class))).thenReturn(
RegionState.createForTesting(null, RegionState.State.OPEN));
for (Map.Entry<byte[], Integer> region : regionSizes.entrySet()) {
RegionMetrics regionLoad = Mockito.mock(RegionMetrics.class);
when(regionLoad.getRegionName()).thenReturn(region.getKey());
when(regionLoad.getStoreFileSize())
.thenReturn(new Size(region.getValue(), Size.Unit.MEGABYTE));
// this is possibly broken with jdk9, unclear if false positive or not
// suppress it for now, fix it when we get to running tests on 9
// see: http://errorprone.info/bugpattern/MockitoCast
when((Object) masterServices.getServerManager().getLoad(sn).getRegionMetrics()
.get(region.getKey())).thenReturn(regionLoad);
}
try {
when(masterRpcServices.isSplitOrMergeEnabled(any(), any())).thenReturn(
MasterProtos.IsSplitOrMergeEnabledResponse.newBuilder().setEnabled(true).build());
} catch (ServiceException se) {
LOG.debug("error setting isSplitOrMergeEnabled switch", se);
}
normalizer = new SimpleRegionNormalizer();
normalizer.setMasterServices(masterServices);
normalizer.setMasterRpcServices(masterRpcServices);
}
}

View File

@ -18,46 +18,55 @@
package org.apache.hadoop.hbase.master.normalizer; package org.apache.hadoop.hbase.master.normalizer;
import static java.lang.String.format; import static java.lang.String.format;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.DEFAULT_MERGE_MIN_REGION_AGE_DAYS;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_ENABLED_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_AGE_DAYS_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_SIZE_MB_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MIN_REGION_COUNT_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.SPLIT_ENABLED_KEY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.iterableWithSize;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.time.Instant;
import java.time.Period;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.RegionMetrics; import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size; import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
/** /**
* Tests logic of {@link SimpleRegionNormalizer}. * Tests logic of {@link SimpleRegionNormalizer}.
@ -69,517 +78,347 @@ public class TestSimpleRegionNormalizer {
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSimpleRegionNormalizer.class); HBaseClassTestRule.forClass(TestSimpleRegionNormalizer.class);
private static final Logger LOG = LoggerFactory.getLogger(TestSimpleRegionNormalizer.class); private Configuration conf;
private SimpleRegionNormalizer normalizer;
private RegionNormalizer normalizer;
private MasterServices masterServices; private MasterServices masterServices;
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
@Test @Before
public void testPlanComparator() { public void before() {
Comparator<NormalizationPlan> comparator = new SimpleRegionNormalizer.PlanComparator(); conf = HBaseConfiguration.create();
NormalizationPlan splitPlan1 = new SplitNormalizationPlan(null, null);
NormalizationPlan splitPlan2 = new SplitNormalizationPlan(null, null);
NormalizationPlan mergePlan1 = new MergeNormalizationPlan(null, null);
NormalizationPlan mergePlan2 = new MergeNormalizationPlan(null, null);
assertEquals(0, comparator.compare(splitPlan1, splitPlan2));
assertEquals(0, comparator.compare(splitPlan2, splitPlan1));
assertEquals(0, comparator.compare(mergePlan1, mergePlan2));
assertEquals(0, comparator.compare(mergePlan2, mergePlan1));
assertTrue(comparator.compare(splitPlan1, mergePlan1) < 0);
assertTrue(comparator.compare(mergePlan1, splitPlan1) > 0);
} }
@Test @Test
public void testNoNormalizationForMetaTable() throws HBaseIOException { public void testNoNormalizationForMetaTable() {
TableName testTable = TableName.META_TABLE_NAME; TableName testTable = TableName.META_TABLE_NAME;
List<RegionInfo> RegionInfo = new ArrayList<>(); List<RegionInfo> RegionInfo = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>(); Map<byte[], Integer> regionSizes = new HashMap<>();
setupMocksForNormalizer(regionSizes, RegionInfo); setupMocksForNormalizer(regionSizes, RegionInfo);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable); List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
assertNull(plans); assertThat(plans, empty());
} }
@Test @Test
public void testNoNormalizationIfTooFewRegions() throws HBaseIOException { public void testNoNormalizationIfTooFewRegions() {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());
List<RegionInfo> RegionInfo = new ArrayList<>(); final List<RegionInfo> regionInfos = createRegionInfos(tableName, 2);
Map<byte[], Integer> regionSizes = new HashMap<>(); final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 10, 15);
RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName) setupMocksForNormalizer(regionSizes, regionInfos);
.setStartKey(Bytes.toBytes("aaa"))
.setEndKey(Bytes.toBytes("bbb"))
.build();
RegionInfo.add(hri1);
regionSizes.put(hri1.getRegionName(), 10);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName) List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
.setStartKey(Bytes.toBytes("bbb")) assertThat(plans, empty());
.setEndKey(Bytes.toBytes("ccc"))
.build();
RegionInfo.add(hri2);
regionSizes.put(hri2.getRegionName(), 15);
setupMocksForNormalizer(regionSizes, RegionInfo);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
assertNull(plans);
} }
@Test @Test
public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException { public void testNoNormalizationOnNormalizedCluster() {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());
List<RegionInfo> RegionInfo = new ArrayList<>(); final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
Map<byte[], Integer> regionSizes = new HashMap<>(); final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 10, 15, 8, 10);
setupMocksForNormalizer(regionSizes, regionInfos);
RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName) List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
.setStartKey(Bytes.toBytes("aaa")) assertThat(plans, empty());
.setEndKey(Bytes.toBytes("bbb"))
.build();
RegionInfo.add(hri1);
regionSizes.put(hri1.getRegionName(), 10);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc"))
.build();
RegionInfo.add(hri2);
regionSizes.put(hri2.getRegionName(), 15);
RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("ccc"))
.setEndKey(Bytes.toBytes("ddd"))
.build();
RegionInfo.add(hri3);
regionSizes.put(hri3.getRegionName(), 8);
RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("ddd"))
.setEndKey(Bytes.toBytes("eee"))
.build();
regionSizes.put(hri4.getRegionName(), 10);
setupMocksForNormalizer(regionSizes, RegionInfo);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
assertNull(plans);
} }
private void noNormalizationOnTransitioningRegions(final RegionState.State state) private void noNormalizationOnTransitioningRegions(final RegionState.State state) {
throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());
final List<RegionInfo> regionInfos = new LinkedList<>(); final List<RegionInfo> regionInfos = createRegionInfos(tableName, 3);
final Map<byte[], Integer> regionSizes = new HashMap<>(); final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 10, 1, 100);
final RegionInfo ri1 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("aaa"))
.setEndKey(Bytes.toBytes("bbb"))
.build();
regionInfos.add(ri1);
regionSizes.put(ri1.getRegionName(), 10);
final RegionInfo ri2 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc"))
.build();
regionInfos.add(ri2);
regionSizes.put(ri2.getRegionName(), 1);
setupMocksForNormalizer(regionSizes, regionInfos); setupMocksForNormalizer(regionSizes, regionInfos);
when(masterServices.getAssignmentManager().getRegionStates() when(masterServices.getAssignmentManager().getRegionStates()
.getRegionState(any(RegionInfo.class))).thenReturn( .getRegionState(any(RegionInfo.class)))
RegionState.createForTesting(null, state)); .thenReturn(RegionState.createForTesting(null, state));
assertNull( assertThat(normalizer.getMinRegionCount(), greaterThanOrEqualTo(regionInfos.size()));
format("Unexpected plans for RegionState %s", state),
normalizer.computePlanForTable(tableName)); List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
assertThat(format("Unexpected plans for RegionState %s", state), plans, empty());
} }
@Test @Test
public void testNoNormalizationOnMergingNewRegions() throws Exception { public void testNoNormalizationOnMergingNewRegions() {
noNormalizationOnTransitioningRegions(RegionState.State.MERGING_NEW); noNormalizationOnTransitioningRegions(RegionState.State.MERGING_NEW);
} }
@Test @Test
public void testNoNormalizationOnMergingRegions() throws Exception { public void testNoNormalizationOnMergingRegions() {
noNormalizationOnTransitioningRegions(RegionState.State.MERGING); noNormalizationOnTransitioningRegions(RegionState.State.MERGING);
} }
@Test @Test
public void testNoNormalizationOnMergedRegions() throws Exception { public void testNoNormalizationOnMergedRegions() {
noNormalizationOnTransitioningRegions(RegionState.State.MERGED); noNormalizationOnTransitioningRegions(RegionState.State.MERGED);
} }
@Test @Test
public void testNoNormalizationOnSplittingNewRegions() throws Exception { public void testNoNormalizationOnSplittingNewRegions() {
noNormalizationOnTransitioningRegions(RegionState.State.SPLITTING_NEW); noNormalizationOnTransitioningRegions(RegionState.State.SPLITTING_NEW);
} }
@Test @Test
public void testNoNormalizationOnSplittingRegions() throws Exception { public void testNoNormalizationOnSplittingRegions() {
noNormalizationOnTransitioningRegions(RegionState.State.SPLITTING); noNormalizationOnTransitioningRegions(RegionState.State.SPLITTING);
} }
@Test @Test
public void testNoNormalizationOnSplitRegions() throws Exception { public void testNoNormalizationOnSplitRegions() {
noNormalizationOnTransitioningRegions(RegionState.State.SPLIT); noNormalizationOnTransitioningRegions(RegionState.State.SPLIT);
} }
@Test @Test
public void testMergeOfSmallRegions() throws HBaseIOException { public void testMergeOfSmallRegions() {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());
List<RegionInfo> RegionInfo = new ArrayList<>(); final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
Map<byte[], Integer> regionSizes = new HashMap<>(); final Map<byte[], Integer> regionSizes =
createRegionSizesMap(regionInfos, 15, 5, 5, 15, 16);
setupMocksForNormalizer(regionSizes, regionInfos);
RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName) List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
.setStartKey(Bytes.toBytes("aaa")) assertThat(plans.get(0), instanceOf(MergeNormalizationPlan.class));
.setEndKey(Bytes.toBytes("bbb")) MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
.build(); assertEquals(regionInfos.get(1), plan.getFirstRegion());
RegionInfo.add(hri1); assertEquals(regionInfos.get(2), plan.getSecondRegion());
regionSizes.put(hri1.getRegionName(), 15);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc"))
.build();
RegionInfo.add(hri2);
regionSizes.put(hri2.getRegionName(), 5);
RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("ccc"))
.setEndKey(Bytes.toBytes("ddd"))
.build();
RegionInfo.add(hri3);
regionSizes.put(hri3.getRegionName(), 5);
RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("ddd"))
.setEndKey(Bytes.toBytes("eee"))
.build();
RegionInfo.add(hri4);
regionSizes.put(hri4.getRegionName(), 15);
RegionInfo hri5 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("eee"))
.setEndKey(Bytes.toBytes("fff"))
.build();
RegionInfo.add(hri5);
regionSizes.put(hri5.getRegionName(), 16);
setupMocksForNormalizer(regionSizes, RegionInfo);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
NormalizationPlan plan = plans.get(0);
assertTrue(plan instanceof MergeNormalizationPlan);
assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
} }
// Test for situation illustrated in HBASE-14867 // Test for situation illustrated in HBASE-14867
@Test @Test
public void testMergeOfSecondSmallestRegions() throws HBaseIOException { public void testMergeOfSecondSmallestRegions() {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());
List<RegionInfo> RegionInfo = new ArrayList<>(); final List<RegionInfo> regionInfos = createRegionInfos(tableName, 6);
Map<byte[], Integer> regionSizes = new HashMap<>(); final Map<byte[], Integer> regionSizes =
createRegionSizesMap(regionInfos, 1, 10000, 10000, 10000, 2700, 2700);
setupMocksForNormalizer(regionSizes, regionInfos);
RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName) List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
.setStartKey(Bytes.toBytes("aaa")) assertThat(plans.get(0), instanceOf(MergeNormalizationPlan.class));
.setEndKey(Bytes.toBytes("bbb")) MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
.build(); assertEquals(regionInfos.get(4), plan.getFirstRegion());
RegionInfo.add(hri1); assertEquals(regionInfos.get(5), plan.getSecondRegion());
regionSizes.put(hri1.getRegionName(), 1);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc"))
.build();
RegionInfo.add(hri2);
regionSizes.put(hri2.getRegionName(), 10000);
RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("ccc"))
.setEndKey(Bytes.toBytes("ddd"))
.build();
RegionInfo.add(hri3);
regionSizes.put(hri3.getRegionName(), 10000);
RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("ddd"))
.setEndKey(Bytes.toBytes("eee"))
.build();
RegionInfo.add(hri4);
regionSizes.put(hri4.getRegionName(), 10000);
RegionInfo hri5 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("eee"))
.setEndKey(Bytes.toBytes("fff"))
.build();
RegionInfo.add(hri5);
regionSizes.put(hri5.getRegionName(), 2700);
RegionInfo hri6 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("fff"))
.setEndKey(Bytes.toBytes("ggg"))
.build();
RegionInfo.add(hri6);
regionSizes.put(hri6.getRegionName(), 2700);
setupMocksForNormalizer(regionSizes, RegionInfo);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
NormalizationPlan plan = plans.get(0);
assertTrue(plan instanceof MergeNormalizationPlan);
assertEquals(hri5, ((MergeNormalizationPlan) plan).getFirstRegion());
assertEquals(hri6, ((MergeNormalizationPlan) plan).getSecondRegion());
} }
@Test @Test
public void testMergeOfSmallNonAdjacentRegions() throws HBaseIOException { public void testMergeOfSmallNonAdjacentRegions() {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());
List<RegionInfo> RegionInfo = new ArrayList<>(); final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
Map<byte[], Integer> regionSizes = new HashMap<>(); final Map<byte[], Integer> regionSizes =
createRegionSizesMap(regionInfos, 15, 5, 16, 15, 5);
setupMocksForNormalizer(regionSizes, regionInfos);
RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName) List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
.setStartKey(Bytes.toBytes("aaa")) assertThat(plans, empty());
.setEndKey(Bytes.toBytes("bbb"))
.build();
RegionInfo.add(hri1);
regionSizes.put(hri1.getRegionName(), 15);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc"))
.build();
RegionInfo.add(hri2);
regionSizes.put(hri2.getRegionName(), 5);
RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("ccc"))
.setEndKey(Bytes.toBytes("ddd"))
.build();
RegionInfo.add(hri3);
regionSizes.put(hri3.getRegionName(), 16);
RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("ddd"))
.setEndKey(Bytes.toBytes("eee"))
.build();
RegionInfo.add(hri4);
regionSizes.put(hri4.getRegionName(), 15);
RegionInfo hri5 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("ddd"))
.setEndKey(Bytes.toBytes("eee"))
.build();
RegionInfo.add(hri4);
regionSizes.put(hri5.getRegionName(), 5);
setupMocksForNormalizer(regionSizes, RegionInfo);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
assertNull(plans);
} }
@Test @Test
public void testSplitOfLargeRegion() throws HBaseIOException { public void testSplitOfLargeRegion() {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());
List<RegionInfo> RegionInfo = new ArrayList<>(); final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
Map<byte[], Integer> regionSizes = new HashMap<>(); final Map<byte[], Integer> regionSizes =
createRegionSizesMap(regionInfos, 8, 6, 10, 30);
setupMocksForNormalizer(regionSizes, regionInfos);
RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName) List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
.setStartKey(Bytes.toBytes("aaa")) assertThat(plans.get(0), instanceOf(SplitNormalizationPlan.class));
.setEndKey(Bytes.toBytes("bbb")) SplitNormalizationPlan plan = (SplitNormalizationPlan) plans.get(0);
.build(); assertEquals(regionInfos.get(3), plan.getRegionInfo());
RegionInfo.add(hri1);
regionSizes.put(hri1.getRegionName(), 8);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc"))
.build();
RegionInfo.add(hri2);
regionSizes.put(hri2.getRegionName(), 6);
RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("ccc"))
.setEndKey(Bytes.toBytes("ddd"))
.build();
RegionInfo.add(hri3);
regionSizes.put(hri3.getRegionName(), 10);
RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes("ddd"))
.setEndKey(Bytes.toBytes("eee"))
.build();
RegionInfo.add(hri4);
regionSizes.put(hri4.getRegionName(), 30);
setupMocksForNormalizer(regionSizes, RegionInfo);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
NormalizationPlan plan = plans.get(0);
assertTrue(plan instanceof SplitNormalizationPlan);
assertEquals(hri4, ((SplitNormalizationPlan) plan).getRegionInfo());
} }
@Test @Test
public void testSplitWithTargetRegionCount() throws Exception { public void testSplitWithTargetRegionCount() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());
List<RegionInfo> RegionInfo = new ArrayList<>(); final List<RegionInfo> regionInfos = createRegionInfos(tableName, 6);
Map<byte[], Integer> regionSizes = new HashMap<>(); final Map<byte[], Integer> regionSizes =
createRegionSizesMap(regionInfos, 20, 40, 60, 80, 100, 120);
RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("aaa")) setupMocksForNormalizer(regionSizes, regionInfos);
.setEndKey(Bytes.toBytes("bbb")).build();
RegionInfo.add(hri1);
regionSizes.put(hri1.getRegionName(), 20);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc")).build();
RegionInfo.add(hri2);
regionSizes.put(hri2.getRegionName(), 40);
RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("ccc"))
.setEndKey(Bytes.toBytes("ddd")).build();
RegionInfo.add(hri3);
regionSizes.put(hri3.getRegionName(), 60);
RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("ddd"))
.setEndKey(Bytes.toBytes("eee")).build();
RegionInfo.add(hri4);
regionSizes.put(hri4.getRegionName(), 80);
RegionInfo hri5 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("eee"))
.setEndKey(Bytes.toBytes("fff")).build();
RegionInfo.add(hri5);
regionSizes.put(hri5.getRegionName(), 100);
RegionInfo hri6 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("fff"))
.setEndKey(Bytes.toBytes("ggg")).build();
RegionInfo.add(hri6);
regionSizes.put(hri6.getRegionName(), 120);
setupMocksForNormalizer(regionSizes, RegionInfo);
// test when target region size is 20 // test when target region size is 20
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize()) when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
.thenReturn(20L); .thenReturn(20L);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName); List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
assertEquals(4, plans.size()); assertThat(plans, iterableWithSize(4));
assertThat(plans, everyItem(instanceOf(SplitNormalizationPlan.class)));
for (NormalizationPlan plan : plans) {
assertTrue(plan instanceof SplitNormalizationPlan);
}
// test when target region size is 200 // test when target region size is 200
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize()) when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
.thenReturn(200L); .thenReturn(200L);
plans = normalizer.computePlanForTable(tableName); plans = normalizer.computePlansForTable(tableName);
assertEquals(2, plans.size()); assertThat(plans, iterableWithSize(2));
NormalizationPlan plan = plans.get(0); assertTrue(plans.get(0) instanceof MergeNormalizationPlan);
assertTrue(plan instanceof MergeNormalizationPlan); MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
assertEquals(hri1, ((MergeNormalizationPlan) plan).getFirstRegion()); assertEquals(regionInfos.get(0), plan.getFirstRegion());
assertEquals(hri2, ((MergeNormalizationPlan) plan).getSecondRegion()); assertEquals(regionInfos.get(1), plan.getSecondRegion());
} }
@Test @Test
public void testSplitWithTargetRegionSize() throws Exception { public void testSplitWithTargetRegionSize() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());
List<RegionInfo> RegionInfo = new ArrayList<>(); final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
Map<byte[], Integer> regionSizes = new HashMap<>(); final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 20, 40, 60, 80);
setupMocksForNormalizer(regionSizes, regionInfos);
RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("aaa"))
.setEndKey(Bytes.toBytes("bbb")).build();
RegionInfo.add(hri1);
regionSizes.put(hri1.getRegionName(), 20);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("bbb"))
.setEndKey(Bytes.toBytes("ccc")).build();
RegionInfo.add(hri2);
regionSizes.put(hri2.getRegionName(), 40);
RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("ccc"))
.setEndKey(Bytes.toBytes("ddd")).build();
RegionInfo.add(hri3);
regionSizes.put(hri3.getRegionName(), 60);
RegionInfo hri4 = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("ddd"))
.setEndKey(Bytes.toBytes("eee")).build();
RegionInfo.add(hri4);
regionSizes.put(hri4.getRegionName(), 80);
setupMocksForNormalizer(regionSizes, RegionInfo);
// test when target region count is 8 // test when target region count is 8
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount()) when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
.thenReturn(8); .thenReturn(8);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName); List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
assertEquals(2, plans.size()); assertThat(plans, iterableWithSize(2));
assertThat(plans, everyItem(instanceOf(SplitNormalizationPlan.class)));
for (NormalizationPlan plan : plans) {
assertTrue(plan instanceof SplitNormalizationPlan);
}
// test when target region count is 3 // test when target region count is 3
when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount()) when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
.thenReturn(3); .thenReturn(3);
plans = normalizer.computePlanForTable(tableName); plans = normalizer.computePlansForTable(tableName);
assertEquals(1, plans.size()); assertThat(plans, contains(instanceOf(MergeNormalizationPlan.class)));
NormalizationPlan plan = plans.get(0); MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
assertTrue(plan instanceof MergeNormalizationPlan); assertEquals(regionInfos.get(0), plan.getFirstRegion());
assertEquals(hri1, ((MergeNormalizationPlan) plan).getFirstRegion()); assertEquals(regionInfos.get(1), plan.getSecondRegion());
assertEquals(hri2, ((MergeNormalizationPlan) plan).getSecondRegion());
} }
@Test @Test
public void testSplitIfTooFewRegions() throws HBaseIOException { public void testHonorsSplitEnabled() {
conf.setBoolean(SPLIT_ENABLED_KEY, true);
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());
List<RegionInfo> RegionInfo = new ArrayList<>(); final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
Map<byte[], Integer> regionSizes = new HashMap<>(); final Map<byte[], Integer> regionSizes =
createRegionSizesMap(regionInfos, 5, 5, 20, 5, 5);
setupMocksForNormalizer(regionSizes, regionInfos);
assertThat(
normalizer.computePlansForTable(tableName),
contains(instanceOf(SplitNormalizationPlan.class)));
RegionInfo hri1 = RegionInfoBuilder.newBuilder(tableName) conf.setBoolean(SPLIT_ENABLED_KEY, false);
.setStartKey(Bytes.toBytes("aaa")) setupMocksForNormalizer(regionSizes, regionInfos);
.setEndKey(Bytes.toBytes("bbb")) assertThat(normalizer.computePlansForTable(tableName), empty());
.build(); }
RegionInfo.add(hri1);
regionSizes.put(hri1.getRegionName(), 1);
RegionInfo hri2 = RegionInfoBuilder.newBuilder(tableName) @Test
.setStartKey(Bytes.toBytes("bbb")) public void testHonorsMergeEnabled() {
.setEndKey(Bytes.toBytes("ccc")) conf.setBoolean(MERGE_ENABLED_KEY, true);
.build(); final TableName tableName = TableName.valueOf(name.getMethodName());
RegionInfo.add(hri2); final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
regionSizes.put(hri2.getRegionName(), 1); final Map<byte[], Integer> regionSizes =
// the third region is huge one createRegionSizesMap(regionInfos, 20, 5, 5, 20, 20);
RegionInfo hri3 = RegionInfoBuilder.newBuilder(tableName) setupMocksForNormalizer(regionSizes, regionInfos);
.setStartKey(Bytes.toBytes("ccc")) assertThat(
.setEndKey(Bytes.toBytes("ddd")) normalizer.computePlansForTable(tableName),
.build(); contains(instanceOf(MergeNormalizationPlan.class)));
RegionInfo.add(hri3);
regionSizes.put(hri3.getRegionName(), 10);
setupMocksForNormalizer(regionSizes, RegionInfo); conf.setBoolean(MERGE_ENABLED_KEY, false);
setupMocksForNormalizer(regionSizes, regionInfos);
assertThat(normalizer.computePlansForTable(tableName), empty());
}
Configuration configuration = HBaseConfiguration.create(); @Test
configuration.setInt(AbstractRegionNormalizer.HBASE_REGION_NORMALIZER_MIN_REGION_COUNT_KEY, 4); public void testHonorsMinimumRegionCount() {
when(masterServices.getConfiguration()).thenReturn(configuration); conf.setInt(MIN_REGION_COUNT_KEY, 1);
final TableName tableName = TableName.valueOf(name.getMethodName());
final List<RegionInfo> regionInfos = createRegionInfos(tableName, 3);
// create a table topology that results in both a merge plan and a split plan. Assert that the
// merge is only created when the when the number of table regions is above the region count
// threshold, and that the split plan is create in both cases.
final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 1, 1, 10);
setupMocksForNormalizer(regionSizes, regionInfos);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName); List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
assertNotNull(plans); assertThat(plans, contains(
NormalizationPlan plan = plans.get(0); instanceOf(SplitNormalizationPlan.class),
assertEquals(hri3, ((SplitNormalizationPlan) plan).getRegionInfo()); instanceOf(MergeNormalizationPlan.class)));
SplitNormalizationPlan splitPlan = (SplitNormalizationPlan) plans.get(0);
assertEquals(regionInfos.get(2), splitPlan.getRegionInfo());
MergeNormalizationPlan mergePlan = (MergeNormalizationPlan) plans.get(1);
assertEquals(regionInfos.get(0), mergePlan.getFirstRegion());
assertEquals(regionInfos.get(1), mergePlan.getSecondRegion());
// have to call setupMocks again because we don't have dynamic config update on normalizer.
conf.setInt(MIN_REGION_COUNT_KEY, 4);
setupMocksForNormalizer(regionSizes, regionInfos);
plans = normalizer.computePlansForTable(tableName);
assertThat(plans, contains(instanceOf(SplitNormalizationPlan.class)));
splitPlan = (SplitNormalizationPlan) plans.get(0);
assertEquals(regionInfos.get(2), splitPlan.getRegionInfo());
}
@Test
public void testHonorsMergeMinRegionAge() {
conf.setInt(MERGE_MIN_REGION_AGE_DAYS_KEY, 7);
final TableName tableName = TableName.valueOf(name.getMethodName());
final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
final Map<byte[], Integer> regionSizes =
createRegionSizesMap(regionInfos, 1, 1, 10, 10);
setupMocksForNormalizer(regionSizes, regionInfos);
assertEquals(Period.ofDays(7), normalizer.getMergeMinRegionAge());
assertThat(
normalizer.computePlansForTable(tableName),
everyItem(not(instanceOf(MergeNormalizationPlan.class))));
// have to call setupMocks again because we don't have dynamic config update on normalizer.
conf.unset(MERGE_MIN_REGION_AGE_DAYS_KEY);
setupMocksForNormalizer(regionSizes, regionInfos);
assertEquals(
Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS), normalizer.getMergeMinRegionAge());
final List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
assertThat(plans, not(empty()));
assertThat(plans, everyItem(instanceOf(MergeNormalizationPlan.class)));
}
@Test
public void testHonorsMergeMinRegionSize() {
conf.setBoolean(SPLIT_ENABLED_KEY, false);
final TableName tableName = TableName.valueOf(name.getMethodName());
final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 1, 2, 0, 10, 10);
setupMocksForNormalizer(regionSizes, regionInfos);
assertFalse(normalizer.isSplitEnabled());
assertEquals(1, normalizer.getMergeMinRegionSizeMb());
final List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
assertThat(plans, everyItem(instanceOf(MergeNormalizationPlan.class)));
assertThat(plans, iterableWithSize(1));
final MergeNormalizationPlan plan = (MergeNormalizationPlan) plans.get(0);
assertEquals(regionInfos.get(0), plan.getFirstRegion());
assertEquals(regionInfos.get(1), plan.getSecondRegion());
conf.setInt(MERGE_MIN_REGION_SIZE_MB_KEY, 3);
setupMocksForNormalizer(regionSizes, regionInfos);
assertEquals(3, normalizer.getMergeMinRegionSizeMb());
assertThat(normalizer.computePlansForTable(tableName), empty());
}
// This test is to make sure that normalizer is only going to merge adjacent regions.
@Test
public void testNormalizerCannotMergeNonAdjacentRegions() {
final TableName tableName = TableName.valueOf(name.getMethodName());
// create 5 regions with sizes to trigger merge of small regions. region ranges are:
// [, "aa"), ["aa", "aa1"), ["aa1", "aa1!"), ["aa1!", "aa2"), ["aa2", )
// Region ["aa", "aa1") and ["aa1!", "aa2") are not adjacent, they are not supposed to
// merged.
final byte[][] keys = {
null,
Bytes.toBytes("aa"),
Bytes.toBytes("aa1!"),
Bytes.toBytes("aa1"),
Bytes.toBytes("aa2"),
null,
};
final List<RegionInfo> regionInfos = createRegionInfos(tableName, keys);
final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 3, 1, 1, 3, 5);
setupMocksForNormalizer(regionSizes, regionInfos);
// Compute the plan, no merge plan returned as they are not adjacent.
List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
assertThat(plans, empty());
} }
@SuppressWarnings("MockitoCast") @SuppressWarnings("MockitoCast")
private void setupMocksForNormalizer(Map<byte[], Integer> regionSizes, private void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
List<RegionInfo> RegionInfo) { List<RegionInfo> regionInfoList) {
masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS); masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
final MasterRpcServices masterRpcServices =
Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS);
// for simplicity all regions are assumed to be on one server; doesn't matter to us // for simplicity all regions are assumed to be on one server; doesn't matter to us
ServerName sn = ServerName.valueOf("localhost", 0, 1L); ServerName sn = ServerName.valueOf("localhost", 0, 0L);
when(masterServices.getAssignmentManager().getRegionStates() when(masterServices.getAssignmentManager().getRegionStates()
.getRegionsOfTable(any())).thenReturn(RegionInfo); .getRegionsOfTable(any())).thenReturn(regionInfoList);
when(masterServices.getAssignmentManager().getRegionStates() when(masterServices.getAssignmentManager().getRegionStates()
.getRegionServerOfRegion(any())).thenReturn(sn); .getRegionServerOfRegion(any())).thenReturn(sn);
when(masterServices.getAssignmentManager().getRegionStates() when(masterServices.getAssignmentManager().getRegionStates()
@ -598,16 +437,70 @@ public class TestSimpleRegionNormalizer {
when((Object) masterServices.getServerManager().getLoad(sn) when((Object) masterServices.getServerManager().getLoad(sn)
.getRegionMetrics().get(region.getKey())).thenReturn(regionLoad); .getRegionMetrics().get(region.getKey())).thenReturn(regionLoad);
} }
try {
when(masterRpcServices.isSplitOrMergeEnabled(any(), when(masterServices.isSplitOrMergeEnabled(any())).thenReturn(true);
any())).thenReturn(
IsSplitOrMergeEnabledResponse.newBuilder().setEnabled(true).build());
} catch (ServiceException se) {
LOG.debug("error setting isSplitOrMergeEnabled switch", se);
}
normalizer = new SimpleRegionNormalizer(); normalizer = new SimpleRegionNormalizer();
normalizer.setConf(conf);
normalizer.setMasterServices(masterServices); normalizer.setMasterServices(masterServices);
normalizer.setMasterRpcServices(masterRpcServices); }
/**
* Create a list of {@link RegionInfo}s that represent a region chain of the specified length.
*/
private static List<RegionInfo> createRegionInfos(final TableName tableName, final int length) {
if (length < 1) {
throw new IllegalStateException("length must be greater than or equal to 1.");
}
final byte[] startKey = Bytes.toBytes("aaaaa");
final byte[] endKey = Bytes.toBytes("zzzzz");
if (length == 1) {
return Collections.singletonList(createRegionInfo(tableName, startKey, endKey));
}
final byte[][] splitKeys = Bytes.split(startKey, endKey, length - 1);
final List<RegionInfo> ret = new ArrayList<>(length);
for (int i = 0; i < splitKeys.length - 1; i++) {
ret.add(createRegionInfo(tableName, splitKeys[i], splitKeys[i+1]));
}
return ret;
}
private static RegionInfo createRegionInfo(final TableName tableName, final byte[] startKey,
final byte[] endKey) {
return RegionInfoBuilder.newBuilder(tableName)
.setStartKey(startKey)
.setEndKey(endKey)
.setRegionId(generateRegionId())
.build();
}
private static long generateRegionId() {
return Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime())
.minus(Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS + 1))
.toEpochMilli();
}
private static List<RegionInfo> createRegionInfos(final TableName tableName,
final byte[][] splitKeys) {
final List<RegionInfo> ret = new ArrayList<>(splitKeys.length);
for (int i = 0; i < splitKeys.length - 1; i++) {
ret.add(createRegionInfo(tableName, splitKeys[i], splitKeys[i+1]));
}
return ret;
}
private static Map<byte[], Integer> createRegionSizesMap(final List<RegionInfo> regionInfos,
int... sizes) {
if (regionInfos.size() != sizes.length) {
throw new IllegalStateException("Parameter lengths must match.");
}
final Map<byte[], Integer> ret = new HashMap<>(regionInfos.size());
for (int i = 0; i < regionInfos.size(); i++) {
ret.put(regionInfos.get(i).getRegionName(), sizes[i]);
}
return ret;
} }
} }

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -18,19 +18,23 @@
package org.apache.hadoop.hbase.master.normalizer; package org.apache.hadoop.hbase.master.normalizer;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
@ -38,6 +42,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.TableNamespaceManager; import org.apache.hadoop.hbase.master.TableNamespaceManager;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.namespace.TestNamespaceAuditor; import org.apache.hadoop.hbase.namespace.TestNamespaceAuditor;
@ -49,6 +54,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LoadTestKVGenerator; import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Rule; import org.junit.Rule;
@ -63,16 +69,18 @@ import org.slf4j.LoggerFactory;
*/ */
@Category({MasterTests.class, MediumTests.class}) @Category({MasterTests.class, MediumTests.class})
public class TestSimpleRegionNormalizerOnCluster { public class TestSimpleRegionNormalizerOnCluster {
private static final Logger LOG =
LoggerFactory.getLogger(TestSimpleRegionNormalizerOnCluster.class);
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSimpleRegionNormalizerOnCluster.class); HBaseClassTestRule.forClass(TestSimpleRegionNormalizerOnCluster.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestSimpleRegionNormalizerOnCluster.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] FAMILYNAME = Bytes.toBytes("fam"); private static final byte[] FAMILY_NAME = Bytes.toBytes("fam");
private static Admin admin; private static Admin admin;
private static HMaster master;
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
@ -83,10 +91,14 @@ public class TestSimpleRegionNormalizerOnCluster {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
// Start a cluster of two regionservers. // no way for the test to set the regionId on a created region, so disable this feature.
TEST_UTIL.getConfiguration().setInt("hbase.normalizer.merge.min_region_age.days", 0);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
TestNamespaceAuditor.waitForQuotaInitialize(TEST_UTIL); TestNamespaceAuditor.waitForQuotaInitialize(TEST_UTIL);
admin = TEST_UTIL.getAdmin(); admin = TEST_UTIL.getAdmin();
master = TEST_UTIL.getHBaseCluster().getMaster();
assertNotNull(master);
} }
@AfterClass @AfterClass
@ -94,217 +106,180 @@ public class TestSimpleRegionNormalizerOnCluster {
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }
@Test @Before
public void testRegionNormalizationSplitOnCluster() throws Exception { public void before() throws IOException {
testRegionNormalizationSplitOnCluster(false); // disable the normalizer ahead of time, let the test enable it when its ready.
testRegionNormalizationSplitOnCluster(true); admin.normalizerSwitch(false);
} }
void testRegionNormalizationSplitOnCluster(boolean limitedByQuota) throws Exception { @Test
TableName TABLENAME; public void testHonorsNormalizerSwitch() throws IOException {
assertFalse(admin.isNormalizerEnabled());
assertFalse(admin.normalize());
assertFalse(admin.normalizerSwitch(true));
assertTrue(admin.normalize());
}
/**
* Test that disabling normalizer via table configuration is honored. There's
* no side-effect to look for (other than a log message), so normalize two
* tables, one with the disabled setting, and look for change in one and no
* change in the other.
*/
@Test
public void testHonorsNormalizerTableSetting() throws Exception {
final TableName tn1 = TableName.valueOf(name.getMethodName() + "1");
final TableName tn2 = TableName.valueOf(name.getMethodName() + "2");
try {
final int tn1RegionCount = createTableBegsSplit(tn1, true);
final int tn2RegionCount = createTableBegsSplit(tn2, false);
assertFalse(admin.normalizerSwitch(true));
assertTrue(admin.normalize());
waitForTableSplit(tn1, tn1RegionCount + 1);
assertEquals(
tn1 + " should have split.",
tn1RegionCount + 1,
MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tn1));
assertEquals(
tn2 + " should not have split.",
tn2RegionCount,
MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tn2));
} finally {
dropIfExists(tn1);
dropIfExists(tn2);
}
}
@Test
public void testRegionNormalizationSplitWithoutQuotaLimit() throws Exception {
testRegionNormalizationSplit(false);
}
@Test
public void testRegionNormalizationSplitWithQuotaLimit() throws Exception {
testRegionNormalizationSplit(true);
}
void testRegionNormalizationSplit(boolean limitedByQuota) throws Exception {
TableName tableName = null;
try {
tableName = limitedByQuota
? buildTableNameForQuotaTest(name.getMethodName())
: TableName.valueOf(name.getMethodName());
final int currentRegionCount = createTableBegsSplit(tableName, true);
final long existingSkippedSplitCount = master.getRegionNormalizer()
.getSkippedCount(PlanType.SPLIT);
assertFalse(admin.normalizerSwitch(true));
assertTrue(admin.normalize());
if (limitedByQuota) { if (limitedByQuota) {
waitForSkippedSplits(master, existingSkippedSplitCount);
assertEquals(
tableName + " should not have split.",
currentRegionCount,
MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
} else {
waitForTableSplit(tableName, currentRegionCount + 1);
assertEquals(
tableName + " should have split.",
currentRegionCount + 1,
MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
}
} finally {
dropIfExists(tableName);
}
}
@Test
public void testRegionNormalizationMerge() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
try {
final int currentRegionCount = createTableBegsMerge(tableName);
assertFalse(admin.normalizerSwitch(true));
assertTrue(admin.normalize());
waitforTableMerge(tableName, currentRegionCount - 1);
assertEquals(
tableName + " should have merged.",
currentRegionCount - 1,
MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
} finally {
dropIfExists(tableName);
}
}
private static TableName buildTableNameForQuotaTest(final String methodName) throws IOException {
String nsp = "np2"; String nsp = "np2";
NamespaceDescriptor nspDesc = NamespaceDescriptor nspDesc =
NamespaceDescriptor.create(nsp) NamespaceDescriptor.create(nsp)
.addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "5") .addConfiguration(TableNamespaceManager.KEY_MAX_REGIONS, "5")
.addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "2").build(); .addConfiguration(TableNamespaceManager.KEY_MAX_TABLES, "2").build();
admin.createNamespace(nspDesc); admin.createNamespace(nspDesc);
TABLENAME = TableName.valueOf(nsp + return TableName.valueOf(nsp + TableName.NAMESPACE_DELIM + methodName);
TableName.NAMESPACE_DELIM + name.getMethodName());
} else {
TABLENAME = TableName.valueOf(name.getMethodName());
} }
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
HMaster m = cluster.getMaster();
try (Table ht = TEST_UTIL.createMultiRegionTable(TABLENAME, FAMILYNAME, 5)) { private static void waitForSkippedSplits(final HMaster master,
final long existingSkippedSplitCount) throws Exception {
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(5), new ExplainingPredicate<Exception>() {
@Override public String explainFailure() {
return "waiting to observe split attempt and skipped.";
}
@Override public boolean evaluate() {
final long skippedSplitCount = master.getRegionNormalizer().getSkippedCount(PlanType.SPLIT);
return skippedSplitCount > existingSkippedSplitCount;
}
});
}
private static void waitForTableSplit(final TableName tableName, final int targetRegionCount)
throws IOException {
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(5), new ExplainingPredicate<IOException>() {
@Override public String explainFailure() {
return "expected normalizer to split region.";
}
@Override public boolean evaluate() throws IOException {
final int currentRegionCount =
MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName);
return currentRegionCount >= targetRegionCount;
}
});
}
private static void waitforTableMerge(final TableName tableName, final int targetRegionCount)
throws IOException {
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(5), new ExplainingPredicate<IOException>() {
@Override public String explainFailure() {
return "expected normalizer to merge regions.";
}
@Override public boolean evaluate() throws IOException {
final int currentRegionCount =
MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName);
return currentRegionCount <= targetRegionCount;
}
});
}
private static List<HRegion> generateTestData(final TableName tableName,
final int... regionSizesMb) throws IOException {
final List<HRegion> generatedRegions;
final int numRegions = regionSizesMb.length;
try (Table ignored = TEST_UTIL.createMultiRegionTable(tableName, FAMILY_NAME, numRegions)) {
// Need to get sorted list of regions here // Need to get sorted list of regions here
List<HRegion> generatedRegions = TEST_UTIL.getHBaseCluster().getRegions(TABLENAME); generatedRegions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
Collections.sort(generatedRegions, Comparator.comparing(HRegion::getRegionInfo, RegionInfo.COMPARATOR));
HRegion region = generatedRegions.get(0);
generateTestData(region, 1);
region.flush(true);
region = generatedRegions.get(1);
generateTestData(region, 1);
region.flush(true);
region = generatedRegions.get(2);
generateTestData(region, 2);
region.flush(true);
region = generatedRegions.get(3);
generateTestData(region, 2);
region.flush(true);
region = generatedRegions.get(4);
generateTestData(region, 5);
region.flush(true);
}
final TableDescriptor td = TableDescriptorBuilder.newBuilder(admin.getDescriptor(TABLENAME))
.setNormalizationEnabled(true)
.build();
admin.modifyTable(td);
admin.flush(TABLENAME);
assertEquals(5, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), TABLENAME));
// Now trigger a split and stop when the split is in progress
Thread.sleep(5000); // to let region load to update
m.normalizeRegions();
if (limitedByQuota) {
long skippedSplitcnt = 0;
do {
skippedSplitcnt = m.getRegionNormalizer().getSkippedCount(PlanType.SPLIT);
Thread.sleep(100);
} while (skippedSplitcnt == 0L);
assert(skippedSplitcnt > 0);
} else {
while (true) {
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(TABLENAME);
int cnt = 0;
for (HRegion region : regions) {
String regionName = region.getRegionInfo().getRegionNameAsString();
if (regionName.startsWith("testRegionNormalizationSplitOnCluster,zzzzz")) {
cnt++;
}
}
if (cnt >= 2) {
break;
}
}
}
admin.disableTable(TABLENAME);
admin.deleteTable(TABLENAME);
}
// This test is to make sure that normalizer is only going to merge adjacent regions.
@Test
public void testNormalizerCannotMergeNonAdjacentRegions() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
HMaster m = cluster.getMaster();
// create 5 regions with sizes to trigger merge of small regions
final byte[][] keys = {
Bytes.toBytes("aa"),
Bytes.toBytes("aa1"),
Bytes.toBytes("aa1!"),
Bytes.toBytes("aa2")
};
try (Table ht = TEST_UTIL.createTable(tableName, FAMILYNAME, keys)) {
// Need to get sorted list of regions here, the order is
// [, "aa"), ["aa", "aa1"), ["aa1", "aa1!"), ["aa1!", "aa2"), ["aa2", )
List<HRegion> generatedRegions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
generatedRegions.sort(Comparator.comparing(HRegion::getRegionInfo, RegionInfo.COMPARATOR)); generatedRegions.sort(Comparator.comparing(HRegion::getRegionInfo, RegionInfo.COMPARATOR));
assertEquals(numRegions, generatedRegions.size());
// Region ["aa", "aa1") and ["aa1!", "aa2") are not adjacent, they are not supposed to for (int i = 0; i < numRegions; i++) {
// merged. HRegion region = generatedRegions.get(i);
HRegion region = generatedRegions.get(0); generateTestData(region, regionSizesMb[i]);
generateTestData(region, 3);
region.flush(true);
region = generatedRegions.get(1);
generateTestData(region, 1);
region.flush(true);
region = generatedRegions.get(2);
generateTestData(region, 3);
region.flush(true);
region = generatedRegions.get(3);
generateTestData(region, 1);
region.flush(true);
region = generatedRegions.get(4);
generateTestData(region, 5);
region.flush(true);
final TableDescriptor td = TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName))
.setNormalizationEnabled(true)
.build();
admin.modifyTable(td);
admin.flush(tableName);
assertEquals(5, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
Thread.sleep(5000); // to let region load to update
// Compute the plan, no merge plan returned as they are not adjacent.
final List<NormalizationPlan> plans = m.getRegionNormalizer().computePlanForTable(tableName);
assertNull(plans);
} finally {
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
}
}
@Test
public void testRegionNormalizationMergeOnCluster() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
HMaster m = cluster.getMaster();
// create 5 regions with sizes to trigger merge of small regions
try (Table ht = TEST_UTIL.createMultiRegionTable(tableName, FAMILYNAME, 5)) {
// Need to get sorted list of regions here
List<HRegion> generatedRegions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
generatedRegions.sort(Comparator.comparing(HRegion::getRegionInfo, RegionInfo.COMPARATOR));
HRegion region = generatedRegions.get(0);
generateTestData(region, 1);
region.flush(true);
region = generatedRegions.get(1);
generateTestData(region, 1);
region.flush(true);
region = generatedRegions.get(2);
generateTestData(region, 3);
region.flush(true);
region = generatedRegions.get(3);
generateTestData(region, 3);
region.flush(true);
region = generatedRegions.get(4);
generateTestData(region, 5);
region.flush(true); region.flush(true);
} }
}
final TableDescriptor td = TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName)) return generatedRegions;
.setNormalizationEnabled(true)
.build();
admin.modifyTable(td);
admin.flush(tableName);
assertEquals(5, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
// Now trigger a merge and stop when the merge is in progress
Thread.sleep(5000); // to let region load to update
m.normalizeRegions();
while (MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName) > 4) {
LOG.info("Waiting for normalization merge to complete");
Thread.sleep(100);
} }
assertEquals(4, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName)); private static void generateTestData(Region region, int numRows) throws IOException {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
private void generateTestData(Region region, int numRows) throws IOException {
// generating 1Mb values // generating 1Mb values
LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(1024 * 1024, 1024 * 1024); LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(1024 * 1024, 1024 * 1024);
for (int i = 0; i < numRows; ++i) { for (int i = 0; i < numRows; ++i) {
@ -313,9 +288,107 @@ public class TestSimpleRegionNormalizerOnCluster {
Put put = new Put(key); Put put = new Put(key);
byte[] col = Bytes.toBytes(String.valueOf(j)); byte[] col = Bytes.toBytes(String.valueOf(j));
byte[] value = dataGenerator.generateRandomSizeValue(key, col); byte[] value = dataGenerator.generateRandomSizeValue(key, col);
put.addColumn(FAMILYNAME, col, value); put.addColumn(FAMILY_NAME, col, value);
region.put(put); region.put(put);
} }
} }
} }
private static double getRegionSizeMB(final MasterServices masterServices,
final RegionInfo regionInfo) {
final ServerName sn = masterServices.getAssignmentManager()
.getRegionStates()
.getRegionServerOfRegion(regionInfo);
final RegionMetrics regionLoad = masterServices.getServerManager()
.getLoad(sn)
.getRegionMetrics()
.get(regionInfo.getRegionName());
if (regionLoad == null) {
LOG.debug("{} was not found in RegionsLoad", regionInfo.getRegionNameAsString());
return -1;
}
return regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
}
/**
* create a table with 5 regions, having region sizes so as to provoke a split
* of the largest region.
* <ul>
* <li>total table size: 12</li>
* <li>average region size: 2.4</li>
* <li>split threshold: 2.4 * 2 = 4.8</li>
* </ul>
*/
private static int createTableBegsSplit(final TableName tableName, final boolean balancerEnabled)
throws IOException {
final List<HRegion> generatedRegions = generateTestData(tableName, 1, 1, 2, 3, 5);
assertEquals(5, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
admin.flush(tableName);
final TableDescriptor td = TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName))
.setNormalizationEnabled(balancerEnabled)
.build();
admin.modifyTable(td);
// make sure relatively accurate region statistics are available for the test table. use
// the last/largest region as clue.
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new ExplainingPredicate<IOException>() {
@Override public String explainFailure() {
return "expected largest region to be >= 4mb.";
}
@Override public boolean evaluate() {
return generatedRegions.stream()
.mapToDouble(val -> getRegionSizeMB(master, val.getRegionInfo()))
.allMatch(val -> val > 0)
&& getRegionSizeMB(master, generatedRegions.get(4).getRegionInfo()) >= 4.0;
}
});
return 5;
}
/**
* create a table with 5 regions, having region sizes so as to provoke a merge
* of the smallest regions.
* <ul>
* <li>total table size: 13</li>
* <li>average region size: 2.6</li>
* <li>sum of sizes of first two regions < average</li>
* </ul>
*/
private static int createTableBegsMerge(final TableName tableName) throws IOException {
// create 5 regions with sizes to trigger merge of small regions
final List<HRegion> generatedRegions = generateTestData(tableName, 1, 1, 3, 3, 5);
assertEquals(5, MetaTableAccessor.getRegionCount(TEST_UTIL.getConnection(), tableName));
admin.flush(tableName);
final TableDescriptor td = TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName))
.setNormalizationEnabled(true)
.build();
admin.modifyTable(td);
// make sure relatively accurate region statistics are available for the test table. use
// the last/largest region as clue.
LOG.debug("waiting for region statistics to settle.");
TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new ExplainingPredicate<IOException>() {
@Override public String explainFailure() {
return "expected largest region to be >= 4mb.";
}
@Override public boolean evaluate() {
return generatedRegions.stream()
.mapToDouble(val -> getRegionSizeMB(master, val.getRegionInfo()))
.allMatch(val -> val > 0)
&& getRegionSizeMB(master, generatedRegions.get(4).getRegionInfo()) >= 4.0;
}
});
return 5;
}
private static void dropIfExists(final TableName tableName) throws IOException {
if (tableName != null && admin.tableExists(tableName)) {
if (admin.isTableEnabled(tableName)) {
admin.disableTable(tableName);
}
admin.deleteTable(tableName);
}
}
} }

View File

@ -3537,14 +3537,23 @@ To enable ACL, add the following to your hbase-site.xml and restart your Master:
[[normalizer]] [[normalizer]]
== Region Normalizer == Region Normalizer
The Region Normalizer tries to make Regions all in a table about the same in size. The Region Normalizer tries to make Regions all in a table about the same in
It does this by finding a rough average. Any region that is larger than twice this size. It does this by first calculating total table size and average size per
size is split. Any region that is much smaller is merged into an adjacent region. region. It splits any region that is larger than twice this size. Any region
It is good to run the Normalizer on occasion on a down time after the cluster has that is much smaller is merged into an adjacent region. The Normalizer runs on
a regular schedule, which is configurable. It can also be disabled entirely via
a runtime "switch". It can be run manually via the shell or Admin API call.
Even if normally disabled, it is good to run manually after the cluster has
been running a while or say after a burst of activity such as a large delete. been running a while or say after a burst of activity such as a large delete.
The Normalizer works well for bringing a table's region boundaries into
alignment with the reality of data distribution after an initial effort at
pre-splitting a table. It is also a nice compliment to the data TTL feature
when the schema includes timestamp in the rowkey, as it will automatically
merge away regions whose contents have expired.
(The bulk of the below detail was copied wholesale from the blog by Romil Choksi at (The bulk of the below detail was copied wholesale from the blog by Romil Choksi at
link:https://community.hortonworks.com/articles/54987/hbase-region-normalizer.html[HBase Region Normalizer]) link:https://community.hortonworks.com/articles/54987/hbase-region-normalizer.html[HBase Region Normalizer]).
The Region Normalizer is feature available since HBase-1.2. It runs a set of The Region Normalizer is feature available since HBase-1.2. It runs a set of
pre-calculated merge/split actions to resize regions that are either too pre-calculated merge/split actions to resize regions that are either too