HBASE-25167 Normalizer support for hot config reloading (#2523)
Wire up the `ConfigurationObserver` chain for `RegionNormalizerManager`. The following configuration keys support hot-reloading: * hbase.normalizer.throughput.max_bytes_per_sec * hbase.normalizer.split.enabled * hbase.normalizer.merge.enabled * hbase.normalizer.min.region.count * hbase.normalizer.merge.min_region_age.days * hbase.normalizer.merge.min_region_size.mb Note that support for `hbase.normalizer.period` is not provided here. Support would need to be implemented generally for the `Chore` subsystem. Signed-off-by: Bharath Vissapragada <bharathv@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org> Signed-off-by: Aman Poonia <aman.poonia.29@gmail.com>
This commit is contained in:
parent
e3beccf1fc
commit
d790bdedde
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -21,28 +21,29 @@ import java.util.Collections;
|
|||
import java.util.Set;
|
||||
import java.util.WeakHashMap;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Maintains the set of all the classes which would like to get notified
|
||||
* when the Configuration is reloaded from the disk in the Online Configuration
|
||||
* Change mechanism, which lets you update certain configuration properties
|
||||
* on-the-fly, without having to restart the cluster.
|
||||
*
|
||||
* <p>
|
||||
* If a class has configuration properties which you would like to be able to
|
||||
* change on-the-fly, do the following:
|
||||
* 1. Implement the {@link ConfigurationObserver} interface. This would require
|
||||
* <ol>
|
||||
* <li>Implement the {@link ConfigurationObserver} interface. This would require
|
||||
* you to implement the
|
||||
* {@link ConfigurationObserver#onConfigurationChange(Configuration)}
|
||||
* method. This is a callback that is used to notify your class' instance
|
||||
* that the configuration has changed. In this method, you need to check
|
||||
* if the new values for the properties that are of interest to your class
|
||||
* are different from the cached values. If yes, update them.
|
||||
*
|
||||
* <br />
|
||||
* However, be careful with this. Certain properties might be trivially
|
||||
* mutable online, but others might not. Two properties might be trivially
|
||||
* mutable by themselves, but not when changed together. For example, if a
|
||||
|
@ -51,21 +52,23 @@ import org.slf4j.LoggerFactory;
|
|||
* yet updated "b", it might make a decision on the basis of a new value of
|
||||
* "a", and an old value of "b". This might introduce subtle bugs. This
|
||||
* needs to be dealt on a case-by-case basis, and this class does not provide
|
||||
* any protection from such cases.
|
||||
* any protection from such cases.</li>
|
||||
*
|
||||
* 2. Register the appropriate instance of the class with the
|
||||
* <li>Register the appropriate instance of the class with the
|
||||
* {@link ConfigurationManager} instance, using the
|
||||
* {@link ConfigurationManager#registerObserver(ConfigurationObserver)}
|
||||
* method. Be careful not to do this in the constructor, as you might cause
|
||||
* the 'this' reference to escape. Use a factory method, or an initialize()
|
||||
* method which is called after the construction of the object.
|
||||
* method which is called after the construction of the object.</li>
|
||||
*
|
||||
* 3. Deregister the instance using the
|
||||
* <li>Deregister the instance using the
|
||||
* {@link ConfigurationManager#deregisterObserver(ConfigurationObserver)}
|
||||
* method when it is going out of scope. In case you are not able to do that
|
||||
* for any reason, it is still okay, since entries for dead observers are
|
||||
* automatically collected during GC. But nonetheless, it is still a good
|
||||
* practice to deregister your observer, whenever possible.
|
||||
* practice to deregister your observer, whenever possible.</li>
|
||||
* </ol>
|
||||
* </p>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
|
@ -118,8 +121,8 @@ public class ConfigurationManager {
|
|||
observer.onConfigurationChange(conf);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Encountered a throwable while notifying observers: " + " of type : " +
|
||||
observer.getClass().getCanonicalName() + "(" + observer + ")", t);
|
||||
LOG.error("Encountered a throwable while notifying observers: of type : {}({})",
|
||||
observer.getClass().getCanonicalName(), observer, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -24,7 +24,7 @@ import org.apache.yetus.audience.InterfaceStability;
|
|||
/**
|
||||
* Every class that wants to observe changes in Configuration properties,
|
||||
* must implement interface (and also, register itself with the
|
||||
* <code>ConfigurationManager</code> object.
|
||||
* {@link ConfigurationManager}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.conf;
|
|||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
|
@ -39,9 +38,9 @@ public class TestConfigurationManager {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestConfigurationManager.class);
|
||||
|
||||
class DummyConfigurationObserver implements ConfigurationObserver {
|
||||
static class DummyConfigurationObserver implements ConfigurationObserver {
|
||||
private boolean notifiedOnChange = false;
|
||||
private ConfigurationManager cm;
|
||||
private final ConfigurationManager cm;
|
||||
|
||||
public DummyConfigurationObserver(ConfigurationManager cm) {
|
||||
this.cm = cm;
|
||||
|
@ -63,11 +62,11 @@ public class TestConfigurationManager {
|
|||
}
|
||||
|
||||
public void register() {
|
||||
this.cm.registerObserver(this);
|
||||
cm.registerObserver(this);
|
||||
}
|
||||
|
||||
public void deregister() {
|
||||
this.cm.deregisterObserver(this);
|
||||
cm.deregisterObserver(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -785,6 +785,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
this.regionNormalizerManager =
|
||||
RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this);
|
||||
this.configurationManager.registerObserver(regionNormalizerManager);
|
||||
this.regionNormalizerManager.start();
|
||||
|
||||
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
|
||||
|
|
|
@ -22,8 +22,11 @@ import edu.umd.cs.findbugs.annotations.Nullable;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
|
||||
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -35,7 +38,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
|
|||
* This class encapsulates the details of the {@link RegionNormalizer} subsystem.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RegionNormalizerManager {
|
||||
public class RegionNormalizerManager implements PropagatingConfigurationObserver {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class);
|
||||
|
||||
private final RegionNormalizerTracker regionNormalizerTracker;
|
||||
|
@ -48,7 +51,7 @@ public class RegionNormalizerManager {
|
|||
private boolean started = false;
|
||||
private boolean stopped = false;
|
||||
|
||||
public RegionNormalizerManager(
|
||||
RegionNormalizerManager(
|
||||
@NonNull final RegionNormalizerTracker regionNormalizerTracker,
|
||||
@Nullable final RegionNormalizerChore regionNormalizerChore,
|
||||
@Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
|
||||
|
@ -67,6 +70,25 @@ public class RegionNormalizerManager {
|
|||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerChildren(ConfigurationManager manager) {
|
||||
if (worker != null) {
|
||||
manager.registerObserver(worker);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deregisterChildren(ConfigurationManager manager) {
|
||||
if (worker != null) {
|
||||
manager.deregisterObserver(worker);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConfigurationChange(Configuration conf) {
|
||||
// no configuration managed here directly.
|
||||
}
|
||||
|
||||
public void start() {
|
||||
synchronized (startStopLock) {
|
||||
if (started) {
|
||||
|
|
|
@ -26,6 +26,9 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -39,7 +42,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
|
|||
* and executes the resulting {@link NormalizationPlan}s.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class RegionNormalizerWorker implements Runnable {
|
||||
class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class);
|
||||
|
||||
static final String RATE_LIMIT_BYTES_PER_SEC_KEY =
|
||||
|
@ -70,7 +73,32 @@ class RegionNormalizerWorker implements Runnable {
|
|||
this.rateLimiter = loadRateLimiter(configuration);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerChildren(ConfigurationManager manager) {
|
||||
if (regionNormalizer instanceof ConfigurationObserver) {
|
||||
final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer;
|
||||
manager.registerObserver(observer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deregisterChildren(ConfigurationManager manager) {
|
||||
if (regionNormalizer instanceof ConfigurationObserver) {
|
||||
final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer;
|
||||
manager.deregisterObserver(observer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConfigurationChange(Configuration conf) {
|
||||
rateLimiter.setRate(loadRateLimit(conf));
|
||||
}
|
||||
|
||||
private static RateLimiter loadRateLimiter(final Configuration configuration) {
|
||||
return RateLimiter.create(loadRateLimit(configuration));
|
||||
}
|
||||
|
||||
private static long loadRateLimit(final Configuration configuration) {
|
||||
long rateLimitBytes =
|
||||
configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES);
|
||||
long rateLimitMbs = rateLimitBytes / 1_000_000L;
|
||||
|
@ -82,7 +110,7 @@ class RegionNormalizerWorker implements Runnable {
|
|||
}
|
||||
LOG.info("Normalizer rate limit set to {}",
|
||||
rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec");
|
||||
return RateLimiter.create(rateLimitMbs);
|
||||
return rateLimitMbs;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -116,6 +144,15 @@ class RegionNormalizerWorker implements Runnable {
|
|||
return mergePlanCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used in test only. This field is exposed to the test, as opposed to tracking the current
|
||||
* configuration value beside the RateLimiter instance and managing synchronization to keep the
|
||||
* two in sync.
|
||||
*/
|
||||
RateLimiter getRateLimiter() {
|
||||
return rateLimiter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.MasterSwitchType;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||
|
@ -56,7 +57,7 @@ import org.slf4j.LoggerFactory;
|
|||
* </ol>
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
class SimpleRegionNormalizer implements RegionNormalizer {
|
||||
class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
|
||||
|
||||
static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
|
||||
|
@ -72,25 +73,17 @@ class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
|
||||
static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
|
||||
|
||||
private Configuration conf;
|
||||
private MasterServices masterServices;
|
||||
private boolean splitEnabled;
|
||||
private boolean mergeEnabled;
|
||||
private int minRegionCount;
|
||||
private Period mergeMinRegionAge;
|
||||
private long mergeMinRegionSizeMb;
|
||||
private NormalizerConfiguration normalizerConfiguration;
|
||||
|
||||
public SimpleRegionNormalizer() {
|
||||
splitEnabled = DEFAULT_SPLIT_ENABLED;
|
||||
mergeEnabled = DEFAULT_MERGE_ENABLED;
|
||||
minRegionCount = DEFAULT_MIN_REGION_COUNT;
|
||||
mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
|
||||
mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
|
||||
masterServices = null;
|
||||
normalizerConfiguration = new NormalizerConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
return normalizerConfiguration.getConf();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -98,12 +91,13 @@ class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
if (conf == null) {
|
||||
return;
|
||||
}
|
||||
this.conf = conf;
|
||||
splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
|
||||
mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
|
||||
minRegionCount = parseMinRegionCount(conf);
|
||||
mergeMinRegionAge = parseMergeMinRegionAge(conf);
|
||||
mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
|
||||
normalizerConfiguration = new NormalizerConfiguration(conf, normalizerConfiguration);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConfigurationChange(Configuration conf) {
|
||||
LOG.debug("Updating configuration parameters according to new configuration instance.");
|
||||
setConf(conf);
|
||||
}
|
||||
|
||||
private static int parseMinRegionCount(final Configuration conf) {
|
||||
|
@ -141,39 +135,46 @@ class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
key, parsedValue, settledValue);
|
||||
}
|
||||
|
||||
private static <T> void logConfigurationUpdated(final String key, final T oldValue,
|
||||
final T newValue) {
|
||||
if (!Objects.equals(oldValue, newValue)) {
|
||||
LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return this instance's configured value for {@value #SPLIT_ENABLED_KEY}.
|
||||
*/
|
||||
public boolean isSplitEnabled() {
|
||||
return splitEnabled;
|
||||
return normalizerConfiguration.isSplitEnabled();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return this instance's configured value for {@value #MERGE_ENABLED_KEY}.
|
||||
*/
|
||||
public boolean isMergeEnabled() {
|
||||
return mergeEnabled;
|
||||
return normalizerConfiguration.isMergeEnabled();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return this instance's configured value for {@value #MIN_REGION_COUNT_KEY}.
|
||||
*/
|
||||
public int getMinRegionCount() {
|
||||
return minRegionCount;
|
||||
return normalizerConfiguration.getMinRegionCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return this instance's configured value for {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}.
|
||||
*/
|
||||
public Period getMergeMinRegionAge() {
|
||||
return mergeMinRegionAge;
|
||||
return normalizerConfiguration.getMergeMinRegionAge();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return this instance's configured value for {@value #MERGE_MIN_REGION_SIZE_MB_KEY}.
|
||||
*/
|
||||
public long getMergeMinRegionSizeMb() {
|
||||
return mergeMinRegionSizeMb;
|
||||
return normalizerConfiguration.getMergeMinRegionSizeMb();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -292,8 +293,15 @@ class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
|
||||
/**
|
||||
* Determine if a {@link RegionInfo} should be considered for a merge operation.
|
||||
* </p>
|
||||
* Callers beware: for safe concurrency, be sure to pass in the local instance of
|
||||
* {@link NormalizerConfiguration}, don't use {@code this}'s instance.
|
||||
*/
|
||||
private boolean skipForMerge(final RegionStates regionStates, final RegionInfo regionInfo) {
|
||||
private boolean skipForMerge(
|
||||
final NormalizerConfiguration normalizerConfiguration,
|
||||
final RegionStates regionStates,
|
||||
final RegionInfo regionInfo
|
||||
) {
|
||||
final RegionState state = regionStates.getRegionState(regionInfo);
|
||||
final String name = regionInfo.getEncodedName();
|
||||
return
|
||||
|
@ -304,10 +312,10 @@ class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
() -> !Objects.equals(state.getState(), RegionState.State.OPEN),
|
||||
"skipping merge of region {} because it is not open.", name)
|
||||
|| logTraceReason(
|
||||
() -> !isOldEnoughForMerge(regionInfo),
|
||||
() -> !isOldEnoughForMerge(normalizerConfiguration, regionInfo),
|
||||
"skipping merge of region {} because it is not old enough.", name)
|
||||
|| logTraceReason(
|
||||
() -> !isLargeEnoughForMerge(regionInfo),
|
||||
() -> !isLargeEnoughForMerge(normalizerConfiguration, regionInfo),
|
||||
"skipping merge region {} because it is not large enough.", name);
|
||||
}
|
||||
|
||||
|
@ -316,15 +324,16 @@ class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
* towards target average or target region count.
|
||||
*/
|
||||
private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeContext ctx) {
|
||||
if (isEmpty(ctx.getTableRegions()) || ctx.getTableRegions().size() < minRegionCount) {
|
||||
final NormalizerConfiguration configuration = normalizerConfiguration;
|
||||
if (ctx.getTableRegions().size() < configuration.getMinRegionCount()) {
|
||||
LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run"
|
||||
+ " is {}, not computing merge plans.", ctx.getTableName(), ctx.getTableRegions().size(),
|
||||
minRegionCount);
|
||||
+ " is {}, not computing merge plans.", ctx.getTableName(),
|
||||
ctx.getTableRegions().size(), configuration.getMinRegionCount());
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
final long avgRegionSizeMb = (long) ctx.getAverageRegionSizeMb();
|
||||
if (avgRegionSizeMb < mergeMinRegionSizeMb) {
|
||||
if (avgRegionSizeMb < configuration.getMergeMinRegionSizeMb()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
LOG.debug("Computing normalization plan for table {}. average region size: {}, number of"
|
||||
|
@ -347,7 +356,7 @@ class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
for (current = rangeStart; current < ctx.getTableRegions().size(); current++) {
|
||||
final RegionInfo regionInfo = ctx.getTableRegions().get(current);
|
||||
final long regionSizeMb = getRegionSizeMB(regionInfo);
|
||||
if (skipForMerge(ctx.getRegionStates(), regionInfo)) {
|
||||
if (skipForMerge(configuration, ctx.getRegionStates(), regionInfo)) {
|
||||
// this region cannot participate in a range. resume the outer loop.
|
||||
rangeStart = Math.max(current, rangeStart + 1);
|
||||
break;
|
||||
|
@ -419,18 +428,28 @@ class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
* Return {@code true} when {@code regionInfo} has a creation date that is old
|
||||
* enough to be considered for a merge operation, {@code false} otherwise.
|
||||
*/
|
||||
private boolean isOldEnoughForMerge(final RegionInfo regionInfo) {
|
||||
private static boolean isOldEnoughForMerge(
|
||||
final NormalizerConfiguration normalizerConfiguration,
|
||||
final RegionInfo regionInfo
|
||||
) {
|
||||
final Instant currentTime = Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime());
|
||||
final Instant regionCreateTime = Instant.ofEpochMilli(regionInfo.getRegionId());
|
||||
return currentTime.isAfter(regionCreateTime.plus(mergeMinRegionAge));
|
||||
return currentTime.isAfter(
|
||||
regionCreateTime.plus(normalizerConfiguration.getMergeMinRegionAge()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Return {@code true} when {@code regionInfo} has a size that is sufficient
|
||||
* to be considered for a merge operation, {@code false} otherwise.
|
||||
* </p>
|
||||
* Callers beware: for safe concurrency, be sure to pass in the local instance of
|
||||
* {@link NormalizerConfiguration}, don't use {@code this}'s instance.
|
||||
*/
|
||||
private boolean isLargeEnoughForMerge(final RegionInfo regionInfo) {
|
||||
return getRegionSizeMB(regionInfo) >= mergeMinRegionSizeMb;
|
||||
private boolean isLargeEnoughForMerge(
|
||||
final NormalizerConfiguration normalizerConfiguration,
|
||||
final RegionInfo regionInfo
|
||||
) {
|
||||
return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb();
|
||||
}
|
||||
|
||||
private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
|
||||
|
@ -442,6 +461,74 @@ class SimpleRegionNormalizer implements RegionNormalizer {
|
|||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Holds the configuration values read from {@link Configuration}. Encapsulation in a POJO
|
||||
* enables atomic hot-reloading of configs without locks.
|
||||
*/
|
||||
private static final class NormalizerConfiguration {
|
||||
private final Configuration conf;
|
||||
private final boolean splitEnabled;
|
||||
private final boolean mergeEnabled;
|
||||
private final int minRegionCount;
|
||||
private final Period mergeMinRegionAge;
|
||||
private final long mergeMinRegionSizeMb;
|
||||
|
||||
private NormalizerConfiguration() {
|
||||
conf = null;
|
||||
splitEnabled = DEFAULT_SPLIT_ENABLED;
|
||||
mergeEnabled = DEFAULT_MERGE_ENABLED;
|
||||
minRegionCount = DEFAULT_MIN_REGION_COUNT;
|
||||
mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
|
||||
mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
|
||||
}
|
||||
|
||||
private NormalizerConfiguration(
|
||||
final Configuration conf,
|
||||
final NormalizerConfiguration currentConfiguration
|
||||
) {
|
||||
this.conf = conf;
|
||||
splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
|
||||
mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
|
||||
minRegionCount = parseMinRegionCount(conf);
|
||||
mergeMinRegionAge = parseMergeMinRegionAge(conf);
|
||||
mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
|
||||
logConfigurationUpdated(SPLIT_ENABLED_KEY, currentConfiguration.isSplitEnabled(),
|
||||
splitEnabled);
|
||||
logConfigurationUpdated(MERGE_ENABLED_KEY, currentConfiguration.isMergeEnabled(),
|
||||
mergeEnabled);
|
||||
logConfigurationUpdated(MIN_REGION_COUNT_KEY, currentConfiguration.getMinRegionCount(),
|
||||
minRegionCount);
|
||||
logConfigurationUpdated(MERGE_MIN_REGION_AGE_DAYS_KEY,
|
||||
currentConfiguration.getMergeMinRegionAge(), mergeMinRegionAge);
|
||||
logConfigurationUpdated(MERGE_MIN_REGION_SIZE_MB_KEY,
|
||||
currentConfiguration.getMergeMinRegionSizeMb(), mergeMinRegionSizeMb);
|
||||
}
|
||||
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
public boolean isSplitEnabled() {
|
||||
return splitEnabled;
|
||||
}
|
||||
|
||||
public boolean isMergeEnabled() {
|
||||
return mergeEnabled;
|
||||
}
|
||||
|
||||
public int getMinRegionCount() {
|
||||
return minRegionCount;
|
||||
}
|
||||
|
||||
public Period getMergeMinRegionAge() {
|
||||
return mergeMinRegionAge;
|
||||
}
|
||||
|
||||
public long getMergeMinRegionSizeMb() {
|
||||
return mergeMinRegionSizeMb;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inner class caries the state necessary to perform a single invocation of
|
||||
* {@link #computePlansForTable(TableName)}. Grabbing this data from the assignment manager
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.normalizer;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter;
|
||||
|
||||
/**
|
||||
* Test that configuration changes are propagated to all children.
|
||||
*/
|
||||
@Category({ MasterTests.class, SmallTests.class})
|
||||
public class TestRegionNormalizerManagerConfigurationObserver {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRegionNormalizerManagerConfigurationObserver.class);
|
||||
|
||||
private static final HBaseTestingUtility testUtil = new HBaseTestingUtility();
|
||||
private static final Pattern rateLimitPattern =
|
||||
Pattern.compile("RateLimiter\\[stableRate=(?<rate>.+)qps]");
|
||||
|
||||
private Configuration conf;
|
||||
private SimpleRegionNormalizer normalizer;
|
||||
@Mock private MasterServices masterServices;
|
||||
@Mock private RegionNormalizerTracker tracker;
|
||||
@Mock private RegionNormalizerChore chore;
|
||||
@Mock private RegionNormalizerWorkQueue<TableName> queue;
|
||||
private RegionNormalizerWorker worker;
|
||||
private ConfigurationManager configurationManager;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
conf = testUtil.getConfiguration();
|
||||
normalizer = new SimpleRegionNormalizer();
|
||||
worker = new RegionNormalizerWorker(conf, masterServices, normalizer, queue);
|
||||
final RegionNormalizerManager normalizerManager =
|
||||
new RegionNormalizerManager(tracker, chore, queue, worker);
|
||||
configurationManager = new ConfigurationManager();
|
||||
configurationManager.registerObserver(normalizerManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
assertTrue(normalizer.isMergeEnabled());
|
||||
assertEquals(3, normalizer.getMinRegionCount());
|
||||
assertEquals(1_000_000L, parseConfiguredRateLimit(worker.getRateLimiter()));
|
||||
|
||||
final Configuration newConf = new Configuration(conf);
|
||||
// configs on SimpleRegionNormalizer
|
||||
newConf.setBoolean("hbase.normalizer.merge.enabled", false);
|
||||
newConf.setInt("hbase.normalizer.min.region.count", 100);
|
||||
// config on RegionNormalizerWorker
|
||||
newConf.set("hbase.normalizer.throughput.max_bytes_per_sec", "12g");
|
||||
|
||||
configurationManager.notifyAllObservers(newConf);
|
||||
assertFalse(normalizer.isMergeEnabled());
|
||||
assertEquals(100, normalizer.getMinRegionCount());
|
||||
assertEquals(12_884L, parseConfiguredRateLimit(worker.getRateLimiter()));
|
||||
}
|
||||
|
||||
/**
|
||||
* The {@link RateLimiter} class does not publicly expose its currently configured rate. It does
|
||||
* offer this information in the {@link RateLimiter#toString()} method. It's fragile, but parse
|
||||
* this value. The alternative would be to track the value explicitly in the worker, and the
|
||||
* associated coordination overhead paid at runtime. See the related note on
|
||||
* {@link RegionNormalizerWorker#getRateLimiter()}.
|
||||
*/
|
||||
private static long parseConfiguredRateLimit(final RateLimiter rateLimiter) {
|
||||
final String val = rateLimiter.toString();
|
||||
final Matcher matcher = rateLimitPattern.matcher(val);
|
||||
assertTrue(matcher.matches());
|
||||
final String parsedRate = matcher.group("rate");
|
||||
return (long) Double.parseDouble(parsedRate);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue