HBASE-25167 Normalizer support for hot config reloading (#2523)

Wire up the `ConfigurationObserver` chain for
`RegionNormalizerManager`. The following configuration keys support
hot-reloading:
 * hbase.normalizer.throughput.max_bytes_per_sec
 * hbase.normalizer.split.enabled
 * hbase.normalizer.merge.enabled
 * hbase.normalizer.min.region.count
 * hbase.normalizer.merge.min_region_age.days
 * hbase.normalizer.merge.min_region_size.mb

Note that support for `hbase.normalizer.period` is not provided
here. Support would need to be implemented generally for the `Chore`
subsystem.

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Aman Poonia <aman.poonia.29@gmail.com>
This commit is contained in:
Nick Dimiduk 2020-10-30 10:41:56 -07:00 committed by Nick Dimiduk
parent 5e3fa7df65
commit 1c7d472537
8 changed files with 319 additions and 59 deletions

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -25,23 +25,25 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* Maintains the set of all the classes which would like to get notified
* when the Configuration is reloaded from the disk in the Online Configuration
* Change mechanism, which lets you update certain configuration properties
* on-the-fly, without having to restart the cluster.
*
* <p>
* If a class has configuration properties which you would like to be able to
* change on-the-fly, do the following:
* 1. Implement the {@link ConfigurationObserver} interface. This would require
* <ol>
* <li>Implement the {@link ConfigurationObserver} interface. This would require
* you to implement the
* {@link ConfigurationObserver#onConfigurationChange(Configuration)}
* method. This is a callback that is used to notify your class' instance
* that the configuration has changed. In this method, you need to check
* if the new values for the properties that are of interest to your class
* are different from the cached values. If yes, update them.
*
* <br />
* However, be careful with this. Certain properties might be trivially
* mutable online, but others might not. Two properties might be trivially
* mutable by themselves, but not when changed together. For example, if a
@ -50,21 +52,23 @@ import org.slf4j.LoggerFactory;
* yet updated "b", it might make a decision on the basis of a new value of
* "a", and an old value of "b". This might introduce subtle bugs. This
* needs to be dealt on a case-by-case basis, and this class does not provide
* any protection from such cases.
* any protection from such cases.</li>
*
* 2. Register the appropriate instance of the class with the
* <li>Register the appropriate instance of the class with the
* {@link ConfigurationManager} instance, using the
* {@link ConfigurationManager#registerObserver(ConfigurationObserver)}
* method. Be careful not to do this in the constructor, as you might cause
* the 'this' reference to escape. Use a factory method, or an initialize()
* method which is called after the construction of the object.
* method which is called after the construction of the object.</li>
*
* 3. Deregister the instance using the
* <li>Deregister the instance using the
* {@link ConfigurationManager#deregisterObserver(ConfigurationObserver)}
* method when it is going out of scope. In case you are not able to do that
* for any reason, it is still okay, since entries for dead observers are
* automatically collected during GC. But nonetheless, it is still a good
* practice to deregister your observer, whenever possible.
* practice to deregister your observer, whenever possible.</li>
* </ol>
* </p>
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ -117,8 +121,8 @@ public class ConfigurationManager {
observer.onConfigurationChange(conf);
}
} catch (Throwable t) {
LOG.error("Encountered a throwable while notifying observers: " + " of type : " +
observer.getClass().getCanonicalName() + "(" + observer + ")", t);
LOG.error("Encountered a throwable while notifying observers: of type : {}({})",
observer.getClass().getCanonicalName(), observer, t);
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -24,7 +24,7 @@ import org.apache.yetus.audience.InterfaceStability;
/**
* Every class that wants to observe changes in Configuration properties,
* must implement interface (and also, register itself with the
* <code>ConfigurationManager</code> object.
* {@link ConfigurationManager}.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.conf;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@ -39,9 +38,9 @@ public class TestConfigurationManager {
private static final Logger LOG = LoggerFactory.getLogger(TestConfigurationManager.class);
class DummyConfigurationObserver implements ConfigurationObserver {
static class DummyConfigurationObserver implements ConfigurationObserver {
private boolean notifiedOnChange = false;
private ConfigurationManager cm;
private final ConfigurationManager cm;
public DummyConfigurationObserver(ConfigurationManager cm) {
this.cm = cm;
@ -63,11 +62,11 @@ public class TestConfigurationManager {
}
public void register() {
this.cm.registerObserver(this);
cm.registerObserver(this);
}
public void deregister() {
this.cm.deregisterObserver(this);
cm.deregisterObserver(this);
}
}

View File

@ -771,6 +771,7 @@ public class HMaster extends HRegionServer implements MasterServices {
this.regionNormalizerManager =
RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this);
this.configurationManager.registerObserver(regionNormalizerManager);
this.regionNormalizerManager.start();
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);

View File

@ -22,8 +22,11 @@ import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@ -35,7 +38,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* This class encapsulates the details of the {@link RegionNormalizer} subsystem.
*/
@InterfaceAudience.Private
public class RegionNormalizerManager {
public class RegionNormalizerManager implements PropagatingConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class);
private final RegionNormalizerTracker regionNormalizerTracker;
@ -48,7 +51,7 @@ public class RegionNormalizerManager {
private boolean started = false;
private boolean stopped = false;
public RegionNormalizerManager(
RegionNormalizerManager(
@NonNull final RegionNormalizerTracker regionNormalizerTracker,
@Nullable final RegionNormalizerChore regionNormalizerChore,
@Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
@ -67,6 +70,25 @@ public class RegionNormalizerManager {
.build());
}
@Override
public void registerChildren(ConfigurationManager manager) {
if (worker != null) {
manager.registerObserver(worker);
}
}
@Override
public void deregisterChildren(ConfigurationManager manager) {
if (worker != null) {
manager.deregisterObserver(worker);
}
}
@Override
public void onConfigurationChange(Configuration conf) {
// no configuration managed here directly.
}
public void start() {
synchronized (startStopLock) {
if (started) {

View File

@ -26,6 +26,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -39,7 +42,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
* and executes the resulting {@link NormalizationPlan}s.
*/
@InterfaceAudience.Private
class RegionNormalizerWorker implements Runnable {
class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class);
static final String RATE_LIMIT_BYTES_PER_SEC_KEY =
@ -70,7 +73,32 @@ class RegionNormalizerWorker implements Runnable {
this.rateLimiter = loadRateLimiter(configuration);
}
@Override
public void registerChildren(ConfigurationManager manager) {
if (regionNormalizer instanceof ConfigurationObserver) {
final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer;
manager.registerObserver(observer);
}
}
@Override
public void deregisterChildren(ConfigurationManager manager) {
if (regionNormalizer instanceof ConfigurationObserver) {
final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer;
manager.deregisterObserver(observer);
}
}
@Override
public void onConfigurationChange(Configuration conf) {
rateLimiter.setRate(loadRateLimit(conf));
}
private static RateLimiter loadRateLimiter(final Configuration configuration) {
return RateLimiter.create(loadRateLimit(configuration));
}
private static long loadRateLimit(final Configuration configuration) {
long rateLimitBytes =
configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES);
long rateLimitMbs = rateLimitBytes / 1_000_000L;
@ -82,7 +110,7 @@ class RegionNormalizerWorker implements Runnable {
}
LOG.info("Normalizer rate limit set to {}",
rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec");
return RateLimiter.create(rateLimitMbs);
return rateLimitMbs;
}
/**
@ -116,6 +144,15 @@ class RegionNormalizerWorker implements Runnable {
return mergePlanCount;
}
/**
* Used in test only. This field is exposed to the test, as opposed to tracking the current
* configuration value beside the RateLimiter instance and managing synchronization to keep the
* two in sync.
*/
RateLimiter getRateLimiter() {
return rateLimiter;
}
@Override
public void run() {
while (true) {

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
@ -56,7 +57,7 @@ import org.slf4j.LoggerFactory;
* </ol>
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
class SimpleRegionNormalizer implements RegionNormalizer {
class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
@ -72,25 +73,17 @@ class SimpleRegionNormalizer implements RegionNormalizer {
static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
private Configuration conf;
private MasterServices masterServices;
private boolean splitEnabled;
private boolean mergeEnabled;
private int minRegionCount;
private Period mergeMinRegionAge;
private long mergeMinRegionSizeMb;
private NormalizerConfiguration normalizerConfiguration;
public SimpleRegionNormalizer() {
splitEnabled = DEFAULT_SPLIT_ENABLED;
mergeEnabled = DEFAULT_MERGE_ENABLED;
minRegionCount = DEFAULT_MIN_REGION_COUNT;
mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
masterServices = null;
normalizerConfiguration = new NormalizerConfiguration();
}
@Override
public Configuration getConf() {
return conf;
return normalizerConfiguration.getConf();
}
@Override
@ -98,12 +91,13 @@ class SimpleRegionNormalizer implements RegionNormalizer {
if (conf == null) {
return;
}
this.conf = conf;
splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
minRegionCount = parseMinRegionCount(conf);
mergeMinRegionAge = parseMergeMinRegionAge(conf);
mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
normalizerConfiguration = new NormalizerConfiguration(conf, normalizerConfiguration);
}
@Override
public void onConfigurationChange(Configuration conf) {
LOG.debug("Updating configuration parameters according to new configuration instance.");
setConf(conf);
}
private static int parseMinRegionCount(final Configuration conf) {
@ -141,39 +135,46 @@ class SimpleRegionNormalizer implements RegionNormalizer {
key, parsedValue, settledValue);
}
private static <T> void logConfigurationUpdated(final String key, final T oldValue,
final T newValue) {
if (!Objects.equals(oldValue, newValue)) {
LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue);
}
}
/**
* Return this instance's configured value for {@value #SPLIT_ENABLED_KEY}.
*/
public boolean isSplitEnabled() {
return splitEnabled;
return normalizerConfiguration.isSplitEnabled();
}
/**
* Return this instance's configured value for {@value #MERGE_ENABLED_KEY}.
*/
public boolean isMergeEnabled() {
return mergeEnabled;
return normalizerConfiguration.isMergeEnabled();
}
/**
* Return this instance's configured value for {@value #MIN_REGION_COUNT_KEY}.
*/
public int getMinRegionCount() {
return minRegionCount;
return normalizerConfiguration.getMinRegionCount();
}
/**
* Return this instance's configured value for {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}.
*/
public Period getMergeMinRegionAge() {
return mergeMinRegionAge;
return normalizerConfiguration.getMergeMinRegionAge();
}
/**
* Return this instance's configured value for {@value #MERGE_MIN_REGION_SIZE_MB_KEY}.
*/
public long getMergeMinRegionSizeMb() {
return mergeMinRegionSizeMb;
return normalizerConfiguration.getMergeMinRegionSizeMb();
}
@Override
@ -292,8 +293,15 @@ class SimpleRegionNormalizer implements RegionNormalizer {
/**
* Determine if a {@link RegionInfo} should be considered for a merge operation.
* </p>
* Callers beware: for safe concurrency, be sure to pass in the local instance of
* {@link NormalizerConfiguration}, don't use {@code this}'s instance.
*/
private boolean skipForMerge(final RegionStates regionStates, final RegionInfo regionInfo) {
private boolean skipForMerge(
final NormalizerConfiguration normalizerConfiguration,
final RegionStates regionStates,
final RegionInfo regionInfo
) {
final RegionState state = regionStates.getRegionState(regionInfo);
final String name = regionInfo.getEncodedName();
return
@ -304,10 +312,10 @@ class SimpleRegionNormalizer implements RegionNormalizer {
() -> !Objects.equals(state.getState(), RegionState.State.OPEN),
"skipping merge of region {} because it is not open.", name)
|| logTraceReason(
() -> !isOldEnoughForMerge(regionInfo),
() -> !isOldEnoughForMerge(normalizerConfiguration, regionInfo),
"skipping merge of region {} because it is not old enough.", name)
|| logTraceReason(
() -> !isLargeEnoughForMerge(regionInfo),
() -> !isLargeEnoughForMerge(normalizerConfiguration, regionInfo),
"skipping merge region {} because it is not large enough.", name);
}
@ -316,15 +324,16 @@ class SimpleRegionNormalizer implements RegionNormalizer {
* towards target average or target region count.
*/
private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeContext ctx) {
if (isEmpty(ctx.getTableRegions()) || ctx.getTableRegions().size() < minRegionCount) {
final NormalizerConfiguration configuration = normalizerConfiguration;
if (ctx.getTableRegions().size() < configuration.getMinRegionCount()) {
LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run"
+ " is {}, not computing merge plans.", ctx.getTableName(), ctx.getTableRegions().size(),
minRegionCount);
+ " is {}, not computing merge plans.", ctx.getTableName(),
ctx.getTableRegions().size(), configuration.getMinRegionCount());
return Collections.emptyList();
}
final long avgRegionSizeMb = (long) ctx.getAverageRegionSizeMb();
if (avgRegionSizeMb < mergeMinRegionSizeMb) {
if (avgRegionSizeMb < configuration.getMergeMinRegionSizeMb()) {
return Collections.emptyList();
}
LOG.debug("Computing normalization plan for table {}. average region size: {}, number of"
@ -347,7 +356,7 @@ class SimpleRegionNormalizer implements RegionNormalizer {
for (current = rangeStart; current < ctx.getTableRegions().size(); current++) {
final RegionInfo regionInfo = ctx.getTableRegions().get(current);
final long regionSizeMb = getRegionSizeMB(regionInfo);
if (skipForMerge(ctx.getRegionStates(), regionInfo)) {
if (skipForMerge(configuration, ctx.getRegionStates(), regionInfo)) {
// this region cannot participate in a range. resume the outer loop.
rangeStart = Math.max(current, rangeStart + 1);
break;
@ -419,18 +428,28 @@ class SimpleRegionNormalizer implements RegionNormalizer {
* Return {@code true} when {@code regionInfo} has a creation date that is old
* enough to be considered for a merge operation, {@code false} otherwise.
*/
private boolean isOldEnoughForMerge(final RegionInfo regionInfo) {
private static boolean isOldEnoughForMerge(
final NormalizerConfiguration normalizerConfiguration,
final RegionInfo regionInfo
) {
final Instant currentTime = Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime());
final Instant regionCreateTime = Instant.ofEpochMilli(regionInfo.getRegionId());
return currentTime.isAfter(regionCreateTime.plus(mergeMinRegionAge));
return currentTime.isAfter(
regionCreateTime.plus(normalizerConfiguration.getMergeMinRegionAge()));
}
/**
* Return {@code true} when {@code regionInfo} has a size that is sufficient
* to be considered for a merge operation, {@code false} otherwise.
* </p>
* Callers beware: for safe concurrency, be sure to pass in the local instance of
* {@link NormalizerConfiguration}, don't use {@code this}'s instance.
*/
private boolean isLargeEnoughForMerge(final RegionInfo regionInfo) {
return getRegionSizeMB(regionInfo) >= mergeMinRegionSizeMb;
private boolean isLargeEnoughForMerge(
final NormalizerConfiguration normalizerConfiguration,
final RegionInfo regionInfo
) {
return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb();
}
private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
@ -442,6 +461,74 @@ class SimpleRegionNormalizer implements RegionNormalizer {
return value;
}
/**
* Holds the configuration values read from {@link Configuration}. Encapsulation in a POJO
* enables atomic hot-reloading of configs without locks.
*/
private static final class NormalizerConfiguration {
private final Configuration conf;
private final boolean splitEnabled;
private final boolean mergeEnabled;
private final int minRegionCount;
private final Period mergeMinRegionAge;
private final long mergeMinRegionSizeMb;
private NormalizerConfiguration() {
conf = null;
splitEnabled = DEFAULT_SPLIT_ENABLED;
mergeEnabled = DEFAULT_MERGE_ENABLED;
minRegionCount = DEFAULT_MIN_REGION_COUNT;
mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
}
private NormalizerConfiguration(
final Configuration conf,
final NormalizerConfiguration currentConfiguration
) {
this.conf = conf;
splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
minRegionCount = parseMinRegionCount(conf);
mergeMinRegionAge = parseMergeMinRegionAge(conf);
mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
logConfigurationUpdated(SPLIT_ENABLED_KEY, currentConfiguration.isSplitEnabled(),
splitEnabled);
logConfigurationUpdated(MERGE_ENABLED_KEY, currentConfiguration.isMergeEnabled(),
mergeEnabled);
logConfigurationUpdated(MIN_REGION_COUNT_KEY, currentConfiguration.getMinRegionCount(),
minRegionCount);
logConfigurationUpdated(MERGE_MIN_REGION_AGE_DAYS_KEY,
currentConfiguration.getMergeMinRegionAge(), mergeMinRegionAge);
logConfigurationUpdated(MERGE_MIN_REGION_SIZE_MB_KEY,
currentConfiguration.getMergeMinRegionSizeMb(), mergeMinRegionSizeMb);
}
public Configuration getConf() {
return conf;
}
public boolean isSplitEnabled() {
return splitEnabled;
}
public boolean isMergeEnabled() {
return mergeEnabled;
}
public int getMinRegionCount() {
return minRegionCount;
}
public Period getMergeMinRegionAge() {
return mergeMinRegionAge;
}
public long getMergeMinRegionSizeMb() {
return mergeMinRegionSizeMb;
}
}
/**
* Inner class caries the state necessary to perform a single invocation of
* {@link #computePlansForTable(TableName)}. Grabbing this data from the assignment manager

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter;
/**
* Test that configuration changes are propagated to all children.
*/
@Category({ MasterTests.class, SmallTests.class})
public class TestRegionNormalizerManagerConfigurationObserver {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionNormalizerManagerConfigurationObserver.class);
private static final HBaseTestingUtility testUtil = new HBaseTestingUtility();
private static final Pattern rateLimitPattern =
Pattern.compile("RateLimiter\\[stableRate=(?<rate>.+)qps]");
private Configuration conf;
private SimpleRegionNormalizer normalizer;
@Mock private MasterServices masterServices;
@Mock private RegionNormalizerTracker tracker;
@Mock private RegionNormalizerChore chore;
@Mock private RegionNormalizerWorkQueue<TableName> queue;
private RegionNormalizerWorker worker;
private ConfigurationManager configurationManager;
@Before
public void before() {
MockitoAnnotations.initMocks(this);
conf = testUtil.getConfiguration();
normalizer = new SimpleRegionNormalizer();
worker = new RegionNormalizerWorker(conf, masterServices, normalizer, queue);
final RegionNormalizerManager normalizerManager =
new RegionNormalizerManager(tracker, chore, queue, worker);
configurationManager = new ConfigurationManager();
configurationManager.registerObserver(normalizerManager);
}
@Test
public void test() {
assertTrue(normalizer.isMergeEnabled());
assertEquals(3, normalizer.getMinRegionCount());
assertEquals(1_000_000L, parseConfiguredRateLimit(worker.getRateLimiter()));
final Configuration newConf = new Configuration(conf);
// configs on SimpleRegionNormalizer
newConf.setBoolean("hbase.normalizer.merge.enabled", false);
newConf.setInt("hbase.normalizer.min.region.count", 100);
// config on RegionNormalizerWorker
newConf.set("hbase.normalizer.throughput.max_bytes_per_sec", "12g");
configurationManager.notifyAllObservers(newConf);
assertFalse(normalizer.isMergeEnabled());
assertEquals(100, normalizer.getMinRegionCount());
assertEquals(12_884L, parseConfiguredRateLimit(worker.getRateLimiter()));
}
/**
* The {@link RateLimiter} class does not publicly expose its currently configured rate. It does
* offer this information in the {@link RateLimiter#toString()} method. It's fragile, but parse
* this value. The alternative would be to track the value explicitly in the worker, and the
* associated coordination overhead paid at runtime. See the related note on
* {@link RegionNormalizerWorker#getRateLimiter()}.
*/
private static long parseConfiguredRateLimit(final RateLimiter rateLimiter) {
final String val = rateLimiter.toString();
final Matcher matcher = rateLimitPattern.matcher(val);
assertTrue(matcher.matches());
final String parsedRate = matcher.group("rate");
return (long) Double.parseDouble(parsedRate);
}
}