HBASE-27496 Optionally limit the amount of plans executed in the Normalizer (#4888)

Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
Charles Connell 2022-11-22 13:08:01 -05:00 committed by GitHub
parent 9462933179
commit 47996d6c21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 145 additions and 1 deletions

View File

@ -58,6 +58,15 @@ final class MergeNormalizationPlan implements NormalizationPlan {
return normalizationTargets;
}
@Override
public long getPlanSizeMb() {
long total = 0;
for (NormalizationTarget target : normalizationTargets) {
total += target.getRegionSizeMb();
}
return total;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)

View File

@ -34,4 +34,6 @@ public interface NormalizationPlan {
/** Returns the type of this plan */
PlanType getType();
long getPlanSizeMb();
}

View File

@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.master.normalizer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@ -53,6 +55,9 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
"hbase.normalizer.throughput.max_bytes_per_sec";
private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec
static final String CUMULATIVE_SIZE_LIMIT_MB_KEY = "hbase.normalizer.plans_size_limit.mb";
static final long DEFAULT_CUMULATIVE_SIZE_LIMIT_MB = Long.MAX_VALUE;
private final MasterServices masterServices;
private final RegionNormalizer regionNormalizer;
private final RegionNormalizerWorkQueue<TableName> workQueue;
@ -62,6 +67,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
private final boolean defaultNormalizerTableLevel;
private long splitPlanCount;
private long mergePlanCount;
private final AtomicLong cumulativePlansSizeLimitMb;
RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices,
final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue<TableName> workQueue) {
@ -73,6 +79,8 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
this.mergePlanCount = 0;
this.rateLimiter = loadRateLimiter(configuration);
this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration);
this.cumulativePlansSizeLimitMb = new AtomicLong(
configuration.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB));
}
private boolean extractDefaultNormalizerValue(final Configuration configuration) {
@ -96,9 +104,20 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
}
}
private static long logLongConfigurationUpdated(final String key, final long oldValue,
final long newValue) {
if (oldValue != newValue) {
LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue);
}
return newValue;
}
@Override
public void onConfigurationChange(Configuration conf) {
rateLimiter.setRate(loadRateLimit(conf));
cumulativePlansSizeLimitMb.set(
logLongConfigurationUpdated(CUMULATIVE_SIZE_LIMIT_MB_KEY, cumulativePlansSizeLimitMb.get(),
conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB)));
}
private static RateLimiter loadRateLimiter(final Configuration configuration) {
@ -207,7 +226,10 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
return Collections.emptyList();
}
final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
plans = truncateForSize(plans);
if (CollectionUtils.isEmpty(plans)) {
LOG.debug("No normalization required for table {}.", tableName);
return Collections.emptyList();
@ -215,6 +237,33 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
return plans;
}
private List<NormalizationPlan> truncateForSize(List<NormalizationPlan> plans) {
if (cumulativePlansSizeLimitMb.get() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB) {
List<NormalizationPlan> maybeTruncatedPlans = new ArrayList<>(plans.size());
long totalCumulativeSizeMb = 0;
long truncatedCumulativeSizeMb = 0;
for (NormalizationPlan plan : plans) {
totalCumulativeSizeMb += plan.getPlanSizeMb();
if (totalCumulativeSizeMb <= cumulativePlansSizeLimitMb.get()) {
truncatedCumulativeSizeMb += plan.getPlanSizeMb();
maybeTruncatedPlans.add(plan);
}
}
if (maybeTruncatedPlans.size() != plans.size()) {
LOG.debug(
"Truncating list of normalization plans that RegionNormalizerWorker will process "
+ "because of {}. Original list had {} plan(s), new list has {} plan(s). "
+ "Original list covered regions with cumulative size {} mb, "
+ "new list covers regions with cumulative size {} mb.",
CUMULATIVE_SIZE_LIMIT_MB_KEY, plans.size(), maybeTruncatedPlans.size(),
totalCumulativeSizeMb, truncatedCumulativeSizeMb);
}
return maybeTruncatedPlans;
} else {
return plans;
}
}
private void submitPlans(final List<NormalizationPlan> plans) {
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
// task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY;
import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.DEFAULT_CUMULATIVE_SIZE_LIMIT_MB;
import static org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils.isEmpty;
import java.time.Instant;
@ -229,6 +231,14 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
plans.addAll(mergePlans);
}
if (
normalizerConfiguration.getCumulativePlansSizeLimitMb() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB
) {
// If we are going to truncate our list of plans, shuffle the split and merge plans together
// so that the merge plans, which are listed last, are not starved out.
shuffleNormalizationPlans(plans);
}
LOG.debug("Computed normalization plans for table {}. Total plans: {}, split plans: {}, "
+ "merge plans: {}", table, plans.size(), splitPlansCount, mergePlansCount);
return plans;
@ -464,6 +474,14 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb(ctx);
}
/**
* This very simple method exists so we can verify it was called in a unit test. Visible for
* testing.
*/
void shuffleNormalizationPlans(List<NormalizationPlan> plans) {
Collections.shuffle(plans);
}
private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
final Object... args) {
final boolean value = predicate.getAsBoolean();
@ -484,6 +502,7 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
private final int mergeMinRegionCount;
private final Period mergeMinRegionAge;
private final long mergeMinRegionSizeMb;
private final long cumulativePlansSizeLimitMb;
private NormalizerConfiguration() {
conf = null;
@ -492,6 +511,7 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
mergeMinRegionCount = DEFAULT_MERGE_MIN_REGION_COUNT;
mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
cumulativePlansSizeLimitMb = DEFAULT_CUMULATIVE_SIZE_LIMIT_MB;
}
private NormalizerConfiguration(final Configuration conf,
@ -502,6 +522,8 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
mergeMinRegionCount = parseMergeMinRegionCount(conf);
mergeMinRegionAge = parseMergeMinRegionAge(conf);
mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
cumulativePlansSizeLimitMb =
conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB);
logConfigurationUpdated(SPLIT_ENABLED_KEY, currentConfiguration.isSplitEnabled(),
splitEnabled);
logConfigurationUpdated(MERGE_ENABLED_KEY, currentConfiguration.isMergeEnabled(),
@ -574,6 +596,10 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
}
return mergeMinRegionSizeMb;
}
private long getCumulativePlansSizeLimitMb() {
return cumulativePlansSizeLimitMb;
}
}
/**

View File

@ -45,6 +45,11 @@ final class SplitNormalizationPlan implements NormalizationPlan {
return splitTarget;
}
@Override
public long getPlanSizeMb() {
return splitTarget.getRegionSizeMb();
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)

View File

@ -204,6 +204,36 @@ public class TestRegionNormalizerWorker {
Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
}
@Test
public void testPlansSizeLimit() throws Exception {
final TableName tn = tableName.getTableName();
final TableDescriptor tnDescriptor =
TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList(
new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder()
.addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(),
new SplitNormalizationPlan(splitRegionInfo, 1)));
final Configuration conf = testingUtility.getConfiguration();
conf.setLong(RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY, 5);
final RegionNormalizerWorker worker = new RegionNormalizerWorker(
testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
workerPool.submit(worker);
queue.put(tn);
assertThatEventually("worker should process first split plan, but not second",
worker::getSplitPlanCount, comparesEqualTo(1L));
assertThatEventually("worker should process merge plan", worker::getMergePlanCount,
comparesEqualTo(1L));
}
/**
* Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until
* the matcher succeeds or the timeout period of 30 seconds is exhausted.

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.normalizer;
import static java.lang.String.format;
import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.DEFAULT_MERGE_MIN_REGION_AGE_DAYS;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_ENABLED_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_AGE_DAYS_KEY;
@ -30,13 +31,18 @@ import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.time.Instant;
@ -607,6 +613,23 @@ public class TestSimpleRegionNormalizer {
assertThat(plans, empty());
}
@Test
public void testSizeLimitShufflesPlans() {
conf.setLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, 10);
final TableName tableName = name.getTableName();
final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 3, 3, 3, 3);
setupMocksForNormalizer(regionSizes, regionInfos);
when(tableDescriptor.getNormalizerTargetRegionSize()).thenReturn(1L);
normalizer = spy(normalizer);
assertTrue(normalizer.isSplitEnabled());
assertTrue(normalizer.isMergeEnabled());
List<NormalizationPlan> computedPlans = normalizer.computePlansForTable(tableDescriptor);
assertThat(computedPlans, hasSize(4));
verify(normalizer, times(1)).shuffleNormalizationPlans(anyList());
}
@SuppressWarnings("MockitoCast")
private void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
List<RegionInfo> regionInfoList) {