HBASE-22285 A normalizer which merges small size regions with adjacent regions (#978)
Signed-off-by: Viraj Jasani <vjasani@apache.org> Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
ebfc1a206c
commit
c9c22fedaa
|
@ -630,6 +630,17 @@ possible configurations would overwhelm and obscure the important.
|
|||
<value>300000</value>
|
||||
<description>Period at which the region normalizer runs in the Master.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.normalizer.min.region.count</name>
|
||||
<value>3</value>
|
||||
<description>configure the minimum number of regions</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.normalizer.min.region.merge.age</name>
|
||||
<value>3</value>
|
||||
<description>configure the minimum age in days for region before it is considered for merge while
|
||||
normalizing</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.regions.slop</name>
|
||||
<value>0.001</value>
|
||||
|
|
|
@ -0,0 +1,213 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.hbase.RegionMetrics;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Size;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.MasterSwitchType;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.master.MasterRpcServices;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public abstract class AbstractRegionNormalizer implements RegionNormalizer {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbstractRegionNormalizer.class);
|
||||
protected MasterServices masterServices;
|
||||
protected MasterRpcServices masterRpcServices;
|
||||
|
||||
/**
|
||||
* Set the master service.
|
||||
* @param masterServices inject instance of MasterServices
|
||||
*/
|
||||
@Override
|
||||
public void setMasterServices(MasterServices masterServices) {
|
||||
this.masterServices = masterServices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
|
||||
this.masterRpcServices = masterRpcServices;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param hri regioninfo
|
||||
* @return size of region in MB and if region is not found than -1
|
||||
*/
|
||||
protected long getRegionSize(RegionInfo hri) {
|
||||
ServerName sn =
|
||||
masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
|
||||
RegionMetrics regionLoad =
|
||||
masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
|
||||
if (regionLoad == null) {
|
||||
LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
|
||||
return -1;
|
||||
}
|
||||
return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
|
||||
}
|
||||
|
||||
protected boolean isMergeEnabled() {
|
||||
boolean mergeEnabled = true;
|
||||
try {
|
||||
mergeEnabled = masterRpcServices
|
||||
.isSplitOrMergeEnabled(null,
|
||||
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE))
|
||||
.getEnabled();
|
||||
} catch (ServiceException e) {
|
||||
LOG.warn("Unable to determine whether merge is enabled", e);
|
||||
}
|
||||
return mergeEnabled;
|
||||
}
|
||||
|
||||
protected boolean isSplitEnabled() {
|
||||
boolean splitEnabled = true;
|
||||
try {
|
||||
splitEnabled = masterRpcServices
|
||||
.isSplitOrMergeEnabled(null,
|
||||
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT))
|
||||
.getEnabled();
|
||||
} catch (ServiceException se) {
|
||||
LOG.warn("Unable to determine whether split is enabled", se);
|
||||
}
|
||||
return splitEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param tableRegions regions of table to normalize
|
||||
* @return average region size depending on
|
||||
* @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
|
||||
* Also make sure tableRegions contains regions of the same table
|
||||
*/
|
||||
protected double getAverageRegionSize(List<RegionInfo> tableRegions) {
|
||||
long totalSizeMb = 0;
|
||||
int acutalRegionCnt = 0;
|
||||
for (RegionInfo hri : tableRegions) {
|
||||
long regionSize = getRegionSize(hri);
|
||||
// don't consider regions that are in bytes for averaging the size.
|
||||
if (regionSize > 0) {
|
||||
acutalRegionCnt++;
|
||||
totalSizeMb += regionSize;
|
||||
}
|
||||
}
|
||||
TableName table = tableRegions.get(0).getTable();
|
||||
int targetRegionCount = -1;
|
||||
long targetRegionSize = -1;
|
||||
try {
|
||||
TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
|
||||
if (tableDescriptor != null) {
|
||||
targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
|
||||
targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
|
||||
LOG.debug("Table {}: target region count is {}, target region size is {}", table,
|
||||
targetRegionCount, targetRegionSize);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn(
|
||||
"cannot get the target number and target size of table {}, they will be default value -1.",
|
||||
table, e);
|
||||
}
|
||||
|
||||
double avgRegionSize;
|
||||
if (targetRegionSize > 0) {
|
||||
avgRegionSize = targetRegionSize;
|
||||
} else if (targetRegionCount > 0) {
|
||||
avgRegionSize = totalSizeMb / (double) targetRegionCount;
|
||||
} else {
|
||||
avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
|
||||
}
|
||||
|
||||
LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,
|
||||
totalSizeMb, avgRegionSize);
|
||||
return avgRegionSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the merge plans that should be executed for this table to converge average region
|
||||
* towards target average or target region count
|
||||
* @param table table to normalize
|
||||
* @return list of merge normalization plans
|
||||
*/
|
||||
protected List<NormalizationPlan> getMergeNormalizationPlan(TableName table) {
|
||||
List<NormalizationPlan> plans = new ArrayList<>();
|
||||
List<RegionInfo> tableRegions =
|
||||
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
|
||||
double avgRegionSize = getAverageRegionSize(tableRegions);
|
||||
LOG.debug("Table {}, average region size: {}.\n Computing normalization plan for table: {}, "
|
||||
+ "number of regions: {}",
|
||||
table, avgRegionSize, table, tableRegions.size());
|
||||
|
||||
int candidateIdx = 0;
|
||||
while (candidateIdx < tableRegions.size() - 1) {
|
||||
RegionInfo hri = tableRegions.get(candidateIdx);
|
||||
long regionSize = getRegionSize(hri);
|
||||
RegionInfo hri2 = tableRegions.get(candidateIdx + 1);
|
||||
long regionSize2 = getRegionSize(hri2);
|
||||
if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
|
||||
// atleast one of the two regions should be older than MIN_REGION_DURATION days
|
||||
plans.add(new MergeNormalizationPlan(hri, hri2));
|
||||
candidateIdx++;
|
||||
} else {
|
||||
LOG.debug("Skipping region {} of table {} with size {}", hri.getRegionNameAsString(), table,
|
||||
regionSize);
|
||||
}
|
||||
candidateIdx++;
|
||||
}
|
||||
return plans;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the split plans that should be executed for this table to converge average region size
|
||||
* towards target average or target region count
|
||||
* @param table table to normalize
|
||||
* @return list of split normalization plans
|
||||
*/
|
||||
protected List<NormalizationPlan> getSplitNormalizationPlan(TableName table) {
|
||||
List<NormalizationPlan> plans = new ArrayList<>();
|
||||
List<RegionInfo> tableRegions =
|
||||
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
|
||||
double avgRegionSize = getAverageRegionSize(tableRegions);
|
||||
LOG.debug("Table {}, average region size: {}", table, avgRegionSize);
|
||||
|
||||
int candidateIdx = 0;
|
||||
while (candidateIdx < tableRegions.size()) {
|
||||
RegionInfo hri = tableRegions.get(candidateIdx);
|
||||
long regionSize = getRegionSize(hri);
|
||||
// if the region is > 2 times larger than average, we split it, split
|
||||
// is more high priority normalization action than merge.
|
||||
if (regionSize > 2 * avgRegionSize) {
|
||||
LOG.info("Table {}, large region {} has size {}, more than twice avg size, splitting",
|
||||
table, hri.getRegionNameAsString(), regionSize);
|
||||
plans.add(new SplitNormalizationPlan(hri, null));
|
||||
}
|
||||
candidateIdx++;
|
||||
}
|
||||
return plans;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,143 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Implementation of MergeNormalizer Logic in use:
|
||||
* <ol>
|
||||
* <li>get all regions of a given table
|
||||
* <li>get avg size S of each region (by total size of store files reported in RegionLoad)
|
||||
* <li>two regions R1 and its neighbour R2 are merged, if R1 + R2 < S, and all such regions are
|
||||
* returned to be merged
|
||||
* <li>Otherwise, no action is performed
|
||||
* </ol>
|
||||
* <p>
|
||||
* Considering the split policy takes care of splitting region we also want a way to merge when
|
||||
* regions are too small. It is little different than what
|
||||
* {@link org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer} does. Instead of doing
|
||||
* splits and merge both to achieve average region size in cluster for a table. We only merge
|
||||
* regions(older than defined age) and rely on Split policy for region splits. The goal of this
|
||||
* normalizer is to merge small regions to make size of regions close to average size (which is
|
||||
* either average size or depends on either target region size or count in that order). Consider
|
||||
* region with size 1,2,3,4,10,10,10,5,4,3. If minimum merge age is set to 0 days this algorithm
|
||||
* will find the average size as 7.2 assuming we haven't provided target region count or size. Now
|
||||
* we will find all those adjacent region which if merged doesn't exceed the average size. so we
|
||||
* will merge 1-2, 3-4, 4,3 in our first run. To get best results from this normalizer theoretically
|
||||
* we should set target region size between 0.5 to 0.75 of configured maximum file size. If we set
|
||||
* min merge age as 3 we create plan as above and see if we have a plan which has both regions as
|
||||
* new(age less than 3) we discard such plans and we consider the regions even if one of the region
|
||||
* is old enough to be merged.
|
||||
* </p>
|
||||
*/
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class MergeNormalizer extends AbstractRegionNormalizer {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);
|
||||
|
||||
private int minRegionCount;
|
||||
private int minRegionAge;
|
||||
private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
|
||||
|
||||
public MergeNormalizer() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
minRegionCount = conf.getInt("hbase.normalizer.min.region.count", 3);
|
||||
minRegionAge = conf.getInt("hbase.normalizer.min.region.merge.age", 3);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void planSkipped(RegionInfo hri, NormalizationPlan.PlanType type) {
|
||||
skippedCount[type.ordinal()]++;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSkippedCount(NormalizationPlan.PlanType type) {
|
||||
return skippedCount[type.ordinal()];
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
|
||||
List<NormalizationPlan> plans = new ArrayList<>();
|
||||
if (!shouldNormalize(table)) {
|
||||
return null;
|
||||
}
|
||||
// at least one of the two regions should be older than MIN_REGION_AGE days
|
||||
List<NormalizationPlan> normalizationPlans = getMergeNormalizationPlan(table);
|
||||
for (NormalizationPlan plan : normalizationPlans) {
|
||||
if (plan instanceof MergeNormalizationPlan) {
|
||||
RegionInfo hri = ((MergeNormalizationPlan) plan).getFirstRegion();
|
||||
RegionInfo hri2 = ((MergeNormalizationPlan) plan).getSecondRegion();
|
||||
if (isOldEnoughToMerge(hri) || isOldEnoughToMerge(hri2)) {
|
||||
plans.add(plan);
|
||||
} else {
|
||||
LOG.debug("Skipping region {} and {} as they are both new", hri.getEncodedName(),
|
||||
hri2.getEncodedName());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (plans.isEmpty()) {
|
||||
LOG.debug("No normalization needed, regions look good for table: {}", table);
|
||||
return null;
|
||||
}
|
||||
return plans;
|
||||
}
|
||||
|
||||
private boolean isOldEnoughToMerge(RegionInfo hri) {
|
||||
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
|
||||
Timestamp hriTime = new Timestamp(hri.getRegionId());
|
||||
boolean isOld =
|
||||
new Timestamp(hriTime.getTime() + TimeUnit.DAYS.toMillis(minRegionAge))
|
||||
.before(currentTime);
|
||||
return isOld;
|
||||
}
|
||||
|
||||
private boolean shouldNormalize(TableName table) {
|
||||
boolean normalize = false;
|
||||
if (table == null || table.isSystemTable()) {
|
||||
LOG.debug("Normalization of system table {} isn't allowed", table);
|
||||
} else if (!isMergeEnabled()) {
|
||||
LOG.debug("Merge disabled for table: {}", table);
|
||||
} else {
|
||||
List<RegionInfo> tableRegions =
|
||||
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
|
||||
if (tableRegions == null || tableRegions.size() < minRegionCount) {
|
||||
int nrRegions = tableRegions == null ? 0 : tableRegions.size();
|
||||
LOG.debug(
|
||||
"Table {} has {} regions, required min number of regions for normalizer to run is {} , "
|
||||
+ "not running normalizer",
|
||||
table, nrRegions, minRegionCount);
|
||||
} else {
|
||||
normalize = true;
|
||||
}
|
||||
}
|
||||
return normalize;
|
||||
}
|
||||
}
|
|
@ -18,68 +18,44 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.RegionMetrics;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Size;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.MasterSwitchType;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.master.MasterRpcServices;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
|
||||
/**
|
||||
* Simple implementation of region normalizer.
|
||||
*
|
||||
* Logic in use:
|
||||
*
|
||||
* <ol>
|
||||
* <li> Get all regions of a given table
|
||||
* <li> Get avg size S of each region (by total size of store files reported in RegionMetrics)
|
||||
* <li> Seek every single region one by one. If a region R0 is bigger than S * 2, it is
|
||||
* kindly requested to split. Thereon evaluate the next region R1
|
||||
* <li> Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge.
|
||||
* Thereon evaluate the next region R2
|
||||
* <li> Otherwise, R1 is evaluated
|
||||
* Simple implementation of region normalizer. Logic in use:
|
||||
* <ol>
|
||||
* <li>Get all regions of a given table
|
||||
* <li>Get avg size S of each region (by total size of store files reported in RegionMetrics)
|
||||
* <li>Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
|
||||
* requested to split. Thereon evaluate the next region R1
|
||||
* <li>Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
|
||||
* evaluate the next region R2
|
||||
* <li>Otherwise, R1 is evaluated
|
||||
* </ol>
|
||||
* <p>
|
||||
* Region sizes are coarse and approximate on the order of megabytes. Additionally,
|
||||
* "empty" regions (less than 1MB, with the previous note) are not merged away. This
|
||||
* is by design to prevent normalization from undoing the pre-splitting of a table.
|
||||
* Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
|
||||
* (less than 1MB, with the previous note) are not merged away. This is by design to prevent
|
||||
* normalization from undoing the pre-splitting of a table.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SimpleRegionNormalizer implements RegionNormalizer {
|
||||
public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
|
||||
private static final int MIN_REGION_COUNT = 3;
|
||||
private MasterServices masterServices;
|
||||
private MasterRpcServices masterRpcServices;
|
||||
private int minRegionCount;
|
||||
private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
|
||||
|
||||
/**
|
||||
* Set the master service.
|
||||
* @param masterServices inject instance of MasterServices
|
||||
*/
|
||||
@Override
|
||||
public void setMasterServices(MasterServices masterServices) {
|
||||
this.masterServices = masterServices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
|
||||
this.masterRpcServices = masterRpcServices;
|
||||
public SimpleRegionNormalizer() {
|
||||
minRegionCount = HBaseConfiguration.create().getInt("hbase.normalizer.min.region.count", 3);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -115,138 +91,56 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
private Comparator<NormalizationPlan> planComparator = new PlanComparator();
|
||||
|
||||
/**
|
||||
* Computes next most "urgent" normalization action on the table.
|
||||
* Action may be either a split, or a merge, or no action.
|
||||
*
|
||||
* Computes next most "urgent" normalization action on the table. Action may be either a split, or
|
||||
* a merge, or no action.
|
||||
* @param table table to normalize
|
||||
* @return normalization plan to execute
|
||||
*/
|
||||
@Override
|
||||
public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
|
||||
if (table == null || table.isSystemTable()) {
|
||||
LOG.debug("Normalization of system table " + table + " isn't allowed");
|
||||
LOG.debug("Normalization of system table {} isn't allowed", table);
|
||||
return null;
|
||||
}
|
||||
boolean splitEnabled = true, mergeEnabled = true;
|
||||
try {
|
||||
splitEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
|
||||
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
|
||||
} catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
|
||||
LOG.debug("Unable to determine whether split is enabled", e);
|
||||
}
|
||||
try {
|
||||
mergeEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
|
||||
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
|
||||
} catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
|
||||
LOG.debug("Unable to determine whether merge is enabled", e);
|
||||
}
|
||||
boolean splitEnabled = isSplitEnabled();
|
||||
boolean mergeEnabled = isMergeEnabled();
|
||||
if (!mergeEnabled && !splitEnabled) {
|
||||
LOG.debug("Both split and merge are disabled for table: " + table);
|
||||
LOG.debug("Both split and merge are disabled for table: {}", table);
|
||||
return null;
|
||||
}
|
||||
List<NormalizationPlan> plans = new ArrayList<>();
|
||||
List<RegionInfo> tableRegions = masterServices.getAssignmentManager().getRegionStates().
|
||||
getRegionsOfTable(table);
|
||||
List<RegionInfo> tableRegions =
|
||||
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
|
||||
|
||||
//TODO: should we make min number of regions a config param?
|
||||
if (tableRegions == null || tableRegions.size() < MIN_REGION_COUNT) {
|
||||
if (tableRegions == null || tableRegions.size() < minRegionCount) {
|
||||
int nrRegions = tableRegions == null ? 0 : tableRegions.size();
|
||||
LOG.debug("Table " + table + " has " + nrRegions + " regions, required min number"
|
||||
+ " of regions for normalizer to run is " + MIN_REGION_COUNT + ", not running normalizer");
|
||||
LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run is "
|
||||
+ "{}, not running normalizer",
|
||||
table, nrRegions, minRegionCount);
|
||||
return null;
|
||||
}
|
||||
|
||||
LOG.debug("Computing normalization plan for table: " + table +
|
||||
", number of regions: " + tableRegions.size());
|
||||
LOG.debug("Computing normalization plan for table: {}, number of regions: {}", table,
|
||||
tableRegions.size());
|
||||
|
||||
long totalSizeMb = 0;
|
||||
int acutalRegionCnt = 0;
|
||||
|
||||
for (int i = 0; i < tableRegions.size(); i++) {
|
||||
RegionInfo hri = tableRegions.get(i);
|
||||
long regionSize = getRegionSize(hri);
|
||||
if (regionSize > 0) {
|
||||
acutalRegionCnt++;
|
||||
totalSizeMb += regionSize;
|
||||
if (splitEnabled) {
|
||||
List<NormalizationPlan> splitPlans = getSplitNormalizationPlan(table);
|
||||
if (splitPlans != null) {
|
||||
plans.addAll(splitPlans);
|
||||
}
|
||||
}
|
||||
int targetRegionCount = -1;
|
||||
long targetRegionSize = -1;
|
||||
try {
|
||||
TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
|
||||
if(tableDescriptor != null) {
|
||||
targetRegionCount =
|
||||
tableDescriptor.getNormalizerTargetRegionCount();
|
||||
targetRegionSize =
|
||||
tableDescriptor.getNormalizerTargetRegionSize();
|
||||
LOG.debug("Table {}: target region count is {}, target region size is {}", table,
|
||||
targetRegionCount, targetRegionSize);
|
||||
|
||||
if (mergeEnabled) {
|
||||
List<NormalizationPlan> mergePlans = getMergeNormalizationPlan(table);
|
||||
if (mergePlans != null) {
|
||||
plans.addAll(mergePlans);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn(
|
||||
"cannot get the target number and target size of table {}, they will be default value -1.",
|
||||
table);
|
||||
}
|
||||
|
||||
double avgRegionSize;
|
||||
if (targetRegionSize > 0) {
|
||||
avgRegionSize = targetRegionSize;
|
||||
} else if (targetRegionCount > 0) {
|
||||
avgRegionSize = totalSizeMb / (double) targetRegionCount;
|
||||
} else {
|
||||
avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
|
||||
}
|
||||
|
||||
LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb);
|
||||
LOG.debug("Table " + table + ", average region size: " + avgRegionSize);
|
||||
|
||||
int candidateIdx = 0;
|
||||
while (candidateIdx < tableRegions.size()) {
|
||||
RegionInfo hri = tableRegions.get(candidateIdx);
|
||||
long regionSize = getRegionSize(hri);
|
||||
// if the region is > 2 times larger than average, we split it, split
|
||||
// is more high priority normalization action than merge.
|
||||
if (regionSize > 2 * avgRegionSize) {
|
||||
if (splitEnabled) {
|
||||
LOG.info("Table " + table + ", large region " + hri.getRegionNameAsString() + " has size "
|
||||
+ regionSize + ", more than twice avg size, splitting");
|
||||
plans.add(new SplitNormalizationPlan(hri, null));
|
||||
}
|
||||
} else {
|
||||
if (candidateIdx == tableRegions.size()-1) {
|
||||
break;
|
||||
}
|
||||
if (mergeEnabled) {
|
||||
RegionInfo hri2 = tableRegions.get(candidateIdx+1);
|
||||
long regionSize2 = getRegionSize(hri2);
|
||||
if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
|
||||
LOG.info("Table " + table + ", small region size: " + regionSize
|
||||
+ " plus its neighbor size: " + regionSize2
|
||||
+ ", less than the avg size " + avgRegionSize + ", merging them");
|
||||
plans.add(new MergeNormalizationPlan(hri, hri2));
|
||||
candidateIdx++;
|
||||
}
|
||||
}
|
||||
}
|
||||
candidateIdx++;
|
||||
}
|
||||
if (plans.isEmpty()) {
|
||||
LOG.debug("No normalization needed, regions look good for table: " + table);
|
||||
LOG.debug("No normalization needed, regions look good for table: {}", table);
|
||||
return null;
|
||||
}
|
||||
Collections.sort(plans, planComparator);
|
||||
return plans;
|
||||
}
|
||||
|
||||
private long getRegionSize(RegionInfo hri) {
|
||||
ServerName sn = masterServices.getAssignmentManager().getRegionStates().
|
||||
getRegionServerOfRegion(hri);
|
||||
RegionMetrics regionLoad = masterServices.getServerManager().getLoad(sn).
|
||||
getRegionMetrics().get(hri.getRegionName());
|
||||
if (regionLoad == null) {
|
||||
LOG.debug(hri.getRegionNameAsString() + " was not found in RegionsLoad");
|
||||
return -1;
|
||||
}
|
||||
return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,270 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.RegionMetrics;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Size;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.master.MasterRpcServices;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
|
||||
|
||||
@Category({ MasterTests.class, SmallTests.class })
|
||||
public class TestMergeNormalizer {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMergeNormalizer.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);
|
||||
|
||||
private static RegionNormalizer normalizer;
|
||||
|
||||
// mocks
|
||||
private static MasterServices masterServices;
|
||||
private static MasterRpcServices masterRpcServices;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeAllTests() throws Exception {
|
||||
normalizer = new MergeNormalizer();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoNormalizationForMetaTable() throws HBaseIOException {
|
||||
TableName testTable = TableName.META_TABLE_NAME;
|
||||
List<RegionInfo> hris = new ArrayList<>();
|
||||
Map<byte[], Integer> regionSizes = new HashMap<>();
|
||||
|
||||
setupMocksForNormalizer(regionSizes, hris);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
|
||||
assertNull(plans);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoNormalizationIfTooFewRegions() throws HBaseIOException {
|
||||
TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
|
||||
List<RegionInfo> hris = new ArrayList<>();
|
||||
Map<byte[], Integer> regionSizes = new HashMap<>();
|
||||
|
||||
RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
|
||||
.setEndKey(Bytes.toBytes("bbb")).build();
|
||||
regionSizes.put(hri1.getRegionName(), 10);
|
||||
|
||||
RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
|
||||
.setEndKey(Bytes.toBytes("ccc")).build();
|
||||
hris.add(hri2);
|
||||
regionSizes.put(hri2.getRegionName(), 15);
|
||||
|
||||
setupMocksForNormalizer(regionSizes, hris);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
|
||||
assertNull(plans);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException {
|
||||
TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
|
||||
List<RegionInfo> hris = new ArrayList<>();
|
||||
Map<byte[], Integer> regionSizes = new HashMap<>();
|
||||
|
||||
RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
|
||||
.setEndKey(Bytes.toBytes("bbb")).build();
|
||||
hris.add(hri1);
|
||||
regionSizes.put(hri1.getRegionName(), 10);
|
||||
|
||||
RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
|
||||
.setEndKey(Bytes.toBytes("ccc")).build();
|
||||
hris.add(hri2);
|
||||
regionSizes.put(hri2.getRegionName(), 15);
|
||||
|
||||
RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
|
||||
.setEndKey(Bytes.toBytes("ddd")).build();
|
||||
hris.add(hri3);
|
||||
regionSizes.put(hri3.getRegionName(), 8);
|
||||
|
||||
RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
|
||||
.setEndKey(Bytes.toBytes("eee")).build();
|
||||
hris.add(hri4);
|
||||
regionSizes.put(hri4.getRegionName(), 10);
|
||||
|
||||
setupMocksForNormalizer(regionSizes, hris);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
|
||||
assertNull(plans);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeOfSmallRegions() throws HBaseIOException {
|
||||
TableName testTable = TableName.valueOf("testMergeOfSmallRegions");
|
||||
List<RegionInfo> hris = new ArrayList<>();
|
||||
Map<byte[], Integer> regionSizes = new HashMap<>();
|
||||
|
||||
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
|
||||
Timestamp threedaysBefore = new Timestamp(currentTime.getTime() - TimeUnit.DAYS.toMillis(3));
|
||||
|
||||
RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
|
||||
.setEndKey(Bytes.toBytes("bbb")).setRegionId(threedaysBefore.getTime()).build();
|
||||
hris.add(hri1);
|
||||
regionSizes.put(hri1.getRegionName(), 15);
|
||||
|
||||
RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
|
||||
.setEndKey(Bytes.toBytes("ccc")).setRegionId(threedaysBefore.getTime()).build();
|
||||
hris.add(hri2);
|
||||
regionSizes.put(hri2.getRegionName(), 5);
|
||||
|
||||
RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
|
||||
.setEndKey(Bytes.toBytes("ddd")).setRegionId(threedaysBefore.getTime()).build();
|
||||
hris.add(hri3);
|
||||
regionSizes.put(hri3.getRegionName(), 5);
|
||||
|
||||
RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
|
||||
.setEndKey(Bytes.toBytes("eee")).setRegionId(threedaysBefore.getTime()).build();
|
||||
hris.add(hri4);
|
||||
regionSizes.put(hri4.getRegionName(), 15);
|
||||
|
||||
RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee"))
|
||||
.setEndKey(Bytes.toBytes("fff")).build();
|
||||
hris.add(hri5);
|
||||
regionSizes.put(hri5.getRegionName(), 16);
|
||||
|
||||
RegionInfo hri6 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("fff"))
|
||||
.setEndKey(Bytes.toBytes("ggg")).setRegionId(threedaysBefore.getTime()).build();
|
||||
hris.add(hri6);
|
||||
regionSizes.put(hri6.getRegionName(), 0);
|
||||
|
||||
RegionInfo hri7 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ggg"))
|
||||
.setEndKey(Bytes.toBytes("hhh")).build();
|
||||
hris.add(hri7);
|
||||
regionSizes.put(hri7.getRegionName(), 0);
|
||||
|
||||
setupMocksForNormalizer(regionSizes, hris);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
|
||||
|
||||
NormalizationPlan plan = plans.get(0);
|
||||
assertTrue(plan instanceof MergeNormalizationPlan);
|
||||
assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
|
||||
assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
|
||||
|
||||
// to check last 0 sized regions are merged
|
||||
plan = plans.get(1);
|
||||
assertEquals(hri6, ((MergeNormalizationPlan) plan).getFirstRegion());
|
||||
assertEquals(hri7, ((MergeNormalizationPlan) plan).getSecondRegion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeOfNewSmallRegions() throws HBaseIOException {
|
||||
TableName testTable = TableName.valueOf("testMergeOfNewSmallRegions");
|
||||
List<RegionInfo> hris = new ArrayList<>();
|
||||
Map<byte[], Integer> regionSizes = new HashMap<>();
|
||||
|
||||
RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
|
||||
.setEndKey(Bytes.toBytes("bbb")).build();
|
||||
hris.add(hri1);
|
||||
regionSizes.put(hri1.getRegionName(), 15);
|
||||
|
||||
RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
|
||||
.setEndKey(Bytes.toBytes("ccc")).build();
|
||||
hris.add(hri2);
|
||||
regionSizes.put(hri2.getRegionName(), 5);
|
||||
|
||||
RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
|
||||
.setEndKey(Bytes.toBytes("ddd")).build();
|
||||
hris.add(hri3);
|
||||
regionSizes.put(hri3.getRegionName(), 16);
|
||||
|
||||
RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
|
||||
.setEndKey(Bytes.toBytes("eee")).build();
|
||||
hris.add(hri4);
|
||||
regionSizes.put(hri4.getRegionName(), 15);
|
||||
|
||||
RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee"))
|
||||
.setEndKey(Bytes.toBytes("fff")).build();
|
||||
hris.add(hri4);
|
||||
regionSizes.put(hri5.getRegionName(), 5);
|
||||
|
||||
setupMocksForNormalizer(regionSizes, hris);
|
||||
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
|
||||
|
||||
assertNull(plans);
|
||||
}
|
||||
|
||||
@SuppressWarnings("MockitoCast")
|
||||
protected void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
|
||||
List<RegionInfo> RegionInfo) {
|
||||
masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
|
||||
masterRpcServices = Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS);
|
||||
|
||||
// for simplicity all regions are assumed to be on one server; doesn't matter to us
|
||||
ServerName sn = ServerName.valueOf("localhost", 0, 1L);
|
||||
when(masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(any()))
|
||||
.thenReturn(RegionInfo);
|
||||
when(masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(any()))
|
||||
.thenReturn(sn);
|
||||
|
||||
for (Map.Entry<byte[], Integer> region : regionSizes.entrySet()) {
|
||||
RegionMetrics regionLoad = Mockito.mock(RegionMetrics.class);
|
||||
when(regionLoad.getRegionName()).thenReturn(region.getKey());
|
||||
when(regionLoad.getStoreFileSize())
|
||||
.thenReturn(new Size(region.getValue(), Size.Unit.MEGABYTE));
|
||||
|
||||
// this is possibly broken with jdk9, unclear if false positive or not
|
||||
// suppress it for now, fix it when we get to running tests on 9
|
||||
// see: http://errorprone.info/bugpattern/MockitoCast
|
||||
when((Object) masterServices.getServerManager().getLoad(sn).getRegionMetrics()
|
||||
.get(region.getKey())).thenReturn(regionLoad);
|
||||
}
|
||||
try {
|
||||
when(masterRpcServices.isSplitOrMergeEnabled(any(), any())).thenReturn(
|
||||
MasterProtos.IsSplitOrMergeEnabledResponse.newBuilder().setEnabled(true).build());
|
||||
} catch (ServiceException se) {
|
||||
LOG.debug("error setting isSplitOrMergeEnabled switch", se);
|
||||
}
|
||||
|
||||
normalizer.setMasterServices(masterServices);
|
||||
normalizer.setMasterRpcServices(masterRpcServices);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue